1
0
mirror of https://github.com/checktheroads/hyperglass synced 2024-05-11 05:55:08 +00:00

complete overhaul of configuration module

This commit is contained in:
checktheroads
2019-05-26 18:46:43 -07:00
parent 892b9ebcfc
commit 3ca26037aa
7 changed files with 391 additions and 926 deletions

View File

@@ -1,524 +1,246 @@
#!/usr/bin/env python3
# Module Imports
import os
import math
import toml
# Project Imports
import hyperglass
# Project Directories
dir = os.path.dirname(os.path.abspath(__file__))
hyperglass_root = os.path.dirname(hyperglass.__file__)
# TOML Imports
configuration = toml.load(os.path.join(dir, "configuration.toml"))
devices = toml.load(os.path.join(dir, "devices.toml"))
def blacklist():
f = os.path.join(dir, "blacklist.toml")
t = toml.load(f)
return t
b = toml.load(os.path.join(dir, "blacklist.toml"))
return b["blacklist"]
def commands():
f = os.path.join(dir, "commands.toml")
t = toml.load(f)
return t
def requires_ipv6_cidr(nos):
r = toml.load(os.path.join(dir, "requires_ipv6_cidr.toml"))
nos_list = r["requires_ipv6_cidr"]
if nos in nos_list:
return True
else:
return False
def configuration():
f = os.path.join(dir, "configuration.toml")
t = toml.load(f)
return t
def networks():
"""Returns dictionary of ASNs as keys, list of associated locations as values.
Used for populating the /routers/<asn> Flask route."""
asn_dict = {}
rl = devices["router"]
for r in rl.values():
asn = r["asn"]
if asn in asn_dict:
asn_dict[asn].append(r["location"])
else:
asn_dict[asn] = [r["location"]]
return asn_dict
def devices():
f = os.path.join(dir, "devices.toml")
t = toml.load(f)
return t
def requires_ipv6_cidr():
f = os.path.join(dir, "requires_ipv6_cidr.toml")
t = toml.load(f)
return t["requires_ipv6_cidr"]
# Filter config to branding variables
branding = configuration()["branding"]
# Filter config to general variables
general = configuration()["general"]
routers_list = devices()["router"]
class dev:
"""Functions to import device variables"""
def networks():
asn_dict = dict()
for r in routers_list:
asn = r["asn"]
if asn in asn_dict:
asn_dict[asn].append(r["location"])
else:
asn_dict[asn] = [r["location"]]
return asn_dict
def name():
list = []
for r in routers_list:
list.append(str(r["name"]))
return list
def display_name():
list = []
for r in routers_list:
list.appen(str(r["display_name"]))
return list
class gen:
"""Functions to import config variables and return default values if undefined"""
def primary_asn():
list = []
for g in general:
if len(g["primary_asn"]) == 0:
return "65000"
else:
return g["primary_asn"]
def org_name():
list = []
for g in general:
if len(g["org_name"]) == 0:
return "The Company"
else:
return g["org_name"]
def debug():
list = []
for a in general:
try:
return a["debug"]
except:
return True
def google_analytics():
list = []
for a in general:
if len(a["google_analytics"]) == 0:
return ""
else:
return a["google_analytics"]
def enable_recaptcha():
list = []
for a in general:
try:
return a["enable_recaptcha"]
except:
return True
def message_error():
list = []
for a in general:
if len(a["message_error"]) == 0:
return "{input} is invalid."
else:
return a["message_error"]
def message_blacklist():
list = []
for a in general:
if len(a["message_blacklist"]) == 0:
return "{input} is not allowed."
else:
return a["message_blacklist"]
def message_rate_limit_query():
list = []
for a in general:
if len(a["message_rate_limit_query"]) == 0:
return "Query limit of {rate_limit_query} per minute reached. Please wait one minute and try again.".format(
rate_limit_query=gen.rate_limit_query()
def networks_list():
networks_dict = {}
rl = devices["router"]
for r in rl.values():
asn = r["asn"]
if asn in networks_dict:
networks_dict[asn].append(
dict(
location=r["location"],
hostname=r["name"],
display_name=r["display_name"],
requires_ipv6_cidr=requires_ipv6_cidr(r["type"]),
)
else:
return a["message_rate_limit_query"]
def enable_bgp_route():
list = []
for a in general:
try:
return a["enable_bgp_route"]
except:
return True
def enable_bgp_community():
list = []
for a in general:
try:
return a["enable_bgp_community"]
except:
return True
def enable_bgp_aspath():
list = []
for a in general:
try:
return a["enable_bgp_aspath"]
except:
return True
def enable_ping():
list = []
for a in general:
try:
return a["enable_ping"]
except:
return True
def enable_traceroute():
list = []
for a in general:
try:
return a["enable_traceroute"]
except:
return True
def rate_limit_query():
list = []
for a in general:
if len(a["rate_limit_query"]) == 0:
return "5"
else:
return a["rate_limit_query"]
def rate_limit_site():
list = []
for a in general:
if len(a["rate_limit_site"]) == 0:
return "120"
else:
return a["rate_limit_site"]
def cache_timeout():
list = []
for a in general:
try:
return a["cache_timeout"]
except:
return 120
def cache_directory():
list = []
for a in general:
if len(a["cache_directory"]) == 0:
d = ".flask_cache"
return os.path.join(hyperglass_root, d)
else:
return a["cache_directory"]
class brand:
"""Functions to import branding variables and return default values if undefined"""
def site_title():
list = []
for t in branding:
if len(t["site_title"]) == 0:
return "hyperglass"
else:
return t["site_title"]
def title():
list = []
for t in branding:
if len(t["title"]) == 0:
return "hyperglass"
else:
return t["title"]
def subtitle():
list = []
for t in branding:
if len(t["subtitle"]) == 0:
return "AS" + gen.primary_asn()
else:
return t["subtitle"]
def title_mode():
list = []
for t in branding:
if len(t["title_mode"]) == 0:
return "logo_only"
else:
return t["title_mode"]
def enable_footer():
list = []
for t in branding:
try:
return t["enable_footer"]
except:
return True
def enable_credit():
list = []
for t in branding:
try:
return t["enable_credit"]
except:
return True
def color_btn_submit():
list = []
for t in branding:
if len(t["color_btn_submit"]) == 0:
return "#40798c"
else:
return t["color_btn_submit"]
def color_tag_loctitle():
list = []
for t in branding:
if len(t["color_tag_loctitle"]) == 0:
return "#330036"
else:
return t["color_tag_loctitle"]
def color_tag_cmdtitle():
list = []
for t in branding:
if len(t["color_tag_cmdtitle"]) == 0:
return "#330036"
else:
return t["color_tag_cmdtitle"]
def color_tag_cmd():
list = []
for t in branding:
if len(t["color_tag_cmd"]) == 0:
return "#ff5e5b"
else:
return t["color_tag_cmd"]
def color_tag_loc():
list = []
for t in branding:
if len(t["color_tag_loc"]) == 0:
return "#40798c"
else:
return t["color_tag_loc"]
def color_progressbar():
list = []
for t in branding:
if len(t["color_progressbar"]) == 0:
return "#40798c"
else:
return t["color_progressbar"]
def color_bg():
list = []
for t in branding:
if len(t["color_bg"]) == 0:
return "#fbfffe"
else:
return t["color_bg"]
def color_danger():
list = []
for t in branding:
if len(t["color_danger"]) == 0:
return "#ff3860"
else:
return t["color_danger"]
def logo_path():
list = []
for t in branding:
if len(t["logo_path"]) == 0:
f = "static/images/hyperglass-dark.png"
return os.path.join(hyperglass_root, f)
else:
return t["logo_path"]
def favicon16_path():
list = []
for t in branding:
if len(t["favicon16_path"]) == 0:
f = "static/images/favicon/favicon-16x16.png"
return f
else:
return t["favicon16_path"]
def favicon32_path():
list = []
for t in branding:
if len(t["favicon32_path"]) == 0:
f = "static/images/favicon/favicon-32x32.png"
return f
else:
return t["favicon32_path"]
def logo_width():
list = []
for t in branding:
if len(t["logo_width"]) == 0:
return "384"
else:
return t["logo_width"]
def placeholder_prefix():
list = []
for t in branding:
if len(t["placeholder_prefix"]) == 0:
return "Prefix, IP, Community, or AS_PATH"
else:
return t["placeholder_prefix"]
def show_peeringdb():
list = []
for t in branding:
try:
return a["show_peeringdb"]
except:
return True
def text_results():
list = []
for t in branding:
if len(t["text_results"]) == 0:
return "Results"
else:
return t["text_results"]
def text_location():
list = []
for t in branding:
if len(t["text_location"]) == 0:
return "Location"
else:
return t["text_location"]
def text_cache():
list = []
for t in branding:
if len(t["text_cache"]) == 0:
cache_timeout_exact = gen.cache_timeout() / 60
return "Results will be cached for {cache_timeout} minutes.".format(
cache_timeout=math.ceil(cache_timeout_exact)
)
else:
networks_dict[asn] = [
dict(
location=r["location"],
hostname=r["name"],
display_name=r["display_name"],
requires_ipv6_cidr=requires_ipv6_cidr(r["type"]),
)
else:
return t["text_cache"]
]
return networks_dict
def primary_font_url():
list = []
for t in branding:
if len(t["primary_font_url"]) == 0:
return "https://fonts.googleapis.com/css?family=Nunito:400,600,700"
else:
return t["primary_font_url"]
def primary_font_name():
list = []
for t in branding:
if len(t["primary_font_name"]) == 0:
return "Nunito"
else:
return t["primary_font_name"]
class codes:
"""Class for easy calling & recalling of http success/error codes"""
def mono_font_url():
list = []
for t in branding:
if len(t["mono_font_url"]) == 0:
return "https://fonts.googleapis.com/css?family=Fira+Mono"
else:
return t["mono_font_url"]
def __init__(self):
# 200 OK: renders standard display text
self.success = 200
# 405 Method Not Allowed: Renders Bulma "warning" class notification message with message text
self.warning = 405
# 415 Unsupported Media Type: Renders Bulma "danger" class notification message with message text
self.danger = 415
def mono_font_name():
list = []
for t in branding:
if len(t["mono_font_name"]) == 0:
return "Fira Mono"
else:
return t["mono_font_name"]
def text_limiter_title():
list = []
for t in branding:
if len(t["text_limiter_title"]) == 0:
return "Limit Reached"
else:
return t["text_limiter_title"]
class command:
def __init__(self, nos):
c = toml.load(os.path.join(dir, "configuration.toml"))
self.dual = c[nos][0]["dual"]
self.ipv4 = c[nos][0]["ipv4"]
self.ipv6 = c[nos][0]["ipv6"]
def text_limiter_subtitle():
list = []
for t in branding:
if len(t["text_limiter_subtitle"]) == 0:
return "You have accessed this site more than {rate_limit_site} times in the last minute.".format(
rate_limit_site=gen.rate_limit_site()
)
else:
return t["text_limiter_subtitle"]
def __call__(self):
return vars(self)
def text_415_title():
list = []
for t in branding:
if len(t["text_415_title"]) == 0:
return "Error"
else:
return t["text_415_title"]
def text_415_subtitle():
list = []
for t in branding:
if len(t["text_415_subtitle"]) == 0:
return "Something went wrong."
else:
return t["text_415_subtitle"]
class credential:
def __init__(self, cred):
c_list = devices["credential"]
self.username = c_list[cred]["username"]
self.password = c_list[cred]["password"]
def text_415_button():
list = []
for t in branding:
if len(t["text_415_button"]) == 0:
return "Home"
else:
return t["text_415_button"]
def __call__(self):
return vars(self)
def text_help_bgp_route():
list = []
for t in branding:
if len(t["text_help_bgp_route"]) == 0:
return "Performs BGP table lookup based on IPv4/IPv6 prefix."
else:
return t["text_help_bgp_route"]
def text_help_bgp_community():
list = []
for t in branding:
if len(t["text_help_bgp_community"]) == 0:
return 'Performs BGP table lookup based on <a href="https://tools.ietf.org/html/rfc4360">Extended</a> or <a href="https://tools.ietf.org/html/rfc8195">Large</a> community value.'
else:
return t["text_help_bgp_community"]
class device:
"""Class to define & export all device variables"""
def text_help_bgp_aspath():
list = []
for t in branding:
if len(t["text_help_bgp_aspath"]) == 0:
return 'Performs BGP table lookup based on <code>AS_PATH</code> regular expression.<br>For commonly used BGP regular expressions, <a href="https://hyperglass.readthedocs.io/en/latest/Extras/common_as_path_regex/">click here</a>.'
else:
return t["text_help_bgp_aspath"]
def __init__(self, device):
d = devices["router"][device]
self.address = d.get("address")
self.asn = d.get("asn")
self.src_addr_ipv4 = d.get("src_addr_ipv4")
self.src_addr_ipv6 = d.get("src_addr_ipv6")
self.credential = d.get("credential")
self.location = d.get("location")
self.name = d.get("name")
self.display_name = d.get("display_name")
self.port = d.get("port")
self.type = d.get("type")
self.proxy = d.get("proxy")
def text_help_ping():
list = []
for t in branding:
if len(t["text_help_ping"]) == 0:
return "Sends 5 ICMP echo requests to the target."
else:
return t["text_help_ping"]
def __call__(self):
return vars(self)
def text_help_traceroute():
list = []
for t in branding:
if len(t["text_help_traceroute"]) == 0:
return 'Performs UDP Based traceroute to the target.<br>For information about how to interpret traceroute results, <a href="https://www.nanog.org/meetings/nanog45/presentations/Sunday/RAS_traceroute_N45.pdf">click here</a>.'
else:
return t["text_help_traceroute"]
class proxy:
def __init__(self, proxy):
self.address = proxies_list[proxy]["address"]
self.username = proxies_list[proxy]["username"]
self.password = proxies_list[proxy]["password"]
self.type = proxies_list[proxy]["type"]
self.ssh_command = proxies_list[proxy]["ssh_command"]
class general:
"""Class to define and export config variables and export default values if undefined"""
def __init__(self):
g = configuration["general"][0]
self.primary_asn = g.get("primary_asn", "65000")
self.org_name = g.get("org_name", "The Company")
self.debug = g.get("debug", False)
self.google_analytics = g.get("google_analytics", "")
self.message_error = g.get("message_error", "{input} is invalid.")
self.message_blacklist = g.get("message_blacklist", "{input} is not allowed.")
self.message_general_error = g.get(
"message_general_error", "Error connecting to device."
)
self.rate_limit_query = g.get("rate_limit_query", "5")
self.message_rate_limit_query = g.get(
"message_rate_limit_query",
f"Query limit of {self.rate_limit_query} per minute reached. Please wait one minute and try again.",
)
self.enable_bgp_route = g.get("enable_bgp_route", True)
self.enable_bgp_community = g.get("enable_bgp_community", True)
self.enable_bgp_aspath = g.get("enable_bgp_aspath", True)
self.enable_ping = g.get("enable_ping", True)
self.enable_traceroute = g.get("enable_traceroute", True)
self.rate_limit_site = g.get("rate_limit_site", "120")
self.cache_timeout = g.get("cache_timeout", 120)
self.cache_directory = g.get(
"cache_directory", os.path.join(hyperglass_root, ".flask_cache")
)
self.enable_max_prefix = g.get("enable_max_prefix", False)
self.max_prefix_length_ipv4 = g.get("max_prefix_length_ipv4", 24)
self.max_prefix_length_ipv6 = g.get("max_prefix_length_ipv6", 29)
class branding:
"""Class to define and export branding variables and export default values if undefined"""
def __init__(self):
b = configuration["branding"][0]
self.site_title = b.get("site_title", "hyperglass")
self.title = b.get("title", "hyperglass")
self.subtitle = b.get("subtitle", f"AS{general().primary_asn}")
self.title_mode = b.get("title_mode", "logo_only")
self.enable_footer = b.get("enable_footer", True)
self.enable_credit = b.get("enable_credit", True)
self.color_btn_submit = b.get("color_btn_submit", "#40798c")
self.color_tag_loctitle = b.get("color_tag_loctitle", "#330036")
self.color_tag_cmdtitle = b.get("color_tag_cmdtitle", "#330036")
self.color_tag_cmd = b.get("color_tag_cmd", "#ff5e5b")
self.color_tag_loc = b.get("color_tag_loc", "#40798c")
self.color_progressbar = b.get("color_progressbar", "#40798c")
self.color_bg = b.get("color_bg", "#fbfffe")
self.color_danger = b.get("color_danger", "#ff3860")
self.logo_path = b.get(
"logo_path",
os.path.join(hyperglass_root, "static/images/hyperglass-dark.png"),
)
self.favicon16_path = b.get(
"favicon16_path", "static/images/favicon/favicon-16x16.png"
)
self.favicon32_path = b.get(
"favicon32_path", "static/images/favicon/favicon-32x32.png"
)
self.logo_width = b.get("logo_width", "384")
self.placeholder_prefix = b.get(
"placeholder_prefix", "IP, Prefix, Community, or AS_PATH"
)
self.show_peeringdb = b.get("show_peeringdb", True)
self.text_results = b.get("text_results", "Results")
self.text_location = b.get("text_location", "Select Location...")
self.text_cache = b.get(
"text_cache",
f"Results will be cached for {math.ceil(general().cache_timeout / 60)} minutes.",
)
self.primary_font_name = b.get("primary_font_name", "Nunito")
self.primary_font_url = b.get(
"primary_font_url",
"https://fonts.googleapis.com/css?family=Nunito:400,600,700",
)
self.mono_font_name = b.get("mono_font_name", "Fira Mono")
self.mono_font_url = b.get(
"mono_font_url", "https://fonts.googleapis.com/css?family=Fira+Mono"
)
self.text_limiter_title = b.get("text_limiter_title", "Limit Reached")
self.text_limiter_subtitle = b.get(
"text_limiter_subtitle",
f"You have accessed this site more than {general().rate_limit_site} times in the last minute.",
)
self.text_415_title = b.get("text_415_title", "Error")
self.text_415_subtitle = b.get("text_415_subtitle", "Something went wrong.")
self.text_415_button = b.get("text_415_button", "Home")
self.text_help_bgp_route = b.get(
"text_help_bgp_route",
"Performs BGP table lookup based on IPv4/IPv6 prefix.",
)
self.text_help_bgp_community = b.get(
"text_help_bgp_community",
'Performs BGP table lookup based on <a href="https://tools.ietf.org/html/rfc4360">Extended</a> or <a href="https://tools.ietf.org/html/rfc8195">Large</a> community value.',
)
self.text_help_bgp_aspath = b.get(
"text_help_bgp_aspath",
'Performs BGP table lookup based on <code>AS_PATH</code> regular expression.<br>For commonly used BGP regular expressions, <a href="https://hyperglass.readthedocs.io/en/latest/Extras/common_as_path_regex/">click here</a>.',
)
self.text_help_ping = b.get(
"text_help_ping", "Sends 5 ICMP echo requests to the target."
)
self.text_help_traceroute = b.get(
"text_help_traceroute",
'Performs UDP Based traceroute to the target.<br>For information about how to interpret traceroute results, <a href="https://www.nanog.org/meetings/nanog45/presentations/Sunday/RAS_traceroute_N45.pdf">click here</a>.',
)