1
0
mirror of https://github.com/checktheroads/hyperglass synced 2024-05-11 05:55:08 +00:00
Files
checktheroads-hyperglass/hyperglass/command/construct.py
2019-10-01 01:04:23 -07:00

293 lines
9.6 KiB
Python

"""
Accepts filtered & validated input from execute.py, constructs SSH
command for Netmiko library or API call parameters for supported
hyperglass API modules.
"""
# Standard Library Imports
import re
import ipaddress
import json
import operator
# Third Party Imports
from logzero import logger as log
# Project Imports
from hyperglass.configuration import vrfs
from hyperglass.configuration import commands
from hyperglass.configuration import logzero_config # noqa: F401
from hyperglass.constants import target_format_space
class Construct:
"""
Constructs SSH commands or REST API queries based on validated
input parameters.
"""
def __init__(self, device, query_data, transport):
self.device = device
self.query_data = query_data
self.transport = transport
self.query_target = self.query_data["query_target"]
self.query_vrf = self.query_data["query_vrf"]
def format_target(self, target):
"""Formats query target based on NOS requirement"""
if self.device.nos in target_format_space:
_target = re.sub(r"\/", r" ", target)
log.debug(f"Formatted target: {_target}")
return _target
@staticmethod
def device_commands(nos, afi, query_type):
"""
Constructs class attribute path from input parameters, returns
class attribute value for command. This is required because
class attributes are set dynamically when devices.yaml is
imported, so the attribute path is unknown until runtime.
"""
cmd_path = f"{nos}.{afi}.{query_type}"
return operator.attrgetter(cmd_path)(commands)
@staticmethod
def get_cmd_type(query_protocol, query_vrf):
"""
Constructs AFI string. If query_vrf is specified, AFI prefix is
"vpnv", if not, AFI prefix is "ipv"
"""
if query_vrf and query_vrf != "default":
cmd_type = f"{query_protocol}_vrf"
else:
cmd_type = f"{query_protocol}_default"
return cmd_type
def ping(self):
"""Constructs ping query parameters from pre-validated input"""
log.debug(
f"Constructing ping query for {self.query_target} via {self.transport}"
)
query = []
query_protocol = f"ipv{ipaddress.ip_network(self.query_target).version}"
vrf = getattr(self.device.vrfs, self.query_vrf)
afi = getattr(vrf, query_protocol)
if self.transport == "rest":
query.append(
json.dumps(
{
"query_type": "ping",
"afi": afi.afi_name,
"vrf": afi.vrf_name,
"source": afi.source_address,
"target": self.query_target,
}
)
)
elif self.transport == "scrape":
cmd_type = self.get_cmd_type(afi.afi_name, self.query_vrf)
cmd = self.device_commands(self.device.commands, cmd_type, "ping")
query.append(
cmd.format(
target=self.query_target,
source=afi.source_address,
vrf=afi.vrf_name,
afi=afi.afi_name,
)
)
log.debug(f"Constructed query: {query}")
return query
def traceroute(self):
"""
Constructs traceroute query parameters from pre-validated input.
"""
log.debug(
(
f"Constructing traceroute query for {self.query_target} "
f"via {self.transport}"
)
)
query = []
query_protocol = f"ipv{ipaddress.ip_network(self.query_target).version}"
vrf = getattr(self.device.vrfs, self.query_vrf)
afi = getattr(vrf, query_protocol)
if self.transport == "rest":
query.append(
json.dumps(
{
"query_type": "traceroute",
"afi": afi.afi_name,
"vrf": afi.vrf_name,
"source": afi.source_address,
"target": self.query_target,
}
)
)
elif self.transport == "scrape":
cmd_type = self.get_cmd_type(afi.afi_name, self.query_vrf)
cmd = self.device_commands(self.device.commands, cmd_type, "traceroute")
query.append(
cmd.format(
target=self.query_target,
source=afi.source_address,
vrf=afi.vrf_name,
afi=afi.afi_name,
)
)
log.debug(f"Constructed query: {query}")
return query
def bgp_route(self):
"""
Constructs bgp_route query parameters from pre-validated input.
"""
log.debug(
f"Constructing bgp_route query for {self.query_target} via {self.transport}"
)
query = []
query_protocol = f"ipv{ipaddress.ip_network(self.query_target).version}"
vrf = getattr(self.device.vrfs, self.query_vrf)
afi = getattr(vrf, query_protocol)
if self.transport == "rest":
query.append(
json.dumps(
{
"query_type": "bgp_route",
"afi": afi.afi_name,
"vrf": afi.vrf_name,
"source": afi.source_address,
"target": self.query_target,
}
)
)
elif self.transport == "scrape":
cmd_type = self.get_cmd_type(afi.afi_name, self.query_vrf)
cmd = self.device_commands(self.device.commands, cmd_type, "bgp_route")
query.append(
cmd.format(
target=self.format_target(self.query_target),
source=afi.source_address,
vrf=afi.vrf_name,
afi=afi.afi_name,
)
)
log.debug(f"Constructed query: {query}")
return query
def bgp_community(self):
"""
Constructs bgp_community query parameters from pre-validated
input.
"""
log.debug(
(
f"Constructing bgp_community query for "
f"{self.query_target} via {self.transport}"
)
)
query = []
vrf = getattr(self.device.vrfs, self.query_vrf)
afis = []
vrf_dict = getattr(vrfs, self.query_vrf).dict()
for vrf_key, vrf_value in {
p: e for p, e in vrf_dict.items() if p in ("ipv4", "ipv6")
}.items():
if vrf_value:
afis.append(vrf_key)
for afi in afis:
afi_attr = getattr(vrf, afi)
if self.transport == "rest":
query.append(
json.dumps(
{
"query_type": "bgp_community",
"afi": afi_attr.afi_name,
"vrf": afi_attr.vrf_name,
"source": afi_attr.source_address,
"target": self.query_target,
}
)
)
elif self.transport == "scrape":
cmd_type = self.get_cmd_type(afi.afi_name, self.query_vrf)
cmd = self.device_commands(
self.device.commands, cmd_type, "bgp_community"
)
query.append(
cmd.format(
target=self.query_target,
source=afi_attr.source_address,
vrf=afi_attr.vrf_name,
afi=afi_attr.afi_name,
)
)
log.debug(f"Constructed query: {query}")
return query
def bgp_aspath(self):
"""
Constructs bgp_aspath query parameters from pre-validated input.
"""
log.debug(
(
f"Constructing bgp_aspath query for "
f"{self.query_target} via {self.transport}"
)
)
query = []
vrf = getattr(self.device.vrfs, self.query_vrf)
afis = []
vrf_dict = getattr(vrfs, self.query_vrf).dict()
for vrf_key, vrf_value in {
p: e for p, e in vrf_dict.items() if p in ("ipv4", "ipv6")
}.items():
if vrf_value:
afis.append(vrf_key)
for afi in afis:
afi_attr = getattr(vrf, afi)
if self.transport == "rest":
query.append(
json.dumps(
{
"query_type": "bgp_aspath",
"afi": afi_attr.afi_name,
"vrf": afi_attr.vrf_name,
"source": afi_attr.source_address,
"target": self.query_target,
}
)
)
elif self.transport == "scrape":
cmd_type = self.get_cmd_type(afi.afi_name, self.query_vrf)
cmd = self.device_commands(self.device.commands, cmd_type, "bgp_aspath")
query.append(
cmd.format(
target=self.query_target,
source=afi_attr.source_address,
vrf=afi_attr.vrf_name,
afi=afi_attr.afi_name,
)
)
log.debug(f"Constructed query: {query}")
return query