diff --git a/hyperglass/api/models/validators.py b/hyperglass/api/models/validators.py index fad42bc..e8339e6 100644 --- a/hyperglass/api/models/validators.py +++ b/hyperglass/api/models/validators.py @@ -6,9 +6,9 @@ from ipaddress import ip_network # Project from hyperglass.log import log -from hyperglass.external import bgptools from hyperglass.exceptions import InputInvalid, InputNotAllowed from hyperglass.configuration import params +from hyperglass.external.bgptools import network_info_sync def _member_of(target, network): @@ -144,9 +144,17 @@ def validate_ip(value, query_type, query_vrf): # noqa: C901 ): log.debug("Getting containing prefix for {q}", q=str(valid_ip)) - containing_prefix = bgptools.network_info_sync( - valid_ip.network_address - ).get("prefix") + ip_str = str(valid_ip.network_address) + network_info = network_info_sync(ip_str) + containing_prefix = network_info.get(ip_str, {}).get("prefix") + + if containing_prefix is None: + log.error( + "Unable to find containing prefix for {}. Got: {}", + str(valid_ip), + network_info, + ) + raise InputInvalid("{q} does not have a containing prefix", q=ip_str) try: diff --git a/hyperglass/api/routes.py b/hyperglass/api/routes.py index 078b8b9..cf75316 100644 --- a/hyperglass/api/routes.py +++ b/hyperglass/api/routes.py @@ -14,7 +14,7 @@ from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html # Project from hyperglass.log import log from hyperglass.util import clean_name, process_headers, import_public_key -from hyperglass.cache import Cache +from hyperglass.cache import AsyncCache from hyperglass.encode import jwt_decode from hyperglass.external import Webhook, bgptools from hyperglass.exceptions import HyperglassError @@ -56,7 +56,7 @@ async def send_webhook(query_data: Query, request: Request, timestamp: datetime) **query_data.export_dict(pretty=True), "headers": headers, "source": host, - "network": network_info, + "network": network_info.get(host, {}), "timestamp": timestamp, } ) @@ -73,7 +73,7 @@ async def query(query_data: Query, request: Request, background_tasks: Backgroun background_tasks.add_task(send_webhook, query_data, request, timestamp) # Initialize cache - cache = Cache(db=params.cache.database, **REDIS_CONFIG) + cache = AsyncCache(db=params.cache.database, **REDIS_CONFIG) log.debug("Initialized cache {}", repr(cache)) # Use hashed query_data string as key for for k/v cache store so @@ -131,12 +131,10 @@ async def query(query_data: Query, request: Request, background_tasks: Backgroun # If it does, return the cached entry cache_response = await cache.get_dict(cache_key, "output") + response_format = "text/plain" if query_data.device.structured_output: response_format = "application/json" - cache_response = json.loads(cache_response) - else: - response_format = "text/plain" log.debug(f"Cache match for {cache_key}:\n {cache_response}") log.success(f"Completed query execution for {query_data.summary}") diff --git a/hyperglass/cache.py b/hyperglass/cache.py deleted file mode 100644 index 71d21f9..0000000 --- a/hyperglass/cache.py +++ /dev/null @@ -1,139 +0,0 @@ -"""Redis cache handler.""" - -# Standard Library -import time -import asyncio - -# Third Party -from aredis import StrictRedis - - -class Cache: - """Redis cache handler.""" - - def __init__( - self, db, host="localhost", port=6379, decode_responses=True, **kwargs - ): - """Initialize Redis connection.""" - self.db: int = db - self.host: str = host - self.port: int = port - self.instance: StrictRedis = StrictRedis( - db=self.db, - host=self.host, - port=self.port, - decode_responses=decode_responses, - **kwargs, - ) - - def __repr__(self): - """Represent class state.""" - return f"ConfigCache(db={self.db}, host={self.host}, port={self.port})" - - def __getitem__(self, item): - """Enable subscriptable syntax.""" - return self.get(item) - - @staticmethod - async def _parse_types(value): - """Parse a string to standard python types.""" - import re - - async def _parse_string(str_value): - - is_float = (re.compile(r"^(\d+\.\d+)$"), float) - is_int = (re.compile(r"^(\d+)$"), int) - is_bool = (re.compile(r"^(True|true|False|false)$"), bool) - is_none = (re.compile(r"^(None|none|null|nil|\(nil\))$"), lambda v: None) - - for pattern, factory in (is_float, is_int, is_bool, is_none): - if isinstance(str_value, str) and bool(re.match(pattern, str_value)): - str_value = factory(str_value) - break - return str_value - - if isinstance(value, str): - value = await _parse_string(value) - elif isinstance(value, bytes): - value = await _parse_string(value.decode("utf-8")) - elif isinstance(value, list): - value = [await _parse_string(i) for i in value] - elif isinstance(value, tuple): - value = tuple(await _parse_string(i) for i in value) - - return value - - async def get(self, *args): - """Get item(s) from cache.""" - if len(args) == 1: - raw = await self.instance.get(args[0]) - else: - raw = await self.instance.mget(args) - return await self._parse_types(raw) - - async def get_dict(self, key, field=None): - """Get hash map (dict) item(s).""" - if field is None: - raw = await self.instance.hgetall(key) - else: - raw = await self.instance.hget(key, field) - return await self._parse_types(raw) - - async def set(self, key, value): - """Set cache values.""" - return await self.instance.set(key, value) - - async def set_dict(self, key, field, value): - """Set hash map (dict) values.""" - return await self.instance.hset(key, field, value) - - async def wait(self, pubsub, timeout=30, **kwargs): - """Wait for pub/sub messages & return posted message.""" - now = time.time() - timeout = now + timeout - while now < timeout: - message = await pubsub.get_message(ignore_subscribe_messages=True, **kwargs) - if message is not None and message["type"] == "message": - data = message["data"] - return await self._parse_types(data) - - await asyncio.sleep(0.01) - now = time.time() - return None - - async def pubsub(self): - """Provide an aredis.pubsub.Pubsub instance.""" - return self.instance.pubsub() - - async def pub(self, key, value): - """Publish a value.""" - await asyncio.sleep(1) - await self.instance.publish(key, value) - - async def clear(self): - """Clear the cache.""" - await self.instance.flushdb() - - async def delete(self, *keys): - """Delete a cache key.""" - await self.instance.delete(*keys) - - async def expire(self, *keys, seconds): - """Set timeout of key in seconds.""" - for key in keys: - await self.instance.expire(key, seconds) - - async def aget_config(self): - """Get picked config object from cache.""" - import pickle - - pickled = await self.instance.get("HYPERGLASS_CONFIG") - return pickle.loads(pickled) - - def get_config(self): - """Get picked config object from cache.""" - import pickle - from hyperglass.compat._asyncio import aiorun - - pickled = aiorun(self.instance.get("HYPERGLASS_CONFIG")) - return pickle.loads(pickled) diff --git a/hyperglass/cache/__init__.py b/hyperglass/cache/__init__.py new file mode 100644 index 0000000..04c3efd --- /dev/null +++ b/hyperglass/cache/__init__.py @@ -0,0 +1,7 @@ +"""Redis cache handlers.""" + +# Project +from hyperglass.cache.aio import AsyncCache +from hyperglass.cache.sync import SyncCache + +__all__ = ("AsyncCache", "SyncCache") diff --git a/hyperglass/cache/aio.py b/hyperglass/cache/aio.py new file mode 100644 index 0000000..9691e6a --- /dev/null +++ b/hyperglass/cache/aio.py @@ -0,0 +1,113 @@ +"""Asyncio Redis cache handler.""" + +# Standard Library +import json +import time +import pickle +import asyncio +from typing import Any, Dict + +# Third Party +from aredis import StrictRedis as AsyncRedis +from aredis.pubsub import PubSub as AsyncPubSub + +# Project +from hyperglass.cache.base import BaseCache + + +class AsyncCache(BaseCache): + """Asynchronous Redis cache handler.""" + + def __init__(self, *args, **kwargs): + """Initialize Redis connection.""" + super().__init__(*args, **kwargs) + self.instance: AsyncRedis = AsyncRedis( + db=self.db, + host=self.host, + port=self.port, + decode_responses=self.decode_responses, + **self.redis_args, + ) + + async def get(self, *args: str) -> Any: + """Get item(s) from cache.""" + if len(args) == 1: + raw = await self.instance.get(args[0]) + else: + raw = await self.instance.mget(args) + return self.parse_types(raw) + + async def get_dict(self, key: str, field: str = "") -> Any: + """Get hash map (dict) item(s).""" + if not field: + raw = await self.instance.hgetall(key) + else: + raw = await self.instance.hget(key, field) + + return self.parse_types(raw) + + async def set(self, key: str, value: str) -> bool: + """Set cache values.""" + return await self.instance.set(key, value) + + async def set_dict(self, key: str, field: str, value: str) -> bool: + """Set hash map (dict) values.""" + success = False + + if isinstance(value, Dict): + value = json.dumps(value) + else: + value = str(value) + + response = await self.instance.hset(key, field, value) + + if response in (0, 1): + success = True + + return success + + async def wait(self, pubsub: AsyncPubSub, timeout: int = 30, **kwargs) -> Any: + """Wait for pub/sub messages & return posted message.""" + now = time.time() + timeout = now + timeout + + while now < timeout: + + message = await pubsub.get_message(ignore_subscribe_messages=True, **kwargs) + + if message is not None and message["type"] == "message": + data = message["data"] + return self.parse_types(data) + + await asyncio.sleep(0.01) + now = time.time() + + return None + + async def pubsub(self) -> AsyncPubSub: + """Provide an aredis.pubsub.Pubsub instance.""" + return self.instance.pubsub() + + async def pub(self, key: str, value: str) -> None: + """Publish a value.""" + await asyncio.sleep(1) + await self.instance.publish(key, value) + + async def clear(self) -> None: + """Clear the cache.""" + await self.instance.flushdb() + + async def delete(self, *keys: str) -> None: + """Delete a cache key.""" + await self.instance.delete(*keys) + + async def expire(self, *keys: str, seconds: int) -> None: + """Set timeout of key in seconds.""" + for key in keys: + await self.instance.expire(key, seconds) + + async def get_config(self) -> Dict: + """Get picked config object from cache.""" + + pickled = await self.instance.get("HYPERGLASS_CONFIG") + return pickle.loads(pickled) diff --git a/hyperglass/cache/base.py b/hyperglass/cache/base.py new file mode 100644 index 0000000..66ed278 --- /dev/null +++ b/hyperglass/cache/base.py @@ -0,0 +1,59 @@ +"""Base Redis cache handler.""" + +# Standard Library +import re +import json +from typing import Any + + +class BaseCache: + """Redis cache handler.""" + + def __init__( + self, + db: int, + host: str = "localhost", + port: int = 6379, + decode_responses: bool = True, + **kwargs: Any, + ) -> None: + """Initialize Redis connection.""" + self.db: int = db + self.host: str = host + self.port: int = port + self.decode_responses: bool = decode_responses + self.redis_args: dict = kwargs + + def __repr__(self) -> str: + """Represent class state.""" + return f"HyperglassCache(db={self.db}, host={self.host}, port={self.port})" + + def parse_types(self, value: str) -> Any: + """Parse a string to standard python types.""" + + def parse_string(str_value: str): + + is_float = (re.compile(r"^(\d+\.\d+)$"), float) + is_int = (re.compile(r"^(\d+)$"), int) + is_bool = (re.compile(r"^(True|true|False|false)$"), bool) + is_none = (re.compile(r"^(None|none|null|nil|\(nil\))$"), lambda v: None) + is_jsonable = (re.compile(r"^[\{\[].*[\}\]]$"), json.loads) + + for pattern, factory in (is_float, is_int, is_bool, is_none, is_jsonable): + if isinstance(str_value, str) and bool(re.match(pattern, str_value)): + str_value = factory(str_value) + break + return str_value + + if isinstance(value, str): + value = parse_string(value) + elif isinstance(value, bytes): + value = parse_string(value.decode("utf-8")) + elif isinstance(value, list): + value = [parse_string(i) for i in value] + elif isinstance(value, tuple): + value = tuple(parse_string(i) for i in value) + elif isinstance(value, dict): + value = {k: self.parse_types(v) for k, v in value.items()} + + return value diff --git a/hyperglass/cache/sync.py b/hyperglass/cache/sync.py new file mode 100644 index 0000000..5905a2f --- /dev/null +++ b/hyperglass/cache/sync.py @@ -0,0 +1,112 @@ +"""Non-asyncio Redis cache handler.""" + +# Standard Library +import json +import time +import pickle +from typing import Any, Dict + +# Third Party +from redis import Redis as SyncRedis +from redis.client import PubSub as SyncPubsSub + +# Project +from hyperglass.cache.base import BaseCache + + +class SyncCache(BaseCache): + """Synchronous Redis cache handler.""" + + def __init__(self, *args, **kwargs): + """Initialize Redis connection.""" + super().__init__(*args, **kwargs) + self.instance: SyncRedis = SyncRedis( + db=self.db, + host=self.host, + port=self.port, + decode_responses=self.decode_responses, + **self.redis_args, + ) + + def get(self, *args: str) -> Any: + """Get item(s) from cache.""" + if len(args) == 1: + raw = self.instance.get(args[0]) + else: + raw = self.instance.mget(args) + return self.parse_types(raw) + + def get_dict(self, key: str, field: str = "") -> Any: + """Get hash map (dict) item(s).""" + if not field: + raw = self.instance.hgetall(key) + else: + raw = self.instance.hget(key, str(field)) + + return self.parse_types(raw) + + def set(self, key: str, value: str) -> bool: + """Set cache values.""" + return self.instance.set(key, str(value)) + + def set_dict(self, key: str, field: str, value: str) -> bool: + """Set hash map (dict) values.""" + success = False + + if isinstance(value, Dict): + value = json.dumps(value) + else: + value = str(value) + + response = self.instance.hset(key, str(field), value) + + if response in (0, 1): + success = True + + return success + + def wait(self, pubsub: SyncPubsSub, timeout: int = 30, **kwargs) -> Any: + """Wait for pub/sub messages & return posted message.""" + now = time.time() + timeout = now + timeout + + while now < timeout: + + message = pubsub.get_message(ignore_subscribe_messages=True, **kwargs) + + if message is not None and message["type"] == "message": + data = message["data"] + return self.parse_types(data) + + time.sleep(0.01) + now = time.time() + + return None + + def pubsub(self) -> SyncPubsSub: + """Provide a redis.client.Pubsub instance.""" + return self.instance.pubsub() + + def pub(self, key: str, value: str) -> None: + """Publish a value.""" + time.sleep(1) + self.instance.publish(key, value) + + def clear(self) -> None: + """Clear the cache.""" + self.instance.flushdb() + + def delete(self, *keys: str) -> None: + """Delete a cache key.""" + self.instance.delete(*keys) + + def expire(self, *keys: str, seconds: int) -> None: + """Set timeout of key in seconds.""" + for key in keys: + self.instance.expire(key, seconds) + + def get_config(self) -> Dict: + """Get picked config object from cache.""" + + pickled = self.instance.get("HYPERGLASS_CONFIG") + return pickle.loads(pickled) diff --git a/hyperglass/main.py b/hyperglass/main.py index 001f782..367d1f5 100644 --- a/hyperglass/main.py +++ b/hyperglass/main.py @@ -12,7 +12,7 @@ from gunicorn.app.base import BaseApplication # Project from hyperglass.log import log -from hyperglass.cache import Cache +from hyperglass.cache import AsyncCache from hyperglass.constants import MIN_PYTHON_VERSION, __version__ pretty_version = ".".join(tuple(str(v) for v in MIN_PYTHON_VERSION)) @@ -85,7 +85,7 @@ async def cache_config(): """Add configuration to Redis cache as a pickled object.""" import pickle - cache = Cache( + cache = AsyncCache( db=params.cache.database, host=params.cache.host, port=params.cache.port ) await cache.set("HYPERGLASS_CONFIG", pickle.dumps(params)) @@ -123,7 +123,8 @@ def on_exit(server: Arbiter): log.critical("Stopping hyperglass {}", __version__) async def runner(): - await clear_cache() + if not params.developer_mode: + await clear_cache() aiorun(runner())