1
0
mirror of https://github.com/checktheroads/hyperglass synced 2024-05-11 05:55:08 +00:00
Files
checktheroads-hyperglass/hyperglass/configuration/models/routers.py

295 lines
9.8 KiB
Python
Raw Normal View History

"""Validate router configuration variables."""
2020-02-03 02:35:11 -07:00
# Standard Library
import os
import re
2020-02-03 02:35:11 -07:00
from typing import List, Optional
from pathlib import Path
2019-10-04 16:54:32 -07:00
2020-02-03 02:35:11 -07:00
# Third Party
from pydantic import StrictInt, StrictStr, validator
2020-02-03 02:35:11 -07:00
# Project
2020-04-14 10:24:20 -07:00
from hyperglass.log import log
from hyperglass.util import clean_name
2020-04-12 02:17:16 -07:00
from hyperglass.constants import SCRAPE_HELPERS, TRANSPORT_REST, TRANSPORT_SCRAPE
2020-02-03 02:35:11 -07:00
from hyperglass.exceptions import ConfigError, UnsupportedDevice
from hyperglass.configuration.models.ssl import Ssl
from hyperglass.configuration.models.vrfs import Vrf, Info
from hyperglass.configuration.models._utils import HyperglassModel, HyperglassModelExtra
2020-02-03 02:35:11 -07:00
from hyperglass.configuration.models.proxies import Proxy
from hyperglass.configuration.models.commands import Command
from hyperglass.configuration.models.networks import Network
2020-02-03 02:35:11 -07:00
from hyperglass.configuration.models.credentials import Credential
2019-09-13 00:36:58 -07:00
2020-01-31 02:06:27 -10:00
_default_vrf = {
"name": "default",
"display_name": "Global",
"info": Info(),
"ipv4": {
"source_address": None,
"access_list": [
{"network": "0.0.0.0/0", "action": "permit", "ge": 0, "le": 32}
],
},
"ipv6": {
"source_address": None,
"access_list": [{"network": "::/0", "action": "permit", "ge": 0, "le": 128}],
},
}
2019-10-04 16:54:32 -07:00
class Router(HyperglassModel):
"""Validation model for per-router config in devices.yaml."""
name: StrictStr
address: StrictStr
network: Network
credential: Credential
2020-01-05 00:36:12 -07:00
proxy: Optional[Proxy]
display_name: StrictStr
port: StrictInt
2020-01-05 00:36:12 -07:00
ssl: Optional[Ssl]
nos: StrictStr
2020-01-05 00:36:12 -07:00
commands: Optional[Command]
2020-01-31 02:06:27 -10:00
vrfs: List[Vrf] = [_default_vrf]
display_vrfs: List[StrictStr] = []
vrf_names: List[StrictStr] = []
@validator("nos")
def supported_nos(cls, value):
"""Validate that nos is supported by hyperglass.
Raises:
UnsupportedDevice: Raised if nos is unsupported.
Returns:
{str} -- Valid NOS
2019-09-13 00:36:58 -07:00
"""
2020-04-12 02:17:16 -07:00
if value in SCRAPE_HELPERS.keys():
value = SCRAPE_HELPERS[value]
if value not in (*TRANSPORT_REST, *TRANSPORT_SCRAPE):
raise UnsupportedDevice('NOS "{n}" is not supported.', n=value)
return value
2020-02-08 10:02:24 -07:00
@validator("name")
def validate_name(cls, value):
"""Remove or replace unsupported characters from field values.
Arguments:
value {str} -- Raw name/location
Returns:
{} -- Valid name/location
2019-09-13 00:36:58 -07:00
"""
return clean_name(value)
@validator("ssl")
def validate_ssl(cls, value, values):
"""Set default cert file location if undefined.
Arguments:
value {object} -- SSL object
values {dict} -- Other already-valiated fields
Returns:
{object} -- SSL configuration
"""
if value is not None:
if value.enable and value.cert is None:
app_path = Path(os.environ["hyperglass_directory"])
cert_file = app_path / "certs" / f'{values["name"]}.pem'
if not cert_file.exists():
log.warning("No certificate found for device {d}", d=values["name"])
cert_file.touch()
value.cert = cert_file
return value
@validator("commands", always=True)
def validate_commands(cls, value, values):
"""If a named command profile is not defined, use the NOS name.
Arguments:
value {str} -- Reference to command profile
values {dict} -- Other already-validated fields
Returns:
{str} -- Command profile or NOS name
2019-09-13 00:36:58 -07:00
"""
if value is None:
value = values["nos"]
return value
2019-12-30 01:44:19 -07:00
@validator("vrfs", pre=True)
def validate_vrfs(cls, value, values):
"""Validate VRF definitions.
- Ensures source IP addresses are set for the default VRF
(global routing table).
- Initializes the default VRF with the DefaultVRF() class so
that specific defaults can be set for the global routing
table.
- If the 'display_name' is not set for a non-default VRF, try
to make one that looks pretty based on the 'name'.
Arguments:
value {list} -- List of VRFs
values {dict} -- Other already-validated fields
Raises:
ConfigError: Raised if the VRF is missing a source address
Returns:
{list} -- List of valid VRFs
2019-09-13 00:36:58 -07:00
"""
vrfs = []
for vrf in value:
vrf_name = vrf.get("name")
for afi in ("ipv4", "ipv6"):
vrf_afi = vrf.get(afi)
# If AFI is actually defined (enabled), and if the
# source_address field is not set, raise an error
if vrf_afi is not None and vrf_afi.get("source_address") is None:
2019-09-30 07:51:17 -07:00
raise ConfigError(
(
"VRF '{vrf}' in router '{router}' is missing a source "
"{afi} address."
),
vrf=vrf.get("name"),
router=values.get("name"),
afi=afi.replace("ip", "IP"),
2019-09-30 07:51:17 -07:00
)
# If no display_name is set for a non-default VRF, try
# to make one by replacing non-alphanumeric characters
# with whitespaces and using str.title() to make each
# word look "pretty".
2020-01-31 02:06:27 -10:00
if vrf_name != "default" and not isinstance(
vrf.get("display_name"), StrictStr
):
new_name = vrf["name"]
new_name = re.sub(r"[^a-zA-Z0-9]", " ", new_name)
new_name = re.split(" ", new_name)
vrf["display_name"] = " ".join([w.title() for w in new_name])
log.debug(
f'Field "display_name" for VRF "{vrf["name"]}" was not set. '
2020-01-16 02:52:00 -07:00
f"Generated '{vrf['display_name']}'"
)
2019-12-30 09:47:31 -07:00
elif vrf_name == "default" and vrf.get("display_name") is None:
vrf["display_name"] = "Global"
# Validate the non-default VRF against the standard
# Vrf() class.
2020-01-31 02:06:27 -10:00
vrf = Vrf(**vrf)
2019-09-30 07:51:17 -07:00
vrfs.append(vrf)
return vrfs
2019-09-30 07:51:17 -07:00
2019-09-13 00:36:58 -07:00
class Routers(HyperglassModelExtra):
"""Validation model for device configurations."""
hostnames: List[StrictStr] = []
vrfs: List[StrictStr] = []
display_vrfs: List[StrictStr] = []
routers: List[Router] = []
2020-01-16 02:52:00 -07:00
networks: List[StrictStr] = []
@classmethod
def _import(cls, input_params):
"""Import loaded YAML, initialize per-network definitions.
Remove unsupported characters from device names, dynamically
set attributes for the devices class. Builds lists of common
attributes for easy access in other modules.
Arguments:
input_params {dict} -- Unvalidated router definitions
Returns:
{object} -- Validated routers object
"""
vrfs = set()
2020-01-16 02:52:00 -07:00
networks = set()
display_vrfs = set()
2020-01-16 02:52:00 -07:00
vrf_objects = set()
2020-01-26 02:20:05 -07:00
router_objects = []
routers = Routers()
routers.hostnames = []
routers.vrfs = []
routers.display_vrfs = []
for definition in input_params:
# Validate each router config against Router() model/schema
router = Router(**definition)
2020-01-31 02:06:27 -10:00
"""
Set a class attribute for each router so each router's
attributes can be accessed with `devices.router_hostname`
"""
setattr(routers, router.name, router)
2020-01-31 02:06:27 -10:00
"""
Add router-level attributes (assumed to be unique) to
class lists, e.g. so all hostnames can be accessed as a
list with `devices.hostnames`, same for all router
classes, for when iteration over all routers is required.
"""
routers.hostnames.append(router.name)
2020-01-26 02:20:05 -07:00
router_objects.append(router)
for vrf in router.vrfs:
2020-01-31 02:06:27 -10:00
"""
For each configured router VRF, add its name and
display_name to a class set (for automatic de-duping).
"""
vrfs.add(vrf.name)
display_vrfs.add(vrf.display_name)
2020-01-31 02:06:27 -10:00
"""
Also add the names to a router-level list so each
router's VRFs and display VRFs can be easily accessed.
"""
router.display_vrfs.append(vrf.display_name)
router.vrf_names.append(vrf.name)
2020-01-31 02:06:27 -10:00
"""
Add a 'default_vrf' attribute to the devices class
which contains the configured default VRF display name.
"""
if vrf.name == "default" and not hasattr(cls, "default_vrf"):
routers.default_vrf = {
"name": vrf.name,
"display_name": vrf.display_name,
}
2019-10-04 16:54:32 -07:00
2020-01-31 02:06:27 -10:00
"""
Add the native VRF objects to a set (for automatic
de-duping), but exlcude device-specific fields.
"""
2020-01-16 02:52:00 -07:00
_copy_params = {
"deep": True,
"exclude": {"ipv4": {"source_address"}, "ipv6": {"source_address"}},
}
vrf_objects.add(vrf.copy(**_copy_params))
2020-01-31 02:06:27 -10:00
"""
Convert the de-duplicated sets to a standard list, add lists
as class attributes.
"""
routers.vrfs = list(vrfs)
routers.display_vrfs = list(display_vrfs)
2020-01-16 02:52:00 -07:00
routers.vrf_objects = list(vrf_objects)
routers.networks = list(networks)
2020-01-26 02:20:05 -07:00
# Sort router list by router name attribute
routers.routers = sorted(router_objects, key=lambda x: x.display_name)
return routers