1
0
mirror of https://github.com/checktheroads/hyperglass synced 2024-05-11 05:55:08 +00:00
Files
checktheroads-hyperglass/hyperglass/command/construct.py

172 lines
6.0 KiB
Python
Raw Normal View History

"""
2019-07-07 23:28:23 -07:00
Accepts filtered & validated input from execute.py, constructs SSH
command for Netmiko library or API call parameters for supported
hyperglass API modules.
"""
# Standard Library Imports
import ipaddress
import json
import operator
2019-06-10 12:22:38 -07:00
# Third Party Imports
2019-06-10 12:22:38 -07:00
from logzero import logger
# Project Imports
from hyperglass.configuration import commands
from hyperglass.configuration import logzero_config # noqa: F401
class Construct:
"""
Constructs SSH commands or REST API queries based on validated
input parameters.
"""
def __init__(self, device, transport):
self.device = device
self.transport = transport
def get_src(self, ver):
"""
Returns source IP based on IP version of query destination.
"""
src = None
if ver == 4:
src = self.device.src_addr_ipv4.exploded
if ver == 6:
src = self.device.src_addr_ipv6.exploded
logger.debug(f"IPv{ver} Source: {src}")
return src
2019-05-22 23:27:57 -07:00
2019-07-07 23:28:23 -07:00
@staticmethod
def device_commands(nos, afi, query_type):
"""
Constructs class attribute path from input parameters, returns
class attribute value for command. This is required because
class attributes are set dynamically when devices.yaml is
imported, so the attribute path is unknown until runtime.
"""
cmd_path = f"{nos}.{afi}.{query_type}"
return operator.attrgetter(cmd_path)(commands)
def ping(self, target):
"""Constructs ping query parameters from pre-validated input"""
query_type = "ping"
logger.debug(
f"Constructing {query_type} query for {target} via {self.transport}..."
)
query = None
ip_version = ipaddress.ip_network(target).version
afi = f"ipv{ip_version}"
source = self.get_src(ip_version)
if self.transport == "rest":
query = json.dumps(
{
"query_type": query_type,
"afi": afi,
"source": source,
"target": target,
}
)
elif self.transport == "scrape":
conf_command = self.device_commands(self.device.commands, afi, query_type)
query = conf_command.format(target=target, source=source)
2019-06-10 12:22:38 -07:00
logger.debug(f"Constructed query: {query}")
return query
2019-05-22 23:27:57 -07:00
def traceroute(self, target):
"""
Constructs traceroute query parameters from pre-validated input.
"""
query_type = "traceroute"
logger.debug(
f"Constructing {query_type} query for {target} via {self.transport}..."
)
query = None
ip_version = ipaddress.ip_network(target).version
afi = f"ipv{ip_version}"
source = self.get_src(ip_version)
if self.transport == "rest":
query = json.dumps(
{
"query_type": query_type,
"afi": afi,
"source": source,
"target": target,
}
)
2019-05-22 23:27:57 -07:00
elif self.transport == "scrape":
conf_command = self.device_commands(self.device.commands, afi, query_type)
query = conf_command.format(target=target, source=source)
2019-06-10 12:22:38 -07:00
logger.debug(f"Constructed query: {query}")
return query
def bgp_route(self, target):
"""
Constructs bgp_route query parameters from pre-validated input.
"""
query_type = "bgp_route"
logger.debug(
f"Constructing {query_type} query for {target} via {self.transport}..."
)
query = None
ip_version = ipaddress.ip_network(target).version
afi = f"ipv{ip_version}"
if self.transport == "rest":
query = json.dumps({"query_type": query_type, "afi": afi, "target": target})
elif self.transport == "scrape":
conf_command = self.device_commands(self.device.commands, afi, query_type)
query = conf_command.format(target=target)
2019-06-10 12:22:38 -07:00
logger.debug(f"Constructed query: {query}")
return query
def bgp_community(self, target):
"""
Constructs bgp_community query parameters from pre-validated
input.
"""
query_type = "bgp_community"
logger.debug(
f"Constructing {query_type} query for {target} via {self.transport}..."
)
afi = "dual"
query = None
if self.transport == "rest":
query = json.dumps({"query_type": query_type, "afi": afi, "target": target})
elif self.transport == "scrape":
conf_command = self.device_commands(self.device.commands, afi, query_type)
2019-08-31 23:50:02 -07:00
afis = []
for afi in self.device.afis:
split_afi = afi.split("v")
afis.append(
"".join([split_afi[0].upper(), "v", split_afi[1], " Unicast|"])
)
query = conf_command.format(target=target, afis="".join(afis))
2019-06-10 12:22:38 -07:00
logger.debug(f"Constructed query: {query}")
return query
2019-05-22 23:27:57 -07:00
def bgp_aspath(self, target):
"""
Constructs bgp_aspath query parameters from pre-validated input.
"""
query_type = "bgp_aspath"
logger.debug(
f"Constructing {query_type} query for {target} via {self.transport}..."
)
afi = "dual"
query = None
if self.transport == "rest":
query = json.dumps({"query_type": query_type, "afi": afi, "target": target})
elif self.transport == "scrape":
conf_command = self.device_commands(self.device.commands, afi, query_type)
2019-08-31 23:50:02 -07:00
afis = []
for afi in self.device.afis:
split_afi = afi.split("v")
afis.append(
"".join([split_afi[0].upper(), "v", split_afi[1], " Unicast|"])
)
query = conf_command.format(target=target, afis="".join(afis))
2019-06-10 12:22:38 -07:00
logger.debug(f"Constructed query: {query}")
return query