1
0
mirror of https://github.com/checktheroads/hyperglass synced 2024-05-11 05:55:08 +00:00
Files
checktheroads-hyperglass/hyperglass/models/api/query.py

193 lines
6.0 KiB
Python
Raw Normal View History

2020-01-16 02:52:00 -07:00
"""Input query validation model."""
2020-02-03 02:34:50 -07:00
# Standard Library
2020-04-15 02:12:01 -07:00
import json
2020-01-26 02:15:19 -07:00
import hashlib
2020-04-18 07:58:46 -07:00
import secrets
2021-06-23 19:11:59 -07:00
from typing import Optional
from datetime import datetime
2020-01-16 02:52:00 -07:00
2020-02-03 02:34:50 -07:00
# Third Party
from pydantic import BaseModel, StrictStr, constr, validator
2020-01-16 02:52:00 -07:00
2020-02-03 02:34:50 -07:00
# Project
from hyperglass.log import log
from hyperglass.util import snake_to_camel
2021-09-15 18:25:37 -07:00
from hyperglass.state import use_state
from hyperglass.exceptions.public import (
InputInvalid,
QueryTypeNotFound,
QueryGroupNotFound,
QueryLocationNotFound,
)
from hyperglass.exceptions.private import InputValidationError
2020-10-11 13:14:57 -07:00
# Local
from ..config.devices import Device
2021-06-23 19:11:59 -07:00
from ..commands.generic import Directive
2020-01-16 02:52:00 -07:00
(TEXT := use_state("params").web.text)
2021-06-23 19:11:59 -07:00
2020-01-16 02:52:00 -07:00
class Query(BaseModel):
"""Validation model for input query parameters."""
# Device `name` field
2020-01-16 02:52:00 -07:00
query_location: StrictStr
# Directive `id` field
query_type: StrictStr
# Directive `groups` member
query_group: Optional[StrictStr]
query_target: constr(strip_whitespace=True, min_length=1)
2020-01-26 02:15:19 -07:00
2020-02-03 02:34:50 -07:00
class Config:
"""Pydantic model configuration."""
2020-04-18 07:58:46 -07:00
extra = "allow"
alias_generator = snake_to_camel
2020-02-03 02:34:50 -07:00
fields = {
"query_location": {
2021-09-15 18:25:37 -07:00
"title": TEXT.query_location,
2020-02-03 02:34:50 -07:00
"description": "Router/Location Name",
"example": "router01",
},
"query_type": {
2021-09-15 18:25:37 -07:00
"title": TEXT.query_type,
2020-02-03 02:34:50 -07:00
"description": "Type of Query to Execute",
"example": "bgp_route",
},
2021-06-23 19:11:59 -07:00
"query_group": {
2021-09-15 18:25:37 -07:00
"title": TEXT.query_group,
2020-02-03 02:34:50 -07:00
"description": "Routing Table/VRF",
"example": "default",
},
"query_target": {
2021-09-15 18:25:37 -07:00
"title": TEXT.query_target,
2020-02-03 02:34:50 -07:00
"description": "IP Address, Community, or AS Path",
"example": "1.1.1.0/24",
},
}
2021-09-12 15:09:24 -07:00
schema_extra = {"x-code-samples": [{"lang": "Python", "source": "print('stuff')"}]}
2020-02-03 02:34:50 -07:00
2020-04-18 07:58:46 -07:00
def __init__(self, **kwargs):
"""Initialize the query with a UTC timestamp at initialization time."""
super().__init__(**kwargs)
self.timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
2021-09-15 18:25:37 -07:00
state = use_state()
2021-09-16 13:59:39 -07:00
self._state = state
try:
self.validate_query_target()
except InputValidationError as err:
raise InputInvalid(**err.kwargs)
2020-04-18 07:58:46 -07:00
def __repr__(self):
"""Represent only the query fields."""
return (
f'Query(query_location="{str(self.query_location)}", '
f'query_type="{str(self.query_type)}", query_group="{str(self.query_group)}", '
f'query_target="{str(self.query_target)}")'
2020-04-18 07:58:46 -07:00
)
2020-01-26 02:15:19 -07:00
def digest(self):
"""Create SHA256 hash digest of model representation."""
return hashlib.sha256(repr(self).encode()).hexdigest()
2020-01-16 02:52:00 -07:00
2020-04-18 07:58:46 -07:00
def random(self):
"""Create a random string to prevent client or proxy caching."""
return hashlib.sha256(
secrets.token_bytes(8) + repr(self).encode() + secrets.token_bytes(8)
).hexdigest()
def validate_query_target(self):
"""Validate a query target after all fields/relationships havebeen initialized."""
self.directive.validate_target(self.query_target)
log.debug("Validation passed for query {}", repr(self))
2020-04-13 01:05:24 -07:00
@property
def summary(self):
"""Create abbreviated representation of instance."""
items = (
f"query_location={self.query_location}",
f"query_type={self.query_type}",
2021-06-23 19:11:59 -07:00
f"query_group={self.query_group}",
2020-04-13 01:05:24 -07:00
f"query_target={str(self.query_target)}",
)
return f'Query({", ".join(items)})'
2020-05-29 17:47:53 -07:00
@property
def device(self) -> Device:
2020-05-29 17:50:30 -07:00
"""Get this query's device object by query_location."""
2021-09-16 13:59:39 -07:00
return self._state.devices[self.query_location]
2020-07-30 01:30:01 -07:00
@property
def directive(self) -> Directive:
"""Get this query's directive."""
for command in self.device.commands:
if command.id == self.query_type:
return command
raise QueryTypeNotFound(query_type=self.query_type)
2020-05-29 17:47:53 -07:00
2020-06-26 12:23:11 -07:00
def export_dict(self, pretty=False):
2020-04-15 02:12:01 -07:00
"""Create dictionary representation of instance."""
2020-06-26 12:23:11 -07:00
if pretty:
items = {
"query_location": self.device.name,
2020-07-30 01:30:01 -07:00
"query_type": self.query.display_name,
2021-06-23 19:11:59 -07:00
"query_group": self.query_group,
2020-06-26 12:23:11 -07:00
"query_target": str(self.query_target),
}
else:
items = {
"query_location": self.query_location,
"query_type": self.query_type,
2021-06-23 19:11:59 -07:00
"query_group": self.query_group,
2020-06-26 12:23:11 -07:00
"query_target": str(self.query_target),
}
return items
2020-04-15 02:12:01 -07:00
def export_json(self):
"""Create JSON representation of instance."""
return json.dumps(self.export_dict(), default=str)
2020-03-23 09:44:50 -07:00
@validator("query_type")
def validate_query_type(cls, value):
"""Ensure a requested query type exists."""
devices = use_state("devices")
2021-09-15 18:25:37 -07:00
directive_ids = [
directive.id for device in devices.objects for directive in device.commands
]
if value in directive_ids:
return value
raise QueryTypeNotFound(name=value)
2020-03-23 09:44:50 -07:00
2020-02-03 02:34:50 -07:00
@validator("query_location")
2020-01-16 02:52:00 -07:00
def validate_query_location(cls, value):
"""Ensure query_location is defined."""
devices = use_state("devices")
2021-09-12 18:27:33 -07:00
valid_id = value in devices.ids
valid_hostname = value in devices.hostnames
if not any((valid_id, valid_hostname)):
raise QueryLocationNotFound(location=value)
2021-06-23 19:11:59 -07:00
return value
2020-04-18 11:34:23 -07:00
@validator("query_group")
def validate_query_group(cls, value):
"""Ensure query_group is defined."""
devices = use_state("devices")
2021-09-15 18:25:37 -07:00
groups = {
group
for device in devices.objects
for directive in device.commands
for group in directive.groups
}
if value in groups:
return value
2020-01-26 02:15:19 -07:00
raise QueryGroupNotFound(group=value)