1
0
mirror of https://github.com/checktheroads/hyperglass synced 2024-05-11 05:55:08 +00:00

restructure location of models for cleaner importing/exporting

This commit is contained in:
checktheroads
2020-10-05 12:07:34 -07:00
parent 1128d1f902
commit 69cb304b6d
31 changed files with 240 additions and 219 deletions

View File

@@ -0,0 +1,3 @@
"""All Data Models used by hyperglass."""
from .main import HyperglassModel, HyperglassModelExtra

View File

@@ -0,0 +1,11 @@
"""Query & Response Validation Models."""
from .query import Query
from .response import (
QueryError,
InfoResponse,
QueryResponse,
RoutersResponse,
CommunityResponse,
SupportedQueryResponse,
)
from .cert_import import EncodedRequest

View File

@@ -0,0 +1,16 @@
"""hyperglass-agent certificate import models."""
# Standard Library
from typing import Union
# Third Party
from pydantic import BaseModel, StrictStr
# Project
from hyperglass.models.fields import StrictBytes
class EncodedRequest(BaseModel):
"""Certificate request model."""
device: StrictStr
encoded: Union[StrictStr, StrictBytes]

View File

@@ -0,0 +1,257 @@
"""Input query validation model."""
# Standard Library
import json
import hashlib
import secrets
from datetime import datetime
# Third Party
from pydantic import BaseModel, StrictStr, constr, validator
# Project
from hyperglass.exceptions import InputInvalid
from hyperglass.configuration import params, devices
from .types import SupportedQuery
from .validators import (
validate_ip,
validate_aspath,
validate_community_input,
validate_community_select,
)
def get_vrf_object(vrf_name):
"""Match VRF object from VRF name.
Arguments:
vrf_name {str} -- VRF name
Raises:
InputInvalid: Raised if no VRF is matched.
Returns:
{object} -- Valid VRF object
"""
matched = None
for vrf_obj in devices.vrf_objects:
if vrf_name is not None:
if vrf_name == vrf_obj.name or vrf_name == vrf_obj.display_name:
matched = vrf_obj
break
elif vrf_name is None:
if vrf_obj.name == "default":
matched = vrf_obj
break
if matched is None:
raise InputInvalid(params.messages.vrf_not_found, vrf_name=vrf_name)
return matched
class Query(BaseModel):
"""Validation model for input query parameters."""
query_location: StrictStr
query_type: SupportedQuery
query_vrf: StrictStr
query_target: constr(strip_whitespace=True, min_length=1)
class Config:
"""Pydantic model configuration."""
extra = "allow"
fields = {
"query_location": {
"title": params.web.text.query_location,
"description": "Router/Location Name",
"example": "router01",
},
"query_type": {
"title": params.web.text.query_type,
"description": "Type of Query to Execute",
"example": "bgp_route",
},
"query_vrf": {
"title": params.web.text.query_vrf,
"description": "Routing Table/VRF",
"example": "default",
},
"query_target": {
"title": params.web.text.query_target,
"description": "IP Address, Community, or AS Path",
"example": "1.1.1.0/24",
},
}
schema_extra = {
"x-code-samples": [{"lang": "Python", "source": "print('stuff')"}]
}
def __init__(self, **kwargs):
"""Initialize the query with a UTC timestamp at initialization time."""
super().__init__(**kwargs)
self.timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
def __repr__(self):
"""Represent only the query fields."""
return (
f"Query(query_location={str(self.query_location)}, "
f"query_type={str(self.query_type)}, query_vrf={str(self.query_vrf)}, "
f"query_target={str(self.query_target)})"
)
def digest(self):
"""Create SHA256 hash digest of model representation."""
return hashlib.sha256(repr(self).encode()).hexdigest()
def random(self):
"""Create a random string to prevent client or proxy caching."""
return hashlib.sha256(
secrets.token_bytes(8) + repr(self).encode() + secrets.token_bytes(8)
).hexdigest()
@property
def summary(self):
"""Create abbreviated representation of instance."""
items = (
f"query_location={self.query_location}",
f"query_type={self.query_type}",
f"query_vrf={self.query_vrf.name}",
f"query_target={str(self.query_target)}",
)
return f'Query({", ".join(items)})'
@property
def device(self):
"""Get this query's device object by query_location."""
return devices[self.query_location]
@property
def query(self):
"""Get this query's configuration object."""
return params.queries[self.query_type]
def export_dict(self, pretty=False):
"""Create dictionary representation of instance."""
if pretty:
items = {
"query_location": self.device.display_name,
"query_type": self.query.display_name,
"query_vrf": self.query_vrf.display_name,
"query_target": str(self.query_target),
}
else:
items = {
"query_location": self.query_location,
"query_type": self.query_type,
"query_vrf": self.query_vrf.name,
"query_target": str(self.query_target),
}
return items
def export_json(self):
"""Create JSON representation of instance."""
return json.dumps(self.export_dict(), default=str)
@validator("query_type")
def validate_query_type(cls, value):
"""Ensure query_type is enabled.
Arguments:
value {str} -- Query Type
Raises:
InputInvalid: Raised if query_type is disabled.
Returns:
{str} -- Valid query_type
"""
query = params.queries[value]
if not query.enable:
raise InputInvalid(
params.messages.feature_not_enabled,
level="warning",
feature=query.display_name,
)
return value
@validator("query_location")
def validate_query_location(cls, value):
"""Ensure query_location is defined.
Arguments:
value {str} -- Unvalidated query_location
Raises:
InputInvalid: Raised if query_location is not defined.
Returns:
{str} -- Valid query_location
"""
if value not in devices.hostnames:
raise InputInvalid(
params.messages.invalid_field,
level="warning",
input=value,
field=params.web.text.query_location,
)
return value
@validator("query_vrf")
def validate_query_vrf(cls, value, values):
"""Ensure query_vrf is defined.
Arguments:
value {str} -- Unvalidated query_vrf
Raises:
InputInvalid: Raised if query_vrf is not defined.
Returns:
{str} -- Valid query_vrf
"""
vrf_object = get_vrf_object(value)
device = devices[values["query_location"]]
device_vrf = None
for vrf in device.vrfs:
if vrf == vrf_object:
device_vrf = vrf
break
if device_vrf is None:
raise InputInvalid(
params.messages.vrf_not_associated,
vrf_name=vrf_object.display_name,
device_name=device.display_name,
)
return device_vrf
@validator("query_target")
def validate_query_target(cls, value, values):
"""Validate query target value based on query_type."""
query_type = values["query_type"]
value = value.strip()
# Use relevant function based on query_type.
validator_map = {
"bgp_aspath": validate_aspath,
"bgp_community": validate_community_input,
"bgp_route": validate_ip,
"ping": validate_ip,
"traceroute": validate_ip,
}
validator_args_map = {
"bgp_aspath": (value,),
"bgp_community": (value,),
"bgp_route": (value, values["query_type"], values["query_vrf"]),
"ping": (value, values["query_type"], values["query_vrf"]),
"traceroute": (value, values["query_type"], values["query_vrf"]),
}
if params.queries.bgp_community.mode == "select":
validator_map["bgp_community"] = validate_community_select
validate_func = validator_map[query_type]
validate_args = validator_args_map[query_type]
return validate_func(*validate_args)

View File

@@ -0,0 +1,251 @@
"""Response model."""
# Standard Library
from typing import Dict, List, Union, Optional
# Third Party
from pydantic import BaseModel, StrictInt, StrictStr, StrictBool, constr
# Project
from hyperglass.configuration import params
ErrorName = constr(regex=r"(success|warning|error|danger)")
ResponseLevel = constr(regex=r"success")
ResponseFormat = constr(regex=r"(application\/json|text\/plain)")
class QueryError(BaseModel):
"""Query response model."""
output: StrictStr = params.messages.general
level: ErrorName = "danger"
id: Optional[StrictStr]
keywords: List[StrictStr] = []
class Config:
"""Pydantic model configuration."""
title = "Query Error"
description = (
"Response received when there is an error executing the requested query."
)
fields = {
"output": {
"title": "Output",
"description": "Error Details",
"example": "192.0.2.1/32 is not allowed.",
},
"level": {
"title": "Level",
"description": "Error Severity",
"example": "danger",
},
"keywords": {
"title": "Keywords",
"description": "Relevant keyword values contained in the `output` field, which can be used for formatting.",
"example": ["192.0.2.1/32"],
},
}
schema_extra = {
"examples": [
{
"output": "192.0.2.1/32 is not allowed.",
"level": "danger",
"keywords": ["192.0.2.1/32"],
}
]
}
class QueryResponse(BaseModel):
"""Query response model."""
output: Union[Dict, StrictStr]
level: ResponseLevel = "success"
random: StrictStr
cached: StrictBool
runtime: StrictInt
keywords: List[StrictStr] = []
timestamp: StrictStr
format: ResponseFormat = "text/plain"
class Config:
"""Pydantic model configuration."""
title = "Query Response"
description = "Looking glass response"
fields = {
"level": {"title": "Level", "description": "Severity"},
"cached": {
"title": "Cached",
"description": "`true` if the response is from a previously cached query.",
},
"random": {
"title": "Random",
"description": "Random string to prevent client or intermediate caching.",
"example": "504cbdb47eb8310ca237bf512c3e10b44b0a3d85868c4b64a20037dc1c3ef857",
},
"runtime": {
"title": "Runtime",
"description": "Time it took to run the query in seconds.",
"example": 6,
},
"timestamp": {
"title": "Timestamp",
"description": "UTC Time at which the backend application received the query.",
"example": "2020-04-18 14:45:37",
},
"format": {
"title": "Format",
"description": "Response [MIME Type](http://www.iana.org/assignments/media-types/media-types.xhtml). Supported values: `text/plain` and `application/json`.",
"example": "text/plain",
},
"keywords": {
"title": "Keywords",
"description": "Relevant keyword values contained in the `output` field, which can be used for formatting.",
"example": ["1.1.1.0/24", "best #1"],
},
"output": {
"title": "Output",
"description": "Looking Glass Response",
"example": """
BGP routing table entry for 1.1.1.0/24, version 224184946
BGP Bestpath: deterministic-med
Paths: (12 available, best #1, table default)
Advertised to update-groups:
1 40
13335, (aggregated by 13335 172.68.129.1), (received & used)
192.0.2.1 (metric 51) from 192.0.2.1 (192.0.2.1)
Origin IGP, metric 0, localpref 250, valid, internal
Community: 65000:1 65000:2
""",
},
}
schema_extra = {
"examples": [
{
"output": """
BGP routing table entry for 1.1.1.0/24, version 224184946
BGP Bestpath: deterministic-med
Paths: (12 available, best #1, table default)
Advertised to update-groups:
1 40
13335, (aggregated by 13335 172.68.129.1), (received & used)
192.0.2.1 (metric 51) from 192.0.2.1 (192.0.2.1)
Origin IGP, metric 0, localpref 250, valid, internal
Community: 65000:1 65000:2
""",
"level": "success",
"keywords": ["1.1.1.0/24", "best #1"],
}
]
}
class Vrf(BaseModel):
"""Response model for /api/devices VRFs."""
name: StrictStr
display_name: StrictStr
class Config:
"""Pydantic model configuration."""
title = "VRF"
description = "VRF attributes"
schema_extra = {
"examples": [
{"name": "default", "display_name": "Global Routing Table"},
{"name": "customer_vrf_name", "display_name": "Customer Name"},
]
}
class Network(BaseModel):
"""Response model for /api/devices networks."""
name: StrictStr
display_name: StrictStr
class Config:
"""Pydantic model configuration."""
title = "Network"
description = "Network/ASN attributes"
schema_extra = {"examples": [{"name": "primary", "display_name": "AS65000"}]}
class RoutersResponse(BaseModel):
"""Response model for /api/devices list items."""
name: StrictStr
network: Network
display_name: StrictStr
vrfs: List[Vrf]
class Config:
"""Pydantic model configuration."""
title = "Device"
description = "Per-device attributes"
schema_extra = {
"examples": [
{
"name": "router01-nyc01",
"location": "nyc01",
"display_name": "New York City, NY",
}
]
}
class CommunityResponse(BaseModel):
"""Response model for /api/communities."""
community: StrictStr
display_name: StrictStr
description: StrictStr
class SupportedQueryResponse(BaseModel):
"""Response model for /api/queries list items."""
name: StrictStr
display_name: StrictStr
enable: StrictBool
class Config:
"""Pydantic model configuration."""
title = "Query Type"
description = "If enabled is `true`, the `name` field may be used to specify the query type."
schema_extra = {
"examples": [
{"name": "bgp_route", "display_name": "BGP Route", "enable": True}
]
}
class InfoResponse(BaseModel):
"""Response model for /api/info endpoint."""
name: StrictStr
organization: StrictStr
primary_asn: StrictInt
version: StrictStr
class Config:
"""Pydantic model configuration."""
title = "System Information"
description = "General information about this looking glass."
schema_extra = {
"examples": [
{
"name": "hyperglass",
"organization": "Company Name",
"primary_asn": 65000,
"version": "hyperglass 1.0.0-beta.52",
}
]
}

View File

@@ -0,0 +1,76 @@
"""Response model."""
# Standard Library
# flake8: noqa
import math
import secrets
from typing import List, Union, Optional
from datetime import datetime
# Third Party
from pydantic import BaseModel, StrictInt, StrictStr, StrictFloat, constr, validator
"""Patterns:
GET /.well-known/looking-glass/v1/ping/2001:DB8::35?protocol=2,1
GET /.well-known/looking-glass/v1/traceroute/192.0.2.8?routerindex=5
GET /.well-known/looking-glass/v1/show/route/2001:DB8::/48?protocol=2,1
GET /.well-known/looking-glass/v1/show/bgp/192.0.2.0/24
GET /.well-known/looking-glass/v1/show/bgp/summary?protocol=2&routerindex=3
GET /.well-known/looking-glass/v1/show/bgp/neighbors/192.0.2.226
GET /.well-known/looking-glass/v1/routers
GET /.well-known/looking-glass/v1/routers/1
GET /.well-known/looking-glass/v1/cmd
"""
class _HyperglassQuery(BaseModel):
class Config:
validate_all = True
validate_assignment = True
class BaseQuery(_HyperglassQuery):
protocol: StrictStr = "1,1"
router: StrictStr
routerindex: StrictInt
random: StrictStr = secrets.token_urlsafe(16)
vrf: Optional[StrictStr]
runtime: StrictInt = 30
query_format: constr(regex=r"(text\/plain|application\/json)") = "text/plain"
@validator("runtime")
def validate_runtime(cls, value):
if isinstance(value, float) and math.modf(value)[0] == 0:
value = math.ceil(value)
return value
class Config:
fields = {"query_format": "format"}
class BaseData(_HyperglassQuery):
router: StrictStr
performed_at: datetime
runtime: Union[StrictFloat, StrictInt]
output: List[StrictStr]
data_format: StrictStr
@validator("runtime")
def validate_runtime(cls, value):
if isinstance(value, float) and math.modf(value)[0] == 0:
value = math.ceil(value)
return value
class Config:
fields = {"data_format": "format"}
extra = "allow"
class QueryError(_HyperglassQuery):
status: constr(regex=r"error")
message: StrictStr
class QueryResponse(_HyperglassQuery):
status: constr(regex=r"success|fail")
data: BaseData

View File

@@ -0,0 +1,26 @@
"""Custom validation types."""
# Project
from hyperglass.constants import SUPPORTED_QUERY_TYPES
class SupportedQuery(str):
"""Query Type Validation Model."""
@classmethod
def __get_validators__(cls):
"""Pydantic custom type method."""
yield cls.validate
@classmethod
def validate(cls, value):
"""Ensure query type is supported by hyperglass."""
if not isinstance(value, str):
raise TypeError("query_type must be a string")
if value not in SUPPORTED_QUERY_TYPES:
raise ValueError(f"'{value}' is not a supported query type")
return value
def __repr__(self):
"""Stringify custom field representation."""
return f"SupportedQuery({super().__repr__()})"

View File

@@ -0,0 +1,234 @@
"""Input validation functions for submitted queries."""
# Standard Library
import re
from ipaddress import ip_network
# Project
from hyperglass.log import log
from hyperglass.exceptions import InputInvalid, InputNotAllowed
from hyperglass.configuration import params
from hyperglass.external.bgptools import network_info_sync
def _member_of(target, network):
"""Check if IP address belongs to network.
Arguments:
target {object} -- Target IPv4/IPv6 address
network {object} -- ACL network
Returns:
{bool} -- True if target is a member of network, False if not
"""
log.debug("Checking membership of {} for {}", target, network)
membership = False
if (
network.network_address <= target.network_address
and network.broadcast_address >= target.broadcast_address # NOQA: W503
):
log.debug("{} is a member of {}", target, network)
membership = True
return membership
def _prefix_range(target, ge, le):
"""Verify if target prefix length is within ge/le threshold.
Arguments:
target {IPv4Network|IPv6Network} -- Valid IPv4/IPv6 Network
ge {int} -- Greater than
le {int} -- Less than
Returns:
{bool} -- True if target in range; False if not
"""
matched = False
if target.prefixlen <= le and target.prefixlen >= ge:
matched = True
return matched
def validate_ip(value, query_type, query_vrf): # noqa: C901
"""Ensure input IP address is both valid and not within restricted allocations.
Arguments:
value {str} -- Unvalidated IP Address
query_type {str} -- Valid query type
query_vrf {object} -- Matched query vrf
Raises:
ValueError: Raised if input IP address is not an IP address.
ValueError: Raised if IP address is valid, but is within a restricted range.
Returns:
Union[IPv4Address, IPv6Address] -- Validated IP address object
"""
query_type_params = getattr(params.queries, query_type)
try:
# Attempt to use IP object factory to create an IP address object
valid_ip = ip_network(value)
except ValueError:
raise InputInvalid(
params.messages.invalid_input,
target=value,
query_type=query_type_params.display_name,
)
# Test the valid IP address to determine if it is:
# - Unspecified (See RFC5735, RFC2373)
# - Loopback (See RFC5735, RFC2373)
# - Otherwise IETF Reserved
# ...and returns an error if so.
if valid_ip.is_reserved or valid_ip.is_unspecified or valid_ip.is_loopback:
raise InputInvalid(
params.messages.invalid_input,
target=value,
query_type=query_type_params.display_name,
)
ip_version = valid_ip.version
vrf_afi = getattr(query_vrf, f"ipv{ip_version}")
if vrf_afi is None:
raise InputInvalid(
params.messages.feature_not_enabled,
feature=f"IPv{ip_version}",
device_name=f"VRF {query_vrf.display_name}",
)
for ace in [a for a in vrf_afi.access_list if a.network.version == ip_version]:
if _member_of(valid_ip, ace.network):
if query_type == "bgp_route" and _prefix_range(valid_ip, ace.ge, ace.le):
pass
if ace.action == "permit":
log.debug(
"{t} is allowed by access-list {a}", t=str(valid_ip), a=repr(ace)
)
break
elif ace.action == "deny":
raise InputNotAllowed(
params.messages.acl_denied,
target=str(valid_ip),
denied_network=str(ace.network),
)
# Handling logic for host queries, e.g. 192.0.2.1 vs. 192.0.2.0/24
if valid_ip.num_addresses == 1:
# For a host query with ping or traceroute query types, convert
# the query_target to an IP address instead of a network.
if query_type in ("ping", "traceroute"):
new_ip = valid_ip.network_address
log.debug(
"Converted '{o}' to '{n}' for '{q}' query",
o=valid_ip,
n=new_ip,
q=query_type,
)
valid_ip = new_ip
# Get the containing prefix for a host query if:
# - Query type is bgp_route
# - force_cidr option is enabled
# - Query target is not a private address/network
elif (
query_type in ("bgp_route",)
and vrf_afi.force_cidr
and not valid_ip.is_private
):
log.debug("Getting containing prefix for {q}", q=str(valid_ip))
ip_str = str(valid_ip.network_address)
network_info = network_info_sync(ip_str)
containing_prefix = network_info.get(ip_str, {}).get("prefix")
if containing_prefix is None:
log.error(
"Unable to find containing prefix for {}. Got: {}",
str(valid_ip),
network_info,
)
raise InputInvalid("{q} does not have a containing prefix", q=ip_str)
try:
valid_ip = ip_network(containing_prefix)
log.debug("Containing prefix: {p}", p=str(valid_ip))
except ValueError as err:
log.error(
"Unable to find containing prefix for {q}. Error: {e}",
q=str(valid_ip),
e=err,
)
raise InputInvalid(
"{q} does does not have a containing prefix", q=valid_ip
)
# For a host query with bgp_route query type and force_cidr
# disabled, convert the host query to a single IP address.
elif query_type in ("bgp_route",) and not vrf_afi.force_cidr:
valid_ip = valid_ip.network_address
log.debug("Validation passed for {ip}", ip=value)
return valid_ip
def validate_community_input(value):
"""Validate input communities against configured or default regex pattern."""
# RFC4360: Extended Communities (New Format)
if re.match(params.queries.bgp_community.pattern.extended_as, value):
pass
# RFC4360: Extended Communities (32 Bit Format)
elif re.match(params.queries.bgp_community.pattern.decimal, value):
pass
# RFC8092: Large Communities
elif re.match(params.queries.bgp_community.pattern.large, value):
pass
else:
raise InputInvalid(
params.messages.invalid_input,
target=value,
query_type=params.queries.bgp_community.display_name,
)
return value
def validate_community_select(value):
"""Validate selected community against configured communities."""
communities = tuple(c.community for c in params.queries.bgp_community.communities)
if value not in communities:
raise InputInvalid(
params.messages.invalid_input,
target=value,
query_type=params.queries.bgp_community.display_name,
)
return value
def validate_aspath(value):
"""Validate input AS_PATH against configured or default regext pattern."""
mode = params.queries.bgp_aspath.pattern.mode
pattern = getattr(params.queries.bgp_aspath.pattern, mode)
if not bool(re.match(pattern, value)):
raise InputInvalid(
params.messages.invalid_input,
target=value,
query_type=params.queries.bgp_aspath.display_name,
)
return value

View File

@@ -0,0 +1,63 @@
"""Validate command configuration variables."""
from .vyos import VyosCommands
from ..main import HyperglassModelExtra
from .arista import AristaCommands
from .common import CommandGroup
from .huawei import HuaweiCommands
from .juniper import JuniperCommands
from .cisco_xr import CiscoXRCommands
from .cisco_ios import CiscoIOSCommands
from .cisco_nxos import CiscoNXOSCommands
from .mikrotik_routeros import MikrotikRouterOS
from .mikrotik_switchos import MikrotikSwitchOS
_NOS_MAP = {
"juniper": JuniperCommands,
"cisco_ios": CiscoIOSCommands,
"cisco_xr": CiscoXRCommands,
"cisco_nxos": CiscoNXOSCommands,
"arista": AristaCommands,
"huawei": HuaweiCommands,
"mikrotik_routeros": MikrotikRouterOS,
"mikrotik_switchos": MikrotikSwitchOS,
"vyos": VyosCommands,
}
class Commands(HyperglassModelExtra):
"""Base class for command definitions."""
juniper: CommandGroup = JuniperCommands()
arista: CommandGroup = AristaCommands()
cisco_ios: CommandGroup = CiscoIOSCommands()
cisco_xr: CommandGroup = CiscoXRCommands()
cisco_nxos: CommandGroup = CiscoNXOSCommands()
huawei: CommandGroup = HuaweiCommands()
mikrotik_routeros: CommandGroup = MikrotikRouterOS()
mikortik_switchos: CommandGroup = MikrotikSwitchOS()
vyos: CommandGroup = VyosCommands()
@classmethod
def import_params(cls, **input_params):
"""Import loaded YAML, initialize per-command definitions.
Dynamically set attributes for the command class.
Arguments:
input_params {dict} -- Unvalidated command definitions
Returns:
{object} -- Validated commands object
"""
obj = Commands()
for nos, cmds in input_params.items():
nos_cmd_set = _NOS_MAP.get(nos, CommandGroup)
nos_cmds = nos_cmd_set(**cmds)
setattr(obj, nos, nos_cmds)
return obj
class Config:
"""Override pydantic config."""
validate_all = False

View File

@@ -0,0 +1,55 @@
"""Arista Command Model."""
# Third Party
from pydantic import StrictStr
from .common import CommandSet, CommandGroup
class _IPv4(CommandSet):
"""Validation model for non-default dual afi commands."""
bgp_route: StrictStr = "show ip bgp {target}"
bgp_aspath: StrictStr = "show ip bgp regexp {target}"
bgp_community: StrictStr = "show ip bgp community {target}"
ping: StrictStr = "ping ip {target} source {source}"
traceroute: StrictStr = "traceroute ip {target} source {source}"
class _IPv6(CommandSet):
"""Validation model for non-default ipv4 commands."""
bgp_route: StrictStr = "show ipv6 bgp {target}"
bgp_aspath: StrictStr = "show ipv6 bgp regexp {target}"
bgp_community: StrictStr = "show ipv6 bgp community {target}"
ping: StrictStr = "ping ipv6 {target} source {source}"
traceroute: StrictStr = "traceroute ipv6 {target} source {source}"
class _VPNIPv4(CommandSet):
"""Validation model for non-default ipv6 commands."""
bgp_route: StrictStr = "show ip bgp {target} vrf {vrf}"
bgp_aspath: StrictStr = "show ip bgp regexp {target} vrf {vrf}"
bgp_community: StrictStr = "show ip bgp community {target} vrf {vrf}"
ping: StrictStr = "ping vrf {vrf} ip {target} source {source}"
traceroute: StrictStr = "traceroute vrf {vrf} ip {target} source {source}"
class _VPNIPv6(CommandSet):
"""Validation model for non-default ipv6 commands."""
bgp_route: StrictStr = "show ipv6 bgp {target} vrf {vrf}"
bgp_aspath: StrictStr = "show ipv6 bgp regexp {target} vrf {vrf}"
bgp_community: StrictStr = "show ipv6 bgp community {target} vrf {vrf}"
ping: StrictStr = "ping vrf {vrf} ipv6 {target} source {source}"
traceroute: StrictStr = "traceroute vrf {vrf} ipv6 {target} source {source}"
class AristaCommands(CommandGroup):
"""Validation model for default arista commands."""
ipv4_default: _IPv4 = _IPv4()
ipv6_default: _IPv6 = _IPv6()
ipv4_vpn: _VPNIPv4 = _VPNIPv4()
ipv6_vpn: _VPNIPv6 = _VPNIPv6()

View File

@@ -0,0 +1,61 @@
"""Cisco IOS Command Model."""
# Third Party
from pydantic import StrictStr
from .common import CommandSet, CommandGroup
class _IPv4(CommandSet):
"""Default commands for ipv4 commands."""
bgp_community: StrictStr = "show bgp ipv4 unicast community {target}"
bgp_aspath: StrictStr = 'show bgp ipv4 unicast quote-regexp "{target}"'
bgp_route: StrictStr = "show bgp ipv4 unicast {target} | exclude pathid:|Epoch"
ping: StrictStr = "ping {target} repeat 5 source {source}"
traceroute: StrictStr = "traceroute {target} timeout 1 probe 2 source {source}"
class _IPv6(CommandSet):
"""Default commands for ipv6 commands."""
bgp_community: StrictStr = "show bgp ipv6 unicast community {target}"
bgp_aspath: StrictStr = 'show bgp ipv6 unicast quote-regexp "{target}"'
bgp_route: StrictStr = "show bgp ipv6 unicast {target} | exclude pathid:|Epoch"
ping: StrictStr = "ping ipv6 {target} repeat 5 source {source}"
traceroute: StrictStr = (
"traceroute ipv6 {target} timeout 1 probe 2 source {source}"
)
class _VPNIPv4(CommandSet):
"""Default commands for dual afi commands."""
bgp_community: StrictStr = "show bgp vpnv4 unicast vrf {vrf} community {target}"
bgp_aspath: StrictStr = 'show bgp vpnv4 unicast vrf {vrf} quote-regexp "{target}"'
bgp_route: StrictStr = "show bgp vpnv4 unicast vrf {vrf} {target}"
ping: StrictStr = "ping vrf {vrf} {target} repeat 5 source {source}"
traceroute: StrictStr = (
"traceroute vrf {vrf} {target} timeout 1 probe 2 source {source}"
)
class _VPNIPv6(CommandSet):
"""Default commands for dual afi commands."""
bgp_community: StrictStr = "show bgp vpnv6 unicast vrf {vrf} community {target}"
bgp_aspath: StrictStr = 'show bgp vpnv6 unicast vrf {vrf} quote-regexp "{target}"'
bgp_route: StrictStr = "show bgp vpnv6 unicast vrf {vrf} {target}"
ping: StrictStr = "ping vrf {vrf} {target} repeat 5 source {source}"
traceroute: StrictStr = (
"traceroute vrf {vrf} {target} timeout 1 probe 2 source {source}"
)
class CiscoIOSCommands(CommandGroup):
"""Validation model for default cisco_ios commands."""
ipv4_default: _IPv4 = _IPv4()
ipv6_default: _IPv6 = _IPv6()
ipv4_vpn: _VPNIPv4 = _VPNIPv4()
ipv6_vpn: _VPNIPv6 = _VPNIPv6()

View File

@@ -0,0 +1,55 @@
"""Cisco NX-OS Command Model."""
# Third Party
from pydantic import StrictStr
from .common import CommandSet, CommandGroup
class _IPv4(CommandSet):
"""Validation model for non-default dual afi commands."""
bgp_route: StrictStr = "show bgp ipv4 unicast {target}"
bgp_aspath: StrictStr = 'show bgp ipv4 unicast regexp "{target}"'
bgp_community: StrictStr = "show bgp ipv4 unicast community {target}"
ping: StrictStr = "ping {target} source {source}"
traceroute: StrictStr = "traceroute {target} source {source}"
class _IPv6(CommandSet):
"""Validation model for non-default ipv4 commands."""
bgp_route: StrictStr = "show bgp ipv6 unicast {target}"
bgp_aspath: StrictStr = 'show bgp ipv6 unicast regexp "{target}"'
bgp_community: StrictStr = "show bgp ipv6 unicast community {target}"
ping: StrictStr = "ping6 {target} source {source}"
traceroute: StrictStr = "traceroute6 {target} source {source}"
class _VPNIPv4(CommandSet):
"""Validation model for non-default ipv6 commands."""
bgp_route: StrictStr = "show bgp ipv4 unicast {target} vrf {vrf}"
bgp_aspath: StrictStr = 'show bgp ipv4 unicast regexp "{target}" vrf {vrf}'
bgp_community: StrictStr = "show bgp ipv4 unicast community {target} vrf {vrf}"
ping: StrictStr = "ping {target} source {source} vrf {vrf}"
traceroute: StrictStr = "traceroute {target} source {source} vrf {vrf}"
class _VPNIPv6(CommandSet):
"""Validation model for non-default ipv6 commands."""
bgp_route: StrictStr = "show bgp ipv6 unicast {target} vrf {vrf}"
bgp_aspath: StrictStr = 'show bgp ipv6 unicast regexp "{target}" vrf {vrf}'
bgp_community: StrictStr = "show bgp ipv6 unicast community {target} vrf {vrf}"
ping: StrictStr = "ping6 {target} source {source} vrf {vrf}"
traceroute: StrictStr = "traceroute6 {target} source {source} vrf {vrf}"
class CiscoNXOSCommands(CommandGroup):
"""Validation model for default cisco_nxos commands."""
ipv4_default: _IPv4 = _IPv4()
ipv6_default: _IPv6 = _IPv6()
ipv4_vpn: _VPNIPv4 = _VPNIPv4()
ipv6_vpn: _VPNIPv6 = _VPNIPv6()

View File

@@ -0,0 +1,55 @@
"""Cisco IOS XR Command Model."""
# Third Party
from pydantic import StrictStr
from .common import CommandSet, CommandGroup
class _IPv4(CommandSet):
"""Validation model for non-default dual afi commands."""
bgp_route: StrictStr = "show bgp ipv4 unicast {target}"
bgp_aspath: StrictStr = "show bgp ipv4 unicast regexp {target}"
bgp_community: StrictStr = "show bgp ipv4 unicast community {target}"
ping: StrictStr = "ping ipv4 {target} count 5 source {source}"
traceroute: StrictStr = "traceroute ipv4 {target} timeout 1 probe 2 source {source}"
class _IPv6(CommandSet):
"""Validation model for non-default ipv4 commands."""
bgp_route: StrictStr = "show bgp ipv6 unicast {target}"
bgp_aspath: StrictStr = "show bgp ipv6 unicast regexp {target}"
bgp_community: StrictStr = "show bgp ipv6 unicast community {target}"
ping: StrictStr = "ping ipv6 {target} count 5 source {source}"
traceroute: StrictStr = "traceroute ipv6 {target} timeout 1 probe 2 source {source}"
class _VPNIPv4(CommandSet):
"""Validation model for non-default ipv6 commands."""
bgp_route: StrictStr = "show bgp vpnv4 unicast vrf {vrf} {target}"
bgp_aspath: StrictStr = "show bgp vpnv4 unicast vrf {vrf} regexp {target}"
bgp_community: StrictStr = "show bgp vpnv4 unicast vrf {vrf} community {target}"
ping: StrictStr = "ping vrf {vrf} {target} count 5 source {source}"
traceroute: StrictStr = "traceroute vrf {vrf} {target} timeout 1 probe 2 source {source}"
class _VPNIPv6(CommandSet):
"""Validation model for non-default ipv6 commands."""
bgp_route: StrictStr = "show bgp vpnv6 unicast vrf {vrf} {target}"
bgp_aspath: StrictStr = "show bgp vpnv6 unicast vrf {vrf} regexp {target}"
bgp_community: StrictStr = "show bgp vpnv6 unicast vrf {vrf} community {target}"
ping: StrictStr = "ping vrf {vrf} {target} count 5 source {source}"
traceroute: StrictStr = "traceroute vrf {vrf} {target} timeout 1 probe 2 source {source}"
class CiscoXRCommands(CommandGroup):
"""Validation model for default cisco_xr commands."""
ipv4_default: _IPv4 = _IPv4()
ipv6_default: _IPv6 = _IPv6()
ipv4_vpn: _VPNIPv4 = _VPNIPv4()
ipv6_vpn: _VPNIPv6 = _VPNIPv6()

View File

@@ -0,0 +1,25 @@
"""Models common to entire commands module."""
# Third Party
from pydantic import StrictStr
from ..main import HyperglassModel, HyperglassModelExtra
class CommandSet(HyperglassModel):
"""Command set, defined per-AFI."""
bgp_route: StrictStr
bgp_aspath: StrictStr
bgp_community: StrictStr
ping: StrictStr
traceroute: StrictStr
class CommandGroup(HyperglassModelExtra):
"""Validation model for all commands."""
ipv4_default: CommandSet
ipv6_default: CommandSet
ipv4_vpn: CommandSet
ipv6_vpn: CommandSet

View File

@@ -0,0 +1,55 @@
"""Huawei Command Model."""
# Third Party
from pydantic import StrictStr
from .common import CommandSet, CommandGroup
class _IPv4(CommandSet):
"""Default commands for ipv4 commands."""
bgp_community: StrictStr = "display bgp routing-table regular-expression {target}"
bgp_aspath: StrictStr = "display bgp routing-table regular-expression {target}"
bgp_route: StrictStr = "display bgp routing-table {target}"
ping: StrictStr = "ping -c 5 -a {source} {target}"
traceroute: StrictStr = "tracert -q 2 -f 1 -a {source} {target}"
class _IPv6(CommandSet):
"""Default commands for ipv6 commands."""
bgp_community: StrictStr = "display bgp ipv6 routing-table community {target}"
bgp_aspath: StrictStr = "display bgp ipv6 routing-table regular-expression {target}"
bgp_route: StrictStr = "display bgp ipv6 routing-table {target}"
ping: StrictStr = "ping ipv6 -c 5 -a {source} {target}"
traceroute: StrictStr = "tracert ipv6 -q 2 -f 1 -a {source} {target}"
class _VPNIPv4(CommandSet):
"""Default commands for dual afi commands."""
bgp_community: StrictStr = "display bgp vpnv4 vpn-instance {vrf} routing-table regular-expression {target}"
bgp_aspath: StrictStr = "display bgp vpnv4 vpn-instance {vrf} routing-table regular-expression {target}"
bgp_route: StrictStr = "display bgp vpnv4 vpn-instance {vrf} routing-table {target}"
ping: StrictStr = "ping -vpn-instance {vrf} -c 5 -a {source} {target}"
traceroute: StrictStr = "tracert -q 2 -f 1 -vpn-instance {vrf} -a {source} {target}"
class _VPNIPv6(CommandSet):
"""Default commands for dual afi commands."""
bgp_community: StrictStr = "display bgp vpnv6 vpn-instance {vrf} routing-table regular-expression {target}"
bgp_aspath: StrictStr = "display bgp vpnv6 vpn-instance {vrf} routing-table regular-expression {target}"
bgp_route: StrictStr = "display bgp vpnv6 vpn-instance {vrf} routing-table {target}"
ping: StrictStr = "ping vpnv6 vpn-instance {vrf} -c 5 -a {source} {target}"
traceroute: StrictStr = "tracert -q 2 -f 1 vpn-instance {vrf} -a {source} {target}"
class HuaweiCommands(CommandGroup):
"""Validation model for default huawei commands."""
ipv4_default: _IPv4 = _IPv4()
ipv6_default: _IPv6 = _IPv6()
ipv4_vpn: _VPNIPv4 = _VPNIPv4()
ipv6_vpn: _VPNIPv6 = _VPNIPv6()

View File

@@ -0,0 +1,92 @@
"""Juniper Command Model."""
# Third Party
from pydantic import StrictStr
from .common import CommandSet, CommandGroup
class _IPv4(CommandSet):
"""Validation model for non-default dual afi commands."""
bgp_route: StrictStr = 'show route protocol bgp table inet.0 {target} detail | except Label | except Label | except "Next hop type" | except Task | except Address | except "Session Id" | except State | except "Next-hop reference" | except destinations | except "Announcement bits"'
bgp_aspath: StrictStr = 'show route protocol bgp table inet.0 aspath-regex "{target}"'
bgp_community: StrictStr = "show route protocol bgp table inet.0 community {target}"
ping: StrictStr = "ping inet {target} count 5 source {source}"
traceroute: StrictStr = "traceroute inet {target} wait 1 source {source}"
class _IPv6(CommandSet):
"""Validation model for non-default ipv4 commands."""
bgp_route: StrictStr = 'show route protocol bgp table inet6.0 {target} detail | except Label | except Label | except "Next hop type" | except Task | except Address | except "Session Id" | except State | except "Next-hop reference" | except destinations | except "Announcement bits"'
bgp_aspath: StrictStr = 'show route protocol bgp table inet6.0 aspath-regex "{target}"'
bgp_community: StrictStr = "show route protocol bgp table inet6.0 community {target}"
ping: StrictStr = "ping inet6 {target} count 5 source {source}"
traceroute: StrictStr = "traceroute inet6 {target} wait 2 source {source}"
class _VPNIPv4(CommandSet):
"""Validation model for non-default ipv6 commands."""
bgp_route: StrictStr = 'show route protocol bgp table {vrf}.inet.0 {target} detail | except Label | except Label | except "Next hop type" | except Task | except Address | except "Session Id" | except State | except "Next-hop reference" | except destinations | except "Announcement bits"'
bgp_aspath: StrictStr = 'show route protocol bgp table {vrf}.inet.0 aspath-regex "{target}"'
bgp_community: StrictStr = "show route protocol bgp table {vrf}.inet.0 community {target}"
ping: StrictStr = "ping inet routing-instance {vrf} {target} count 5 source {source}"
traceroute: StrictStr = "traceroute inet routing-instance {vrf} {target} wait 1 source {source}"
class _VPNIPv6(CommandSet):
"""Validation model for non-default ipv6 commands."""
bgp_route: StrictStr = 'show route protocol bgp table {vrf}.inet6.0 {target} detail | except Label | except Label | except "Next hop type" | except Task | except Address | except "Session Id" | except State | except "Next-hop reference" | except destinations | except "Announcement bits"'
bgp_aspath: StrictStr = 'show route protocol bgp table {vrf}.inet6.0 aspath-regex "{target}"'
bgp_community: StrictStr = "show route protocol bgp table {vrf}.inet6.0 community {target}"
ping: StrictStr = "ping inet6 routing-instance {vrf} {target} count 5 source {source}"
traceroute: StrictStr = "traceroute inet6 routing-instance {vrf} {target} wait 2 source {source}"
_structured = CommandGroup(
ipv4_default=CommandSet(
bgp_route="show route protocol bgp table inet.0 {target} detail | display xml",
bgp_aspath='show route protocol bgp table inet.0 aspath-regex "{target}" detail | display xml',
bgp_community="show route protocol bgp table inet.0 community {target} detail | display xml",
ping="ping inet {target} count 5 source {source}",
traceroute="traceroute inet {target} wait 1 source {source}",
),
ipv6_default=CommandSet(
bgp_route="show route protocol bgp table inet6.0 {target} detail | display xml",
bgp_aspath='show route protocol bgp table inet6.0 aspath-regex "{target}" detail | display xml',
bgp_community="show route protocol bgp table inet6.0 community {target} detail | display xml",
ping="ping inet6 {target} count 5 source {source}",
traceroute="traceroute inet6 {target} wait 2 source {source}",
),
ipv4_vpn=CommandSet(
bgp_route="show route protocol bgp table {vrf}.inet.0 {target} detail | display xml",
bgp_aspath='show route protocol bgp table {vrf}.inet.0 aspath-regex "{target}" detail | display xml',
bgp_community="show route protocol bgp table {vrf}.inet.0 community {target} detail | display xml",
ping="ping inet routing-instance {vrf} {target} count 5 source {source}",
traceroute="traceroute inet routing-instance {vrf} {target} wait 1 source {source}",
),
ipv6_vpn=CommandSet(
bgp_route="show route protocol bgp table {vrf}.inet6.0 {target} detail | display xml",
bgp_aspath='show route protocol bgp table {vrf}.inet6.0 aspath-regex "{target}" detail | display xml',
bgp_community="show route protocol bgp table {vrf}.inet6.0 community {target} detail | display xml",
ping="ping inet6 routing-instance {vrf} {target} count 5 source {source}",
traceroute="traceroute inet6 routing-instance {vrf} {target} wait 1 source {source}",
),
)
class JuniperCommands(CommandGroup):
"""Validation model for default juniper commands."""
ipv4_default: _IPv4 = _IPv4()
ipv6_default: _IPv6 = _IPv6()
ipv4_vpn: _VPNIPv4 = _VPNIPv4()
ipv6_vpn: _VPNIPv6 = _VPNIPv6()
def __init__(self, **kwargs):
"""Initialize command group, ensure structured fields are not overridden."""
super().__init__(**kwargs)
self.structured = _structured

View File

@@ -0,0 +1,55 @@
"""VyOS Command Model."""
# Third Party
from pydantic import StrictStr
from .common import CommandSet, CommandGroup
class _IPv4(CommandSet):
"""Validation model for non-default dual afi commands."""
bgp_route: StrictStr = "show ip bgp {target}"
bgp_aspath: StrictStr = 'show ip bgp regexp "{target}"'
bgp_community: StrictStr = "show ip bgp community {target}"
ping: StrictStr = "ping {target} count 5 interface {source}"
traceroute: StrictStr = "mtr -4 -G 1 -c 1 -w -o SAL -a {source} {target}"
class _IPv6(CommandSet):
"""Validation model for non-default ipv4 commands."""
bgp_route: StrictStr = "show ipv6 bgp {target}"
bgp_aspath: StrictStr = 'show ipv6 bgp regexp "{target}"'
bgp_community: StrictStr = "show ipv6 bgp community {target}"
ping: StrictStr = "ping {target} count 5 interface {source}"
traceroute: StrictStr = "mtr -6 -G 1 -c 1 -w -o SAL -a {source} {target}"
class _VPNIPv4(CommandSet):
"""Validation model for non-default ipv6 commands."""
bgp_route: StrictStr = "show ip bgp {target}"
bgp_aspath: StrictStr = 'show ip bgp regexp "{target}"'
bgp_community: StrictStr = "show ip bgp community {target}"
ping: StrictStr = "ping {target} count 5 vrf {vrf} interface {source}"
traceroute: StrictStr = "ip vrf exec {vrf} mtr -4 -G 1 -c 1 -w -o SAL -a {source} {target}"
class _VPNIPv6(CommandSet):
"""Validation model for non-default ipv6 commands."""
bgp_route: StrictStr = "show ipv6 bgp {target}"
bgp_aspath: StrictStr = 'show ipv6 bgp regexp "{target}"'
bgp_community: StrictStr = "show ipv6 bgp community {target}"
ping: StrictStr = "ping {target} count 5 interface {source}"
traceroute: StrictStr = "ip vrf exec {vrf} mtr -6 -G 1 -c 1 -w -o SAL -a {source} {target}"
class VyosCommands(CommandGroup):
"""Validation model for default juniper commands."""
ipv4_default: _IPv4 = _IPv4()
ipv6_default: _IPv6 = _IPv6()
ipv4_vpn: _VPNIPv4 = _VPNIPv4()
ipv6_vpn: _VPNIPv6 = _VPNIPv6()

View File

@@ -0,0 +1,79 @@
"""Custom Pydantic Fields/Types."""
# Standard Library
import re
from typing import TypeVar
# Third Party
from pydantic import StrictInt, StrictFloat
IntFloat = TypeVar("IntFloat", StrictInt, StrictFloat)
class StrictBytes(bytes):
"""Custom data type for a strict byte string.
Used for validating the encoded JWT request payload.
"""
@classmethod
def __get_validators__(cls):
"""Yield Pydantic validator function.
See: https://pydantic-docs.helpmanual.io/usage/types/#custom-data-types
Yields:
{function} -- Validator
"""
yield cls.validate
@classmethod
def validate(cls, value):
"""Validate type.
Arguments:
value {Any} -- Pre-validated input
Raises:
TypeError: Raised if value is not bytes
Returns:
{object} -- Instantiated class
"""
if not isinstance(value, bytes):
raise TypeError("bytes required")
return cls()
def __repr__(self):
"""Return representation of object.
Returns:
{str} -- Representation
"""
return f"StrictBytes({super().__repr__()})"
class AnyUri(str):
"""Custom field type for HTTP URI, e.g. /example."""
@classmethod
def __get_validators__(cls):
"""Pydantic custim field method."""
yield cls.validate
@classmethod
def validate(cls, value):
"""Ensure URI string contains a leading forward-slash."""
uri_regex = re.compile(r"^(\/.*)$")
if not isinstance(value, str):
raise TypeError("AnyUri type must be a string")
match = uri_regex.fullmatch(value)
if not match:
raise ValueError(
"Invalid format. A URI must begin with a forward slash, e.g. '/example'"
)
return cls(match.group())
def __repr__(self):
"""Stringify custom field representation."""
return f"AnyUri({super().__repr__()})"

99
hyperglass/models/main.py Normal file
View File

@@ -0,0 +1,99 @@
"""Data models used throughout hyperglass."""
# Standard Library
import re
# Third Party
from pydantic import HttpUrl, BaseModel
_WEBHOOK_TITLE = "hyperglass received a valid query with the following data"
_ICON_URL = "https://res.cloudinary.com/hyperglass/image/upload/v1593192484/icon.png"
def clean_name(_name: str) -> str:
"""Remove unsupported characters from field names.
Converts any "desirable" seperators to underscore, then removes all
characters that are unsupported in Python class variable names.
Also removes leading numbers underscores.
"""
_replaced = re.sub(r"[\-|\.|\@|\~|\:\/|\s]", "_", _name)
_scrubbed = "".join(re.findall(r"([a-zA-Z]\w+|\_+)", _replaced))
return _scrubbed.lower()
class HyperglassModel(BaseModel):
"""Base model for all hyperglass configuration models."""
class Config:
"""Default Pydantic configuration.
See https://pydantic-docs.helpmanual.io/usage/model_config
"""
validate_all = True
extra = "forbid"
validate_assignment = True
alias_generator = clean_name
json_encoders = {HttpUrl: lambda v: str(v)}
def export_json(self, *args, **kwargs):
"""Return instance as JSON.
Returns:
{str} -- Stringified JSON.
"""
export_kwargs = {
"by_alias": True,
"exclude_unset": False,
**kwargs,
}
return self.json(*args, **export_kwargs)
def export_dict(self, *args, **kwargs):
"""Return instance as dictionary.
Returns:
{dict} -- Python dictionary.
"""
export_kwargs = {
"by_alias": True,
"exclude_unset": False,
**kwargs,
}
return self.dict(*args, **export_kwargs)
def export_yaml(self, *args, **kwargs):
"""Return instance as YAML.
Returns:
{str} -- Stringified YAML.
"""
# Standard Library
import json
# Third Party
import yaml
export_kwargs = {
"by_alias": kwargs.pop("by_alias", True),
"exclude_unset": kwargs.pop("by_alias", False),
}
return yaml.safe_dump(
json.loads(self.export_json(**export_kwargs)), *args, **kwargs
)
class HyperglassModelExtra(HyperglassModel):
"""Model for hyperglass configuration models with dynamic fields."""
pass
class Config:
"""Default pydantic configuration."""
extra = "allow"

View File

@@ -0,0 +1,191 @@
"""Data models used throughout hyperglass."""
# Standard Library
from typing import Optional
from datetime import datetime
# Third Party
from pydantic import StrictStr, root_validator
# Project
from hyperglass.log import log
from .main import HyperglassModel, HyperglassModelExtra
_WEBHOOK_TITLE = "hyperglass received a valid query with the following data"
_ICON_URL = "https://res.cloudinary.com/hyperglass/image/upload/v1593192484/icon.png"
class WebhookHeaders(HyperglassModel):
"""Webhook data model."""
user_agent: Optional[StrictStr]
referer: Optional[StrictStr]
accept_encoding: Optional[StrictStr]
accept_language: Optional[StrictStr]
x_real_ip: Optional[StrictStr]
x_forwarded_for: Optional[StrictStr]
class Config:
"""Pydantic model config."""
fields = {
"user_agent": "user-agent",
"accept_encoding": "accept-encoding",
"accept_language": "accept-language",
"x_real_ip": "x-real-ip",
"x_forwarded_for": "x-forwarded-for",
}
class WebhookNetwork(HyperglassModelExtra):
"""Webhook data model."""
prefix: StrictStr = "Unknown"
asn: StrictStr = "Unknown"
org: StrictStr = "Unknown"
country: StrictStr = "Unknown"
class Webhook(HyperglassModel):
"""Webhook data model."""
query_location: StrictStr
query_type: StrictStr
query_vrf: StrictStr
query_target: StrictStr
headers: WebhookHeaders
source: StrictStr = "Unknown"
network: WebhookNetwork
timestamp: datetime
@root_validator(pre=True)
def validate_webhook(cls, values):
"""Reset network attributes if the source is localhost."""
if values.get("source") in ("127.0.0.1", "::1"):
values["network"] = {}
return values
def msteams(self):
"""Format the webhook data as a Microsoft Teams card."""
def code(value):
"""Wrap argument in backticks for markdown inline code formatting."""
return f"`{str(value)}`"
header_data = [
{"name": k, "value": code(v)}
for k, v in self.headers.dict(by_alias=True).items()
]
time_fmt = self.timestamp.strftime("%Y %m %d %H:%M:%S")
payload = {
"@type": "MessageCard",
"@context": "http://schema.org/extensions",
"themeColor": "118ab2",
"summary": _WEBHOOK_TITLE,
"sections": [
{
"activityTitle": _WEBHOOK_TITLE,
"activitySubtitle": f"{time_fmt} UTC",
"activityImage": _ICON_URL,
"facts": [
{"name": "Query Location", "value": self.query_location},
{"name": "Query Target", "value": code(self.query_target)},
{"name": "Query Type", "value": self.query_type},
{"name": "Query VRF", "value": self.query_vrf},
],
},
{"markdown": True, "text": "**Source Information**"},
{"markdown": True, "text": "---"},
{
"markdown": True,
"facts": [
{"name": "IP", "value": code(self.source)},
{"name": "Prefix", "value": code(self.network.prefix)},
{"name": "ASN", "value": code(self.network.asn)},
{"name": "Country", "value": self.network.country},
{"name": "Organization", "value": self.network.org},
],
},
{"markdown": True, "text": "**Request Headers**"},
{"markdown": True, "text": "---"},
{"markdown": True, "facts": header_data},
],
}
log.debug("Created MS Teams webhook: {}", str(payload))
return payload
def slack(self):
"""Format the webhook data as a Slack message."""
def make_field(key, value, code=False):
if code:
value = f"`{value}`"
return f"*{key}*\n{value}"
header_data = []
for k, v in self.headers.dict(by_alias=True).items():
field = make_field(k, v, code=True)
header_data.append(field)
query_data = [
{
"type": "mrkdwn",
"text": make_field("Query Location", self.query_location),
},
{
"type": "mrkdwn",
"text": make_field("Query Target", self.query_target, code=True),
},
{"type": "mrkdwn", "text": make_field("Query Type", self.query_type)},
{"type": "mrkdwn", "text": make_field("Query VRF", self.query_vrf)},
]
source_data = [
{
"type": "mrkdwn",
"text": make_field("Source IP", self.source, code=True),
},
{
"type": "mrkdwn",
"text": make_field("Source Prefix", self.network.prefix, code=True),
},
{
"type": "mrkdwn",
"text": make_field("Source ASN", self.network.asn, code=True),
},
{
"type": "mrkdwn",
"text": make_field("Source Country", self.network.country),
},
{
"type": "mrkdwn",
"text": make_field("Source Organization", self.network.org),
},
]
time_fmt = self.timestamp.strftime("%Y %m %d %H:%M:%S")
payload = {
"text": _WEBHOOK_TITLE,
"blocks": [
{
"type": "section",
"text": {"type": "mrkdwn", "text": f"*{time_fmt} UTC*"},
},
{"type": "section", "fields": query_data},
{"type": "divider"},
{"type": "section", "fields": source_data},
{"type": "divider"},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Headers*\n" + "\n".join(header_data),
},
},
],
}
log.debug("Created Slack webhook: {}", str(payload))
return payload