1
0
mirror of https://github.com/checktheroads/hyperglass synced 2024-05-11 05:55:08 +00:00
Files
checktheroads-hyperglass/hyperglass/models/config/devices.py

321 lines
11 KiB
Python
Raw Normal View History

"""Validate router configuration variables."""
2020-02-03 02:35:11 -07:00
# Standard Library
import os
import re
2021-09-12 18:27:33 -07:00
from typing import Any, Set, Dict, List, Tuple, Union, Optional
from pathlib import Path
2020-07-30 01:30:01 -07:00
from ipaddress import IPv4Address, IPv6Address
2019-10-04 16:54:32 -07:00
2020-02-03 02:35:11 -07:00
# Third Party
2021-09-13 14:11:07 -07:00
from pydantic import StrictInt, StrictStr, StrictBool, validator, root_validator
2020-02-03 02:35:11 -07:00
# Project
2020-04-14 10:24:20 -07:00
from hyperglass.log import log
2021-09-13 14:11:07 -07:00
from hyperglass.util import (
get_driver,
get_fmt_keys,
resolve_hostname,
validate_device_type,
)
from hyperglass.constants import SCRAPE_HELPERS, SUPPORTED_STRUCTURED_OUTPUT
from hyperglass.exceptions.private import ConfigError, UnsupportedDevice
2021-05-29 21:26:03 -07:00
2020-10-11 13:14:07 -07:00
# Local
from .ssl import Ssl
2021-09-12 18:27:33 -07:00
from ..main import HyperglassModel, HyperglassModelWithId
2021-09-13 14:11:07 -07:00
from ..util import check_legacy_fields
2020-10-11 13:14:07 -07:00
from .proxy import Proxy
2021-09-10 01:18:38 -07:00
from .params import Params
from ..fields import SupportedDriver
2020-10-11 13:14:07 -07:00
from .network import Network
from .credential import Credential
2021-09-13 14:11:07 -07:00
from ..commands.generic import Directive
2019-09-13 00:36:58 -07:00
2021-09-12 18:27:33 -07:00
class Device(HyperglassModelWithId, extra="allow"):
"""Validation model for per-router config in devices.yaml."""
2021-09-12 18:27:33 -07:00
id: StrictStr
name: StrictStr
2020-07-30 01:30:01 -07:00
address: Union[IPv4Address, IPv6Address, StrictStr]
network: Network
credential: Credential
2020-01-05 00:36:12 -07:00
proxy: Optional[Proxy]
display_name: Optional[StrictStr]
2021-02-06 00:19:29 -07:00
port: StrictInt = 22
2020-01-05 00:36:12 -07:00
ssl: Optional[Ssl]
2021-09-13 14:11:07 -07:00
type: StrictStr
commands: List[Directive]
structured_output: Optional[StrictBool]
driver: Optional[SupportedDriver]
attrs: Dict[str, str] = {}
def __init__(self, **kwargs) -> None:
"""Set the device ID."""
2021-09-13 14:11:07 -07:00
kwargs = check_legacy_fields("Device", **kwargs)
_id, values = self._generate_id(kwargs)
2021-09-12 18:27:33 -07:00
super().__init__(id=_id, **values)
self._validate_directive_attrs()
2020-07-30 01:30:01 -07:00
@property
def _target(self):
return str(self.address)
@staticmethod
def _generate_id(values: Dict) -> Tuple[str, Dict]:
"""Generate device id & handle legacy display_name field."""
def generate_id(name: str) -> str:
scrubbed = re.sub(r"[^A-Za-z0-9\_\-\s]", "", name)
return "_".join(scrubbed.split()).lower()
name = values.pop("name", None)
if name is None:
raise ValueError("name is required.")
legacy_display_name = values.pop("display_name", None)
if legacy_display_name is not None:
2021-09-12 15:09:24 -07:00
log.warning("The 'display_name' field is deprecated. Use the 'name' field instead.")
device_id = generate_id(legacy_display_name)
display_name = legacy_display_name
else:
device_id = generate_id(name)
display_name = name
return device_id, {"name": display_name, "display_name": None, **values}
2021-09-11 13:56:20 -07:00
def export_api(self) -> Dict[str, Any]:
"""Export API-facing device fields."""
return {
2021-09-12 18:27:33 -07:00
"id": self.id,
2021-09-11 13:56:20 -07:00
"name": self.name,
"network": self.network.display_name,
}
2021-09-11 00:47:01 -07:00
@property
def directive_commands(self) -> List[str]:
"""Get all commands associated with the device."""
return [
command
for directive in self.commands
for rule in directive.rules
for command in rule.commands
]
2021-09-13 14:11:07 -07:00
@property
def directive_ids(self) -> List[str]:
"""Get all directive IDs associated with the device."""
return [directive.id for directive in self.commands]
def has_directives(self, *directive_ids: str) -> bool:
"""Determine if a directive is used on this device."""
for directive_id in directive_ids:
if directive_id in self.directive_ids:
return True
return False
2021-09-11 00:47:01 -07:00
def _validate_directive_attrs(self) -> None:
# Set of all keys except for built-in key `target`.
keys = {
key
2021-09-11 00:47:01 -07:00
for group in [get_fmt_keys(command) for command in self.directive_commands]
for key in group
if key != "target"
}
attrs = {k: v for k, v in self.attrs.items() if k in keys}
# Verify all keys in associated commands contain values in device's `attrs`.
for key in keys:
if key not in attrs:
raise ConfigError(
"Device '{d}' has a command that references attribute '{a}', but '{a}' is missing from device attributes",
d=self.name,
a=key,
)
2020-07-30 01:30:01 -07:00
@validator("address")
def validate_address(cls, value, values):
"""Ensure a hostname is resolvable."""
2020-07-30 01:30:01 -07:00
if not isinstance(value, (IPv4Address, IPv6Address)):
if not any(resolve_hostname(value)):
raise ConfigError(
"Device '{d}' has an address of '{a}', which is not resolvable.",
d=values["name"],
a=value,
)
return value
@validator("structured_output", pre=True, always=True)
def validate_structured_output(cls, value: bool, values: Dict) -> bool:
"""Validate structured output is supported on the device & set a default."""
if value is True:
if values["type"] not in SUPPORTED_STRUCTURED_OUTPUT:
raise ConfigError(
"The 'structured_output' field is set to 'true' on device '{d}' with "
+ "NOS '{n}', which does not support structured output",
d=values["name"],
n=values["type"],
)
return value
elif value is None and values["type"] in SUPPORTED_STRUCTURED_OUTPUT:
value = True
else:
value = False
return value
@validator("ssl")
def validate_ssl(cls, value, values):
"""Set default cert file location if undefined."""
if value is not None:
if value.enable and value.cert is None:
app_path = Path(os.environ["hyperglass_directory"])
cert_file = app_path / "certs" / f'{values["name"]}.pem'
if not cert_file.exists():
log.warning("No certificate found for device {d}", d=values["name"])
cert_file.touch()
value.cert = cert_file
return value
@root_validator(pre=True)
def validate_device_commands(cls, values: Dict) -> Dict:
"""Validate & rewrite device type, set default commands."""
2021-09-13 14:11:07 -07:00
_type = values.get("type")
if _type is None:
# Ensure device type is defined.
raise ValueError(
f"Device {values['name']} is missing a 'type' (Network Operating System) property."
)
if _type in SCRAPE_HELPERS.keys():
# Rewrite NOS to helper value if needed.
_type = SCRAPE_HELPERS[_type]
# Verify device type is supported by hyperglass.
supported, _ = validate_device_type(_type)
if not supported:
raise UnsupportedDevice(_type)
values["type"] = _type
commands = values.get("commands")
if commands is None:
# If no commands are defined, set commands to the NOS.
inferred = values["type"]
2020-11-02 23:08:07 -07:00
# If the _telnet prefix is added, remove it from the command
# profile so the commands are the same regardless of
# protocol.
if "_telnet" in inferred:
inferred = inferred.replace("_telnet", "")
2021-05-29 21:26:03 -07:00
values["commands"] = [inferred]
return values
@validator("driver")
def validate_driver(cls, value: Optional[str], values: Dict) -> Dict:
"""Set the correct driver and override if supported."""
return get_driver(values["type"], value)
2019-09-13 00:36:58 -07:00
class Devices(HyperglassModel, extra="allow"):
"""Validation model for device configurations."""
2021-09-12 18:27:33 -07:00
ids: List[StrictStr] = []
hostnames: List[StrictStr] = []
2020-07-30 01:30:01 -07:00
objects: List[Device] = []
2020-07-30 01:30:01 -07:00
def __init__(self, input_params: List[Dict]) -> None:
"""Import loaded YAML, initialize per-network definitions.
Remove unsupported characters from device names, dynamically
set attributes for the devices class. Builds lists of common
attributes for easy access in other modules.
"""
2020-07-30 01:30:01 -07:00
objects = set()
hostnames = set()
2021-09-12 18:27:33 -07:00
ids = set()
2020-07-30 01:30:01 -07:00
init_kwargs = {}
for definition in input_params:
# Validate each router config against Router() model/schema
2020-07-30 01:30:01 -07:00
device = Device(**definition)
# Add router-level attributes (assumed to be unique) to
# class lists, e.g. so all hostnames can be accessed as a
# list with `devices.hostnames`, same for all router
# classes, for when iteration over all routers is required.
2020-07-30 01:30:01 -07:00
hostnames.add(device.name)
2021-09-12 18:27:33 -07:00
ids.add(device.id)
2020-07-30 01:30:01 -07:00
objects.add(device)
# Convert the de-duplicated sets to a standard list, add lists
2020-07-30 01:30:01 -07:00
# as class attributes. Sort router list by router name attribute
2021-09-12 18:27:33 -07:00
init_kwargs["ids"] = list(ids)
2020-07-30 01:30:01 -07:00
init_kwargs["hostnames"] = list(hostnames)
init_kwargs["objects"] = sorted(objects, key=lambda x: x.name)
2020-07-30 01:30:01 -07:00
super().__init__(**init_kwargs)
def __getitem__(self, accessor: str) -> Device:
"""Get a device by its name."""
for device in self.objects:
2021-09-12 18:27:33 -07:00
if device.id == accessor:
2020-07-30 01:30:01 -07:00
return device
elif device.name == accessor:
return device
2020-07-30 01:30:01 -07:00
raise AttributeError(f"No device named '{accessor}'")
2021-09-10 01:18:38 -07:00
2021-09-11 13:56:20 -07:00
def export_api(self) -> List[Dict[str, Any]]:
"""Export API-facing device fields."""
return [d.export_api() for d in self.objects]
2021-09-10 01:18:38 -07:00
def networks(self, params: Params) -> List[Dict[str, Any]]:
"""Group devices by network."""
names = {device.network.display_name for device in self.objects}
return [
{
"display_name": name,
"locations": [
{
2021-09-12 18:27:33 -07:00
"id": device.id,
2021-09-10 01:18:38 -07:00
"name": device.name,
"network": device.network.display_name,
"directives": [c.frontend(params) for c in device.commands],
}
for device in self.objects
if device.network.display_name == name
2021-09-10 01:18:38 -07:00
],
}
for name in names
]
2021-09-12 18:27:33 -07:00
def directive_plugins(self) -> Dict[Path, Tuple[StrictStr]]:
"""Get a mapping of plugin paths to associated directive IDs."""
result: Dict[Path, Set[StrictStr]] = {}
# Unique set of all directives.
directives = {directive for device in self.objects for directive in device.commands}
# Unique set of all plugin file names.
plugin_names = {plugin for directive in directives for plugin in directive.plugins}
for directive in directives:
# Convert each plugin file name to a `Path` object.
for plugin in (Path(p) for p in directive.plugins if p in plugin_names):
if plugin not in result:
result[plugin] = set()
result[plugin].add(directive.id)
# Convert the directive set to a tuple.
return {k: tuple(v) for k, v in result.items()}