From a2ee4b50fa27e7d8f5f8ff0947f48df11512b495 Mon Sep 17 00:00:00 2001 From: thatmattlove Date: Wed, 15 Sep 2021 00:57:45 -0700 Subject: [PATCH] Implement global state --- .flake8 | 2 +- hyperglass/api/__init__.py | 95 +++++++------ hyperglass/api/error_handlers.py | 7 +- hyperglass/api/events.py | 10 +- hyperglass/api/routes.py | 67 +++++----- hyperglass/cache/aio.py | 62 +++++++-- hyperglass/cache/base.py | 35 ++--- hyperglass/cache/sync.py | 72 ++++++++-- hyperglass/configuration/main.py | 18 +-- hyperglass/exceptions/private.py | 4 + hyperglass/execution/drivers/__init__.py | 7 + hyperglass/execution/drivers/_construct.py | 18 ++- hyperglass/execution/drivers/agent.py | 7 +- hyperglass/execution/drivers/ssh.py | 5 +- hyperglass/execution/drivers/ssh_netmiko.py | 8 +- hyperglass/execution/drivers/ssh_scrapli.py | 5 +- hyperglass/execution/main.py | 8 +- hyperglass/main.py | 141 ++++++++------------ hyperglass/models/commands/generic.py | 50 +++---- hyperglass/models/system.py | 116 ++++++++++++++++ hyperglass/plugins/_manager.py | 67 ++++------ hyperglass/plugins/main.py | 2 +- hyperglass/settings.py | 17 +++ hyperglass/state/__init__.py | 6 + hyperglass/state/redis.py | 133 ++++++++++++++++++ hyperglass/util/__init__.py | 24 +--- hyperglass/util/frontend.py | 22 +-- poetry.lock | 34 ++++- pyproject.toml | 5 +- 29 files changed, 702 insertions(+), 345 deletions(-) create mode 100644 hyperglass/models/system.py create mode 100644 hyperglass/settings.py create mode 100644 hyperglass/state/__init__.py create mode 100644 hyperglass/state/redis.py diff --git a/.flake8 b/.flake8 index 1745540..5800db7 100644 --- a/.flake8 +++ b/.flake8 @@ -18,7 +18,7 @@ per-file-ignores= hyperglass/models/*/__init__.py:F401 # Disable assertion and docstring checks on tests. hyperglass/**/test_*.py:S101,D103 -ignore=W503,C0330,R504,D202,S403,S301,S404,E731 +ignore=W503,C0330,R504,D202,S403,S301,S404,E731,D402 select=B, BLK, C, D, E, F, I, II, N, P, PIE, S, R, W disable-noqa=False hang-closing=False diff --git a/hyperglass/api/__init__.py b/hyperglass/api/__init__.py index 88f7cbc..1c347c7 100644 --- a/hyperglass/api/__init__.py +++ b/hyperglass/api/__init__.py @@ -22,8 +22,9 @@ from hyperglass.constants import __version__ from hyperglass.models.ui import UIParameters from hyperglass.api.events import on_startup, on_shutdown from hyperglass.api.routes import docs, info, query, router, queries, routers, ui_props +from hyperglass.state import use_state from hyperglass.exceptions import HyperglassError -from hyperglass.configuration import URL_DEV, STATIC_PATH, params +from hyperglass.configuration import URL_DEV, STATIC_PATH from hyperglass.api.error_handlers import ( app_handler, http_handler, @@ -39,6 +40,8 @@ from hyperglass.models.api.response import ( SupportedQueryResponse, ) +STATE = use_state() + WORKING_DIR = Path(__file__).parent EXAMPLES_DIR = WORKING_DIR / "examples" @@ -54,18 +57,18 @@ EXAMPLE_QUERIES_CURL = EXAMPLES_DIR / "queries.sh" EXAMPLE_QUERY_CURL = EXAMPLES_DIR / "query.sh" ASGI_PARAMS = { - "host": str(params.listen_address), - "port": params.listen_port, - "debug": params.debug, + "host": str(STATE.settings.host), + "port": STATE.settings.port, + "debug": STATE.settings.debug, "workers": cpu_count(2), } DOCS_PARAMS = {} -if params.docs.enable: - DOCS_PARAMS.update({"openapi_url": params.docs.openapi_uri}) - if params.docs.mode == "redoc": - DOCS_PARAMS.update({"docs_url": None, "redoc_url": params.docs.uri}) - elif params.docs.mode == "swagger": - DOCS_PARAMS.update({"docs_url": params.docs.uri, "redoc_url": None}) +if STATE.params.docs.enable: + DOCS_PARAMS.update({"openapi_url": STATE.params.docs.openapi_uri}) + if STATE.params.docs.mode == "redoc": + DOCS_PARAMS.update({"docs_url": None, "redoc_url": STATE.params.docs.uri}) + elif STATE.params.docs.mode == "swagger": + DOCS_PARAMS.update({"docs_url": STATE.params.docs.uri, "redoc_url": None}) for directory in (UI_DIR, IMAGES_DIR): if not directory.exists(): @@ -74,9 +77,9 @@ for directory in (UI_DIR, IMAGES_DIR): # Main App Definition app = FastAPI( - debug=params.debug, - title=params.site_title, - description=params.site_description, + debug=STATE.settings.debug, + title=STATE.params.site_title, + description=STATE.params.site_description, version=__version__, default_response_class=JSONResponse, **DOCS_PARAMS, @@ -108,12 +111,12 @@ app.add_exception_handler(Exception, default_handler) def _custom_openapi(): """Generate custom OpenAPI config.""" openapi_schema = get_openapi( - title=params.docs.title.format(site_title=params.site_title), + title=STATE.params.docs.title.format(site_title=STATE.params.site_title), version=__version__, - description=params.docs.description, + description=STATE.params.docs.description, routes=app.routes, ) - openapi_schema["info"]["x-logo"] = {"url": "/images/light" + params.web.logo.light.suffix} + openapi_schema["info"]["x-logo"] = {"url": "/images/light" + STATE.params.web.logo.light.suffix} query_samples = [] queries_samples = [] @@ -121,26 +124,36 @@ def _custom_openapi(): with EXAMPLE_QUERY_CURL.open("r") as e: example = e.read() - query_samples.append({"lang": "cURL", "source": example % str(params.docs.base_url)}) + query_samples.append({"lang": "cURL", "source": example % str(STATE.params.docs.base_url)}) with EXAMPLE_QUERY_PY.open("r") as e: example = e.read() - query_samples.append({"lang": "Python", "source": example % str(params.docs.base_url)}) + query_samples.append( + {"lang": "Python", "source": example % str(STATE.params.docs.base_url)} + ) with EXAMPLE_DEVICES_CURL.open("r") as e: example = e.read() - queries_samples.append({"lang": "cURL", "source": example % str(params.docs.base_url)}) + queries_samples.append( + {"lang": "cURL", "source": example % str(STATE.params.docs.base_url)} + ) with EXAMPLE_DEVICES_PY.open("r") as e: example = e.read() - queries_samples.append({"lang": "Python", "source": example % str(params.docs.base_url)}) + queries_samples.append( + {"lang": "Python", "source": example % str(STATE.params.docs.base_url)} + ) with EXAMPLE_QUERIES_CURL.open("r") as e: example = e.read() - devices_samples.append({"lang": "cURL", "source": example % str(params.docs.base_url)}) + devices_samples.append( + {"lang": "cURL", "source": example % str(STATE.params.docs.base_url)} + ) with EXAMPLE_QUERIES_PY.open("r") as e: example = e.read() - devices_samples.append({"lang": "Python", "source": example % str(params.docs.base_url)}) + devices_samples.append( + {"lang": "Python", "source": example % str(STATE.params.docs.base_url)} + ) openapi_schema["paths"]["/api/query/"]["post"]["x-code-samples"] = query_samples openapi_schema["paths"]["/api/devices"]["get"]["x-code-samples"] = devices_samples @@ -150,8 +163,8 @@ def _custom_openapi(): return app.openapi_schema -CORS_ORIGINS = params.cors_origins.copy() -if params.developer_mode: +CORS_ORIGINS = STATE.params.cors_origins.copy() +if STATE.settings.dev_mode: CORS_ORIGINS = [*CORS_ORIGINS, URL_DEV, "http://localhost:3000"] # CORS Configuration @@ -171,9 +184,9 @@ app.add_api_route( methods=["GET"], response_model=InfoResponse, response_class=JSONResponse, - summary=params.docs.info.summary, - description=params.docs.info.description, - tags=[params.docs.info.title], + summary=STATE.params.docs.info.summary, + description=STATE.params.docs.info.description, + tags=[STATE.params.docs.info.title], ) app.add_api_route( @@ -182,9 +195,9 @@ app.add_api_route( methods=["GET"], response_model=List[RoutersResponse], response_class=JSONResponse, - summary=params.docs.devices.summary, - description=params.docs.devices.description, - tags=[params.docs.devices.title], + summary=STATE.params.docs.devices.summary, + description=STATE.params.docs.devices.description, + tags=[STATE.params.docs.devices.title], ) app.add_api_route( @@ -193,9 +206,9 @@ app.add_api_route( methods=["GET"], response_model=RoutersResponse, response_class=JSONResponse, - summary=params.docs.devices.summary, - description=params.docs.devices.description, - tags=[params.docs.devices.title], + summary=STATE.params.docs.devices.summary, + description=STATE.params.docs.devices.description, + tags=[STATE.params.docs.devices.title], ) app.add_api_route( @@ -204,24 +217,24 @@ app.add_api_route( methods=["GET"], response_class=JSONResponse, response_model=List[SupportedQueryResponse], - summary=params.docs.queries.summary, - description=params.docs.queries.description, - tags=[params.docs.queries.title], + summary=STATE.params.docs.queries.summary, + description=STATE.params.docs.queries.description, + tags=[STATE.params.docs.queries.title], ) app.add_api_route( path="/api/query/", endpoint=query, methods=["POST"], - summary=params.docs.query.summary, - description=params.docs.query.description, + summary=STATE.params.docs.query.summary, + description=STATE.params.docs.query.description, responses={ 400: {"model": QueryError, "description": "Request Content Error"}, 422: {"model": QueryError, "description": "Request Format Error"}, 500: {"model": QueryError, "description": "Server Error"}, }, response_model=QueryResponse, - tags=[params.docs.query.title], + tags=[STATE.params.docs.query.title], response_class=JSONResponse, ) @@ -235,8 +248,8 @@ app.add_api_route( ) -if params.docs.enable: - app.add_api_route(path=params.docs.uri, endpoint=docs, include_in_schema=False) +if STATE.params.docs.enable: + app.add_api_route(path=STATE.params.docs.uri, endpoint=docs, include_in_schema=False) app.openapi = _custom_openapi log.debug("API Docs config: {}", app.openapi()) diff --git a/hyperglass/api/error_handlers.py b/hyperglass/api/error_handlers.py index 3dd959f..fbf8df9 100644 --- a/hyperglass/api/error_handlers.py +++ b/hyperglass/api/error_handlers.py @@ -4,18 +4,21 @@ from starlette.responses import JSONResponse # Project -from hyperglass.configuration import params +from hyperglass.state import use_state async def default_handler(request, exc): """Handle uncaught errors.""" + state = use_state() return JSONResponse( - {"output": params.messages.general, "level": "danger", "keywords": []}, status_code=500, + {"output": state.params.messages.general, "level": "danger", "keywords": []}, + status_code=500, ) async def http_handler(request, exc): """Handle web server errors.""" + return JSONResponse( {"output": exc.detail, "level": "danger", "keywords": []}, status_code=exc.status_code, ) diff --git a/hyperglass/api/events.py b/hyperglass/api/events.py index 418010a..b59e8ea 100644 --- a/hyperglass/api/events.py +++ b/hyperglass/api/events.py @@ -1,15 +1,13 @@ """API Events.""" # Project -from hyperglass.cache import AsyncCache -from hyperglass.configuration import REDIS_CONFIG, params +from hyperglass.state import use_state -async def check_redis() -> bool: +def check_redis() -> bool: """Ensure Redis is running before starting server.""" - cache = AsyncCache(db=params.cache.database, **REDIS_CONFIG) - await cache.test() - return True + state = use_state() + return state._redis.ping() on_startup = (check_redis,) diff --git a/hyperglass/api/routes.py b/hyperglass/api/routes.py index 94c2da4..2919926 100644 --- a/hyperglass/api/routes.py +++ b/hyperglass/api/routes.py @@ -1,9 +1,9 @@ """API Routes.""" # Standard Library -import os import json import time +import typing as t from datetime import datetime # Third Party @@ -13,25 +13,28 @@ from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html # Project from hyperglass.log import log -from hyperglass.cache import AsyncCache +from hyperglass.state import use_state from hyperglass.external import Webhook, bgptools from hyperglass.api.tasks import process_headers from hyperglass.constants import __version__ from hyperglass.exceptions import HyperglassError -from hyperglass.models.api import Query -from hyperglass.configuration import REDIS_CONFIG, params, devices, ui_params from hyperglass.execution.main import execute # Local from .fake_output import fake_output -APP_PATH = os.environ["hyperglass_directory"] +if t.TYPE_CHECKING: + # Project + from hyperglass.models.api import Query -async def send_webhook(query_data: Query, request: Request, timestamp: datetime): +STATE = use_state() + + +async def send_webhook(query_data: "Query", request: Request, timestamp: datetime): """If webhooks are enabled, get request info and send a webhook.""" try: - if params.logging.http is not None: + if STATE.params.logging.http is not None: headers = await process_headers(headers=request.headers) if headers.get("x-real-ip") is not None: @@ -43,7 +46,7 @@ async def send_webhook(query_data: Query, request: Request, timestamp: datetime) network_info = await bgptools.network_info(host) - async with Webhook(params.logging.http) as hook: + async with Webhook(STATE.params.logging.http) as hook: await hook.send( query={ @@ -55,30 +58,30 @@ async def send_webhook(query_data: Query, request: Request, timestamp: datetime) } ) except Exception as err: - log.error("Error sending webhook to {}: {}", params.logging.http.provider, str(err)) + log.error("Error sending webhook to {}: {}", STATE.params.logging.http.provider, str(err)) -async def query(query_data: Query, request: Request, background_tasks: BackgroundTasks): +async def query(query_data: "Query", request: Request, background_tasks: BackgroundTasks): """Ingest request data pass it to the backend application to perform the query.""" timestamp = datetime.utcnow() background_tasks.add_task(send_webhook, query_data, request, timestamp) # Initialize cache - cache = AsyncCache(db=params.cache.database, **REDIS_CONFIG) + cache = STATE.redis log.debug("Initialized cache {}", repr(cache)) # Use hashed query_data string as key for for k/v cache store so # each command output value is unique. - cache_key = query_data.digest() + cache_key = f"hyperglass.query.{query_data.digest()}" # Define cache entry expiry time - cache_timeout = params.cache.timeout + cache_timeout = STATE.params.cache.timeout log.debug("Cache Timeout: {}", cache_timeout) log.info("Starting query execution for query {}", query_data.summary) - cache_response = await cache.get_dict(cache_key, "output") + cache_response = cache.get_dict(cache_key, "output") json_output = False @@ -95,11 +98,11 @@ async def query(query_data: Query, request: Request, background_tasks: Backgroun log.debug("Query {} exists in cache", cache_key) # If a cached response exists, reset the expiration time. - await cache.expire(cache_key, seconds=cache_timeout) + cache.expire(cache_key, seconds=cache_timeout) cached = True runtime = 0 - timestamp = await cache.get_dict(cache_key, "timestamp") + timestamp = cache.get_dict(cache_key, "timestamp") elif not cache_response: log.debug("No existing cache entry for query {}", cache_key) @@ -109,7 +112,7 @@ async def query(query_data: Query, request: Request, background_tasks: Backgroun starttime = time.time() - if params.fake_output: + if STATE.params.fake_output: # Return fake, static data for development purposes, if enabled. cache_output = await fake_output(json_output) else: @@ -121,23 +124,23 @@ async def query(query_data: Query, request: Request, background_tasks: Backgroun log.debug("Query {} took {} seconds to run.", cache_key, elapsedtime) if cache_output is None: - raise HyperglassError(message=params.messages.general, alert="danger") + raise HyperglassError(message=STATE.params.messages.general, alert="danger") # Create a cache entry if json_output: raw_output = json.dumps(cache_output) else: raw_output = str(cache_output) - await cache.set_dict(cache_key, "output", raw_output) - await cache.set_dict(cache_key, "timestamp", timestamp) - await cache.expire(cache_key, seconds=cache_timeout) + cache.set_dict(cache_key, "output", raw_output) + cache.set_dict(cache_key, "timestamp", timestamp) + cache.expire(cache_key, seconds=cache_timeout) log.debug("Added cache entry for query: {}", cache_key) runtime = int(round(elapsedtime, 0)) # If it does, return the cached entry - cache_response = await cache.get_dict(cache_key, "output") + cache_response = cache.get_dict(cache_key, "output") response_format = "text/plain" if json_output: @@ -161,11 +164,11 @@ async def query(query_data: Query, request: Request, background_tasks: Backgroun async def docs(): """Serve custom docs.""" - if params.docs.enable: + if STATE.params.docs.enable: docs_func_map = {"swagger": get_swagger_ui_html, "redoc": get_redoc_html} - docs_func = docs_func_map[params.docs.mode] + docs_func = docs_func_map[STATE.params.docs.mode] return docs_func( - openapi_url=params.docs.openapi_url, title=params.site_title + " - API Docs" + openapi_url=STATE.params.docs.openapi_url, title=STATE.params.site_title + " - API Docs" ) else: raise HTTPException(detail="Not found", status_code=404) @@ -173,32 +176,32 @@ async def docs(): async def router(id: str): """Get a device's API-facing attributes.""" - return devices[id].export_api() + return STATE.devices[id].export_api() async def routers(): """Serve list of configured routers and attributes.""" - return devices.export_api() + return STATE.devices.export_api() async def queries(): """Serve list of enabled query types.""" - return params.queries.list + return STATE.params.queries.list async def info(): """Serve general information about this instance of hyperglass.""" return { - "name": params.site_title, - "organization": params.org_name, - "primary_asn": int(params.primary_asn), + "name": STATE.params.site_title, + "organization": STATE.params.org_name, + "primary_asn": int(STATE.params.primary_asn), "version": __version__, } async def ui_props(): """Serve UI configration.""" - return ui_params + return STATE.ui_params endpoints = [query, docs, routers, info, ui_props] diff --git a/hyperglass/cache/aio.py b/hyperglass/cache/aio.py index 87ddb42..3f3fb38 100644 --- a/hyperglass/cache/aio.py +++ b/hyperglass/cache/aio.py @@ -4,25 +4,49 @@ import json import time import pickle +import typing as t import asyncio -from typing import Any, Dict # Third Party from aredis import StrictRedis as AsyncRedis # type: ignore -from aredis.pubsub import PubSub as AsyncPubSub # type: ignore +from pydantic import SecretStr from aredis.exceptions import RedisError # type: ignore # Project from hyperglass.cache.base import BaseCache from hyperglass.exceptions.private import DependencyError +if t.TYPE_CHECKING: + # Third Party + from aredis.pubsub import PubSub as AsyncPubSub # type: ignore + + # Project + from hyperglass.models.config.params import Params + from hyperglass.models.config.devices import Devices + class AsyncCache(BaseCache): """Asynchronous Redis cache handler.""" - def __init__(self, *args, **kwargs): + def __init__( + self, + *, + db: int, + host: str = "localhost", + port: int = 6379, + password: t.Optional[SecretStr] = None, + decode_responses: bool = False, + **kwargs: t.Any, + ): """Initialize Redis connection.""" - super().__init__(*args, **kwargs) + super().__init__( + db=db, + host=host, + port=port, + password=password, + decode_responses=decode_responses, + **kwargs, + ) password = self.password if password is not None: @@ -62,7 +86,7 @@ class AsyncCache(BaseCache): e=err_msg, ) - async def get(self, *args: str) -> Any: + async def get(self, *args: str) -> t.Any: """Get item(s) from cache.""" if len(args) == 1: raw = await self.instance.get(args[0]) @@ -70,7 +94,7 @@ class AsyncCache(BaseCache): raw = await self.instance.mget(args) return self.parse_types(raw) - async def get_dict(self, key: str, field: str = "") -> Any: + async def get_dict(self, key: str, field: str = "") -> t.Any: """Get hash map (dict) item(s).""" if not field: raw = await self.instance.hgetall(key) @@ -87,7 +111,7 @@ class AsyncCache(BaseCache): """Set hash map (dict) values.""" success = False - if isinstance(value, Dict): + if isinstance(value, t.Dict): value = json.dumps(value) else: value = str(value) @@ -99,7 +123,7 @@ class AsyncCache(BaseCache): return success - async def wait(self, pubsub: AsyncPubSub, timeout: int = 30, **kwargs) -> Any: + async def wait(self, pubsub: "AsyncPubSub", timeout: int = 30, **kwargs) -> t.Any: """Wait for pub/sub messages & return posted message.""" now = time.time() timeout = now + timeout @@ -117,7 +141,7 @@ class AsyncCache(BaseCache): return None - async def pubsub(self) -> AsyncPubSub: + async def pubsub(self) -> "AsyncPubSub": """Provide an aredis.pubsub.Pubsub instance.""" return self.instance.pubsub() @@ -139,8 +163,20 @@ class AsyncCache(BaseCache): for key in keys: await self.instance.expire(key, seconds) - async def get_config(self) -> Dict: - """Get picked config object from cache.""" + async def get_params(self: "AsyncCache") -> "Params": + """Get Params object from the cache.""" + params = await self.instance.get(self.CONFIG_KEY) + return pickle.loads(params) - pickled = await self.instance.get("HYPERGLASS_CONFIG") - return pickle.loads(pickled) + async def get_devices(self: "AsyncCache") -> "Devices": + """Get Devices object from the cache.""" + devices = await self.instance.get(self.DEVICES_KEY) + return pickle.loads(devices) + + async def set_config(self: "AsyncCache", config: "Params") -> None: + """Add a params instance to the cache.""" + await self.instance.set(self.CONFIG_KEY, pickle.dumps(config)) + + async def set_devices(self: "AsyncCache", devices: "Devices") -> None: + """Add a devices instance to the cache.""" + await self.instance.set(self.DEVICES_KEY, pickle.dumps(devices)) diff --git a/hyperglass/cache/base.py b/hyperglass/cache/base.py index 87fa43f..60998b6 100644 --- a/hyperglass/cache/base.py +++ b/hyperglass/cache/base.py @@ -3,7 +3,7 @@ # Standard Library import re import json -from typing import Any, Optional +import typing as t # Third Party from pydantic import SecretStr @@ -12,30 +12,35 @@ from pydantic import SecretStr class BaseCache: """Redis cache handler.""" + CONFIG_KEY: str = "hyperglass.config" + DEVICES_KEY: str = "hyperglass.devices" + def __init__( self, + *, db: int, host: str = "localhost", port: int = 6379, - password: Optional[SecretStr] = None, - decode_responses: bool = True, - **kwargs: Any, + password: t.Optional[SecretStr] = None, + decode_responses: bool = False, + **kwargs: t.Any, ) -> None: """Initialize Redis connection.""" - self.db: int = db - self.host: str = str(host) - self.port: int = port - self.password: Optional[SecretStr] = password - self.decode_responses: bool = decode_responses - self.redis_args: dict = kwargs + self.db = db + self.host = str(host) + self.port = port + self.password = password + self.decode_responses = decode_responses + self.redis_args = kwargs def __repr__(self) -> str: """Represent class state.""" - return "HyperglassCache(db={}, host={}, port={}, password={})".format( + + return "HyperglassCache(db={!s}, host={}, port={!s}, password={})".format( self.db, self.host, self.port, self.password ) - def parse_types(self, value: str) -> Any: + def parse_types(self, value: str) -> t.Any: """Parse a string to standard python types.""" def parse_string(str_value: str): @@ -56,11 +61,11 @@ class BaseCache: value = parse_string(value) elif isinstance(value, bytes): value = parse_string(value.decode("utf-8")) - elif isinstance(value, list): + elif isinstance(value, t.List): value = [parse_string(i) for i in value] - elif isinstance(value, tuple): + elif isinstance(value, t.Tuple): value = tuple(parse_string(i) for i in value) - elif isinstance(value, dict): + elif isinstance(value, t.Dict): value = {k: self.parse_types(v) for k, v in value.items()} return value diff --git a/hyperglass/cache/sync.py b/hyperglass/cache/sync.py index 0579a25..a4dc13f 100644 --- a/hyperglass/cache/sync.py +++ b/hyperglass/cache/sync.py @@ -4,24 +4,48 @@ import json import time import pickle -from typing import Any, Dict +import typing as t # Third Party from redis import Redis as SyncRedis -from redis.client import PubSub as SyncPubsSub +from pydantic import SecretStr from redis.exceptions import RedisError # Project from hyperglass.cache.base import BaseCache from hyperglass.exceptions.private import DependencyError +if t.TYPE_CHECKING: + # Third Party + from redis.client import PubSub as SyncPubsSub + + # Project + from hyperglass.models.config.params import Params + from hyperglass.models.config.devices import Devices + class SyncCache(BaseCache): """Synchronous Redis cache handler.""" - def __init__(self, *args, **kwargs): + def __init__( + self, + *, + db: int, + host: str = "localhost", + port: int = 6379, + password: t.Optional[SecretStr] = None, + decode_responses: bool = False, + **kwargs: t.Any, + ): """Initialize Redis connection.""" - super().__init__(*args, **kwargs) + super().__init__( + db=db, + host=host, + port=port, + password=password, + decode_responses=decode_responses, + **kwargs, + ) password = self.password if password is not None: @@ -60,15 +84,25 @@ class SyncCache(BaseCache): e=err_msg, ) - def get(self, *args: str) -> Any: + def get(self, *args: str, decode: bool = True) -> t.Any: """Get item(s) from cache.""" if len(args) == 1: raw = self.instance.get(args[0]) else: raw = self.instance.mget(args) + if decode and isinstance(raw, bytes): + raw = raw.decode() + return self.parse_types(raw) - def get_dict(self, key: str, field: str = "") -> Any: + GetObj = t.TypeVar("GetObj") + + def get_object(self, name: str, _type: t.Type[GetObj] = t.Any) -> GetObj: + raw = self.instance.get(name) + obj: _type = pickle.loads(raw) + return obj + + def get_dict(self, key: str, field: str = "", *, decode: bool = True) -> t.Any: """Get hash map (dict) item(s).""" if not field: raw = self.instance.hgetall(key) @@ -85,7 +119,7 @@ class SyncCache(BaseCache): """Set hash map (dict) values.""" success = False - if isinstance(value, Dict): + if isinstance(value, t.Dict): value = json.dumps(value) else: value = str(value) @@ -97,7 +131,7 @@ class SyncCache(BaseCache): return success - def wait(self, pubsub: SyncPubsSub, timeout: int = 30, **kwargs) -> Any: + def wait(self, pubsub: "SyncPubsSub", timeout: int = 30, **kwargs) -> t.Any: """Wait for pub/sub messages & return posted message.""" now = time.time() timeout = now + timeout @@ -115,7 +149,7 @@ class SyncCache(BaseCache): return None - def pubsub(self) -> SyncPubsSub: + def pubsub(self) -> "SyncPubsSub": """Provide a redis.client.Pubsub instance.""" return self.instance.pubsub() @@ -137,8 +171,20 @@ class SyncCache(BaseCache): for key in keys: self.instance.expire(key, seconds) - def get_config(self) -> Dict: - """Get picked config object from cache.""" + def get_params(self) -> "Params": + """Get Params object from the cache.""" + return self.get_object(self.CONFIG_KEY, "Params") + # return pickle.loads(self.get(self.CONFIG_KEY, decode=False, parse=False)) - pickled = self.instance.get("HYPERGLASS_CONFIG") - return pickle.loads(pickled) + def get_devices(self) -> "Devices": + """Get Devices object from the cache.""" + return self.get_object(self.DEVICES_KEY, "Devices") + # return pickle.loads(self.get(self.DEVICES_KEY, decode=False, parse=False)) + + def set_config(self: "SyncCache", config: "Params") -> None: + """Add a params instance to the cache.""" + self.instance.set(self.CONFIG_KEY, pickle.dumps(config)) + + def set_devices(self: "SyncCache", devices: "Devices") -> None: + """Add a devices instance to the cache.""" + self.instance.set(self.DEVICES_KEY, pickle.dumps(devices)) diff --git a/hyperglass/configuration/main.py b/hyperglass/configuration/main.py index 4b46160..1b20216 100644 --- a/hyperglass/configuration/main.py +++ b/hyperglass/configuration/main.py @@ -10,13 +10,8 @@ import yaml from pydantic import ValidationError # Project -from hyperglass.log import ( - log, - set_log_level, - enable_file_logging, - enable_syslog_logging, -) -from hyperglass.util import set_app_path, set_cache_env, current_log_level +from hyperglass.log import log, enable_file_logging, enable_syslog_logging +from hyperglass.util import set_app_path, set_cache_env from hyperglass.defaults import CREDIT from hyperglass.constants import PARSED_RESPONSE_FIELDS, __version__ from hyperglass.models.ui import UIParameters @@ -135,20 +130,11 @@ def _get_devices(data: List[Dict], directives: List[Directive]) -> Devices: user_config = _config_optional(CONFIG_MAIN) # Read raw debug value from config to enable debugging quickly. -set_log_level(logger=log, debug=user_config.get("debug", True)) # Map imported user configuration to expected schema. log.debug("Unvalidated configuration from {}: {}", CONFIG_MAIN, user_config) params = validate_config(config=user_config, importer=Params) -# Re-evaluate debug state after config is validated -log_level = current_log_level(log) - -if params.debug and log_level != "debug": - set_log_level(logger=log, debug=True) -elif not params.debug and log_level == "debug": - set_log_level(logger=log, debug=False) - # Map imported user commands to expected schema. _user_commands = _config_optional(CONFIG_COMMANDS) log.debug("Unvalidated commands from {}: {}", CONFIG_COMMANDS, _user_commands) diff --git a/hyperglass/exceptions/private.py b/hyperglass/exceptions/private.py index 4172ff8..255bc17 100644 --- a/hyperglass/exceptions/private.py +++ b/hyperglass/exceptions/private.py @@ -92,3 +92,7 @@ class DependencyError(PrivateHyperglassError): class PluginError(PrivateHyperglassError): """Raised when a plugin error occurs.""" + + +class StateError(PrivateHyperglassError): + """Raised when an error occurs while fetching state from Redis.""" diff --git a/hyperglass/execution/drivers/__init__.py b/hyperglass/execution/drivers/__init__.py index 73a6ada..93a1f0e 100644 --- a/hyperglass/execution/drivers/__init__.py +++ b/hyperglass/execution/drivers/__init__.py @@ -5,3 +5,10 @@ from .agent import AgentConnection from ._common import Connection from .ssh_netmiko import NetmikoConnection from .ssh_scrapli import ScrapliConnection + +__all__ = ( + "AgentConnection", + "Connection", + "NetmikoConnection", + "ScrapliConnection", +) diff --git a/hyperglass/execution/drivers/_construct.py b/hyperglass/execution/drivers/_construct.py index 1033c57..b2b8520 100644 --- a/hyperglass/execution/drivers/_construct.py +++ b/hyperglass/execution/drivers/_construct.py @@ -8,28 +8,32 @@ hyperglass API modules. # Standard Library import re import json as _json +import typing as t # Project from hyperglass.log import log from hyperglass.util import get_fmt_keys from hyperglass.constants import TRANSPORT_REST, TARGET_FORMAT_SPACE -from hyperglass.models.api.query import Query from hyperglass.exceptions.public import InputInvalid from hyperglass.exceptions.private import ConfigError -from hyperglass.models.config.devices import Device -from hyperglass.models.commands.generic import Directive + +if t.TYPE_CHECKING: + # Project + from hyperglass.models.api.query import Query + from hyperglass.models.config.devices import Device + from hyperglass.models.commands.generic import Directive class Construct: """Construct SSH commands/REST API parameters from validated query data.""" - directive: Directive - device: Device - query: Query + directive: "Directive" + device: "Device" + query: "Query" transport: str target: str - def __init__(self, device, query): + def __init__(self, device: "Device", query: "Query"): """Initialize command construction.""" log.debug( "Constructing '{}' query for '{}'", query.query_type, str(query.query_target), diff --git a/hyperglass/execution/drivers/agent.py b/hyperglass/execution/drivers/agent.py index 0465596..8439230 100644 --- a/hyperglass/execution/drivers/agent.py +++ b/hyperglass/execution/drivers/agent.py @@ -16,8 +16,8 @@ import httpx # Project from hyperglass.log import log from hyperglass.util import parse_exception +from hyperglass.state import use_state from hyperglass.encode import jwt_decode, jwt_encode -from hyperglass.configuration import params from hyperglass.exceptions.public import RestError, ResponseEmpty # Local @@ -38,10 +38,11 @@ class AgentConnection(Connection): async def collect(self) -> Iterable: # noqa: C901 """Connect to a device running hyperglass-agent via HTTP.""" log.debug("Query parameters: {}", self.query) + state = use_state() client_params = { "headers": {"Content-Type": "application/json"}, - "timeout": params.request_timeout, + "timeout": state.params.request_timeout, } if self.device.ssl is not None and self.device.ssl.enable: with self.device.ssl.cert.open("r") as file: @@ -76,7 +77,7 @@ class AgentConnection(Connection): encoded_query = await jwt_encode( payload=query, secret=self.device.credential.password.get_secret_value(), - duration=params.request_timeout, + duration=state.params.request_timeout, ) log.debug("Encoded JWT: {}", encoded_query) diff --git a/hyperglass/execution/drivers/ssh.py b/hyperglass/execution/drivers/ssh.py index 11f27cb..940a0ff 100644 --- a/hyperglass/execution/drivers/ssh.py +++ b/hyperglass/execution/drivers/ssh.py @@ -5,7 +5,7 @@ from typing import TYPE_CHECKING # Project from hyperglass.log import log -from hyperglass.configuration import params +from hyperglass.state import use_state from hyperglass.compat._sshtunnel import BaseSSHTunnelForwarderError, open_tunnel from hyperglass.exceptions.public import ScrapeError @@ -24,6 +24,7 @@ class SSHConnection(Connection): """Return a preconfigured sshtunnel.SSHTunnelForwarder instance.""" proxy = self.device.proxy + state = use_state() def opener(): """Set up an SSH tunnel according to a device's configuration.""" @@ -32,7 +33,7 @@ class SSHConnection(Connection): "remote_bind_address": (self.device._target, self.device.port), "local_bind_address": ("localhost", 0), "skip_tunnel_checkup": False, - "gateway_timeout": params.request_timeout - 2, + "gateway_timeout": state.params.request_timeout - 2, } if proxy.credential._method == "password": # Use password auth if no key is defined. diff --git a/hyperglass/execution/drivers/ssh_netmiko.py b/hyperglass/execution/drivers/ssh_netmiko.py index 5758194..f099a60 100644 --- a/hyperglass/execution/drivers/ssh_netmiko.py +++ b/hyperglass/execution/drivers/ssh_netmiko.py @@ -16,7 +16,7 @@ from netmiko import ( # type: ignore # Project from hyperglass.log import log -from hyperglass.configuration import params +from hyperglass.state import state from hyperglass.exceptions.public import AuthError, DeviceTimeout, ResponseEmpty # Local @@ -65,9 +65,9 @@ class NetmikoConnection(SSHConnection): "port": port or self.device.port, "device_type": self.device.type, "username": self.device.credential.username, - "global_delay_factor": params.netmiko_delay_factor, - "timeout": math.floor(params.request_timeout * 1.25), - "session_timeout": math.ceil(params.request_timeout - 1), + "global_delay_factor": state.params.netmiko_delay_factor, + "timeout": math.floor(state.params.request_timeout * 1.25), + "session_timeout": math.ceil(state.params.request_timeout - 1), **global_args, } diff --git a/hyperglass/execution/drivers/ssh_scrapli.py b/hyperglass/execution/drivers/ssh_scrapli.py index 581883a..3ac9a16 100644 --- a/hyperglass/execution/drivers/ssh_scrapli.py +++ b/hyperglass/execution/drivers/ssh_scrapli.py @@ -24,7 +24,7 @@ from scrapli.driver.core import ( # Project from hyperglass.log import log -from hyperglass.configuration import params +from hyperglass.state import use_state from hyperglass.exceptions.public import ( AuthError, ScrapeError, @@ -71,6 +71,7 @@ class ScrapliConnection(SSHConnection): Directly connects to the router via Netmiko library, returns the command output. """ + state = use_state() driver = _map_driver(self.device.type) if host is not None: @@ -89,7 +90,7 @@ class ScrapliConnection(SSHConnection): "host": host or self.device._target, "port": port or self.device.port, "auth_username": self.device.credential.username, - "timeout_ops": math.floor(params.request_timeout * 1.25), + "timeout_ops": math.floor(state.params.request_timeout * 1.25), "transport": "asyncssh", "auth_strict_key": False, "ssh_known_hosts_file": False, diff --git a/hyperglass/execution/main.py b/hyperglass/execution/main.py index 8d49d74..174f661 100644 --- a/hyperglass/execution/main.py +++ b/hyperglass/execution/main.py @@ -12,7 +12,7 @@ from typing import TYPE_CHECKING, Any, Dict, Union, Callable # Project from hyperglass.log import log -from hyperglass.configuration import params +from hyperglass.state import use_state from hyperglass.exceptions.public import DeviceTimeout, ResponseEmpty if TYPE_CHECKING: @@ -47,8 +47,8 @@ def handle_timeout(**exc_args: Any) -> Callable: async def execute(query: "Query") -> Union["OutputDataModel", str]: """Initiate query validation and execution.""" - - output = params.messages.general + state = use_state() + output = state.params.messages.general log.debug("Received query for {}", query.json()) log.debug("Matched device config: {}", query.device) @@ -60,7 +60,7 @@ async def execute(query: "Query") -> Union["OutputDataModel", str]: signal.SIGALRM, handle_timeout(error=TimeoutError("Connection timed out"), device=query.device), ) - signal.alarm(params.request_timeout - 1) + signal.alarm(state.params.request_timeout - 1) if query.device.proxy: proxy = driver.setup_proxy() diff --git a/hyperglass/main.py b/hyperglass/main.py index c01a16e..57bbdec 100644 --- a/hyperglass/main.py +++ b/hyperglass/main.py @@ -2,18 +2,17 @@ # Standard Library import sys -import math import shutil +import typing as t import logging import platform -from typing import TYPE_CHECKING # Third Party from gunicorn.app.base import BaseApplication # type: ignore from gunicorn.glogging import Logger # type: ignore # Local -from .log import log, setup_lib_logging +from .log import log, set_log_level, setup_lib_logging from .plugins import ( InputPluginManager, OutputPluginManager, @@ -23,7 +22,7 @@ from .plugins import ( from .constants import MIN_NODE_VERSION, MIN_PYTHON_VERSION, __version__ from .util.frontend import get_node_version -if TYPE_CHECKING: +if t.TYPE_CHECKING: # Third Party from gunicorn.arbiter import Arbiter # type: ignore @@ -39,33 +38,19 @@ if sys.version_info < MIN_PYTHON_VERSION: node_major, _, __ = get_node_version() if node_major != MIN_NODE_VERSION: - raise RuntimeError(f"NodeJS {MIN_NODE_VERSION}+ is required.") + raise RuntimeError(f"NodeJS {MIN_NODE_VERSION!s}+ is required.") # Project from hyperglass.compat._asyncio import aiorun # Local -from .util import cpu_count, clear_redis_cache, format_listen_address -from .cache import SyncCache -from .configuration import ( - URL_DEV, - URL_PROD, - CONFIG_PATH, - REDIS_CONFIG, - params, - devices, - ui_params, -) +from .util import cpu_count +from .state import use_state +from .settings import Settings +from .configuration import URL_DEV, URL_PROD from .util.frontend import build_frontend -if params.debug: - workers = 1 - loglevel = "DEBUG" -else: - workers = cpu_count(2) - loglevel = "WARNING" - class StubbedGunicornLogger(Logger): """Custom logging to direct Gunicorn/Uvicorn logs to Loguru/Rich. @@ -73,58 +58,30 @@ class StubbedGunicornLogger(Logger): See: https://pawamoy.github.io/posts/unify-logging-for-a-gunicorn-uvicorn-app/ """ - def setup(self, cfg): + def setup(self, cfg: t.Any) -> None: """Override Gunicorn setup.""" handler = logging.NullHandler() self.error_logger = logging.getLogger("gunicorn.error") self.error_logger.addHandler(handler) self.access_logger = logging.getLogger("gunicorn.access") self.access_logger.addHandler(handler) - self.error_logger.setLevel(loglevel) - self.access_logger.setLevel(loglevel) - - -def check_redis_instance() -> bool: - """Ensure Redis is running before starting server.""" - - cache = SyncCache(db=params.cache.database, **REDIS_CONFIG) - cache.test() - log.debug("Redis is running at: {}:{}", REDIS_CONFIG["host"], REDIS_CONFIG["port"]) - return True + self.error_logger.setLevel(Settings.log_level) + self.access_logger.setLevel(Settings.log_level) async def build_ui() -> bool: """Perform a UI build prior to starting the application.""" + state = use_state() await build_frontend( - dev_mode=params.developer_mode, + dev_mode=Settings.dev_mode, dev_url=URL_DEV, prod_url=URL_PROD, - params=ui_params, - app_path=CONFIG_PATH, + params=state.ui_params, + app_path=Settings.app_path, ) return True -async def clear_cache(): - """Clear the Redis cache on shutdown.""" - try: - await clear_redis_cache(db=params.cache.database, config=REDIS_CONFIG) - except RuntimeError as e: - log.error(str(e)) - pass - - -def cache_config() -> bool: - """Add configuration to Redis cache as a pickled object.""" - # Standard Library - import pickle - - cache = SyncCache(db=params.cache.database, **REDIS_CONFIG) - cache.set("HYPERGLASS_CONFIG", pickle.dumps(params)) - - return True - - def register_all_plugins(devices: "Devices") -> None: """Validate and register configured plugins.""" @@ -149,23 +106,21 @@ def unregister_all_plugins() -> None: def on_starting(server: "Arbiter"): """Gunicorn pre-start tasks.""" - setup_lib_logging() - python_version = platform.python_version() required = ".".join((str(v) for v in MIN_PYTHON_VERSION)) - log.info("Python {} detected ({} required)", python_version, required) + log.debug("Python {} detected ({} required)", python_version, required) + + state = use_state() + + register_all_plugins(state.devices) - check_redis_instance() aiorun(build_ui()) - cache_config() - register_all_plugins(devices) log.success( - "Started hyperglass {v} on http://{h}:{p} with {w} workers", - v=__version__, - h=format_listen_address(params.listen_address), - p=str(params.listen_port), - w=server.app.cfg.settings["workers"].value, + "Started hyperglass {} on http://{} with {!s} workers", + __version__, + Settings.bind(), + server.app.cfg.settings["workers"].value, ) @@ -174,11 +129,10 @@ def on_exit(server: "Arbiter"): log.critical("Stopping hyperglass {}", __version__) - async def runner(): - if not params.developer_mode: - await clear_cache() + state = use_state() + if not Settings.dev_mode: + state.clear() - aiorun(runner()) unregister_all_plugins() @@ -210,24 +164,29 @@ class HyperglassWSGI(BaseApplication): def start(**kwargs): """Start hyperglass via gunicorn.""" + set_log_level(log, Settings.debug) + + log.debug("System settings: {!r}", Settings) + setup_lib_logging() + + workers, log_level = 1, "DEBUG" + if Settings.debug is False: + workers, log_level = cpu_count(2), "WARNING" + HyperglassWSGI( app="hyperglass.api:app", options={ - "worker_class": "uvicorn.workers.UvicornWorker", "preload": True, - "keepalive": 10, - "command": shutil.which("gunicorn"), - "bind": ":".join( - (format_listen_address(params.listen_address), str(params.listen_port)) - ), - "workers": workers, - "loglevel": loglevel, - "timeout": math.ceil(params.request_timeout * 1.25), - "on_starting": on_starting, - "on_exit": on_exit, - "logger_class": StubbedGunicornLogger, - "accesslog": "-", "errorlog": "-", + "accesslog": "-", + "workers": workers, + "on_exit": on_exit, + "loglevel": log_level, + "bind": Settings.bind(), + "on_starting": on_starting, + "command": shutil.which("gunicorn"), + "logger_class": StubbedGunicornLogger, + "worker_class": "uvicorn.workers.UvicornWorker", "logconfig_dict": {"formatters": {"generic": {"format": "%(message)s"}}}, **kwargs, }, @@ -235,4 +194,12 @@ def start(**kwargs): if __name__ == "__main__": - start() + try: + start() + except Exception as error: + if not Settings.dev_mode: + state = use_state() + state.clear() + log.info("Cleared Redis cache") + unregister_all_plugins() + raise error diff --git a/hyperglass/models/commands/generic.py b/hyperglass/models/commands/generic.py index 9c4789a..c1deee6 100644 --- a/hyperglass/models/commands/generic.py +++ b/hyperglass/models/commands/generic.py @@ -3,7 +3,7 @@ # Standard Library import os import re -from typing import Dict, List, Union, Literal, Optional +import typing as t from pathlib import Path from ipaddress import IPv4Network, IPv6Network, ip_network @@ -25,15 +25,18 @@ from hyperglass.exceptions.private import InputValidationError # Local from ..main import HyperglassModel, HyperglassModelWithId from ..fields import Action -from ..config.params import Params + +if t.TYPE_CHECKING: + # Local + from ..config.params import Params IPv4PrefixLength = conint(ge=0, le=32) IPv6PrefixLength = conint(ge=0, le=128) -IPNetwork = Union[IPv4Network, IPv6Network] -StringOrArray = Union[StrictStr, List[StrictStr]] -Condition = Union[IPv4Network, IPv6Network, StrictStr] -RuleValidation = Union[Literal["ipv4", "ipv6", "pattern"], None] -PassedValidation = Union[bool, None] +IPNetwork = t.Union[IPv4Network, IPv6Network] +StringOrArray = t.Union[StrictStr, t.List[StrictStr]] +Condition = t.Union[IPv4Network, IPv6Network, StrictStr] +RuleValidation = t.Union[t.Literal["ipv4", "ipv6", "pattern"], None] +PassedValidation = t.Union[bool, None] class Input(HyperglassModel): @@ -57,14 +60,14 @@ class Text(Input): """Text/input field model.""" _type: PrivateAttr = PrivateAttr("text") - validation: Optional[StrictStr] + validation: t.Optional[StrictStr] class Option(HyperglassModel): """Select option model.""" - name: Optional[StrictStr] - description: Optional[StrictStr] + name: t.Optional[StrictStr] + description: t.Optional[StrictStr] value: StrictStr @@ -72,7 +75,7 @@ class Select(Input): """Select field model.""" _type: PrivateAttr = PrivateAttr("select") - options: List[Option] + options: t.List[Option] class Rule(HyperglassModel, allow_population_by_field_name=True): @@ -82,10 +85,10 @@ class Rule(HyperglassModel, allow_population_by_field_name=True): _passed: PassedValidation = PrivateAttr(None) condition: Condition action: Action = Action("permit") - commands: List[str] = Field([], alias="command") + commands: t.List[str] = Field([], alias="command") @validator("commands", pre=True, allow_reuse=True) - def validate_commands(cls, value: Union[str, List[str]]) -> List[str]: + def validate_commands(cls, value: t.Union[str, t.List[str]]) -> t.List[str]: """Ensure commands is a list.""" if isinstance(value, str): return [value] @@ -215,13 +218,13 @@ class RuleWithoutValidation(Rule): _validation: RuleValidation = PrivateAttr(None) condition: None - def validate_target(self, target: str) -> Literal[True]: + def validate_target(self, target: str) -> t.Literal[True]: """Don't validate a target. Always returns `True`.""" self._passed = True return True -Rules = Union[RuleWithIPv4, RuleWithIPv6, RuleWithPattern, RuleWithoutValidation] +Rules = t.Union[RuleWithIPv4, RuleWithIPv6, RuleWithPattern, RuleWithoutValidation] class Directive(HyperglassModelWithId): @@ -229,11 +232,12 @@ class Directive(HyperglassModelWithId): id: StrictStr name: StrictStr - rules: List[Rules] - field: Union[Text, Select, None] - info: Optional[FilePath] - plugins: List[StrictStr] = [] - groups: List[ + rules: t.List[Rules] + field: t.Union[Text, Select, None] + info: t.Optional[FilePath] + plugins: t.List[StrictStr] = [] + disable_builtins: StrictBool = False + groups: t.List[ StrictStr ] = [] # TODO: Flesh this out. Replace VRFs, but use same logic in React to filter available commands for multi-device queries. @@ -247,7 +251,7 @@ class Directive(HyperglassModelWithId): raise InputValidationError(error="No matched validation rules", target=target) @property - def field_type(self) -> Literal["text", "select", None]: + def field_type(self) -> t.Literal["text", "select", None]: """Get the linked field type.""" if self.field.is_select: @@ -257,7 +261,7 @@ class Directive(HyperglassModelWithId): return None @validator("plugins") - def validate_plugins(cls: "Directive", plugins: List[str]) -> List[str]: + def validate_plugins(cls: "Directive", plugins: t.List[str]) -> t.List[str]: """Validate and register configured plugins.""" plugin_dir = Path(os.environ["hyperglass_directory"]) / "plugins" if plugin_dir.exists(): @@ -271,7 +275,7 @@ class Directive(HyperglassModelWithId): return [str(f) for f in matching_plugins] return [] - def frontend(self, params: Params) -> Dict: + def frontend(self: "Directive", params: "Params") -> t.Dict[str, t.Any]: """Prepare a representation of the directive for the UI.""" value = { diff --git a/hyperglass/models/system.py b/hyperglass/models/system.py new file mode 100644 index 0000000..a2fedee --- /dev/null +++ b/hyperglass/models/system.py @@ -0,0 +1,116 @@ +"""hyperglass System Settings model.""" + +# Standard Library +import typing as t +from ipaddress import ip_address + +# Third Party +from pydantic import ( + RedisDsn, + SecretStr, + BaseSettings, + DirectoryPath, + IPvAnyAddress, + validator, +) + +# Project +from hyperglass.util import at_least, cpu_count + +ListenHost = t.Union[None, IPvAnyAddress, t.Literal["localhost"]] + + +class HyperglassSystem(BaseSettings): + """hyperglass system settings, required to start hyperglass.""" + + class Config: + """hyperglass system settings configuration.""" + + env_prefix = "hyperglass_" + + debug: bool = False + dev_mode: bool = False + app_path: DirectoryPath + redis_host: str = "localhost" + redis_password: t.Optional[SecretStr] + redis_db: int = 1 + redis_dsn: RedisDsn = None + host: IPvAnyAddress = None + port: int = 8001 + + @validator("host", pre=True, always=True) + def validate_host( + cls: "HyperglassSystem", value: t.Any, values: t.Dict[str, t.Any] + ) -> IPvAnyAddress: + """Set default host based on debug mode.""" + + if value is None: + if values["debug"] is False: + return ip_address("127.0.0.1") + elif values["debug"] is True: + return ip_address("0.0.0.0") + + if isinstance(value, str): + if value != "localhost": + try: + return ip_address(value) + except ValueError: + raise ValueError(str(value)) + + elif value == "localhost": + return ip_address("127.0.0.1") + + raise ValueError(str(value)) + + @validator("redis_dsn", always=True) + def validate_redis_dsn( + cls: "HyperglassSystem", value: t.Any, values: t.Dict[str, t.Any] + ) -> RedisDsn: + """Construct a Redis DSN if none is provided.""" + if value is None: + dsn = "redis://{}/{!s}".format(values["redis_host"], values["redis_db"]) + password = values.get("redis_password") + if password is not None: + dsn = "redis://:{}@{}/{!s}".format( + password.get_secret_value(), values["redis_host"], values["redis_db"], + ) + return dsn + return value + + def bind(self: "HyperglassSystem") -> str: + """Format a listen_address. Wraps IPv6 address in brackets.""" + if self.host.version == 6: + return f"[{self.host!s}]:{self.port!s}" + return f"{self.host!s}:{self.port!s}" + + @property + def log_level(self: "HyperglassSystem") -> str: + """Get log level as string, inferred from debug mode.""" + if self.debug: + return "DEBUG" + return "WARNING" + + @property + def workers(self: "HyperglassSystem") -> int: + """Get worker count, inferred from debug mode.""" + if self.debug: + return 1 + return cpu_count(2) + + @property + def redis(self: "HyperglassSystem") -> t.Dict[str, t.Union[None, int, str]]: + """Get redis parameters as a dict for convenient connection setups.""" + password = None + if self.redis_password is not None: + password = self.redis_password.get_secret_value() + + return { + "db": self.redis_db, + "host": self.redis_host, + "password": password, + } + + @property + def redis_connection_pool(self: "HyperglassSystem") -> t.Dict[str, t.Any]: + """Get Redis ConnectionPool keyword arguments.""" + return {"url": str(self.redis_dsn), "max_connections": at_least(8, cpu_count(2))} diff --git a/hyperglass/plugins/_manager.py b/hyperglass/plugins/_manager.py index 9c7e6df..33743dc 100644 --- a/hyperglass/plugins/_manager.py +++ b/hyperglass/plugins/_manager.py @@ -1,16 +1,12 @@ """Plugin manager definition.""" # Standard Library -import json -import codecs -import pickle -from typing import TYPE_CHECKING, Any, List, Generic, TypeVar, Callable, Generator +import typing as t from inspect import isclass # Project from hyperglass.log import log -from hyperglass.cache import SyncCache -from hyperglass.configuration import REDIS_CONFIG, params +from hyperglass.state.redis import use_state from hyperglass.exceptions.private import PluginError # Local @@ -18,26 +14,27 @@ from ._base import PluginType, HyperglassPlugin from ._input import InputPlugin, InputPluginReturn from ._output import OutputType, OutputPlugin -if TYPE_CHECKING: +if t.TYPE_CHECKING: # Project + from hyperglass.state.redis import HyperglassState from hyperglass.models.api.query import Query from hyperglass.models.config.devices import Device from hyperglass.models.commands.generic import Directive -PluginT = TypeVar("PluginT") +PluginT = t.TypeVar("PluginT", bound=HyperglassPlugin) -class PluginManager(Generic[PluginT]): +class PluginManager(t.Generic[PluginT]): """Manage all plugins.""" _type: PluginType - _cache: SyncCache + _state: "HyperglassState" _index: int = 0 _cache_key: str def __init__(self: "PluginManager") -> None: """Initialize plugin manager.""" - self._cache = SyncCache(db=params.cache.database, **REDIS_CONFIG) + self._state = use_state() self._cache_key = f"hyperglass.plugins.{self._type}" def __init_subclass__(cls: "PluginManager", **kwargs: PluginType) -> None: @@ -61,20 +58,19 @@ class PluginManager(Generic[PluginT]): self._index = 0 raise StopIteration - def _get_plugins(self: "PluginManager") -> List[PluginT]: - """Retrieve plugins from cache.""" - cached = self._cache.get(self._cache_key) - return list({pickle.loads(codecs.decode(plugin.encode(), "base64")) for plugin in cached}) - - def _clear_plugins(self: "PluginManager") -> None: - """Remove all plugins.""" - self._cache.set(self._cache_key, json.dumps([])) - @property - def plugins(self: "PluginManager") -> List[PluginT]: + def plugins(self: "PluginManager", builtins: bool = True) -> t.List[PluginT]: """Get all plugins, with built-in plugins last.""" + plugins = self._state.plugins(self._type) + if builtins is False: + plugins = [p for p in plugins if p.__hyperglass_builtin__ is False] + + # Sort plugins by their name attribute, which is the name of the class by default. + sorted_by_name = sorted(plugins, key=lambda p: str(p)) + + # Sort with built-in plugins last. return sorted( - self._get_plugins(), + sorted_by_name, key=lambda p: -1 if p.__hyperglass_builtin__ else 1, # flake8: noqa IF100 reverse=True, ) @@ -84,7 +80,7 @@ class PluginManager(Generic[PluginT]): """Get this plugin manager's name.""" return self.__class__.__name__ - def methods(self: "PluginManager", name: str) -> Generator[Callable, None, None]: + def methods(self: "PluginManager", name: str) -> t.Generator[t.Callable, None, None]: """Get methods of all registered plugins matching `name`.""" for plugin in self.plugins: if hasattr(plugin, name): @@ -99,39 +95,24 @@ class PluginManager(Generic[PluginT]): def reset(self: "PluginManager") -> None: """Remove all plugins.""" self._index = 0 - self._cache = SyncCache(db=params.cache.database, **REDIS_CONFIG) - return self._clear_plugins() + self._state.reset_plugins(self._type) def unregister(self: "PluginManager", plugin: PluginT) -> None: """Remove a plugin from currently active plugins.""" if isclass(plugin): if issubclass(plugin, HyperglassPlugin): - plugins = { - # Create a base64 representation of a picked plugin. - codecs.encode(pickle.dumps(p), "base64").decode() - # Merge current plugins with the new plugin. - for p in self._get_plugins() - if p != plugin - } - # Add plugins from cache. - self._cache.set(f"hyperglass.plugins.{self._type}", json.dumps(list(plugins))) + self._state.remove_plugin(self._type, plugin) + return raise PluginError("Plugin '{}' is not a valid hyperglass plugin", repr(plugin)) - def register(self: "PluginManager", plugin: PluginT, *args: Any, **kwargs: Any) -> None: + def register(self: "PluginManager", plugin: PluginT, *args: t.Any, **kwargs: t.Any) -> None: """Add a plugin to currently active plugins.""" # Create a set of plugins so duplicate plugins are not mistakenly added. try: if issubclass(plugin, HyperglassPlugin): instance = plugin(*args, **kwargs) - plugins = { - # Create a base64 representation of a picked plugin. - codecs.encode(pickle.dumps(p), "base64").decode() - # Merge current plugins with the new plugin. - for p in [*self._get_plugins(), instance] - } - # Add plugins from cache. - self._cache.set(f"hyperglass.plugins.{self._type}", json.dumps(list(plugins))) + self._state.add_plugin(self._type, instance) if instance.__hyperglass_builtin__ is True: log.debug("Registered built-in plugin '{}'", instance.name) else: diff --git a/hyperglass/plugins/main.py b/hyperglass/plugins/main.py index c71c7fc..66905f4 100644 --- a/hyperglass/plugins/main.py +++ b/hyperglass/plugins/main.py @@ -34,6 +34,7 @@ def _register_from_module(module: Any, **kwargs: Any) -> Tuple[str, ...]: """Register defined classes from the module.""" failures = () defs = getmembers(module, lambda o: _is_class(module, o)) + sys.modules[module.__name__] = module for name, plugin in defs: if issubclass(plugin, OutputPlugin): manager = OutputPluginManager() @@ -55,7 +56,6 @@ def _module_from_file(file: Path) -> Any: for k, v in _PLUGIN_GLOBALS.items(): setattr(module, k, v) spec.loader.exec_module(module) - sys.modules[module.__name__] = module return module diff --git a/hyperglass/settings.py b/hyperglass/settings.py new file mode 100644 index 0000000..8653591 --- /dev/null +++ b/hyperglass/settings.py @@ -0,0 +1,17 @@ +# Standard Library +import typing as t + +if t.TYPE_CHECKING: + # Local + from .models.system import HyperglassSystem + + +def _system_settings() -> "HyperglassSystem": + """Get system settings from local environment.""" + # Local + from .models.system import HyperglassSystem + + return HyperglassSystem() + + +Settings = _system_settings() diff --git a/hyperglass/state/__init__.py b/hyperglass/state/__init__.py new file mode 100644 index 0000000..b33dcfe --- /dev/null +++ b/hyperglass/state/__init__.py @@ -0,0 +1,6 @@ +"""hyperglass global state management.""" + +# Local +from .redis import use_state + +__all__ = ("use_state",) diff --git a/hyperglass/state/redis.py b/hyperglass/state/redis.py new file mode 100644 index 0000000..d926a81 --- /dev/null +++ b/hyperglass/state/redis.py @@ -0,0 +1,133 @@ +"""hyperglass global state.""" + +# Standard Library +import codecs +import pickle +import typing as t +from functools import lru_cache + +# Third Party +from redis import Redis, ConnectionPool + +# Project +from hyperglass.configuration import params, devices, ui_params +from hyperglass.exceptions.private import StateError + +# Local +from ..settings import Settings + +if t.TYPE_CHECKING: + # Project + from hyperglass.models.ui import UIParameters + from hyperglass.models.system import HyperglassSystem + from hyperglass.plugins._base import HyperglassPlugin + from hyperglass.models.config.params import Params + from hyperglass.models.config.devices import Devices + +PluginT = t.TypeVar("PluginT", bound="HyperglassPlugin") + + +class HyperglassState: + """Global State Manager. + + Maintains configuration objects in Redis cache and accesses them as needed. + """ + + settings: "HyperglassSystem" + redis: Redis + _connection_pool: ConnectionPool + _namespace: str = "hyperglass.state" + + def __init__(self, *, settings: "HyperglassSystem") -> None: + """Set up Redis connection and add configuration objects.""" + + self.settings = settings + self._connection_pool = ConnectionPool.from_url(**self.settings.redis_connection_pool) + self.redis = Redis(connection_pool=self._connection_pool) + + # Add configuration objects. + self.set_object("params", params) + self.set_object("devices", devices) + self.set_object("ui_params", ui_params) + + # Ensure plugins are empty. + self.reset_plugins("output") + self.reset_plugins("input") + + def key(self, *keys: str) -> str: + """Format keys with state namespace.""" + return ".".join((*self._namespace.split("."), *keys)) + + def get_object(self, name: str, raise_if_none: bool = False) -> t.Any: + """Get an object (class instance) from the cache.""" + value = self.redis.get(name) + + if isinstance(value, bytes): + return pickle.loads(value) + elif isinstance(value, str): + return pickle.loads(value.encode()) + if raise_if_none is True: + raise StateError("'{key}' does not exist in Redis store", key=name) + return None + + def set_object(self, name: str, obj: t.Any) -> None: + """Add an object (class instance) to the cache.""" + value = pickle.dumps(obj) + self.redis.set(self.key(name), value) + + def add_plugin(self, _type: str, plugin: "HyperglassPlugin") -> None: + """Add a plugin to its list by type.""" + current = self.plugins(_type) + plugins = { + # Create a base64 representation of a picked plugin. + codecs.encode(pickle.dumps(p), "base64").decode() + # Merge current plugins with the new plugin. + for p in [*current, plugin] + } + self.set_object(self.key("plugins", _type), list(plugins)) + + def remove_plugin(self, _type: str, plugin: "HyperglassPlugin") -> None: + """Remove a plugin from its list by type.""" + current = self.plugins(_type) + plugins = { + # Create a base64 representation of a picked plugin. + codecs.encode(pickle.dumps(p), "base64").decode() + # Merge current plugins with the new plugin. + for p in current + if p != plugin + } + self.set_object(self.key("plugins", _type), list(plugins)) + + def reset_plugins(self, _type: str) -> None: + """Remove all plugins of `_type`.""" + self.set_object(self.key("plugins", _type), []) + + def clear(self) -> None: + """Delete all cache keys.""" + self.redis.flushdb(asynchronous=True) + + @property + def params(self) -> "Params": + """Get hyperglass configuration parameters (`hyperglass.yaml`).""" + return self.get_object(self.key("params"), raise_if_none=True) + + @property + def devices(self) -> "Devices": + """Get hyperglass devices (`devices.yaml`).""" + return self.get_object(self.key("devices"), raise_if_none=True) + + @property + def ui_params(self) -> "UIParameters": + """UI parameters, built from params.""" + return self.get_object(self.key("ui_params"), raise_if_none=True) + + def plugins(self, _type: str) -> t.List[PluginT]: + """Get plugins by type.""" + current = self.get_object(self.key("plugins", _type), raise_if_none=False) or [] + return list({pickle.loads(codecs.decode(plugin.encode(), "base64")) for plugin in current}) + + +@lru_cache(maxsize=None) +def use_state() -> "HyperglassState": + """Access hyperglass global state.""" + return HyperglassState(settings=Settings) diff --git a/hyperglass/util/__init__.py b/hyperglass/util/__init__.py index 1df0d5f..774b491 100644 --- a/hyperglass/util/__init__.py +++ b/hyperglass/util/__init__.py @@ -146,23 +146,6 @@ to access the following directories: return matched_path -def format_listen_address(listen_address: Union[IPv4Address, IPv6Address, str]) -> str: - """Format a listen_address. Wraps IPv6 address in brackets.""" - fmt = str(listen_address) - - if isinstance(listen_address, str): - try: - listen_address = ip_address(listen_address) - except ValueError as err: - log.error(err) - pass - - if isinstance(listen_address, (IPv4Address, IPv6Address)) and listen_address.version == 6: - fmt = f"[{str(listen_address)}]" - - return fmt - - def split_on_uppercase(s): """Split characters by uppercase letters. @@ -363,3 +346,10 @@ def deep_convert_keys(_dict: Type[DeepConvert], predicate: Callable[[str], str]) converted[predicate(key)] = get_value(value) return converted + + +def at_least(minimum: int, value: int,) -> int: + """Get a number value that is at least a specified minimum.""" + if value < minimum: + return minimum + return value diff --git a/hyperglass/util/frontend.py b/hyperglass/util/frontend.py index 9ed8b9e..b9f8d6f 100644 --- a/hyperglass/util/frontend.py +++ b/hyperglass/util/frontend.py @@ -5,20 +5,23 @@ import os import json import math import shutil +import typing as t import asyncio import subprocess -from typing import Dict, Tuple, Optional from pathlib import Path # Project from hyperglass.log import log -from hyperglass.models.ui import UIParameters # Local from .files import copyfiles, check_path +if t.TYPE_CHECKING: + # Project + from hyperglass.models.ui import UIParameters -def get_node_version() -> Tuple[int, int, int]: + +def get_node_version() -> t.Tuple[int, int, int]: """Get the system's NodeJS version.""" node_path = shutil.which("node") @@ -30,7 +33,7 @@ def get_node_version() -> Tuple[int, int, int]: return tuple((int(v) for v in version.split("."))) -def get_ui_build_timeout() -> Optional[int]: +def get_ui_build_timeout() -> t.Optional[int]: """Read the UI build timeout from environment variables or set a default.""" timeout = None @@ -60,7 +63,7 @@ async def check_node_modules() -> bool: return valid -async def read_package_json() -> Dict: +async def read_package_json() -> t.Dict[str, t.Any]: """Import package.json as a python dict.""" package_json_file = Path(__file__).parent.parent / "ui" / "package.json" @@ -114,7 +117,7 @@ async def node_initial(timeout: int = 180, dev_mode: bool = False) -> str: return "\n".join(all_messages) -async def build_ui(app_path): +async def build_ui(app_path: Path): """Execute `next build` & `next export` from UI directory. Raises: @@ -216,7 +219,7 @@ def generate_opengraph( return True -def migrate_images(app_path: Path, params: UIParameters): +def migrate_images(app_path: Path, params: "UIParameters"): """Migrate images from source code to install directory.""" images_dir = app_path / "static" / "images" favicon_dir = images_dir / "favicons" @@ -236,7 +239,7 @@ async def build_frontend( # noqa: C901 dev_mode: bool, dev_url: str, prod_url: str, - params: UIParameters, + params: "UIParameters", app_path: Path, force: bool = False, timeout: int = 180, @@ -264,8 +267,6 @@ async def build_frontend( # noqa: C901 # Project from hyperglass.constants import __version__ - log.info("Starting UI build") - # Create temporary file. json file extension is added for easy # webpack JSON parsing. env_file = Path("/tmp/hyperglass.env.json") # noqa: S108 @@ -344,6 +345,7 @@ async def build_frontend( # noqa: C901 # Initiate Next.JS export process. if any((not dev_mode, force, full)): + log.info("Starting UI build") initialize_result = await node_initial(timeout, dev_mode) build_result = await build_ui(app_path=app_path) diff --git a/poetry.lock b/poetry.lock index 66d37e7..e855802 100644 --- a/poetry.lock +++ b/poetry.lock @@ -6,6 +6,21 @@ category = "main" optional = false python-versions = "*" +[[package]] +name = "aioredis" +version = "2.0.0" +description = "asyncio (PEP 3156) Redis support" +category = "main" +optional = false +python-versions = ">=3.6" + +[package.dependencies] +async-timeout = "*" +typing-extensions = "*" + +[package.extras] +hiredis = ["hiredis (>=1.0)"] + [[package]] name = "ansicon" version = "1.89.0" @@ -41,6 +56,14 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" [package.dependencies] pyyaml = "*" +[[package]] +name = "async-timeout" +version = "3.0.1" +description = "Timeout context manager for asyncio programs" +category = "main" +optional = false +python-versions = ">=3.5.3" + [[package]] name = "asyncssh" version = "2.7.0" @@ -880,6 +903,7 @@ optional = false python-versions = ">=3.6.1" [package.dependencies] +python-dotenv = {version = ">=0.10.4", optional = true, markers = "extra == \"dotenv\""} typing-extensions = ">=3.7.4.3" [package.extras] @@ -1398,13 +1422,17 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" [metadata] lock-version = "1.1" python-versions = ">=3.8.1,<4.0" -content-hash = "b99fec86745b99f5b0c132dadf90e07f8529aa751c8fbd582c36d6b82cd79dd3" +content-hash = "34e21443d0af22b763bd715875da90ca519cde388af0e54b4d9a71180b14ca13" [metadata.files] aiofiles = [ {file = "aiofiles-0.6.0-py3-none-any.whl", hash = "sha256:bd3019af67f83b739f8e4053c6c0512a7f545b9a8d91aaeab55e6e0f9d123c27"}, {file = "aiofiles-0.6.0.tar.gz", hash = "sha256:e0281b157d3d5d59d803e3f4557dcc9a3dff28a4dd4829a9ff478adae50ca092"}, ] +aioredis = [ + {file = "aioredis-2.0.0-py3-none-any.whl", hash = "sha256:9921d68a3df5c5cdb0d5b49ad4fc88a4cfdd60c108325df4f0066e8410c55ffb"}, + {file = "aioredis-2.0.0.tar.gz", hash = "sha256:3a2de4b614e6a5f8e104238924294dc4e811aefbe17ddf52c04a93cbf06e67db"}, +] ansicon = [ {file = "ansicon-1.89.0-py2.py3-none-any.whl", hash = "sha256:f1def52d17f65c2c9682cf8370c03f541f410c1752d6a14029f97318e4b9dfec"}, {file = "ansicon-1.89.0.tar.gz", hash = "sha256:e4d039def5768a47e4afec8e89e83ec3ae5a26bf00ad851f914d1240b444d2b1"}, @@ -1420,6 +1448,10 @@ aredis = [ {file = "aspy.yaml-1.3.0-py2.py3-none-any.whl", hash = "sha256:463372c043f70160a9ec950c3f1e4c3a82db5fca01d334b6bc89c7164d744bdc"}, {file = "aspy.yaml-1.3.0.tar.gz", hash = "sha256:e7c742382eff2caed61f87a39d13f99109088e5e93f04d76eb8d4b28aa143f45"}, ] +async-timeout = [ + {file = "async-timeout-3.0.1.tar.gz", hash = "sha256:0c3c816a028d47f659d6ff5c745cb2acf1f966da1fe5c19c77a70282b25f4c5f"}, + {file = "async_timeout-3.0.1-py3-none-any.whl", hash = "sha256:4291ca197d287d274d0b6cb5d6f8f8f82d434ed288f962539ff18cc9012f9ea3"}, +] asyncssh = [ {file = "asyncssh-2.7.0-py3-none-any.whl", hash = "sha256:ccc62a1b311c71d4bf8e4bc3ac141eb00ebb28b324e375aed1d0a03232893ca1"}, {file = "asyncssh-2.7.0.tar.gz", hash = "sha256:185013d8e67747c3c0f01b72416b8bd78417da1df48c71f76da53c607ef541b6"}, diff --git a/pyproject.toml b/pyproject.toml index a9f1b44..080d9ab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,7 +47,7 @@ netmiko = "^3.4.0" paramiko = "^2.7.2" psutil = "^5.7.2" py-cpuinfo = "^7.0.0" -pydantic = "1.8.2" +pydantic = {extras = ["dotenv"], version = "^1.8.2"} python = ">=3.8.1,<4.0" redis = "^3.5.3" scrapli = {version = "2021.07.30", extras = ["asyncssh"]} @@ -55,6 +55,7 @@ typing-extensions = "^3.7.4" uvicorn = {extras = ["standard"], version = "^0.13.4"} uvloop = "^0.14.0" xmltodict = "^0.12.0" +aioredis = "^2.0.0" [tool.poetry.dev-dependencies] bandit = "^1.6.2" @@ -97,7 +98,7 @@ reportMissingTypeStubs = true check = {cmd = "task lint && task ui-lint", help = "Run all lint checks"} lint = {cmd = "flake8 hyperglass", help = "Run Flake8"} sort = {cmd = "isort hyperglass", help = "Run iSort"} -start = {cmd = "python3 -m hyperglass.console start", help = "Start hyperglass"} +start = {cmd = "python3 -m hyperglass.main", help = "Start hyperglass"} start-asgi = {cmd = "uvicorn hyperglass.api:app", help = "Start hyperglass via Uvicorn"} test = {cmd = "pytest hyperglass", help = "Run hyperglass tests"} ui-build = {cmd = "python3 -m hyperglass.console build-ui", help = "Run a UI Build"}