1
0
mirror of https://github.com/checktheroads/hyperglass synced 2024-05-11 05:55:08 +00:00

improve external data handling

This commit is contained in:
checktheroads
2020-04-18 23:18:50 -07:00
parent a00be434cf
commit 945059a0ed
9 changed files with 354 additions and 121 deletions

View File

@@ -2,7 +2,6 @@
# Standard Library
import os
import json
import time
# Third Party
@@ -11,10 +10,11 @@ from starlette.requests import Request
from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html
# Project
from hyperglass.log import log, query_hook
from hyperglass.util import adonothing, clean_name, get_network_info, import_public_key
from hyperglass.log import log
from hyperglass.util import clean_name, process_headers, import_public_key
from hyperglass.cache import Cache
from hyperglass.encode import jwt_decode
from hyperglass.external import Webhook, RIPEStat
from hyperglass.exceptions import HyperglassError
from hyperglass.configuration import REDIS_CONFIG, params, devices
from hyperglass.api.models.query import Query
@@ -23,39 +23,25 @@ from hyperglass.api.models.cert_import import EncodedRequest
APP_PATH = os.environ["hyperglass_directory"]
if params.logging.http is not None and params.logging.http.enable:
log_query = query_hook
else:
log_query = adonothing
async def query(query_data: Query, request: Request):
"""Ingest request data pass it to the backend application to perform the query."""
network_info = get_network_info(request.client.host, serialize=True)
headers = await process_headers(headers=request.headers)
header_keys = (
"content-length",
"accept",
"user-agent",
"content-type",
"referer",
"accept-encoding",
"accept-language",
)
async with RIPEStat() as ripe:
network_info = await ripe.network_info(request.client.host, serialize=True)
await log_query(
{
**json.loads(query_data.export_json()),
"headers": {
k: v for k, v in dict(request.headers).items() if k in header_keys
async with Webhook(params.logging.http.provider) as hook:
await hook.send(
query={
**query_data.export_dict(),
"headers": headers,
"source": request.client.host,
"network": network_info,
},
"source": request.client.host,
"network": network_info,
},
params.logging.http,
log,
)
provider=params.logging.http,
)
# Initialize cache
cache = Cache(db=params.cache.database, **REDIS_CONFIG)

5
hyperglass/external/__init__.py vendored Normal file
View File

@@ -0,0 +1,5 @@
"""Functions & handlers for external data."""
# Project
from hyperglass.external.ripestat import RIPEStat # noqa: F401
from hyperglass.external.webhooks import Webhook # noqa: F401

196
hyperglass/external/_base.py vendored Normal file
View File

@@ -0,0 +1,196 @@
"""Session handler for RIPEStat Data API."""
# Standard Library
import re
import json as _json
import asyncio
from json import JSONDecodeError
from socket import gaierror
# Third Party
import httpx
from httpx.status_codes import StatusCode
# Project
from hyperglass.log import log
from hyperglass.util import make_repr, parse_exception
from hyperglass.exceptions import HyperglassError
def _prepare_dict(_dict):
return _json.loads(_json.dumps(_dict, default=str))
def _parse_response(response):
parsed = {}
try:
parsed = response.json()
except JSONDecodeError:
try:
parsed = _json.loads(response)
except JSONDecodeError:
log.error("Error parsing JSON for response {}", repr(response))
parsed = {"data": response.text}
return parsed
class BaseExternal:
"""Base session handler."""
def __init__(
self, base_url, uri_prefix="", uri_suffix="", verify_ssl=True, timeout=10,
):
"""Initialize connection instance."""
self.__name__ = self.name
self.base_url = base_url.strip("/")
self.uri_prefix = uri_prefix.strip("/")
self.uri_suffix = uri_suffix.strip("/")
self.verify_ssl = verify_ssl
self.timeout = timeout
self._session = httpx.AsyncClient(
verify=self.verify_ssl, base_url=self.base_url, timeout=self.timeout
)
@classmethod
def __init_subclass__(cls, name=None, **kwargs):
"""Set correct subclass name."""
super().__init_subclass__(**kwargs)
cls.name = name or cls.__name__
async def __aenter__(self):
"""Test connection on entry."""
available = await self._test()
if available:
log.debug("Initialized session with {}", self.base_url)
return self
else:
raise self._exception(f"Unable to create session to {self.name}")
async def __aexit__(self, exc_type=None, exc_value=None, traceback=None):
"""Close connection on exit."""
log.debug("Closing session with {}", self.base_url)
await self._session.aclose()
return True
def __repr__(self):
"""Return user friendly representation of instance."""
return make_repr(self)
def _exception(self, message, exc=None, level="warning", **kwargs):
"""Add stringified exception to message if passed."""
if exc is not None:
message = f"{str(message)}: {str(exc)}"
return HyperglassError(message, str(level), **kwargs)
async def _test(self):
"""Open a low-level connection to the base URL to ensure its port is open."""
log.debug("Testing connection to {}", self.base_url)
try:
test_host = re.sub(r"http(s)?\:\/\/", "", self.base_url)
_reader, _writer = await asyncio.open_connection(test_host, 443)
except gaierror as err:
raise self._exception(
f"{self.name} appears to be unreachable", err
) from None
if _reader or _writer:
return True
else:
return False
async def _request( # noqa: C901
self,
method,
endpoint,
item=None,
params=None,
data=None,
timeout=None,
response_required=False,
):
"""Run HTTP POST operation."""
supported_methods = ("GET", "POST", "PUT", "DELETE", "HEAD", "PATCH")
if method.upper() not in supported_methods:
raise self._exception(
f'Method must be one of {", ".join(supported_methods)}. '
f"Got: {str(method)}"
)
endpoint = "/".join(
i
for i in (
"",
self.uri_prefix.strip("/"),
endpoint.strip("/"),
self.uri_suffix.strip("/"),
item,
)
if i
)
request = {
"method": method,
"url": endpoint,
}
if params is not None:
params = {str(k): str(v) for k, v in params.items() if v is not None}
request["params"] = params
if data is not None:
if not isinstance(data, dict):
raise self._exception(f"Data must be a dict, got: {str(data)}")
request["json"] = _prepare_dict(data)
if timeout is not None:
if not isinstance(timeout, int):
try:
timeout = int(timeout)
except TypeError:
raise self._exception(
f"Timeout must be an int, got: {str(timeout)}"
)
request["timeout"] = timeout
log.debug("Constructed url {}", "".join((self.base_url, endpoint)))
log.debug("Constructed request parameters {}", request)
try:
response = await self._session.request(**request)
if response.status_code not in range(200, 300):
status = StatusCode(response.status_code)
error = _parse_response(response)
raise self._exception(
f'{status.name.replace("_", " ")}: {error}', level="danger"
) from None
except httpx.HTTPError as http_err:
raise self._exception(parse_exception(http_err), level="danger") from None
return _parse_response(response)
async def _get(self, endpoint, **kwargs):
return await self._request(method="GET", endpoint=endpoint, **kwargs)
async def _post(self, endpoint, **kwargs):
return await self._request(method="POST", endpoint=endpoint, **kwargs)
async def _put(self, endpoint, **kwargs):
return await self._request(method="PUT", endpoint=endpoint, **kwargs)
async def _delete(self, endpoint, **kwargs):
return await self._request(method="DELETE", endpoint=endpoint, **kwargs)
async def _patch(self, endpoint, **kwargs):
return await self._request(method="PATCH", endpoint=endpoint, **kwargs)
async def _head(self, endpoint, **kwargs):
return await self._request(method="HEAD", endpoint=endpoint, **kwargs)

49
hyperglass/external/ripestat.py vendored Normal file
View File

@@ -0,0 +1,49 @@
"""Session handler for RIPEStat Data API."""
# Standard Library
from ipaddress import ip_address, ip_network
# Project
from hyperglass.log import log
from hyperglass.external._base import BaseExternal
class RIPEStat(BaseExternal, name="RIPEStat"):
"""RIPEStat session handler."""
def __init__(self):
"""Initialize external base class with RIPEStat connection details."""
super().__init__(
base_url="https://stat.ripe.net", uri_prefix="/data", uri_suffix="data.json"
)
async def network_info(self, resource, serialize=False):
"""Get network info via RIPE's Network Info API endpoint.
See: https://stat.ripe.net/docs/data_api#network-info
"""
try:
valid_ip = ip_address(resource)
if not valid_ip.is_global:
log.debug("IP {ip} is not a global address", ip=str(valid_ip))
return {"prefix": None, "asn": None}
except ValueError:
log.debug("'{resource}' is not a valid IP address", resource=resource)
return {"prefix": None, "asn": None}
raw = await self._get(endpoint="network-info", params={"resource": valid_ip})
data = {
"asns": raw["data"]["asns"],
"prefix": ip_network(raw["data"]["prefix"]),
}
if serialize:
data["prefix"] = str(data["prefix"])
data["asns"] = data["asns"][0]
log.debug("Collected network info from RIPEState: {i}", i=str(data))
return data

24
hyperglass/external/slack.py vendored Normal file
View File

@@ -0,0 +1,24 @@
"""Session handler for Slack API."""
# Project
from hyperglass.log import log
from hyperglass.models import Webhook
from hyperglass.external._base import BaseExternal
class SlackHook(BaseExternal, name="Slack"):
"""Slack session handler."""
def __init__(self):
"""Initialize external base class with Slack connection details."""
super().__init__(base_url="https://hooks.slack.com")
async def send(self, query, provider):
"""Send an incoming webhook to Slack."""
payload = Webhook(**query)
log.debug("Sending query data to Slack:\n{}", payload)
return await self._post(endpoint=provider.host.path, data=payload.slack())

24
hyperglass/external/webhooks.py vendored Normal file
View File

@@ -0,0 +1,24 @@
"""Convenience functions for webhooks."""
# Project
from hyperglass.exceptions import HyperglassError
from hyperglass.external._base import BaseExternal
from hyperglass.external.slack import SlackHook
PROVIDER_MAP = {
"slack": SlackHook,
}
class Webhook(BaseExternal):
"""Get webhook for provider name."""
def __new__(cls, provider):
"""Return instance for correct provider handler."""
try:
provider_class = PROVIDER_MAP[provider]
return provider_class()
except KeyError:
raise HyperglassError(
f"{provider} is not yet supported as a webhook target."
)

View File

@@ -100,31 +100,3 @@ def enable_syslog_logging(logger, syslog_host, syslog_port):
p=str(syslog_port),
)
return True
async def query_hook(query, http_logging, log):
"""Log a query to an http server."""
import httpx
from hyperglass.models import Webhook
from hyperglass.util import parse_exception
valid_webhook = Webhook(**query)
format_map = {"generic": valid_webhook.export_dict, "slack": valid_webhook.slack}
format_func = format_map[http_logging.provider]
async with httpx.AsyncClient(**http_logging.decoded()) as client:
payload = format_func()
log.debug("Sending query data to webhook:\n{}", payload)
try:
response = await client.post(str(http_logging.host), json=payload)
if response.status_code not in range(200, 300):
log.error(f"{response.status_code} error: {response.text}")
except httpx.HTTPError as err:
parsed = parse_exception(err)
log.error(parsed)
return True

View File

@@ -147,6 +147,8 @@ class WebhookHeaders(HyperglassModel):
referer: Optional[StrictStr]
accept_encoding: Optional[StrictStr]
accept_language: Optional[StrictStr]
x_real_ip: Optional[StrictStr]
x_forwarded_for: Optional[StrictStr]
class Config:
"""Pydantic model config."""
@@ -157,6 +159,8 @@ class WebhookHeaders(HyperglassModel):
"content_type": "content-type",
"accept_encoding": "accept-encoding",
"accept_language": "accept-language",
"x_real_ip": "x-real-ip",
"x_forwarded_for": "x-forwarded-for",
}

View File

@@ -695,64 +695,6 @@ def parse_exception(exc):
return ", caused by ".join(parsed)
def get_network_info(ip, serialize=False):
"""Get containing prefix for an IP host query from RIPEstat API.
Arguments:
valid_ip {IPv4Address|IPv6Address} -- Valid IP Address object
Raises:
InputInvalid: Raised if an http error occurs
InputInvalid: Raised if RIPEstat response doesn't contain a prefix.
Returns:
{IPv4Network|IPv6Network} -- Valid IP Network object
"""
import httpx
from ipaddress import ip_network, ip_address
from hyperglass.exceptions import InputInvalid
log.debug("Attempting to find network details for {ip}", ip=str(ip))
try:
valid_ip = ip_address(ip)
if not valid_ip.is_global:
return {"prefix": None, "asn": None}
except ValueError:
return {"prefix": None, "asn": None}
try:
response = httpx.get(
"https://stat.ripe.net/data/network-info/data.json",
params={"resource": str(valid_ip)},
)
except httpx.HTTPError as error:
msg = parse_exception(error)
raise InputInvalid(msg)
network_info = response.json().get("data", {})
if not network_info.get("prefix", ""):
raise InputInvalid(f"{str(valid_ip)} has no containing prefix")
elif not network_info.get("asns", []):
raise InputInvalid(f"{str(valid_ip)} is not announced")
log.debug(
"Network info for IP '{i}': Announced as '{p}' from AS {a}",
p=network_info.get("prefix"),
a=", ".join(network_info.get("asns")),
i=str(valid_ip),
)
if not serialize:
network_info["prefix"] = ip_network(network_info["prefix"])
if serialize:
network_info["asns"] = network_info["asns"][0]
return network_info
def set_cache_env(host, port, db):
"""Set basic cache config parameters to environment variables.
@@ -782,11 +724,42 @@ def get_cache_env():
return host, port, db
def donothing(*args, **kwargs):
"""Do nothing."""
pass
async def process_headers(headers):
"""Filter out unwanted headers and return as a dictionary."""
headers = dict(headers)
header_keys = (
"content-length",
"accept",
"user-agent",
"content-type",
"referer",
"accept-encoding",
"accept-language",
"x-real-ip",
"x-forwarded-for",
)
return {k: headers.get(k) for k in header_keys}
async def adonothing(*args, **kwargs):
"""Do nothing."""
pass
def make_repr(_class):
"""Create a user-friendly represention of an object."""
from asyncio import iscoroutine
def _process_attrs(_dir):
for attr in _dir:
if not attr.startswith("_"):
attr_val = getattr(_class, attr)
if callable(attr_val):
yield f'{attr}=<function name="{attr_val.__name__}">'
elif iscoroutine(attr_val):
yield f'{attr}=<coroutine name="{attr_val.__name__}">'
elif isinstance(attr_val, str):
yield f'{attr}="{attr_val}"'
else:
yield f"{attr}={str(attr_val)}"
return f'{_class.__name__}({", ".join(_process_attrs(dir(_class)))})'