mirror of
https://github.com/checktheroads/hyperglass
synced 2024-05-11 05:55:08 +00:00
move models to api
This commit is contained in:
@@ -1,10 +0,0 @@
|
||||
"""Query & Response Validation Models."""
|
||||
|
||||
# Project Imports
|
||||
# flake8: noqa: F401
|
||||
|
||||
from hyperglass.models import query
|
||||
from hyperglass.models import response
|
||||
from hyperglass.models import rfc8522
|
||||
from hyperglass.models import types
|
||||
from hyperglass.models import validators
|
@@ -1,135 +0,0 @@
|
||||
"""Input query validation model."""
|
||||
|
||||
# Standard Library Imports
|
||||
import hashlib
|
||||
|
||||
# Third Party Imports
|
||||
from pydantic import BaseModel
|
||||
from pydantic import StrictStr
|
||||
from pydantic import validator
|
||||
|
||||
# Project Imports
|
||||
from hyperglass.configuration import devices
|
||||
from hyperglass.configuration import params
|
||||
from hyperglass.configuration.models.vrfs import Vrf
|
||||
from hyperglass.exceptions import InputInvalid
|
||||
from hyperglass.models.types import SupportedQuery
|
||||
from hyperglass.models.validators import validate_aspath
|
||||
from hyperglass.models.validators import validate_community
|
||||
from hyperglass.models.validators import validate_ip
|
||||
|
||||
|
||||
def get_vrf_object(vrf_name):
|
||||
"""Match VRF object from VRF name.
|
||||
|
||||
Arguments:
|
||||
vrf_name {str} -- VRF name
|
||||
|
||||
Raises:
|
||||
InputInvalid: Raised if no VRF is matched.
|
||||
|
||||
Returns:
|
||||
{object} -- Valid VRF object
|
||||
"""
|
||||
matched = None
|
||||
for vrf_obj in devices.vrf_objects:
|
||||
if vrf_name is not None:
|
||||
if vrf_name == vrf_obj.name or vrf_name == vrf_obj.display_name:
|
||||
matched = vrf_obj
|
||||
break
|
||||
elif vrf_name is None:
|
||||
if vrf_obj.name == "default":
|
||||
matched = vrf_obj
|
||||
break
|
||||
if matched is None:
|
||||
raise InputInvalid(params.messages.vrf_not_found, vrf_name=vrf_name)
|
||||
return matched
|
||||
|
||||
|
||||
class Query(BaseModel):
|
||||
"""Validation model for input query parameters."""
|
||||
|
||||
query_location: StrictStr
|
||||
query_type: SupportedQuery
|
||||
query_vrf: Vrf
|
||||
query_target: StrictStr
|
||||
|
||||
def digest(self):
|
||||
"""Create SHA256 hash digest of model representation."""
|
||||
return hashlib.sha256(repr(self).encode()).hexdigest()
|
||||
|
||||
@validator("query_location", pre=True, always=True)
|
||||
def validate_query_location(cls, value):
|
||||
"""Ensure query_location is defined.
|
||||
|
||||
Arguments:
|
||||
value {str} -- Unvalidated query_location
|
||||
|
||||
Raises:
|
||||
InputInvalid: Raised if query_location is not defined.
|
||||
|
||||
Returns:
|
||||
{str} -- Valid query_location
|
||||
"""
|
||||
if value not in devices.hostnames:
|
||||
raise InputInvalid(
|
||||
params.messages.invalid_field,
|
||||
level="warning",
|
||||
input=value,
|
||||
field=params.web.text.query_location,
|
||||
)
|
||||
return value
|
||||
|
||||
@validator("query_vrf", always=True, pre=True)
|
||||
def validate_query_vrf(cls, value, values):
|
||||
"""Ensure query_vrf is defined.
|
||||
|
||||
Arguments:
|
||||
value {str} -- Unvalidated query_vrf
|
||||
|
||||
Raises:
|
||||
InputInvalid: Raised if query_vrf is not defined.
|
||||
|
||||
Returns:
|
||||
{str} -- Valid query_vrf
|
||||
"""
|
||||
vrf_object = get_vrf_object(value)
|
||||
device = getattr(devices, values["query_location"])
|
||||
device_vrf = None
|
||||
for vrf in device.vrfs:
|
||||
if vrf == vrf_object:
|
||||
device_vrf = vrf
|
||||
break
|
||||
if device_vrf is None:
|
||||
raise InputInvalid(
|
||||
params.messages.vrf_not_associated,
|
||||
vrf_name=vrf_object.display_name,
|
||||
device_name=device.display_name,
|
||||
)
|
||||
return device_vrf
|
||||
|
||||
@validator("query_target", always=True)
|
||||
def validate_query_target(cls, value, values):
|
||||
"""Validate query target value based on query_type."""
|
||||
|
||||
query_type = values["query_type"]
|
||||
|
||||
# Use relevant function based on query_type.
|
||||
validator_map = {
|
||||
"bgp_aspath": validate_aspath,
|
||||
"bgp_community": validate_community,
|
||||
"bgp_route": validate_ip,
|
||||
"ping": validate_ip,
|
||||
"traceroute": validate_ip,
|
||||
}
|
||||
validator_args_map = {
|
||||
"bgp_aspath": (value,),
|
||||
"bgp_community": (value,),
|
||||
"bgp_route": (value, values["query_type"], values["query_vrf"]),
|
||||
"ping": (value, values["query_type"], values["query_vrf"]),
|
||||
"traceroute": (value, values["query_type"], values["query_vrf"]),
|
||||
}
|
||||
validate_func = validator_map[query_type]
|
||||
validate_args = validator_args_map[query_type]
|
||||
|
||||
return validate_func(*validate_args)
|
@@ -1,24 +0,0 @@
|
||||
"""Response model."""
|
||||
# Standard Library Imports
|
||||
from typing import List
|
||||
|
||||
# Third Party Imports
|
||||
from pydantic import BaseModel
|
||||
from pydantic import StrictStr
|
||||
from pydantic import constr
|
||||
|
||||
|
||||
class QueryError(BaseModel):
|
||||
"""Query response model."""
|
||||
|
||||
output: StrictStr
|
||||
level: constr(regex=r"(success|warning|error|danger)")
|
||||
keywords: List[StrictStr]
|
||||
|
||||
|
||||
class QueryResponse(BaseModel):
|
||||
"""Query response model."""
|
||||
|
||||
output: StrictStr
|
||||
level: constr(regex=r"(success|warning|error|danger)")
|
||||
keywords: List[StrictStr] = []
|
@@ -1,87 +0,0 @@
|
||||
"""Response model."""
|
||||
|
||||
# Standard Library Imports
|
||||
# flake8: noqa
|
||||
import math
|
||||
import secrets
|
||||
from datetime import datetime
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Union
|
||||
|
||||
# Third Party Imports
|
||||
import ujson
|
||||
from pydantic import BaseModel
|
||||
from pydantic import StrictFloat
|
||||
from pydantic import StrictInt
|
||||
from pydantic import StrictStr
|
||||
from pydantic import constr
|
||||
from pydantic import validator
|
||||
|
||||
|
||||
"""Patterns:
|
||||
GET /.well-known/looking-glass/v1/ping/2001:DB8::35?protocol=2,1
|
||||
GET /.well-known/looking-glass/v1/traceroute/192.0.2.8?routerindex=5
|
||||
GET /.well-known/looking-glass/v1/show/route/2001:DB8::/48?protocol=2,1
|
||||
GET /.well-known/looking-glass/v1/show/bgp/192.0.2.0/24
|
||||
GET /.well-known/looking-glass/v1/show/bgp/summary?protocol=2&routerindex=3
|
||||
GET /.well-known/looking-glass/v1/show/bgp/neighbors/192.0.2.226
|
||||
GET /.well-known/looking-glass/v1/routers
|
||||
GET /.well-known/looking-glass/v1/routers/1
|
||||
GET /.well-known/looking-glass/v1/cmd
|
||||
"""
|
||||
|
||||
|
||||
class _HyperglassQuery(BaseModel):
|
||||
class Config:
|
||||
json_loads = ujson.loads
|
||||
json_dumps = ujson.dumps
|
||||
validate_all = True
|
||||
validate_assignment = True
|
||||
|
||||
|
||||
class BaseQuery(_HyperglassQuery):
|
||||
protocol: StrictStr = "1,1"
|
||||
router: StrictStr
|
||||
routerindex: StrictInt
|
||||
random: StrictStr = secrets.token_urlsafe(16)
|
||||
vrf: Optional[StrictStr]
|
||||
runtime: StrictInt = 30
|
||||
query_format: constr(regex=r"(text\/plain|application\/json)") = "text/plain"
|
||||
|
||||
@validator("runtime")
|
||||
def validate_runtime(cls, value):
|
||||
if isinstance(value, float) and math.modf(value)[0] == 0:
|
||||
value = math.ceil(value)
|
||||
return value
|
||||
|
||||
class Config:
|
||||
fields = {"query_format": "format"}
|
||||
|
||||
|
||||
class BaseData(_HyperglassQuery):
|
||||
router: StrictStr
|
||||
performed_at: datetime
|
||||
runtime: Union[StrictFloat, StrictInt]
|
||||
output: List[StrictStr]
|
||||
data_format: StrictStr
|
||||
|
||||
@validator("runtime")
|
||||
def validate_runtime(cls, value):
|
||||
if isinstance(value, float) and math.modf(value)[0] == 0:
|
||||
value = math.ceil(value)
|
||||
return value
|
||||
|
||||
class Config:
|
||||
fields = {"data_format": "format"}
|
||||
extra = "allow"
|
||||
|
||||
|
||||
class QueryError(_HyperglassQuery):
|
||||
status: constr(regex=r"error")
|
||||
message: StrictStr
|
||||
|
||||
|
||||
class QueryResponse(_HyperglassQuery):
|
||||
status: constr(regex=r"success|fail")
|
||||
data: BaseData
|
@@ -1,26 +0,0 @@
|
||||
"""Custom validation types."""
|
||||
|
||||
# Project Imports
|
||||
from hyperglass.constants import SUPPORTED_QUERY_TYPES
|
||||
|
||||
|
||||
class SupportedQuery(str):
|
||||
"""Query Type Validation Model."""
|
||||
|
||||
@classmethod
|
||||
def __get_validators__(cls):
|
||||
"""Pydantic custom type method."""
|
||||
yield cls.validate
|
||||
|
||||
@classmethod
|
||||
def validate(cls, value):
|
||||
"""Ensure query type is supported by hyperglass."""
|
||||
if not isinstance(value, str):
|
||||
raise TypeError("query_type must be a string")
|
||||
if value not in SUPPORTED_QUERY_TYPES:
|
||||
raise ValueError(f"'{value}' is not a supported query type")
|
||||
return value
|
||||
|
||||
def __repr__(self):
|
||||
"""Stringify custom field representation."""
|
||||
return f"SupportedQuery({super().__repr__()})"
|
@@ -1,185 +0,0 @@
|
||||
"""Input validation functions for submitted queries."""
|
||||
|
||||
# Standard Library Imports
|
||||
import operator
|
||||
import re
|
||||
from ipaddress import ip_network
|
||||
|
||||
# Project Imports
|
||||
from hyperglass.configuration import params
|
||||
from hyperglass.exceptions import InputInvalid
|
||||
from hyperglass.exceptions import InputNotAllowed
|
||||
from hyperglass.util import log
|
||||
|
||||
|
||||
def _member_of(target, network):
|
||||
"""Check if IP address belongs to network.
|
||||
|
||||
Arguments:
|
||||
target {object} -- Target IPv4/IPv6 address
|
||||
network {object} -- ACL network
|
||||
|
||||
Returns:
|
||||
{bool} -- True if target is a member of network, False if not
|
||||
"""
|
||||
log.debug(f"Checking membership of {target} for {network}")
|
||||
|
||||
membership = False
|
||||
if (
|
||||
network.network_address <= target.network_address
|
||||
and network.broadcast_address >= target.broadcast_address # NOQA: W503
|
||||
):
|
||||
log.debug(f"{target} is a member of {network}")
|
||||
membership = True
|
||||
return membership
|
||||
|
||||
|
||||
def _prefix_range(target, ge, le):
|
||||
"""Verify if target prefix length is within ge/le threshold.
|
||||
|
||||
Arguments:
|
||||
target {IPv4Network|IPv6Network} -- Valid IPv4/IPv6 Network
|
||||
ge {int} -- Greater than
|
||||
le {int} -- Less than
|
||||
|
||||
Returns:
|
||||
{bool} -- True if target in range; False if not
|
||||
"""
|
||||
matched = False
|
||||
if target.prefixlen <= le and target.prefixlen >= ge:
|
||||
matched = True
|
||||
return matched
|
||||
|
||||
|
||||
def validate_ip(value, query_type, query_vrf): # noqa: C901
|
||||
"""Ensure input IP address is both valid and not within restricted allocations.
|
||||
|
||||
Arguments:
|
||||
value {str} -- Unvalidated IP Address
|
||||
query_type {str} -- Valid query type
|
||||
query_vrf {object} -- Matched query vrf
|
||||
Raises:
|
||||
ValueError: Raised if input IP address is not an IP address.
|
||||
ValueError: Raised if IP address is valid, but is within a restricted range.
|
||||
Returns:
|
||||
Union[IPv4Address, IPv6Address] -- Validated IP address object
|
||||
"""
|
||||
query_type_params = getattr(params.queries, query_type)
|
||||
try:
|
||||
|
||||
# Attempt to use IP object factory to create an IP address object
|
||||
valid_ip = ip_network(value)
|
||||
|
||||
except ValueError:
|
||||
raise InputInvalid(
|
||||
params.messages.invalid_input,
|
||||
target=value,
|
||||
query_type=query_type_params.display_name,
|
||||
)
|
||||
|
||||
"""
|
||||
Test the valid IP address to determine if it is:
|
||||
- Unspecified (See RFC5735, RFC2373)
|
||||
- Loopback (See RFC5735, RFC2373)
|
||||
- Otherwise IETF Reserved
|
||||
...and returns an error if so.
|
||||
"""
|
||||
if valid_ip.is_reserved or valid_ip.is_unspecified or valid_ip.is_loopback:
|
||||
raise InputInvalid(
|
||||
params.messages.invalid_input,
|
||||
target=value,
|
||||
query_type=query_type_params.display_name,
|
||||
)
|
||||
|
||||
ip_version = valid_ip.version
|
||||
if valid_ip.num_addresses == 1:
|
||||
|
||||
if query_type in ("ping", "traceroute"):
|
||||
new_ip = valid_ip.network_address
|
||||
|
||||
log.debug(
|
||||
"Converted '{o}' to '{n}' for '{q}' query",
|
||||
o=valid_ip,
|
||||
n=new_ip,
|
||||
q=query_type,
|
||||
)
|
||||
|
||||
valid_ip = new_ip
|
||||
|
||||
elif query_type in ("bgp_route",):
|
||||
max_le = max(
|
||||
ace.le
|
||||
for ace in query_vrf[ip_version].access_list
|
||||
if ace.action == "permit"
|
||||
)
|
||||
new_ip = valid_ip.supernet(new_prefix=max_le)
|
||||
|
||||
log.debug(
|
||||
"Converted '{o}' to '{n}' for '{q}' query",
|
||||
o=valid_ip,
|
||||
n=new_ip,
|
||||
q=query_type,
|
||||
)
|
||||
|
||||
valid_ip = new_ip
|
||||
|
||||
vrf_acl = operator.attrgetter(f"ipv{ip_version}.access_list")(query_vrf)
|
||||
|
||||
for ace in [a for a in vrf_acl if a.network.version == ip_version]:
|
||||
if _member_of(valid_ip, ace.network):
|
||||
if query_type == "bgp_route" and _prefix_range(valid_ip, ace.ge, ace.le):
|
||||
pass
|
||||
|
||||
if ace.action == "permit":
|
||||
log.debug(
|
||||
"{t} is allowed by access-list {a}", t=str(valid_ip), a=repr(ace)
|
||||
)
|
||||
break
|
||||
elif ace.action == "deny":
|
||||
raise InputNotAllowed(
|
||||
params.messages.acl_denied,
|
||||
target=str(valid_ip),
|
||||
denied_network=str(ace.network),
|
||||
)
|
||||
log.debug("Validation passed for {ip}", ip=value)
|
||||
return valid_ip
|
||||
|
||||
|
||||
def validate_community(value):
|
||||
"""Validate input communities against configured or default regex pattern."""
|
||||
|
||||
# RFC4360: Extended Communities (New Format)
|
||||
if re.match(params.queries.bgp_community.pattern.extended_as, value):
|
||||
pass
|
||||
|
||||
# RFC4360: Extended Communities (32 Bit Format)
|
||||
elif re.match(params.queries.bgp_community.pattern.decimal, value):
|
||||
pass
|
||||
|
||||
# RFC8092: Large Communities
|
||||
elif re.match(params.queries.bgp_community.pattern.large, value):
|
||||
pass
|
||||
|
||||
else:
|
||||
raise InputInvalid(
|
||||
params.messages.invalid_input,
|
||||
target=value,
|
||||
query_type=params.queries.bgp_community.display_name,
|
||||
)
|
||||
return value
|
||||
|
||||
|
||||
def validate_aspath(value):
|
||||
"""Validate input AS_PATH against configured or default regext pattern."""
|
||||
|
||||
mode = params.queries.bgp_aspath.pattern.mode
|
||||
pattern = getattr(params.queries.bgp_aspath.pattern, mode)
|
||||
|
||||
if not re.match(pattern, value):
|
||||
raise InputInvalid(
|
||||
params.messages.invalid_input,
|
||||
target=value,
|
||||
query_type=params.queries.bgp_aspath.display_name,
|
||||
)
|
||||
|
||||
return value
|
Reference in New Issue
Block a user