#!/usr/bin/env python3 # Standard Imports import os import grp import pwd import sys import glob import random import shutil import string from pathlib import Path # Module Imports import click import json from passlib.hash import pbkdf2_sha256 import requests # Initialize shutil copy function cp = shutil.copyfile # Define working directory working_directory = os.path.dirname(os.path.abspath(__file__)) def construct_test(test_query, location, test_target): """Constructs JSON POST data for test_hyperglass function""" constructed_query = json.dumps( {"type": test_query, "location": location, "target": test_target} ) return constructed_query @click.group() def hg(): pass @hg.command("pylint-check", help="Runs Pylint and generates a badge for GitHub") @click.option( "-m", "--number", "num_only", is_flag=True, help="Output Pylint score as integer" ) @click.option("-b", "--badge", "create_badge", is_flag=True, help="Create Pylint badge") @click.option( "-e", "--print-errors", "errors", is_flag=True, help="Print pylint errors" ) def pylint_check(num_only, create_badge, errors): try: import re import anybadge from pylint import epylint pylint_stdout, pylint_stderr = epylint.py_run( "hyperglass --verbose --rcfile=.pylintrc", return_std=True ) pylint_output = pylint_stdout.getvalue() pylint_score = re.search( r"Your code has been rated at (\d+\.\d+)\/10.*", pylint_output ).group(1) if num_only: click.echo(pylint_score) if errors: click.echo(pylint_output) if not pylint_score == "10.00": raise RuntimeError(f"Pylint score {pylint_score} not acceptable.") if create_badge: badge_file = os.path.join(working_directory, "pylint.svg") if os.path.exists(badge_file): os.remove(badge_file) ab_thresholds = {1: "red", 10: "green"} badge = anybadge.Badge("pylint", pylint_score, thresholds=ab_thresholds) badge.write_badge("pylint.svg") click.echo( click.style("Created Pylint badge for score: ", fg="white") + click.style(pylint_score, fg="blue", bold=True) ) except ImportError as error_exception: click.secho(f"Import error:\n{error_exception}", fg="red", bold=True) @hg.command("pre-check", help="Check hyperglass config & readiness") def pre_check(): if sys.version_info < (3, 6): click.secho( f"Hyperglass requires Python 3.6 or higher. Curren version: Python {sys.version.split()[0]}", fg="red", bold=True, ) if sys.version_info >= (3, 6): click.secho( f"✓ Python Version Check passed (Current version: Python {sys.version.split()[0]})", fg="green", bold=True, ) try: from hyperglass import configuration config = configuration.params() status = True while status: if config["general"]["primary_asn"] == "65000" or "": status = False reason = f'Primary ASN is not defined (Current: "{config["general"]["primary_asn"]}")' remediation = f""" To define the Primary ASN paramter, modify your `configuration.toml` and add the following \ configuration:\n [general] primary_asn = "" \nIf you do not define a Primary ASN, \"{config["general"]["primary_asn"]}\" will be used.""" break click.secho(reason, fg="red", bold=True) click.secho(remediation, fg="blue") if config["general"]["org_name"] == "The Company" or "": status = False reason = f'Org Name is not defined (Current: "{config["general"]["org_name"]}")' remediation = f""" To define an Org Name paramter, modify your `configuration.toml` and add the following \ configuration:\n [general] org_name = "" \nIf you do not define an Org Name, \"{config["general"]["org_name"]}\" will be displayed.""" break click.secho(reason, fg="red", bold=True) click.secho(remediation, fg="blue") click.secho( "✓ All critical hyperglass parameters are defined!", fg="green", bold=True, ) break except Exception as e: click.secho(f"Exception occurred:\n{e}", fg="red") @hg.command("test", help="Full test of all backend features") @click.option("-l", "--location", type=str, required=True, help="Location to query") @click.option( "-4", "--target-ipv4", "target_ipv4", type=str, default="1.1.1.0/24", required=False, show_default=True, help="IPv4 Target Address", ) @click.option( "-6", "--target-ipv6", "target_ipv6", type=str, default="2606:4700:4700::/48", required=False, show_default=True, help="IPv6 Target Address", ) @click.option( "-c", "--community", "test_community", type=str, required=False, show_default=True, default="65000:1", help="BGP Community", ) @click.option( "-a", "--aspath", "test_aspath", type=str, required=False, show_default=True, default="^65001$", help="BGP AS Path", ) @click.option( "-r", "--requires-ipv6-cidr", "requires_ipv6_cidr", type=str, required=False, help="Location for testing IPv6 CIDR requirement", ) @click.option( "-b", "--blacklist", "test_blacklist", type=str, default="100.64.0.1", required=False, show_default=True, help="Address to use for blacklist check", ) @click.option( "-h", "--host", "test_host", type=str, default="localhost", required=False, show_default=True, help="Name or IP address of hyperglass server", ) @click.option( "-p", "--port", "test_port", type=int, default=5000, required=False, show_default=True, help="Port hyperglass is running on", ) def test_hyperglass( location, target_ipv4, target_ipv6, requires_ipv6_cidr, test_blacklist, test_community, test_aspath, test_host, test_port, ): """ Fully tests hyperglass backend by making use of requests library to mimic the JS Ajax POST performed by the front end. """ test_target = None invalid_ip = "this_ain't_an_ip!" invalid_community = "192.0.2.1" invalid_aspath = ".*" ipv4_host = "1.1.1.1" ipv4_cidr = "1.1.1.0/24" ipv6_host = "2606:4700:4700::1111" ipv6_cidr = "2606:4700:4700::/48" test_headers = {"Content-Type": "application/json"} test_endpoint = f"http://{test_host}:{test_port}/lg" # No Query Type Test try: click.secho("Starting No Query Type test...", fg="black") test_query = construct_test("", location, target_ipv4) hg_response = requests.post( test_endpoint, headers=test_headers, data=test_query ) if hg_response.status_code in range(400, 500): click.secho("✓ No Query Type test passed", fg="green", bold=True) if not hg_response.status_code in range(400, 500): click.secho("✗ No Query Type test failed", fg="red", bold=True) click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True) click.secho(hg_response.text, fg="red") except Exception as e: click.secho(f"Exception occurred:\n{e}") # No Location Test try: click.secho("Starting No Location test...", fg="black") test_query = construct_test("bgp_route", "", target_ipv6) hg_response = requests.post( test_endpoint, headers=test_headers, data=test_query ) if hg_response.status_code in range(400, 500): click.secho("✓ No Location test passed", fg="green", bold=True) if not hg_response.status_code in range(400, 500): click.secho("✗ No Location test failed", fg="red", bold=True) click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True) click.secho(hg_response.text, fg="red") except Exception as e: click.secho(f"Exception occurred:\n{e}") # No Target Test try: click.secho("Starting No Target test...", fg="black") test_query = construct_test("bgp_route", location, "") hg_response = requests.post( test_endpoint, headers=test_headers, data=test_query ) if hg_response.status_code in range(400, 500): click.secho("✓ No Target test passed", fg="green", bold=True) if not hg_response.status_code in range(400, 500): click.secho("✗ No Target test failed", fg="red", bold=True) click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True) click.secho(hg_response.text, fg="red") except Exception as e: click.secho(f"Exception occurred:\n{e}") # Valid BGP IPv4 Route Test try: click.secho("Starting Valid BGP IPv4 Route test...", fg="black") test_query = construct_test("bgp_route", location, target_ipv4) hg_response = requests.post( test_endpoint, headers=test_headers, data=test_query ) if hg_response.status_code == 200: click.secho("✓ Valid BGP IPv4 Route test passed", fg="green", bold=True) if not hg_response.status_code == 200: click.secho("✗ Valid BGP IPv4 Route test failed", fg="red", bold=True) click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True) click.secho(hg_response.text, fg="red") except Exception as e: click.secho(f"Exception occurred:\n{e}") # Valid BGP IPv6 Route Test try: click.secho("Starting Valid BGP IPv6 Route test...", fg="black") test_query = construct_test("bgp_route", location, target_ipv6) hg_response = requests.post( test_endpoint, headers=test_headers, data=test_query ) if hg_response.status_code == 200: click.secho("✓ Valid BGP IPv6 Route test passed", fg="green", bold=True) if not hg_response.status_code == 200: click.secho("✗ Valid BGP IPv6 Route test failed", fg="red", bold=True) click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True) click.secho(hg_response.text, fg="red") except Exception as e: click.secho(f"Exception occurred:\n{e}") # Invalid BGP Route Test try: click.secho("Starting Invalid BGP IPv4 Route test...", fg="black") test_query = construct_test("bgp_route", location, invalid_ip) hg_response = requests.post( test_endpoint, headers=test_headers, data=test_query ) if hg_response.status_code in range(400, 500): click.secho("✓ Invalid BGP IPv4 Route test passed", fg="green", bold=True) if not hg_response.status_code in range(400, 500): click.secho("✗ Invalid BGP IPv4 Route test failed", fg="red", bold=True) click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True) click.secho(hg_response.text, fg="red") except Exception as e: click.secho(f"Exception occurred:\n{e}") # Requires IPv6 CIDR Test if requires_ipv6_cidr: try: click.secho("Starting Requires IPv6 CIDR test...", fg="black") test_query = construct_test("bgp_route", requires_ipv6_cidr, ipv6_host) hg_response = requests.post( test_endpoint, headers=test_headers, data=test_query ) if hg_response.status_code in range(400, 500): click.secho("✓ Requires IPv6 CIDR test passed", fg="green", bold=True) if not hg_response.status_code in range(400, 500): click.secho("✗ Requires IPv6 CIDR test failed", fg="red", bold=True) click.secho( f"Status Code: {hg_response.status_code}", fg="red", bold=True ) click.secho(hg_response.text, fg="red") except Exception as e: click.secho(f"Exception occurred:\n{e}") # Valid BGP Community Test try: click.secho("Starting Valid BGP Community test...", fg="black") test_query = construct_test("bgp_community", location, test_community) hg_response = requests.post( test_endpoint, headers=test_headers, data=test_query ) if hg_response.status_code == 200: click.secho("✓ Valid BGP Community test passed", fg="green", bold=True) if not hg_response.status_code == 200: click.secho("✗ Valid BGP Community test failed", fg="red", bold=True) click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True) click.secho(hg_response.text, fg="red") except Exception as e: click.secho(f"Exception occurred:\n{e}") # Invalid BGP Community Test try: click.secho("Starting Invalid BGP Community test...", fg="black") test_query = construct_test("bgp_community", location, target_ipv4) hg_response = requests.post( test_endpoint, headers=test_headers, data=test_query ) if hg_response.status_code in range(400, 500): click.secho("✓ Invalid BGP Community test passed", fg="green", bold=True) if not hg_response.status_code in range(400, 500): click.secho("✗ Invalid BGP Community test failed", fg="red", bold=True) click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True) click.secho(hg_response.text, fg="red") except Exception as e: click.secho(f"Exception occurred:\n{e}") # Valid BGP AS_PATH Test try: click.secho("Starting Valid BGP AS_PATH test...", fg="black") test_query = construct_test("bgp_aspath", location, test_aspath) hg_response = requests.post( test_endpoint, headers=test_headers, data=test_query ) if hg_response.status_code == 200: click.secho("✓ Valid BGP AS_PATH test passed", fg="green", bold=True) if not hg_response.status_code == 200: click.secho("✗ Valid BGP AS_PATH test failed", fg="red", bold=True) click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True) click.secho(hg_response.text, fg="red") except Exception as e: click.secho(f"Exception occurred:\n{e}") # Invalid BGP AS_PATH Test try: click.secho("Starting invalid BGP AS_PATH test...", fg="black") test_query = construct_test("bgp_aspath", location, invalid_aspath) hg_response = requests.post( test_endpoint, headers=test_headers, data=test_query ) if hg_response.status_code in range(400, 500): click.secho("✓ Invalid BGP AS_PATH test passed", fg="green", bold=True) if not hg_response.status_code in range(400, 500): click.secho("✗ Invalid BGP AS_PATH test failed", fg="red", bold=True) click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True) click.secho(hg_response.text, fg="red") except Exception as e: click.secho(f"Exception occurred:\n{e}") # Valid IPv4 Ping Test try: click.secho("Starting Valid IPv4 Ping test...", fg="black") test_query = construct_test("ping", location, ipv4_host) hg_response = requests.post( test_endpoint, headers=test_headers, data=test_query ) if hg_response.status_code == 200: click.secho("✓ Valid IPv4 Ping test passed", fg="green", bold=True) if not hg_response.status_code == 200: click.secho("✗ Valid IPv4 Ping test failed", fg="red", bold=True) click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True) click.secho(hg_response.text, fg="red") except Exception as e: click.secho(f"Exception occurred:\n{e}") # Valid IPv6 Ping Test try: click.secho("Starting Valid IPv6 Ping test...", fg="black") test_query = construct_test("ping", location, ipv6_host) hg_response = requests.post( test_endpoint, headers=test_headers, data=test_query ) if hg_response.status_code == 200: click.secho("✓ Valid IPv6 Ping test passed", fg="green", bold=True) if not hg_response.status_code == 200: click.secho("✗ Valid IPv6 Ping test failed", fg="red", bold=True) click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True) click.secho(hg_response.text, fg="red") except Exception as e: click.secho(f"Exception occurred:\n{e}") # Invalid IPv4 Ping Test try: click.secho("Starting Invalid IPv4 Ping test...", fg="black") test_query = construct_test("ping", location, ipv4_cidr) hg_response = requests.post( test_endpoint, headers=test_headers, data=test_query ) if hg_response.status_code in range(400, 500): click.secho("✓ Invalid IPv4 Ping test passed", fg="green", bold=True) if not hg_response.status_code in range(400, 500): click.secho("✗ Invalid IPv4 Ping test failed", fg="red", bold=True) click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True) click.secho(hg_response.text, fg="red") except Exception as e: click.secho(f"Exception occurred:\n{e}") # Invalid IPv6 Ping Test try: click.secho("Starting Invalid IPv6 Ping test...", fg="black") test_query = construct_test("ping", location, ipv6_cidr) hg_response = requests.post( test_endpoint, headers=test_headers, data=test_query ) if hg_response.status_code in range(400, 500): click.secho("✓ Invalid IPv6 Ping test passed", fg="green", bold=True) if not hg_response.status_code in range(400, 500): click.secho("✗ Invalid IPv6 Ping test failed", fg="red", bold=True) click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True) click.secho(hg_response.text, fg="red") except Exception as e: click.secho(f"Exception occurred:\n{e}") # Blacklist Test try: click.secho("Starting Blacklist test...", fg="black") test_query = construct_test("bgp_route", location, test_blacklist) hg_response = requests.post( test_endpoint, headers=test_headers, data=test_query ) if hg_response.status_code in range(400, 500): click.secho("✓ Blacklist test passed", fg="green", bold=True) if not hg_response.status_code in range(400, 500): click.secho("✗ Blacklist test failed", fg="red", bold=True) click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True) click.secho(hg_response.text, fg="red") except Exception as e: click.secho(f"Exception occurred:\n{e}") @hg.command("clear-cache", help="Clear Flask cache") def clearcache(): """Clears the Flask-Caching cache""" try: import hyperglass.hyperglass hyperglass.hyperglass.clear_cache() click.secho("✓ Successfully cleared cache.", fg="green", bold=True) except: click.secho("✗ Failed to clear cache.", fg="red", bold=True) raise @hg.command("generate-key", help="Generate API key & hash") @click.option( "-l", "--length", "string_length", type=int, default=16, show_default=True ) def generatekey(string_length): """ Generates 16 character API Key for hyperglass-frr API, and a corresponding PBKDF2 SHA256 Hash. """ ld = string.ascii_letters + string.digits nl = "\n" api_key = "".join(random.choice(ld) for i in range(string_length)) key_hash = pbkdf2_sha256.hash(api_key) line_len = len(key_hash) ak_info = " Your API Key is: " ak_help1 = " Put this in the" ak_help2 = " configuration.yaml " ak_help3 = "of your API module." kh_info = " Your Key Hash is: " kh_help1 = " Use this as the password for the corresponding device in" kh_help2 = " devices.yaml" kh_help3 = "." ak_info_len = len(ak_info + api_key) ak_help_len = len(ak_help1 + ak_help2 + ak_help3) kh_info_len = len(kh_info + key_hash) kh_help_len = len(kh_help1 + kh_help2 + kh_help3) ak_kh = [ak_info_len, ak_help_len, kh_info_len, kh_help_len] ak_kh.sort() longest_line = ak_kh[-1] + 2 s_box = {"fg": "white", "dim": True, "bold": True} s_txt = {"fg": "white"} s_ak = {"fg": "green", "bold": True} s_kh = {"fg": "blue", "bold": True} s_file = {"fg": "yellow"} click.echo( click.style("┌" + ("─" * longest_line) + "┐", **s_box) + click.style(nl + "│", **s_box) + click.style(ak_info, **s_txt) + click.style(api_key, **s_ak) + click.style(" " * (longest_line - ak_info_len) + "│", **s_box) + click.style(nl + "│", **s_box) + click.style(ak_help1, **s_txt) + click.style(ak_help2, **s_file) + click.style(ak_help3, **s_txt) + click.style(" " * (longest_line - ak_help_len) + "│", **s_box) + click.style(nl + "├" + ("─" * longest_line) + "┤", **s_box) + click.style(nl + "│", **s_box) + click.style(kh_info, **s_txt) + click.style(key_hash, **s_kh) + click.style(" " * (longest_line - kh_info_len) + "│", **s_box) + click.style(nl + "│", **s_box) + click.style(kh_help1, **s_txt) + click.style(kh_help2, **s_file) + click.style(kh_help3, **s_txt) + click.style(" " * (longest_line - kh_help_len) + "│", **s_box) + click.style(nl + "└" + ("─" * longest_line) + "┘", **s_box) ) @hg.command("dev-server", help="Start Flask development server") @click.option("--host", type=str, default="0.0.0.0", help="Listening IP") @click.option("--port", type=int, default=5000, help="TCP Port") def flask_dev_server(host, port): """Starts Flask development server for testing without WSGI/Reverse Proxy""" try: from hyperglass import render from hyperglass import hyperglass from hyperglass.configuration import params except ImportError as import_error: raise click.ClickException( click.style("✗ Error importing hyperlgass: ", fg="red", bold=True) + click.style(import_error, fg="blue") ) try: render.css() except Exception as e: raise click.ClickException( click.style("✗ Error compiling Sass: ", fg="red", bold=True) + click.style(e, fg="blue") ) try: click.secho( f"✓ Starting hyperglass development server...", fg="green", bold=True ) hyperglass.app.run(host=host, debug=params.general.debug, port=port) except Exception as e: raise click.ClickException( click.style("✗ Failed to start test server: ", fg="red", bold=True) + click.style(e, fg="red") ) @hg.command("compile-sass", help="Compile Sass templates to CSS") def compile_sass(): """Renders Jinja2 and Sass templates to HTML & CSS files""" try: from hyperglass import render except ImportError as import_error: raise click.ClickException( click.style("✗ Error importing hyperlgass: ", fg="red", bold=True) + click.style(import_error, fg="blue") ) try: render.css() except Exception as e: raise click.ClickException( click.style("✗ Error compiling Sass: ", fg="red", bold=True) + click.style(e, fg="blue") ) @hg.command("migrate-configs", help="Copy TOML examples to usable config files") def migrateconfig(): """Copies example configuration files to usable config files""" try: click.secho("Migrating example config files...", fg="black") config_dir = os.path.join(working_directory, "hyperglass/configuration/") examples = glob.iglob(os.path.join(config_dir, "*.example")) for f in examples: basefile, extension = os.path.splitext(f) if os.path.exists(basefile): click.secho(f"{basefile} already exists", fg="blue") else: try: cp(f, basefile) click.secho(f"✓ Migrated {basefile}", fg="green") except: click.secho(f"✗ Failed to migrate {basefile}", fg="red") raise click.secho( "✓ Successfully migrated example config files", fg="green", bold=True ) except: click.secho("✗ Error migrating example config files", fg="red", bold=True) raise @hg.command("migrate-gunicorn", help="Copy Gunicorn example to usable config file") def migrategunicorn(): """Copies example Gunicorn config file to a usable config""" try: import hyperglass except ImportError as error_exception: click.secho(f"Error while importing hyperglass:\n{error_exception}", fg="red") try: click.secho("Migrating example Gunicorn configuration...", fg="black") hyperglass_root = os.path.dirname(hyperglass.__file__) ex_file = os.path.join(hyperglass_root, "gunicorn_config.py.example") basefile, extension = os.path.splitext(ex_file) newfile = basefile if os.path.exists(newfile): click.secho(f"{newfile} already exists", fg="blue") else: try: cp(ex_file, newfile) click.secho( f"✓ Successfully migrated Gunicorn configuration to: {newfile}", fg="green", bold=True, ) except: click.secho(f"✗ Failed to migrate {newfile}", fg="red") raise except: click.secho( "✗ Error migrating example Gunicorn configuration", fg="red", bold=True ) raise @hg.command("migrate-systemd", help="Copy Systemd example to OS") @click.option( "-d", "--directory", default="/etc/systemd/system", help="Destination Directory" ) def migratesystemd(directory): """Copies example systemd service file to /etc/systemd/system/""" try: click.secho("Migrating example systemd service...", fg="black") ex_file_base = "hyperglass.service.example" ex_file = os.path.join(working_directory, f"hyperglass/{ex_file_base}") basefile, extension = os.path.splitext(ex_file_base) newfile = os.path.join(directory, basefile) if os.path.exists(newfile): click.secho(f"{newfile} already exists", fg="blue") else: try: cp(ex_file, newfile) click.secho( f"✓ Successfully migrated systemd service to: {newfile}", fg="green", bold=True, ) except: click.secho(f"✗ Failed to migrate {newfile}", fg="red") raise except: click.secho("✗ Error migrating example systemd service", fg="red", bold=True) raise @hg.command( "update-permissions", help="Fix ownership & permissions of hyperglass project directory", ) @click.option("--user", default="www-data") @click.option("--group", default="www-data") def fixpermissions(user, group): """Effectively runs `chmod` and `chown` on the hyperglass/hyperglass directory""" try: import hyperglass except ImportError as error_exception: click.secho(f"Error importing hyperglass:\n{error_exception}") hyperglass_root = os.path.dirname(hyperglass.__file__) uid = pwd.getpwnam(user).pw_uid gid = grp.getgrnam(group).gr_gid try: for root, dirs, files in os.walk(hyperglass_root): for d in dirs: full_path = os.path.join(root, d) os.chown(full_path, uid, gid) for f in files: full_path = os.path.join(root, f) os.chown(full_path, uid, gid) os.chown(root, uid, gid) click.secho( "✓ Successfully changed hyperglass/ ownership", fg="green", bold=True ) except: click.secho("✗ Failed to change hyperglass/ ownership", fg="red", bold=True) raise try: for root, dirs, files in os.walk(hyperglass_root): for d in dirs: full_path = os.path.join(root, d) os.chmod(full_path, 0o744) for f in files: full_path = os.path.join(root, f) os.chmod(full_path, 0o744) os.chmod(root, 0o744) click.secho( "✓ Successfully changed hyperglass/ permissions", fg="green", bold=True ) except: click.secho("✗ Failed to change hyperglass/ permissions", fg="red", bold=True) raise if __name__ == "__main__": hg()