1
0
mirror of https://github.com/checktheroads/hyperglass synced 2024-05-11 05:55:08 +00:00

add customized redis caching handler

This commit is contained in:
checktheroads
2020-04-13 01:05:24 -07:00
parent 97484e16c2
commit eebe9b2f11
4 changed files with 140 additions and 13 deletions

View File

@@ -86,6 +86,17 @@ class Query(BaseModel):
"""Create SHA256 hash digest of model representation."""
return hashlib.sha256(repr(self).encode()).hexdigest()
@property
def summary(self):
"""Create abbreviated representation of instance."""
items = (
f"query_location={self.query_location}",
f"query_type={self.query_type}",
f"query_vrf={self.query_vrf.name}",
f"query_target={str(self.query_target)}",
)
return f'Query({", ".join(items)})'
@validator("query_type")
def validate_query_type(cls, value):
"""Ensure query_type is enabled.

View File

@@ -5,13 +5,13 @@ import os
import time
# Third Party
import aredis
from fastapi import HTTPException
from starlette.requests import Request
from fastapi.openapi.docs import get_redoc_html, get_swagger_ui_html
# Project
from hyperglass.util import log, clean_name, import_public_key
from hyperglass.cache import Cache
from hyperglass.encode import jwt_decode
from hyperglass.exceptions import HyperglassError
from hyperglass.configuration import REDIS_CONFIG, params, devices
@@ -19,14 +19,16 @@ from hyperglass.api.models.query import Query
from hyperglass.execution.execute import Execute
from hyperglass.api.models.cert_import import EncodedRequest
Cache = aredis.StrictRedis(db=params.cache.database, **REDIS_CONFIG)
APP_PATH = os.environ["hyperglass_directory"]
async def query(query_data: Query, request: Request):
"""Ingest request data pass it to the backend application to perform the query."""
# Initialize cache
cache = Cache(db=params.cache.database, **REDIS_CONFIG)
log.debug("Initialized cache {}", repr(cache))
# Use hashed query_data string as key for for k/v cache store so
# each command output value is unique.
cache_key = query_data.digest()
@@ -35,10 +37,13 @@ async def query(query_data: Query, request: Request):
cache_timeout = params.cache.timeout
log.debug(f"Cache Timeout: {cache_timeout}")
log.info(f"Starting query execution for query {query_data.summary}")
# Check if cached entry exists
if not await Cache.get(cache_key):
log.debug(f"Created new cache key {cache_key} entry for query {query_data}")
log.debug("Beginning query execution...")
if not await cache.get(cache_key):
log.debug(f"No existing cache entry for query {cache_key}")
log.debug(
f"Created new cache key {cache_key} entry for query {query_data.summary}"
)
# Pass request to execution module
starttime = time.time()
@@ -51,16 +56,16 @@ async def query(query_data: Query, request: Request):
raise HyperglassError(message=params.messages.general, alert="danger")
# Create a cache entry
await Cache.set(cache_key, str(cache_value))
await Cache.expire(cache_key, cache_timeout)
await cache.set(cache_key, str(cache_value))
await cache.expire(cache_key, seconds=cache_timeout)
log.debug(f"Added cache entry for query: {cache_key}")
# If it does, return the cached entry
cache_response = await Cache.get(cache_key)
cache_response = await cache.get(cache_key)
log.debug(f"Cache match for: {cache_key}, returning cached entry")
log.debug(f"Cache Output: {cache_response}")
log.debug(f"Cache match for {cache_key}:\n {cache_response}")
log.success(f"Completed query execution for {query_data.summary}")
return {"output": cache_response, "level": "success", "keywords": []}

112
hyperglass/cache.py Normal file
View File

@@ -0,0 +1,112 @@
"""Redis cache handler."""
# Standard Library
import time
import asyncio
# Third Party
from aredis import StrictRedis
class Cache:
"""Redis cache handler."""
def __init__(
self, db, host="localhost", port=6379, decode_responses=True, **kwargs
):
"""Initialize Redis connection."""
self.db: int = db
self.host: str = host
self.port: int = port
self.instance: StrictRedis = StrictRedis(
db=self.db,
host=self.host,
port=self.port,
decode_responses=decode_responses,
**kwargs,
)
def __repr__(self):
"""Represent class state."""
return f"ConfigCache(db={self.db}, host={self.host}, port={self.port})"
def __getitem__(self, item):
"""Enable subscriptable syntax."""
return self.get(item)
@staticmethod
async def _parse_types(value):
"""Parse a string to standard python types."""
import re
async def _parse_string(str_value):
is_float = (re.compile(r"^(\d+\.\d+)$"), float)
is_int = (re.compile(r"^(\d+)$"), int)
is_bool = (re.compile(r"^(True|true|False|false)$"), bool)
is_none = (re.compile(r"^(None|none|null|nil|\(nil\))$"), lambda v: None)
for pattern, factory in (is_float, is_int, is_bool, is_none):
if isinstance(str_value, str) and bool(re.match(pattern, str_value)):
str_value = factory(str_value)
break
return str_value
if isinstance(value, str):
value = await _parse_string(value)
elif isinstance(value, bytes):
value = await _parse_string(value.decode("utf-8"))
elif isinstance(value, list):
value = [await _parse_string(i) for i in value]
elif isinstance(value, tuple):
value = tuple(await _parse_string(i) for i in value)
return value
async def get(self, *args):
"""Get item(s) from cache."""
if len(args) == 1:
raw = await self.instance.get(args[0])
else:
raw = await self.instance.mget(args)
return await self._parse_types(raw)
async def set(self, key, value):
"""Set cache values."""
return await self.instance.set(key, value)
async def wait(self, pubsub, timeout=30, **kwargs):
"""Wait for pub/sub messages & return posted message."""
now = time.time()
timeout = now + timeout
while now < timeout:
message = await pubsub.get_message(ignore_subscribe_messages=True, **kwargs)
if message is not None and message["type"] == "message":
data = message["data"]
return await self._parse_types(data)
await asyncio.sleep(0.01)
now = time.time()
return None
async def pubsub(self):
"""Provide an aredis.pubsub.Pubsub instance."""
return self.instance.pubsub()
async def pub(self, key, value):
"""Publish a value."""
await asyncio.sleep(1)
await self.instance.publish(key, value)
async def clear(self):
"""Clear the cache."""
await self.instance.flushdb()
async def delete(self, *keys):
"""Delete a cache key."""
await self.instance.delete(*keys)
async def expire(self, *keys, seconds):
"""Set timeout of key in seconds."""
for key in keys:
await self.instance.expire(key, seconds)

View File

@@ -18,7 +18,7 @@ class Cache(HyperglassModel):
)
port: StrictInt = Field(6379, title="Port", description="Redis server TCP port.")
database: StrictInt = Field(
0, title="Database ID", description="Redis server database ID."
1, title="Database ID", description="Redis server database ID."
)
timeout: StrictInt = Field(
120,
@@ -36,4 +36,3 @@ class Cache(HyperglassModel):
title = "Cache"
description = "Redis server & cache timeout configuration."
schema_extra = {"level": 2}