1
0
mirror of https://github.com/checktheroads/hyperglass synced 2024-05-11 05:55:08 +00:00

add outgoing http webhook for queries

This commit is contained in:
checktheroads
2020-04-15 02:12:01 -07:00
parent 5f3c516f86
commit b35040c0a2
11 changed files with 288 additions and 102 deletions

View File

@@ -13,7 +13,7 @@ per-file-ignores=
hyperglass/api/models/*.py:N805,E0213,R0903
hyperglass/configuration/models/*.py:N805,E0213,R0903,E501,C0301
hyperglass/api/models/response.py:E501,C0301
ignore=W503,C0330,R504,D202
ignore=W503,C0330,R504,D202,S403,S301
select=B, BLK, C, D, E, F, I, II, N, P, PIE, S, R, W
disable-noqa=False
hang-closing=False

View File

@@ -1,6 +1,7 @@
"""Input query validation model."""
# Standard Library
import json
import hashlib
# Third Party
@@ -97,6 +98,19 @@ class Query(BaseModel):
)
return f'Query({", ".join(items)})'
def export_dict(self):
"""Create dictionary representation of instance."""
return {
"query_location": self.query_location,
"query_type": self.query_type,
"query_vrf": self.query_vrf.name,
"query_target": str(self.query_target),
}
def export_json(self):
"""Create JSON representation of instance."""
return json.dumps(self.export_dict(), default=str)
@validator("query_type")
def validate_query_type(cls, value):
"""Ensure query_type is enabled.

View File

@@ -6,7 +6,7 @@ from ipaddress import ip_network
# Project
from hyperglass.log import log
from hyperglass.util import get_containing_prefix
from hyperglass.util import get_network_info
from hyperglass.exceptions import InputInvalid, InputNotAllowed
from hyperglass.configuration import params
@@ -138,7 +138,7 @@ def validate_ip(value, query_type, query_vrf): # noqa: C901
# query.
elif query_type in ("bgp_route",) and vrf_afi.force_cidr:
valid_ip = get_containing_prefix(valid_ip.network_address)
valid_ip = get_network_info(valid_ip.network_address).get("prefix")
# For a host query with bgp_route query type and force_cidr
# disabled, convert the host query to a single IP address.

View File

@@ -2,7 +2,9 @@
# Standard Library
import os
import json
import time
from ipaddress import ip_address
# Third Party
from fastapi import HTTPException
@@ -10,8 +12,8 @@ from starlette.requests import Request
from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html
# Project
from hyperglass.log import log
from hyperglass.util import clean_name, import_public_key
from hyperglass.log import log, query_hook
from hyperglass.util import adonothing, clean_name, get_network_info, import_public_key
from hyperglass.cache import Cache
from hyperglass.encode import jwt_decode
from hyperglass.exceptions import HyperglassError
@@ -22,10 +24,47 @@ from hyperglass.api.models.cert_import import EncodedRequest
APP_PATH = os.environ["hyperglass_directory"]
if params.logging.http is not None and params.logging.http.enable:
log_query = query_hook
else:
log_query = adonothing
async def query(query_data: Query, request: Request):
"""Ingest request data pass it to the backend application to perform the query."""
if ip_address(request.client.host).is_loopback:
network_info = {"prefix": None, "asn": None}
else:
network_info = get_network_info("199.34.92.64")
network_info = {
"prefix": str(network_info["prefix"]),
"asn": network_info["asns"][0],
}
header_keys = (
"content-length",
"accept",
"user-agent",
"content-type",
"referer",
"accept-encoding",
"accept-language",
)
await log_query(
{
**json.loads(query_data.export_json()),
"headers": {
k: v for k, v in dict(request.headers).items() if k in header_keys
},
"source": request.client.host,
"network": network_info,
},
params.logging.http,
)
# Initialize cache
cache = Cache(db=params.cache.database, **REDIS_CONFIG)
log.debug("Initialized cache {}", repr(cache))

View File

@@ -110,3 +110,18 @@ class Cache:
"""Set timeout of key in seconds."""
for key in keys:
await self.instance.expire(key, seconds)
async def aget_config(self):
"""Get picked config object from cache."""
import pickle
pickled = await self.instance.get("HYPERGLASS_CONFIG")
return pickle.loads(pickled)
def get_config(self):
"""Get picked config object from cache."""
import pickle
from hyperglass.compat._asyncio import aiorun
pickled = aiorun(self.instance.get("HYPERGLASS_CONFIG"))
return pickle.loads(pickled)

View File

@@ -51,6 +51,32 @@ def _patch_loop(loop):
return tasks
def _cancel_all_tasks(loop, tasks):
to_cancel = [task for task in tasks if not task.done()]
if not to_cancel:
return
for task in to_cancel:
task.cancel()
loop.run_until_complete(
asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
)
for task in to_cancel:
if task.cancelled():
continue
if task.exception() is not None:
loop.call_exception_handler(
{
"message": "unhandled exception during asyncio.run() shutdown",
"exception": task.exception(),
"task": task,
}
)
def _patched_run(main, *, debug=False):
try:
loop = get_running_loop()
@@ -79,32 +105,6 @@ def _patched_run(main, *, debug=False):
loop.close()
def _cancel_all_tasks(loop, tasks):
to_cancel = [task for task in tasks if not task.done()]
if not to_cancel:
return
for task in to_cancel:
task.cancel()
loop.run_until_complete(
asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
)
for task in to_cancel:
if task.cancelled():
continue
if task.exception() is not None:
loop.call_exception_handler(
{
"message": "unhandled exception during asyncio.run() shutdown",
"exception": task.exception(),
"task": task,
}
)
# If local system's python version is at least 3.6, use the backported
# asyncio runner.
if RUNNING_PYTHON_VERSION >= (3, 6):

View File

@@ -9,7 +9,6 @@ from pathlib import Path
# Third Party
import yaml
from aiofile import AIOFile
from pydantic import ValidationError
# Project
@@ -19,7 +18,7 @@ from hyperglass.log import (
enable_file_logging,
enable_syslog_logging,
)
from hyperglass.util import check_path, set_app_path
from hyperglass.util import check_path, set_app_path, set_cache_env
from hyperglass.constants import (
CREDIT,
DEFAULT_HELP,
@@ -115,57 +114,6 @@ def _config_optional(config_path: Path) -> dict:
return config
async def _config_main():
"""Open main config file and load YAML to dict.
Returns:
{dict} -- Main config file
"""
config = {}
try:
async with AIOFile(CONFIG_MAIN, "r") as cf:
raw = await cf.read()
config = yaml.safe_load(raw) or {}
except (yaml.YAMLError, yaml.MarkedYAMLError) as yaml_error:
raise ConfigError(error_msg=str(yaml_error)) from None
return config
async def _config_commands():
"""Open commands config file and load YAML to dict.
Returns:
{dict} -- Commands config file
"""
if CONFIG_COMMANDS is None:
config = {}
else:
try:
async with AIOFile(CONFIG_COMMANDS, "r") as cf:
raw = await cf.read()
config = yaml.safe_load(raw) or {}
except (yaml.YAMLError, yaml.MarkedYAMLError) as yaml_error:
raise ConfigError(error_msg=str(yaml_error)) from None
log.debug("Unvalidated commands: {c}", c=config)
return config
async def _config_devices():
"""Open devices config file and load YAML to dict.
Returns:
{dict} -- Devices config file
"""
try:
async with AIOFile(CONFIG_DEVICES, "r") as cf:
raw = await cf.read()
config = yaml.safe_load(raw)
log.debug("Unvalidated device config: {c}", c=config)
except (yaml.YAMLError, yaml.MarkedYAMLError) as yaml_error:
raise ConfigError(error_msg=str(yaml_error)) from None
return config
user_config = _config_optional(CONFIG_MAIN)
# Read raw debug value from config to enable debugging quickly.
@@ -188,6 +136,8 @@ except ValidationError as validation_errors:
error_msg=error["msg"],
)
set_cache_env(db=params.cache.database, host=params.cache.host, port=params.cache.port)
# Re-evaluate debug state after config is validated
set_log_level(logger=log, debug=params.debug)

View File

@@ -1,14 +1,28 @@
"""Validate logging configuration."""
# Standard Library
from typing import Optional
import base64
from ast import literal_eval
from typing import Dict, Union, Optional
from pathlib import Path
# Third Party
from pydantic import ByteSize, StrictInt, StrictStr, StrictBool, DirectoryPath, constr
from pydantic import (
ByteSize,
SecretStr,
StrictInt,
StrictStr,
AnyHttpUrl,
StrictBool,
StrictFloat,
DirectoryPath,
constr,
validator,
)
# Project
from hyperglass.configuration.models._utils import HyperglassModel
from hyperglass.constants import __version__
from hyperglass.configuration.models._utils import HyperglassModel, HyperglassModelExtra
class Syslog(HyperglassModel):
@@ -19,10 +33,70 @@ class Syslog(HyperglassModel):
port: StrictInt = 514
class HttpAuth(HyperglassModel):
"""HTTP hook authentication parameters."""
mode: constr(regex=r"(basic|api_key)") = "basic"
username: Optional[StrictStr]
password: SecretStr
def api_key(self, header_name="X-API-Key"):
"""Represent authentication as an API key header."""
return {header_name: self.password.get_secret_value()}
def basic(self):
"""Represent HTTP basic authentication."""
return (self.username, self.password.get_secret_value())
class Http(HyperglassModelExtra):
"""HTTP logging parameters."""
enable: StrictBool = True
host: AnyHttpUrl
authentication: Optional[HttpAuth]
headers: Dict[StrictStr, Union[StrictStr, StrictInt, StrictBool, None]] = {}
params: Dict[StrictStr, Union[StrictStr, StrictInt, StrictBool, None]] = {}
verify_ssl: StrictBool = True
timeout: Union[StrictFloat, StrictInt] = 5.0
@validator("headers", "params")
def stringify_headers_params(cls, value):
"""Ensure headers and URL parameters are strings."""
for k, v in value.items():
if not isinstance(v, str):
value[k] = str(v)
return value
def __init__(self, **kwargs):
"""Initialize model, add obfuscated connection details as attribute."""
super().__init__(**kwargs)
dumped = {
"headers": self.headers,
"params": self.params,
"verify": self.verify_ssl,
"timeout": self.timeout,
}
dumped["headers"].update({"user-agent": f"hyperglass/{__version__}"})
if self.authentication is not None:
if self.authentication.mode == "api_key":
dumped["headers"].update(self.authentication.api_key())
else:
dumped["auth"] = self.authentication.basic()
self._obscured_params = base64.encodestring(str(dumped).encode())
def decoded(self):
"""Decode connection details."""
return literal_eval(base64.decodestring(self._obscured_params).decode())
class Logging(HyperglassModel):
"""Validation model for logging configuration."""
directory: DirectoryPath = Path("/tmp") # noqa: S108
format: constr(regex=r"(text|json)") = "text"
syslog: Optional[Syslog]
max_size: ByteSize = "50MB"
syslog: Optional[Syslog]
http: Optional[Http]

View File

@@ -26,7 +26,7 @@ _LOG_LEVELS = [
def base_logger():
"""Initialize hyperglass logging instance."""
_loguru_logger.remove()
_loguru_logger.add(sys.stdout, format=_LOG_FMT, level="INFO")
_loguru_logger.add(sys.stdout, format=_LOG_FMT, level="INFO", enqueue=True)
_loguru_logger.configure(levels=_LOG_LEVELS)
return _loguru_logger
@@ -39,7 +39,7 @@ def set_log_level(logger, debug):
if debug:
os.environ["HYPERGLASS_LOG_LEVEL"] = "DEBUG"
logger.remove()
logger.add(sys.stdout, format=_LOG_FMT, level="DEBUG")
logger.add(sys.stdout, format=_LOG_FMT, level="DEBUG", enqueue=True)
logger.configure(levels=_LOG_LEVELS)
if debug:
@@ -78,7 +78,7 @@ def enable_file_logging(logger, log_directory, log_format, log_max_size):
with log_file.open("a+") as lf:
lf.write(f'\n\n{"".join(log_break)}\n\n')
logger.add(log_file, rotation=log_max_size, serialize=structured)
logger.add(log_file, rotation=log_max_size, serialize=structured, enqueue=True)
logger.debug("Logging to file enabled")
@@ -90,7 +90,9 @@ def enable_syslog_logging(logger, syslog_host, syslog_port):
from logging.handlers import SysLogHandler
logger.add(
SysLogHandler(address=(str(syslog_host), syslog_port)), format="{message}"
SysLogHandler(address=(str(syslog_host), syslog_port)),
format="{message}",
enqueue=True,
)
logger.debug(
"Logging to syslog target {h}:{p} enabled",
@@ -98,3 +100,23 @@ def enable_syslog_logging(logger, syslog_host, syslog_port):
p=str(syslog_port),
)
return True
async def query_hook(query, http_logging):
"""Log a query to an http server."""
import httpx
from hyperglass.util import parse_exception
async with httpx.AsyncClient(**http_logging.decoded()) as client:
try:
response = await client.post(str(http_logging.host), json=query)
if response.status_code not in range(200, 300):
print(f"{response.status_code}: {response.text}", file=sys.stderr)
except httpx.HTTPError as err:
parsed = parse_exception(err)
print(parsed, file=sys.stderr)
return True

View File

@@ -12,13 +12,13 @@ from gunicorn.app.base import BaseApplication
# Project
from hyperglass.log import log
from hyperglass.cache import Cache
from hyperglass.constants import MIN_PYTHON_VERSION, __version__
pretty_version = ".".join(tuple(str(v) for v in MIN_PYTHON_VERSION))
if sys.version_info < MIN_PYTHON_VERSION:
raise RuntimeError(f"Python {pretty_version}+ is required.")
from hyperglass.configuration import ( # isort:skip
params,
URL_DEV,
@@ -81,6 +81,18 @@ async def clear_cache():
pass
async def cache_config():
"""Add configuration to Redis cache as a pickled object."""
import pickle
cache = Cache(
db=params.cache.database, host=params.cache.host, port=params.cache.port
)
await cache.set("HYPERGLASS_CONFIG", pickle.dumps(params))
return True
def on_starting(server: Arbiter):
"""Gunicorn pre-start tasks."""
@@ -88,8 +100,13 @@ def on_starting(server: Arbiter):
required = ".".join((str(v) for v in MIN_PYTHON_VERSION))
log.info(f"Python {python_version} detected ({required} required)")
aiorun(check_redis_instance())
aiorun(build_ui())
async def runner():
from asyncio import gather
await gather(check_redis_instance(), build_ui(), cache_config())
# await log.complete()
aiorun(runner())
log.success(
"Started hyperglass {v} on http://{h}:{p} with {w} workers",
@@ -102,8 +119,14 @@ def on_starting(server: Arbiter):
def on_exit(server: Arbiter):
"""Gunicorn shutdown tasks."""
aiorun(clear_cache())
log.critical("Stopped hyperglass {}", __version__)
log.critical("Stopping hyperglass {}", __version__)
async def runner():
await clear_cache()
# await log.complete()
aiorun(runner())
class HyperglassWSGI(BaseApplication):
@@ -122,6 +145,7 @@ class HyperglassWSGI(BaseApplication):
for key, value in self.options.items()
if key in self.cfg.settings and value is not None
}
for key, value in config.items():
self.cfg.set(key.lower(), value)
@@ -149,6 +173,7 @@ def start(**kwargs):
"timeout": math.ceil(params.request_timeout * 1.25),
"on_starting": on_starting,
"on_exit": on_exit,
"raw_env": ["testing=test"],
**kwargs,
},
).run()

View File

@@ -1,5 +1,6 @@
"""Utility functions."""
# Project
from hyperglass.log import log
@@ -694,7 +695,7 @@ def parse_exception(exc):
return ", caused by ".join(parsed)
def get_containing_prefix(valid_ip):
def get_network_info(valid_ip):
"""Get containing prefix for an IP host query from RIPEstat API.
Arguments:
@@ -722,13 +723,59 @@ def get_containing_prefix(valid_ip):
msg = parse_exception(error)
raise InputInvalid(msg)
containing = response.json().get("data", {}).get("prefix")
network_info = response.json().get("data", {})
if containing is None:
if not network_info.get("prefix", ""):
raise InputInvalid(f"{str(valid_ip)} has no containing prefix")
elif not network_info.get("asns", []):
raise InputInvalid(f"{str(valid_ip)} is not announced")
log.debug(
"Found containing prefix '{p}' for IP '{i}'", p=containing, i=str(valid_ip)
"Network info for IP '{i}': Announced as '{p}' from AS {a}",
p=network_info.get("prefix"),
a=", ".join(network_info.get("asns")),
i=str(valid_ip),
)
return ip_network(containing)
network_info["prefix"] = ip_network(network_info["prefix"])
return network_info
def set_cache_env(host, port, db):
"""Set basic cache config parameters to environment variables.
Functions using Redis to access the pickled config need to be able
to access Redis without reading the config.
"""
import os
os.environ["HYPERGLASS_CACHE_HOST"] = str(host)
os.environ["HYPERGLASS_CACHE_PORT"] = str(port)
os.environ["HYPERGLASS_CACHE_DB"] = str(db)
return True
def get_cache_env():
"""Get basic cache config from environment variables."""
import os
host = os.environ.get("HYPERGLASS_CACHE_HOST")
port = os.environ.get("HYPERGLASS_CACHE_PORT")
db = os.environ.get("HYPERGLASS_CACHE_DB")
for i in (host, port, db):
if i is None:
raise LookupError(
"Unable to find cache configuration in environment variables"
)
return host, port, db
def donothing(*args, **kwargs):
"""Do nothing."""
pass
async def adonothing(*args, **kwargs):
"""Do nothing."""
pass