mirror of
https://github.com/checktheroads/hyperglass
synced 2024-05-11 05:55:08 +00:00
397 lines
14 KiB
Python
397 lines
14 KiB
Python
"""Validate query data.
|
|
|
|
Accepts raw input data from execute.py, passes it through specific
|
|
filters based on query type, returns validity boolean and specific
|
|
error message.
|
|
"""
|
|
# Standard Library Imports
|
|
import ipaddress
|
|
import re
|
|
|
|
# Project Imports
|
|
from hyperglass.configuration import params
|
|
from hyperglass.exceptions import HyperglassError
|
|
from hyperglass.exceptions import InputInvalid
|
|
from hyperglass.exceptions import InputNotAllowed
|
|
from hyperglass.util import log
|
|
|
|
|
|
class IPType:
|
|
"""Build IPv4 & IPv6 attributes for input target.
|
|
|
|
Passes input through IPv4/IPv6 regex patterns to determine if input
|
|
is formatted as a host (e.g. 192.0.2.1), or as CIDR
|
|
(e.g. 192.0.2.0/24). is_host() and is_cidr() return a boolean.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialize attribute builder."""
|
|
self.ipv4_host = (
|
|
r"^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4]"
|
|
r"[0-9]|[01]?[0-9][0-9]?)?$"
|
|
)
|
|
self.ipv4_cidr = (
|
|
r"^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4]"
|
|
r"[0-9]|[01]?[0-9][0-9]?)\/(3[0-2]|2[0-9]|1[0-9]|[0-9])?$"
|
|
)
|
|
self.ipv6_host = (
|
|
r"^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:)"
|
|
r"{1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}"
|
|
r"(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}"
|
|
r"|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA\-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:)"
|
|
r"{1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})"
|
|
r"|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]"
|
|
r"{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]"
|
|
r")\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:)"
|
|
r"{1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|"
|
|
r"1{0,1}[0-9]){0,1}[0-9]))?$"
|
|
)
|
|
self.ipv6_cidr = (
|
|
r"^(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|"
|
|
r"([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:"
|
|
r"[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|"
|
|
r"([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}"
|
|
r"(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:("
|
|
r"(:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}"
|
|
r"|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.)"
|
|
r"{3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:(("
|
|
r"25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}"
|
|
r"[0-9]){0,1}[0-9]))\/((1(1[0-9]|2[0-8]))|([0-9][0-9])|([0-9]))?$"
|
|
)
|
|
|
|
def is_host(self, target):
|
|
"""Test target to see if it is formatted as a host address.
|
|
|
|
Arguments:
|
|
target {str} -- Target IPv4/IPv6 address
|
|
|
|
Returns:
|
|
{bool} -- True if host, False if not
|
|
"""
|
|
ip_version = ipaddress.ip_network(target).version
|
|
state = False
|
|
if ip_version == 4 and re.match(self.ipv4_host, target):
|
|
log.debug(f"{target} is an IPv{ip_version} host.")
|
|
state = True
|
|
if ip_version == 6 and re.match(self.ipv6_host, target):
|
|
log.debug(f"{target} is an IPv{ip_version} host.")
|
|
state = True
|
|
return state
|
|
|
|
def is_cidr(self, target):
|
|
"""Test target to see if it is formatted as CIDR.
|
|
|
|
Arguments:
|
|
target {str} -- Target IPv4/IPv6 address
|
|
|
|
Returns:
|
|
{bool} -- True if CIDR, False if not
|
|
"""
|
|
ip_version = ipaddress.ip_network(target).version
|
|
state = False
|
|
if ip_version == 4 and re.match(self.ipv4_cidr, target):
|
|
state = True
|
|
if ip_version == 6 and re.match(self.ipv6_cidr, target):
|
|
state = True
|
|
return state
|
|
|
|
|
|
def ip_validate(target):
|
|
"""Validate if input is a valid IP address.
|
|
|
|
Arguments:
|
|
target {str} -- Unvalidated IPv4/IPv6 address
|
|
|
|
Raises:
|
|
ValueError: Raised if target is not a valid IPv4 or IPv6 address
|
|
|
|
Returns:
|
|
{object} -- Valid IPv4Network/IPv6Network object
|
|
"""
|
|
try:
|
|
valid_ip = ipaddress.ip_network(target)
|
|
if valid_ip.is_reserved or valid_ip.is_unspecified or valid_ip.is_loopback:
|
|
_exception = ValueError(params.messages.invalid_input)
|
|
_exception.details = {}
|
|
raise _exception
|
|
except (ipaddress.AddressValueError, ValueError) as ip_error:
|
|
log.debug(f"IP {target} is invalid")
|
|
_exception = ValueError(ip_error)
|
|
_exception.details = {}
|
|
raise _exception
|
|
return valid_ip
|
|
|
|
|
|
def ip_access_list(query_data, device):
|
|
"""Check VRF access list for matching prefixes.
|
|
|
|
Arguments:
|
|
query_data {object} -- Query object
|
|
device {object} -- Device object
|
|
|
|
Raises:
|
|
HyperglassError: Raised if query VRF and ACL VRF do not match
|
|
ValueError: Raised if an ACL deny match is found
|
|
ValueError: Raised if no ACL permit match is found
|
|
|
|
Returns:
|
|
{str} -- Allowed target
|
|
"""
|
|
log.debug(f"Checking Access List for: {query_data.query_target}")
|
|
|
|
def _member_of(target, network):
|
|
"""Check if IP address belongs to network.
|
|
|
|
Arguments:
|
|
target {object} -- Target IPv4/IPv6 address
|
|
network {object} -- ACL network
|
|
|
|
Returns:
|
|
{bool} -- True if target is a member of network, False if not
|
|
"""
|
|
log.debug(f"Checking membership of {target} for {network}")
|
|
|
|
membership = False
|
|
if (
|
|
network.network_address <= target.network_address
|
|
and network.broadcast_address >= target.broadcast_address # NOQA: W503
|
|
):
|
|
log.debug(f"{target} is a member of {network}")
|
|
membership = True
|
|
return membership
|
|
|
|
target = ipaddress.ip_network(query_data.query_target)
|
|
|
|
vrf_acl = None
|
|
for vrf in device.vrfs:
|
|
if vrf.name == query_data.query_vrf:
|
|
vrf_acl = vrf.access_list
|
|
if not vrf_acl:
|
|
raise HyperglassError(
|
|
message="Unable to match query VRF to any configured VRFs",
|
|
alert="danger",
|
|
keywords=[query_data.query_vrf],
|
|
)
|
|
|
|
target_ver = target.version
|
|
|
|
log.debug(f"Access List: {vrf_acl}")
|
|
|
|
for ace in vrf_acl:
|
|
for action, net in {
|
|
a: n for a, n in ace.items() for ace in vrf_acl if n.version == target_ver
|
|
}.items():
|
|
# If the target is a member of an allowed network, exit successfully.
|
|
if _member_of(target, net) and action == "allow":
|
|
log.debug(f"{target} is specifically allowed")
|
|
return target
|
|
|
|
# If the target is a member of a denied network, return an error.
|
|
elif _member_of(target, net) and action == "deny":
|
|
log.debug(f"{target} is specifically denied")
|
|
_exception = ValueError(params.messages.acl_denied)
|
|
_exception.details = {"denied_network": str(net)}
|
|
raise _exception
|
|
|
|
# Implicitly deny queries if an allow statement does not exist.
|
|
log.debug(f"{target} is implicitly denied")
|
|
_exception = ValueError(params.messages.acl_not_allowed)
|
|
_exception.details = {"denied_network": ""}
|
|
raise _exception
|
|
|
|
|
|
def ip_attributes(target):
|
|
"""Construct dictionary of validated IP attributes for repeated use.
|
|
|
|
Arguments:
|
|
target {str} -- Target IPv4/IPv6 address
|
|
|
|
Returns:
|
|
{dict} -- IP attribute dict
|
|
"""
|
|
network = ipaddress.ip_network(target)
|
|
addr = network.network_address
|
|
ip_version = addr.version
|
|
afi = f"ipv{ip_version}"
|
|
afi_pretty = f"IPv{ip_version}"
|
|
length = network.prefixlen
|
|
return {
|
|
"prefix": target,
|
|
"network": network,
|
|
"version": ip_version,
|
|
"length": length,
|
|
"afi": afi,
|
|
"afi_pretty": afi_pretty,
|
|
}
|
|
|
|
|
|
def ip_type_check(query_type, target, device):
|
|
"""Check multiple IP address related validation parameters.
|
|
|
|
Arguments:
|
|
query_type {str} -- Query type
|
|
target {str} -- Query target
|
|
device {object} -- Device
|
|
|
|
Raises:
|
|
ValueError: Raised if max prefix length check fails
|
|
ValueError: Raised if Requires IPv6 CIDR check fails
|
|
ValueError: Raised if directed CIDR check fails
|
|
|
|
Returns:
|
|
{str} -- target if checks pass
|
|
"""
|
|
prefix_attr = ip_attributes(target)
|
|
log.debug(f"IP Attributes:\n{prefix_attr}")
|
|
|
|
# If enable_max_prefix feature enabled, require that BGP Route
|
|
# queries be smaller than configured size limit.
|
|
if query_type == "bgp_route" and params.features.max_prefix.enable:
|
|
max_length = getattr(params.features.max_prefix, prefix_attr["afi"])
|
|
if prefix_attr["length"] > max_length:
|
|
log.debug("Failed max prefix length check")
|
|
_exception = ValueError(params.messages.max_prefix)
|
|
_exception.details = {"max_length": max_length}
|
|
raise _exception
|
|
|
|
# If device NOS is listed in requires_ipv6_cidr.toml, and query is
|
|
# an IPv6 host address, return an error.
|
|
if (
|
|
query_type == "bgp_route"
|
|
and prefix_attr["version"] == 6
|
|
and device.nos in params.general.requires_ipv6_cidr
|
|
and IPType().is_host(target)
|
|
):
|
|
log.debug("Failed requires IPv6 CIDR check")
|
|
_exception = ValueError(params.messages.requires_ipv6_cidr)
|
|
_exception.details = {"device_name": device.display_name}
|
|
raise _exception
|
|
|
|
# If query type is ping or traceroute, and query target is in CIDR
|
|
# format, return an error.
|
|
if query_type in ("ping", "traceroute") and IPType().is_cidr(target):
|
|
log.debug("Failed CIDR format for ping/traceroute check")
|
|
_exception = ValueError(params.messages.directed_cidr)
|
|
_exception.details = {"query_type": getattr(params.branding.text, query_type)}
|
|
raise _exception
|
|
return target
|
|
|
|
|
|
class Validate:
|
|
"""Validates query data with selected device.
|
|
|
|
Accepts raw input and associated device parameters from execute.py
|
|
and validates the input based on specific query type. Returns
|
|
boolean for validity, specific error message, and status code.
|
|
"""
|
|
|
|
def __init__(self, device, query_data, target):
|
|
"""Initialize device parameters and error codes."""
|
|
self.device = device
|
|
self.query_data = query_data
|
|
self.query_type = self.query_data.query_type
|
|
self.target = target
|
|
|
|
def validate_ip(self):
|
|
"""Validate IPv4/IPv6 Input.
|
|
|
|
Raises:
|
|
InputInvalid: Raised if IP validation fails
|
|
InputNotAllowed: Raised if ACL checks fail
|
|
InputNotAllowed: Raised if IP type checks fail
|
|
|
|
Returns:
|
|
{str} -- target if validation passes
|
|
"""
|
|
log.debug(f"Validating {self.query_type} query for target {self.target}...")
|
|
|
|
# Perform basic validation of an IP address, return error if
|
|
# not a valid IP.
|
|
try:
|
|
ip_validate(self.target)
|
|
except ValueError as unformatted_error:
|
|
raise InputInvalid(
|
|
params.messages.invalid_input,
|
|
target=self.target,
|
|
query_type=getattr(params.branding.text, self.query_type),
|
|
**unformatted_error.details,
|
|
)
|
|
|
|
# If target is a not allowed, return an error.
|
|
try:
|
|
ip_access_list(self.query_data, self.device)
|
|
except ValueError as unformatted_error:
|
|
raise InputNotAllowed(
|
|
str(unformatted_error), target=self.target, **unformatted_error.details
|
|
)
|
|
|
|
# Perform further validation of a valid IP address, return an
|
|
# error upon failure.
|
|
try:
|
|
ip_type_check(self.query_type, self.target, self.device)
|
|
except ValueError as unformatted_error:
|
|
raise InputNotAllowed(
|
|
str(unformatted_error), target=self.target, **unformatted_error.details
|
|
)
|
|
|
|
return self.target
|
|
|
|
def validate_dual(self):
|
|
"""Validate dual-stack input such as bgp_community & bgp_aspath.
|
|
|
|
Raises:
|
|
InputInvalid: Raised if target community is invalid.
|
|
InputInvalid: Raised if target AS_PATh is invalid.
|
|
|
|
Returns:
|
|
{str} -- target if validation passes.
|
|
"""
|
|
log.debug(f"Validating {self.query_type} query for target {self.target}...")
|
|
|
|
if self.query_type == "bgp_community":
|
|
# Validate input communities against configured or default regex
|
|
# pattern.
|
|
|
|
# Extended Communities, new-format
|
|
if re.match(params.features.bgp_community.regex.extended_as, self.target):
|
|
pass
|
|
# Extended Communities, 32 bit format
|
|
elif re.match(params.features.bgp_community.regex.decimal, self.target):
|
|
pass
|
|
# RFC 8092 Large Community Support
|
|
elif re.match(params.features.bgp_community.regex.large, self.target):
|
|
pass
|
|
else:
|
|
raise InputInvalid(
|
|
params.messages.invalid_input,
|
|
target=self.target,
|
|
query_type=getattr(params.branding.text, self.query_type),
|
|
)
|
|
|
|
elif self.query_type == "bgp_aspath":
|
|
# Validate input AS_PATH regex pattern against configured or
|
|
# default regex pattern.
|
|
|
|
mode = params.features.bgp_aspath.regex.mode
|
|
pattern = getattr(params.features.bgp_aspath.regex, mode)
|
|
if re.match(pattern, self.target):
|
|
pass
|
|
else:
|
|
raise InputInvalid(
|
|
params.messages.invalid_input,
|
|
target=self.target,
|
|
query_type=getattr(params.branding.text, self.query_type),
|
|
)
|
|
return self.target
|
|
|
|
def validate_query(self):
|
|
"""Validate input.
|
|
|
|
Returns:
|
|
{str} -- target if validation passes
|
|
"""
|
|
if self.query_type in ("bgp_community", "bgp_aspath"):
|
|
return self.validate_dual()
|
|
else:
|
|
return self.validate_ip()
|