diff --git a/.flake8 b/.flake8 index 34b4bd7..a94e7e0 100644 --- a/.flake8 +++ b/.flake8 @@ -13,7 +13,7 @@ per-file-ignores= hyperglass/api/models/*.py:N805,E0213,R0903 hyperglass/configuration/models/*.py:N805,E0213,R0903,E501,C0301 hyperglass/api/models/response.py:E501,C0301 -ignore=W503,C0330,R504,D202 +ignore=W503,C0330,R504,D202,S403,S301 select=B, BLK, C, D, E, F, I, II, N, P, PIE, S, R, W disable-noqa=False hang-closing=False diff --git a/hyperglass/api/models/query.py b/hyperglass/api/models/query.py index 4e7a614..1d8c882 100644 --- a/hyperglass/api/models/query.py +++ b/hyperglass/api/models/query.py @@ -1,6 +1,7 @@ """Input query validation model.""" # Standard Library +import json import hashlib # Third Party @@ -97,6 +98,19 @@ class Query(BaseModel): ) return f'Query({", ".join(items)})' + def export_dict(self): + """Create dictionary representation of instance.""" + return { + "query_location": self.query_location, + "query_type": self.query_type, + "query_vrf": self.query_vrf.name, + "query_target": str(self.query_target), + } + + def export_json(self): + """Create JSON representation of instance.""" + return json.dumps(self.export_dict(), default=str) + @validator("query_type") def validate_query_type(cls, value): """Ensure query_type is enabled. diff --git a/hyperglass/api/models/validators.py b/hyperglass/api/models/validators.py index 32c974f..1e3bed5 100644 --- a/hyperglass/api/models/validators.py +++ b/hyperglass/api/models/validators.py @@ -6,7 +6,7 @@ from ipaddress import ip_network # Project from hyperglass.log import log -from hyperglass.util import get_containing_prefix +from hyperglass.util import get_network_info from hyperglass.exceptions import InputInvalid, InputNotAllowed from hyperglass.configuration import params @@ -138,7 +138,7 @@ def validate_ip(value, query_type, query_vrf): # noqa: C901 # query. elif query_type in ("bgp_route",) and vrf_afi.force_cidr: - valid_ip = get_containing_prefix(valid_ip.network_address) + valid_ip = get_network_info(valid_ip.network_address).get("prefix") # For a host query with bgp_route query type and force_cidr # disabled, convert the host query to a single IP address. diff --git a/hyperglass/api/routes.py b/hyperglass/api/routes.py index 830c489..fd240a7 100644 --- a/hyperglass/api/routes.py +++ b/hyperglass/api/routes.py @@ -2,7 +2,9 @@ # Standard Library import os +import json import time +from ipaddress import ip_address # Third Party from fastapi import HTTPException @@ -10,8 +12,8 @@ from starlette.requests import Request from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html # Project -from hyperglass.log import log -from hyperglass.util import clean_name, import_public_key +from hyperglass.log import log, query_hook +from hyperglass.util import adonothing, clean_name, get_network_info, import_public_key from hyperglass.cache import Cache from hyperglass.encode import jwt_decode from hyperglass.exceptions import HyperglassError @@ -22,10 +24,47 @@ from hyperglass.api.models.cert_import import EncodedRequest APP_PATH = os.environ["hyperglass_directory"] +if params.logging.http is not None and params.logging.http.enable: + log_query = query_hook +else: + log_query = adonothing + async def query(query_data: Query, request: Request): """Ingest request data pass it to the backend application to perform the query.""" + if ip_address(request.client.host).is_loopback: + network_info = {"prefix": None, "asn": None} + else: + network_info = get_network_info("199.34.92.64") + + network_info = { + "prefix": str(network_info["prefix"]), + "asn": network_info["asns"][0], + } + + header_keys = ( + "content-length", + "accept", + "user-agent", + "content-type", + "referer", + "accept-encoding", + "accept-language", + ) + + await log_query( + { + **json.loads(query_data.export_json()), + "headers": { + k: v for k, v in dict(request.headers).items() if k in header_keys + }, + "source": request.client.host, + "network": network_info, + }, + params.logging.http, + ) + # Initialize cache cache = Cache(db=params.cache.database, **REDIS_CONFIG) log.debug("Initialized cache {}", repr(cache)) diff --git a/hyperglass/cache.py b/hyperglass/cache.py index d7fb769..f330a08 100644 --- a/hyperglass/cache.py +++ b/hyperglass/cache.py @@ -110,3 +110,18 @@ class Cache: """Set timeout of key in seconds.""" for key in keys: await self.instance.expire(key, seconds) + + async def aget_config(self): + """Get picked config object from cache.""" + import pickle + + pickled = await self.instance.get("HYPERGLASS_CONFIG") + return pickle.loads(pickled) + + def get_config(self): + """Get picked config object from cache.""" + import pickle + from hyperglass.compat._asyncio import aiorun + + pickled = aiorun(self.instance.get("HYPERGLASS_CONFIG")) + return pickle.loads(pickled) diff --git a/hyperglass/compat/_asyncio.py b/hyperglass/compat/_asyncio.py index 7ba156a..68e5961 100644 --- a/hyperglass/compat/_asyncio.py +++ b/hyperglass/compat/_asyncio.py @@ -51,6 +51,32 @@ def _patch_loop(loop): return tasks +def _cancel_all_tasks(loop, tasks): + to_cancel = [task for task in tasks if not task.done()] + + if not to_cancel: + return + + for task in to_cancel: + task.cancel() + + loop.run_until_complete( + asyncio.gather(*to_cancel, loop=loop, return_exceptions=True) + ) + + for task in to_cancel: + if task.cancelled(): + continue + if task.exception() is not None: + loop.call_exception_handler( + { + "message": "unhandled exception during asyncio.run() shutdown", + "exception": task.exception(), + "task": task, + } + ) + + def _patched_run(main, *, debug=False): try: loop = get_running_loop() @@ -79,32 +105,6 @@ def _patched_run(main, *, debug=False): loop.close() -def _cancel_all_tasks(loop, tasks): - to_cancel = [task for task in tasks if not task.done()] - - if not to_cancel: - return - - for task in to_cancel: - task.cancel() - - loop.run_until_complete( - asyncio.gather(*to_cancel, loop=loop, return_exceptions=True) - ) - - for task in to_cancel: - if task.cancelled(): - continue - if task.exception() is not None: - loop.call_exception_handler( - { - "message": "unhandled exception during asyncio.run() shutdown", - "exception": task.exception(), - "task": task, - } - ) - - # If local system's python version is at least 3.6, use the backported # asyncio runner. if RUNNING_PYTHON_VERSION >= (3, 6): diff --git a/hyperglass/configuration/__init__.py b/hyperglass/configuration/__init__.py index 0bead73..28a2827 100644 --- a/hyperglass/configuration/__init__.py +++ b/hyperglass/configuration/__init__.py @@ -9,7 +9,6 @@ from pathlib import Path # Third Party import yaml -from aiofile import AIOFile from pydantic import ValidationError # Project @@ -19,7 +18,7 @@ from hyperglass.log import ( enable_file_logging, enable_syslog_logging, ) -from hyperglass.util import check_path, set_app_path +from hyperglass.util import check_path, set_app_path, set_cache_env from hyperglass.constants import ( CREDIT, DEFAULT_HELP, @@ -115,57 +114,6 @@ def _config_optional(config_path: Path) -> dict: return config -async def _config_main(): - """Open main config file and load YAML to dict. - - Returns: - {dict} -- Main config file - """ - config = {} - try: - async with AIOFile(CONFIG_MAIN, "r") as cf: - raw = await cf.read() - config = yaml.safe_load(raw) or {} - except (yaml.YAMLError, yaml.MarkedYAMLError) as yaml_error: - raise ConfigError(error_msg=str(yaml_error)) from None - return config - - -async def _config_commands(): - """Open commands config file and load YAML to dict. - - Returns: - {dict} -- Commands config file - """ - if CONFIG_COMMANDS is None: - config = {} - else: - try: - async with AIOFile(CONFIG_COMMANDS, "r") as cf: - raw = await cf.read() - config = yaml.safe_load(raw) or {} - except (yaml.YAMLError, yaml.MarkedYAMLError) as yaml_error: - raise ConfigError(error_msg=str(yaml_error)) from None - log.debug("Unvalidated commands: {c}", c=config) - return config - - -async def _config_devices(): - """Open devices config file and load YAML to dict. - - Returns: - {dict} -- Devices config file - """ - try: - async with AIOFile(CONFIG_DEVICES, "r") as cf: - raw = await cf.read() - config = yaml.safe_load(raw) - log.debug("Unvalidated device config: {c}", c=config) - except (yaml.YAMLError, yaml.MarkedYAMLError) as yaml_error: - raise ConfigError(error_msg=str(yaml_error)) from None - return config - - user_config = _config_optional(CONFIG_MAIN) # Read raw debug value from config to enable debugging quickly. @@ -188,6 +136,8 @@ except ValidationError as validation_errors: error_msg=error["msg"], ) +set_cache_env(db=params.cache.database, host=params.cache.host, port=params.cache.port) + # Re-evaluate debug state after config is validated set_log_level(logger=log, debug=params.debug) diff --git a/hyperglass/configuration/models/logging.py b/hyperglass/configuration/models/logging.py index f371828..2dcd726 100644 --- a/hyperglass/configuration/models/logging.py +++ b/hyperglass/configuration/models/logging.py @@ -1,14 +1,28 @@ """Validate logging configuration.""" # Standard Library -from typing import Optional +import base64 +from ast import literal_eval +from typing import Dict, Union, Optional from pathlib import Path # Third Party -from pydantic import ByteSize, StrictInt, StrictStr, StrictBool, DirectoryPath, constr +from pydantic import ( + ByteSize, + SecretStr, + StrictInt, + StrictStr, + AnyHttpUrl, + StrictBool, + StrictFloat, + DirectoryPath, + constr, + validator, +) # Project -from hyperglass.configuration.models._utils import HyperglassModel +from hyperglass.constants import __version__ +from hyperglass.configuration.models._utils import HyperglassModel, HyperglassModelExtra class Syslog(HyperglassModel): @@ -19,10 +33,70 @@ class Syslog(HyperglassModel): port: StrictInt = 514 +class HttpAuth(HyperglassModel): + """HTTP hook authentication parameters.""" + + mode: constr(regex=r"(basic|api_key)") = "basic" + username: Optional[StrictStr] + password: SecretStr + + def api_key(self, header_name="X-API-Key"): + """Represent authentication as an API key header.""" + return {header_name: self.password.get_secret_value()} + + def basic(self): + """Represent HTTP basic authentication.""" + return (self.username, self.password.get_secret_value()) + + +class Http(HyperglassModelExtra): + """HTTP logging parameters.""" + + enable: StrictBool = True + host: AnyHttpUrl + authentication: Optional[HttpAuth] + headers: Dict[StrictStr, Union[StrictStr, StrictInt, StrictBool, None]] = {} + params: Dict[StrictStr, Union[StrictStr, StrictInt, StrictBool, None]] = {} + verify_ssl: StrictBool = True + timeout: Union[StrictFloat, StrictInt] = 5.0 + + @validator("headers", "params") + def stringify_headers_params(cls, value): + """Ensure headers and URL parameters are strings.""" + for k, v in value.items(): + if not isinstance(v, str): + value[k] = str(v) + return value + + def __init__(self, **kwargs): + """Initialize model, add obfuscated connection details as attribute.""" + super().__init__(**kwargs) + dumped = { + "headers": self.headers, + "params": self.params, + "verify": self.verify_ssl, + "timeout": self.timeout, + } + dumped["headers"].update({"user-agent": f"hyperglass/{__version__}"}) + + if self.authentication is not None: + if self.authentication.mode == "api_key": + dumped["headers"].update(self.authentication.api_key()) + else: + dumped["auth"] = self.authentication.basic() + + self._obscured_params = base64.encodestring(str(dumped).encode()) + + def decoded(self): + """Decode connection details.""" + return literal_eval(base64.decodestring(self._obscured_params).decode()) + + class Logging(HyperglassModel): """Validation model for logging configuration.""" directory: DirectoryPath = Path("/tmp") # noqa: S108 format: constr(regex=r"(text|json)") = "text" - syslog: Optional[Syslog] max_size: ByteSize = "50MB" + syslog: Optional[Syslog] + http: Optional[Http] diff --git a/hyperglass/log.py b/hyperglass/log.py index c5d7681..9d12e00 100644 --- a/hyperglass/log.py +++ b/hyperglass/log.py @@ -26,7 +26,7 @@ _LOG_LEVELS = [ def base_logger(): """Initialize hyperglass logging instance.""" _loguru_logger.remove() - _loguru_logger.add(sys.stdout, format=_LOG_FMT, level="INFO") + _loguru_logger.add(sys.stdout, format=_LOG_FMT, level="INFO", enqueue=True) _loguru_logger.configure(levels=_LOG_LEVELS) return _loguru_logger @@ -39,7 +39,7 @@ def set_log_level(logger, debug): if debug: os.environ["HYPERGLASS_LOG_LEVEL"] = "DEBUG" logger.remove() - logger.add(sys.stdout, format=_LOG_FMT, level="DEBUG") + logger.add(sys.stdout, format=_LOG_FMT, level="DEBUG", enqueue=True) logger.configure(levels=_LOG_LEVELS) if debug: @@ -78,7 +78,7 @@ def enable_file_logging(logger, log_directory, log_format, log_max_size): with log_file.open("a+") as lf: lf.write(f'\n\n{"".join(log_break)}\n\n') - logger.add(log_file, rotation=log_max_size, serialize=structured) + logger.add(log_file, rotation=log_max_size, serialize=structured, enqueue=True) logger.debug("Logging to file enabled") @@ -90,7 +90,9 @@ def enable_syslog_logging(logger, syslog_host, syslog_port): from logging.handlers import SysLogHandler logger.add( - SysLogHandler(address=(str(syslog_host), syslog_port)), format="{message}" + SysLogHandler(address=(str(syslog_host), syslog_port)), + format="{message}", + enqueue=True, ) logger.debug( "Logging to syslog target {h}:{p} enabled", @@ -98,3 +100,23 @@ def enable_syslog_logging(logger, syslog_host, syslog_port): p=str(syslog_port), ) return True + + +async def query_hook(query, http_logging): + """Log a query to an http server.""" + import httpx + + from hyperglass.util import parse_exception + + async with httpx.AsyncClient(**http_logging.decoded()) as client: + try: + response = await client.post(str(http_logging.host), json=query) + + if response.status_code not in range(200, 300): + print(f"{response.status_code}: {response.text}", file=sys.stderr) + + except httpx.HTTPError as err: + parsed = parse_exception(err) + print(parsed, file=sys.stderr) + + return True diff --git a/hyperglass/main.py b/hyperglass/main.py index 0a1ffc5..f7df65c 100644 --- a/hyperglass/main.py +++ b/hyperglass/main.py @@ -12,13 +12,13 @@ from gunicorn.app.base import BaseApplication # Project from hyperglass.log import log +from hyperglass.cache import Cache from hyperglass.constants import MIN_PYTHON_VERSION, __version__ pretty_version = ".".join(tuple(str(v) for v in MIN_PYTHON_VERSION)) if sys.version_info < MIN_PYTHON_VERSION: raise RuntimeError(f"Python {pretty_version}+ is required.") - from hyperglass.configuration import ( # isort:skip params, URL_DEV, @@ -81,6 +81,18 @@ async def clear_cache(): pass +async def cache_config(): + """Add configuration to Redis cache as a pickled object.""" + import pickle + + cache = Cache( + db=params.cache.database, host=params.cache.host, port=params.cache.port + ) + await cache.set("HYPERGLASS_CONFIG", pickle.dumps(params)) + + return True + + def on_starting(server: Arbiter): """Gunicorn pre-start tasks.""" @@ -88,8 +100,13 @@ def on_starting(server: Arbiter): required = ".".join((str(v) for v in MIN_PYTHON_VERSION)) log.info(f"Python {python_version} detected ({required} required)") - aiorun(check_redis_instance()) - aiorun(build_ui()) + async def runner(): + from asyncio import gather + + await gather(check_redis_instance(), build_ui(), cache_config()) + # await log.complete() + + aiorun(runner()) log.success( "Started hyperglass {v} on http://{h}:{p} with {w} workers", @@ -102,8 +119,14 @@ def on_starting(server: Arbiter): def on_exit(server: Arbiter): """Gunicorn shutdown tasks.""" - aiorun(clear_cache()) - log.critical("Stopped hyperglass {}", __version__) + + log.critical("Stopping hyperglass {}", __version__) + + async def runner(): + await clear_cache() + # await log.complete() + + aiorun(runner()) class HyperglassWSGI(BaseApplication): @@ -122,6 +145,7 @@ class HyperglassWSGI(BaseApplication): for key, value in self.options.items() if key in self.cfg.settings and value is not None } + for key, value in config.items(): self.cfg.set(key.lower(), value) @@ -149,6 +173,7 @@ def start(**kwargs): "timeout": math.ceil(params.request_timeout * 1.25), "on_starting": on_starting, "on_exit": on_exit, + "raw_env": ["testing=test"], **kwargs, }, ).run() diff --git a/hyperglass/util.py b/hyperglass/util.py index 5ec5d7d..f0fd4aa 100644 --- a/hyperglass/util.py +++ b/hyperglass/util.py @@ -1,5 +1,6 @@ """Utility functions.""" +# Project from hyperglass.log import log @@ -694,7 +695,7 @@ def parse_exception(exc): return ", caused by ".join(parsed) -def get_containing_prefix(valid_ip): +def get_network_info(valid_ip): """Get containing prefix for an IP host query from RIPEstat API. Arguments: @@ -722,13 +723,59 @@ def get_containing_prefix(valid_ip): msg = parse_exception(error) raise InputInvalid(msg) - containing = response.json().get("data", {}).get("prefix") + network_info = response.json().get("data", {}) - if containing is None: + if not network_info.get("prefix", ""): raise InputInvalid(f"{str(valid_ip)} has no containing prefix") + elif not network_info.get("asns", []): + raise InputInvalid(f"{str(valid_ip)} is not announced") log.debug( - "Found containing prefix '{p}' for IP '{i}'", p=containing, i=str(valid_ip) + "Network info for IP '{i}': Announced as '{p}' from AS {a}", + p=network_info.get("prefix"), + a=", ".join(network_info.get("asns")), + i=str(valid_ip), ) - return ip_network(containing) + network_info["prefix"] = ip_network(network_info["prefix"]) + + return network_info + + +def set_cache_env(host, port, db): + """Set basic cache config parameters to environment variables. + + Functions using Redis to access the pickled config need to be able + to access Redis without reading the config. + """ + import os + + os.environ["HYPERGLASS_CACHE_HOST"] = str(host) + os.environ["HYPERGLASS_CACHE_PORT"] = str(port) + os.environ["HYPERGLASS_CACHE_DB"] = str(db) + return True + + +def get_cache_env(): + """Get basic cache config from environment variables.""" + import os + + host = os.environ.get("HYPERGLASS_CACHE_HOST") + port = os.environ.get("HYPERGLASS_CACHE_PORT") + db = os.environ.get("HYPERGLASS_CACHE_DB") + for i in (host, port, db): + if i is None: + raise LookupError( + "Unable to find cache configuration in environment variables" + ) + return host, port, db + + +def donothing(*args, **kwargs): + """Do nothing.""" + pass + + +async def adonothing(*args, **kwargs): + """Do nothing.""" + pass