2020-04-16 11:12:37 -07:00
|
|
|
"""Data models used throughout hyperglass."""
|
|
|
|
|
2020-04-16 09:29:57 -07:00
|
|
|
# Standard Library
|
|
|
|
import re
|
2020-04-16 11:12:37 -07:00
|
|
|
from typing import TypeVar, Optional
|
2020-04-16 09:29:57 -07:00
|
|
|
|
|
|
|
# Third Party
|
2020-04-16 11:12:37 -07:00
|
|
|
from pydantic import HttpUrl, BaseModel, StrictInt, StrictStr, StrictFloat
|
2020-04-16 09:29:57 -07:00
|
|
|
|
|
|
|
# Project
|
2020-04-16 11:12:37 -07:00
|
|
|
from hyperglass.log import log
|
2020-04-16 09:29:57 -07:00
|
|
|
from hyperglass.util import clean_name
|
|
|
|
|
|
|
|
IntFloat = TypeVar("IntFloat", StrictInt, StrictFloat)
|
|
|
|
|
|
|
|
|
|
|
|
class HyperglassModel(BaseModel):
|
|
|
|
"""Base model for all hyperglass configuration models."""
|
|
|
|
|
|
|
|
class Config:
|
|
|
|
"""Default Pydantic configuration.
|
|
|
|
|
|
|
|
See https://pydantic-docs.helpmanual.io/usage/model_config
|
|
|
|
"""
|
|
|
|
|
|
|
|
validate_all = True
|
|
|
|
extra = "forbid"
|
|
|
|
validate_assignment = True
|
|
|
|
alias_generator = clean_name
|
|
|
|
json_encoders = {HttpUrl: lambda v: str(v)}
|
|
|
|
|
|
|
|
def export_json(self, *args, **kwargs):
|
|
|
|
"""Return instance as JSON.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
{str} -- Stringified JSON.
|
|
|
|
"""
|
|
|
|
return self.json(by_alias=True, exclude_unset=False, *args, **kwargs)
|
|
|
|
|
|
|
|
def export_dict(self, *args, **kwargs):
|
|
|
|
"""Return instance as dictionary.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
{dict} -- Python dictionary.
|
|
|
|
"""
|
|
|
|
return self.dict(by_alias=True, exclude_unset=False, *args, **kwargs)
|
|
|
|
|
|
|
|
def export_yaml(self, *args, **kwargs):
|
|
|
|
"""Return instance as YAML.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
{str} -- Stringified YAML.
|
|
|
|
"""
|
|
|
|
import json
|
|
|
|
import yaml
|
|
|
|
|
|
|
|
return yaml.safe_dump(json.loads(self.export_json()), *args, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
class HyperglassModelExtra(HyperglassModel):
|
|
|
|
"""Model for hyperglass configuration models with dynamic fields."""
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
class Config:
|
|
|
|
"""Default pydantic configuration."""
|
|
|
|
|
|
|
|
extra = "allow"
|
|
|
|
|
|
|
|
|
|
|
|
class AnyUri(str):
|
|
|
|
"""Custom field type for HTTP URI, e.g. /example."""
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def __get_validators__(cls):
|
|
|
|
"""Pydantic custim field method."""
|
|
|
|
yield cls.validate
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def validate(cls, value):
|
|
|
|
"""Ensure URI string contains a leading forward-slash."""
|
|
|
|
uri_regex = re.compile(r"^(\/.*)$")
|
|
|
|
if not isinstance(value, str):
|
|
|
|
raise TypeError("AnyUri type must be a string")
|
|
|
|
match = uri_regex.fullmatch(value)
|
|
|
|
if not match:
|
|
|
|
raise ValueError(
|
|
|
|
"Invalid format. A URI must begin with a forward slash, e.g. '/example'"
|
|
|
|
)
|
|
|
|
return cls(match.group())
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
"""Stringify custom field representation."""
|
|
|
|
return f"AnyUri({super().__repr__()})"
|
|
|
|
|
|
|
|
|
|
|
|
class StrictBytes(bytes):
|
|
|
|
"""Custom data type for a strict byte string.
|
|
|
|
|
|
|
|
Used for validating the encoded JWT request payload.
|
|
|
|
"""
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def __get_validators__(cls):
|
|
|
|
"""Yield Pydantic validator function.
|
|
|
|
|
|
|
|
See: https://pydantic-docs.helpmanual.io/usage/types/#custom-data-types
|
|
|
|
|
|
|
|
Yields:
|
|
|
|
{function} -- Validator
|
|
|
|
"""
|
|
|
|
yield cls.validate
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def validate(cls, value):
|
|
|
|
"""Validate type.
|
|
|
|
|
|
|
|
Arguments:
|
|
|
|
value {Any} -- Pre-validated input
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
TypeError: Raised if value is not bytes
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
{object} -- Instantiated class
|
|
|
|
"""
|
|
|
|
if not isinstance(value, bytes):
|
|
|
|
raise TypeError("bytes required")
|
|
|
|
return cls()
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
"""Return representation of object.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
{str} -- Representation
|
|
|
|
"""
|
|
|
|
return f"StrictBytes({super().__repr__()})"
|
2020-04-16 11:12:37 -07:00
|
|
|
|
|
|
|
|
|
|
|
class WebhookHeaders(HyperglassModel):
|
|
|
|
"""Webhook data model."""
|
|
|
|
|
|
|
|
content_length: Optional[StrictStr]
|
|
|
|
accept: Optional[StrictStr]
|
|
|
|
user_agent: Optional[StrictStr]
|
|
|
|
content_type: Optional[StrictStr]
|
|
|
|
referer: Optional[StrictStr]
|
|
|
|
accept_encoding: Optional[StrictStr]
|
|
|
|
accept_language: Optional[StrictStr]
|
|
|
|
|
|
|
|
class Config:
|
|
|
|
"""Pydantic model config."""
|
|
|
|
|
|
|
|
fields = {
|
|
|
|
"content_length": "content-length",
|
|
|
|
"user_agent": "user-agent",
|
|
|
|
"content_type": "content-type",
|
|
|
|
"accept_encoding": "accept-encoding",
|
|
|
|
"accept_language": "accept-language",
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
class WebhookNetwork(HyperglassModel):
|
|
|
|
"""Webhook data model."""
|
|
|
|
|
|
|
|
prefix: Optional[StrictStr]
|
|
|
|
asn: Optional[StrictStr]
|
|
|
|
|
|
|
|
|
|
|
|
class Webhook(HyperglassModel):
|
|
|
|
"""Webhook data model."""
|
|
|
|
|
|
|
|
query_location: StrictStr
|
|
|
|
query_type: StrictStr
|
|
|
|
query_vrf: StrictStr
|
|
|
|
query_target: StrictStr
|
|
|
|
headers: WebhookHeaders
|
|
|
|
source: StrictStr
|
|
|
|
network: WebhookNetwork
|
|
|
|
|
|
|
|
def slack(self):
|
|
|
|
"""Format the webhook data as a Slack message."""
|
|
|
|
|
|
|
|
def make_field(key, value, code=False):
|
|
|
|
if code:
|
|
|
|
value = f"`{value}`"
|
|
|
|
return f"*{key}*\n{value}"
|
|
|
|
|
|
|
|
try:
|
|
|
|
header_data = []
|
|
|
|
for k, v in self.headers.dict(by_alias=True).items():
|
|
|
|
field = make_field(k, v, code=True)
|
|
|
|
header_data.append(field)
|
|
|
|
|
|
|
|
query_details = (
|
|
|
|
("Query Location", self.query_location),
|
|
|
|
("Query Type", self.query_type),
|
|
|
|
("Query VRF", self.query_vrf),
|
|
|
|
("Query Target", self.query_target),
|
|
|
|
)
|
|
|
|
query_data = []
|
|
|
|
for k, v in query_details:
|
|
|
|
field = make_field(k, v)
|
|
|
|
query_data.append({"type": "mrkdwn", "text": field})
|
|
|
|
|
|
|
|
source_details = (
|
|
|
|
("Source IP", self.source),
|
|
|
|
("Source Prefix", self.network.prefix),
|
|
|
|
("Source ASN", self.network.asn),
|
|
|
|
)
|
|
|
|
|
|
|
|
source_data = []
|
|
|
|
for k, v in source_details:
|
|
|
|
field = make_field(k, v, code=True)
|
|
|
|
source_data.append({"type": "mrkdwn", "text": field})
|
|
|
|
|
|
|
|
payload = {
|
|
|
|
"text": "hyperglass received a valid query with the following data",
|
|
|
|
"blocks": [
|
|
|
|
{"type": "section", "fields": query_data},
|
|
|
|
{"type": "divider"},
|
|
|
|
{"type": "section", "fields": source_data},
|
|
|
|
{"type": "divider"},
|
|
|
|
{
|
|
|
|
"type": "section",
|
|
|
|
"text": {
|
|
|
|
"type": "mrkdwn",
|
|
|
|
"text": "*Headers*\n" + "\n".join(header_data),
|
|
|
|
},
|
|
|
|
},
|
|
|
|
],
|
|
|
|
}
|
|
|
|
log.debug("Created Slack webhook: {}", str(payload))
|
|
|
|
except Exception as err:
|
|
|
|
log.error("Error while creating webhook: {}", str(err))
|
|
|
|
payload = {}
|
|
|
|
return payload
|