1
0
mirror of https://github.com/checktheroads/hyperglass synced 2024-05-11 05:55:08 +00:00

split CLI into separate module

This commit is contained in:
checktheroads
2020-01-21 01:08:28 -07:00
parent ddb42b02a4
commit e84936b53a
9 changed files with 631 additions and 890 deletions

13
cli/__init__.py Normal file
View File

@@ -0,0 +1,13 @@
"""hyperglass cli module."""
import stackprinter
# Project Imports
from cli import commands
from cli import echo # noqa: F401
from cli import formatting # noqa: F401
from cli import static # noqa: F401
from cli import util # noqa: F401
stackprinter.set_excepthook(style="darkbg2")
CLI = commands.hg

154
cli/commands.py Normal file
View File

@@ -0,0 +1,154 @@
#!/usr/bin/env python3
"""CLI Command definitions."""
# Standard Library Imports
from pathlib import Path
# Third Party Imports
import click
# Project Imports
from cli.echo import cmd_help
from cli.echo import error
from cli.echo import status
from cli.echo import success_info
from cli.echo import value
from cli.formatting import HelpColorsCommand
from cli.formatting import HelpColorsGroup
from cli.formatting import random_colors
from cli.static import CLI_HELP
from cli.static import LABEL
from cli.static import E
from cli.util import fix_ownership
from cli.util import fix_permissions
from cli.util import migrate_config
from cli.util import migrate_systemd
from cli.util import start_web_server
# Define working directory
WORKING_DIR = Path(__file__).parent
@click.group(
cls=HelpColorsGroup,
help=CLI_HELP,
help_headers_color=LABEL,
help_options_custom_colors=random_colors(
"build-ui", "start", "migrate-examples", "systemd", "permissions", "secret"
),
)
def hg():
"""Initialize Click Command Group."""
pass
@hg.command("build-ui", short_help=cmd_help(E.BUTTERFLY, "Create a new UI build"))
def build_ui():
"""Create a new UI build.
Raises:
click.ClickException: Raised on any errors.
"""
try:
import asyncio
from hyperglass.util import build_ui
except ImportError as e:
error("Error importing UI builder", e)
status("Starting new UI build...")
try:
success = asyncio.run(build_ui())
success_info("Completed build, ran", success)
except Exception as e:
error("Error building UI", e)
@hg.command(
"start",
help=cmd_help(E.ROCKET, "Start web server"),
cls=HelpColorsCommand,
help_options_custom_colors=random_colors("-b"),
)
@click.option(
"-b", "--build", is_flag=True, help="Render theme & build frontend assets"
)
def start(build):
"""Start web server and optionally build frontend assets."""
try:
from hyperglass.api import start, ASGI_PARAMS
except ImportError as e:
error("Error importing hyperglass", e)
if build:
build_complete = build_ui()
if build_complete:
start_web_server(start, ASGI_PARAMS)
if not build:
start_web_server(start, ASGI_PARAMS)
@hg.command(
"migrate-examples",
short_help=cmd_help(E.PAPERCLIP, "Copy example configs to production config files"),
help=cmd_help(E.PAPERCLIP, "Copy example configs to production config files"),
cls=HelpColorsCommand,
help_options_custom_colors=random_colors(),
)
def migrateconfig():
"""Copy example configuration files to usable config files."""
migrate_config(WORKING_DIR / "hyperglas/configuration/")
@hg.command(
"systemd",
help=cmd_help(E.CLAMP, "Copy systemd example to file system"),
cls=HelpColorsCommand,
help_options_custom_colors=random_colors("-d"),
)
@click.option(
"-d",
"--directory",
default="/etc/systemd/system",
help="Destination Directory [default: 'etc/systemd/system']",
)
def migratesystemd(directory):
"""Copy example systemd service file to /etc/systemd/system/."""
migrate_systemd(WORKING_DIR / "hyperglass/hyperglass.service.example", directory)
@hg.command(
"permissions",
help=cmd_help(E.KEY, "Fix ownership & permissions of 'hyperglass/'"),
cls=HelpColorsCommand,
help_options_custom_colors=random_colors("--user", "--group"),
)
@click.option("--user", default="www-data")
@click.option("--group", default="www-data")
def permissions(user, group):
"""Run `chmod` and `chown` on the hyperglass/hyperglass directory."""
fix_permissions(user, group, WORKING_DIR)
fix_ownership(WORKING_DIR)
@hg.command(
"secret",
help=cmd_help(E.LOCK, "Generate agent secret"),
cls=HelpColorsCommand,
help_options_custom_colors=random_colors("-l"),
)
@click.option(
"-l", "--length", "length", default=32, help="Number of characters [default: 32]"
)
def generate_secret(length):
"""Generate secret for hyperglass-agent.
Arguments:
length {int} -- Length of secret
"""
import secrets
gen_secret = secrets.token_urlsafe(length)
value("Secret", gen_secret)

71
cli/echo.py Normal file
View File

@@ -0,0 +1,71 @@
"""Helper functions for CLI message printing."""
# Third Party Imports
import click
# Project Imports
from cli.static import CL
from cli.static import CMD_HELP
from cli.static import ERROR
from cli.static import INFO
from cli.static import LABEL
from cli.static import NL
from cli.static import STATUS
from cli.static import SUCCESS
from cli.static import VALUE
from cli.static import WS
from cli.static import E
def cmd_help(emoji="", help_text=""):
"""Print formatted command help."""
return emoji + click.style(help_text, **CMD_HELP)
def success(msg):
"""Print formatted success messages."""
click.echo(E.CHECK + click.style(str(msg), **SUCCESS))
def success_info(label, msg):
"""Print formatted labeled success messages."""
click.echo(
E.CHECK
+ click.style(str(label), **SUCCESS)
+ CL[1]
+ WS[1]
+ click.style(str(msg), **INFO)
)
def info(msg):
"""Print formatted informational messages."""
click.echo(E.INFO + click.style(str(msg), **INFO))
def status(msg):
"""Print formatted status messages."""
click.echo(click.style(str(msg), **STATUS))
def error(msg, exc):
"""Raise click exception with formatted output."""
raise click.ClickException(
NL
+ E.ERROR
+ click.style(str(msg), **LABEL)
+ CL[1]
+ WS[1]
+ click.style(str(exc), **ERROR)
) from None
def value(label, msg):
"""Print formatted label: value."""
click.echo(
NL[1]
+ click.style(str(label), **LABEL)
+ CL[1]
+ WS[1]
+ click.style(str(msg), **VALUE)
+ NL[1]
)

178
cli/formatting.py Normal file
View File

@@ -0,0 +1,178 @@
"""Help formatting.
https://github.com/click-contrib/click-help-colors
MIT License
Copyright (c) 2016 Roman Tonkonozhko
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
# Standard Library Imports
import random
# Third Party Imports
import click
def random_colors(*commands):
"""From tuple of commands, generate random but unique colors."""
colors = ["blue", "green", "red", "yellow", "magenta", "cyan", "white"]
num_colors = len(colors)
num_commands = len(commands)
if num_commands >= num_colors:
colors += colors
unique_colors = random.sample(colors, num_commands)
commands_fmt = {}
for i, cmd in enumerate(commands):
commands_fmt.update({cmd: {"fg": unique_colors[i], "bold": True}})
commands_fmt.update({"--help": {"fg": "white"}})
return commands_fmt
class HelpColorsFormatter(click.HelpFormatter):
"""Click help formatting plugin. See file docstring for license.
Modified from original copy to support click.style() instead of
direct ANSII string formatting.
"""
def __init__(
self,
headers_color=None,
options_color=None,
options_custom_colors=None,
*args,
**kwargs
):
"""Initialize help formatter.
Keyword Arguments:
headers_color {dict} -- click.style() paramters for header
options_color {dict} -- click.style() paramters for options
options_custom_colors {dict} -- click.style() paramters for options by name
"""
self.headers_color = headers_color or {}
self.options_color = options_color or {}
self.options_custom_colors = options_custom_colors or {}
super().__init__(indent_increment=3, *args, **kwargs)
def _pick_color(self, option_name):
"""Filter options and pass relevant click.style() options for command."""
opt = option_name.split()[0].strip(",")
color = {}
if self.options_custom_colors and opt in self.options_custom_colors.keys():
color = self.options_custom_colors[opt]
else:
color = self.options_color
return color
def write_usage(self, prog, args="", prefix="Usage: "):
"""Write Usage: section."""
prefix_fmt = click.style(prefix, **self.headers_color)
super().write_usage(prog, args, prefix=prefix_fmt)
def write_heading(self, heading):
"""Write Heading section."""
heading_fmt = click.style(heading, **self.headers_color)
super().write_heading(heading_fmt)
def write_dl(self, rows, **kwargs):
"""Write Options section."""
colorized_rows = [
(click.style(row[0], **self._pick_color(row[0])), row[1]) for row in rows
]
super().write_dl(colorized_rows, **kwargs)
class HelpColorsMixin:
"""Click help formatting plugin. See file docstring for license.
Modified from original copy to support click.style() instead of
direct ANSII string formatting.
"""
def __init__(
self,
help_headers_color=None,
help_options_color=None,
help_options_custom_colors=None,
*args,
**kwargs
):
"""Initialize help mixin."""
self.help_headers_color = help_headers_color or {}
self.help_options_color = help_options_color or {}
self.help_options_custom_colors = help_options_custom_colors or {}
super().__init__(*args, **kwargs)
def get_help(self, ctx):
"""Format help."""
formatter = HelpColorsFormatter(
width=ctx.terminal_width,
max_width=ctx.max_content_width,
headers_color=self.help_headers_color,
options_color=self.help_options_color,
options_custom_colors=self.help_options_custom_colors,
)
self.format_help(ctx, formatter)
return formatter.getvalue().rstrip("\n")
class HelpColorsGroup(HelpColorsMixin, click.Group):
"""Click help formatting plugin. See file docstring for license.
Modified from original copy to support click.style() instead of
direct ANSII string formatting.
"""
def __init__(self, *args, **kwargs):
"""Initialize group formatter."""
super().__init__(*args, **kwargs)
def command(self, *args, **kwargs):
"""Set command values."""
kwargs.setdefault("cls", HelpColorsCommand)
kwargs.setdefault("help_headers_color", self.help_headers_color)
kwargs.setdefault("help_options_color", self.help_options_color)
kwargs.setdefault("help_options_custom_colors", self.help_options_custom_colors)
return super().command(*args, **kwargs)
def group(self, *args, **kwargs):
"""Set group values."""
kwargs.setdefault("cls", HelpColorsGroup)
kwargs.setdefault("help_headers_color", self.help_headers_color)
kwargs.setdefault("help_options_color", self.help_options_color)
kwargs.setdefault("help_options_custom_colors", self.help_options_custom_colors)
return super().group(*args, **kwargs)
class HelpColorsCommand(HelpColorsMixin, click.Command):
"""Click help formatting plugin. See file docstring for license.
Modified from original copy to support click.style() instead of
direct ANSII string formatting.
"""
def __init__(self, *args, **kwargs):
"""Initialize command formatter."""
super().__init__(*args, **kwargs)

63
cli/static.py Normal file
View File

@@ -0,0 +1,63 @@
"""Static string definitions."""
# Third Party Imports
import click
class Char:
"""Helper class for single-character strings."""
def __init__(self, char):
"""Set instance character."""
self.char = char
def __getitem__(self, i):
"""Subscription returns the instance's character * n."""
return self.char * i
def __str__(self):
"""Stringify the instance character."""
return str(self.char)
def __repr__(self):
"""Stringify the instance character for representation."""
return str(self.char)
def __add__(self, other):
"""Addition method for string concatenation."""
return str(self.char) + str(other)
class Emoji:
"""Helper class for unicode emoji."""
BUTTERFLY = "\U0001F98B" + " "
CHECK = "\U00002705" + " "
INFO = "\U00002755"
ERROR = "\U0000274C" + " "
ROCKET = "\U0001F680" + " "
SPARKLES = "\U00002728" + " "
PAPERCLIP = "\U0001F4CE" + " "
KEY = "\U0001F511"
LOCK = "\U0001F512"
CLAMP = "\U0001F5DC" + " "
WS = Char(" ")
NL = Char("\n")
CL = Char(":")
E = Emoji()
CLI_HELP = (
click.style("hyperglass", fg="magenta", bold=True)
+ WS[1]
+ click.style("CLI Management Tool", fg="white")
)
# Click Style Helpers
SUCCESS = {"fg": "green", "bold": True}
ERROR = {"fg": "red", "bold": True}
LABEL = {"fg": "white"}
INFO = {"fg": "blue", "bold": True}
STATUS = {"fg": "black"}
VALUE = {"fg": "magenta", "bold": True}
CMD_HELP = {"fg": "white"}

147
cli/util.py Normal file
View File

@@ -0,0 +1,147 @@
"""CLI utility functions."""
# Third Party Imports
import click
# Project Imports
from cli.echo import error
from cli.echo import info
from cli.echo import status
from cli.echo import success
from cli.static import CL
from cli.static import NL
from cli.static import WS
from cli.static import E
def async_command(func):
"""Decororator for to make async functions runable from syncronous code."""
import asyncio
from functools import update_wrapper
func = asyncio.coroutine(func)
def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
return loop.run_until_complete(func(*args, **kwargs))
return update_wrapper(wrapper, func)
def fix_ownership(user, group, directory):
"""Make user & group the owner of the directory."""
import grp
import pwd
import os
uid = pwd.getpwnam(user).pw_uid
gid = grp.getgrnam(group).gr_gid
try:
for root, dirs, files in os.walk(directory):
for d in dirs:
full_path = os.path.join(root, d)
os.chown(full_path, uid, gid)
for f in files:
full_path = os.path.join(root, f)
os.chown(full_path, uid, gid)
os.chown(root, uid, gid)
except Exception as e:
error("Failed to change 'hyperglass/' ownership", e)
success("Successfully changed 'hyperglass/' ownership")
def fix_permissions(directory):
"""Make directory readable by public."""
import os
try:
for root, dirs, files in os.walk(directory):
for d in dirs:
full_path = os.path.join(root, d)
os.chmod(full_path, 0o744)
for f in files:
full_path = os.path.join(root, f)
os.chmod(full_path, 0o744)
os.chmod(root, 0o744)
except Exception as e:
error("Failed to change 'hyperglass/' permissions", e)
success("Successfully changed 'hyperglass/' permissions")
def start_web_server(start, params):
"""Start web server."""
msg_start = "Starting hyperglass web server on"
msg_uri = "http://"
msg_host = str(params["host"])
msg_port = str(params["port"])
msg_len = len("".join([msg_start, WS[1], msg_uri, msg_host, CL[1], msg_port]))
try:
click.echo(
NL[1]
+ WS[msg_len + 8]
+ E.ROCKET
+ NL[1]
+ E.CHECK
+ click.style(msg_start, fg="green", bold=True)
+ WS[1]
+ click.style(msg_uri, fg="white")
+ click.style(msg_host, fg="blue", bold=True)
+ click.style(CL[1], fg="white")
+ click.style(msg_port, fg="magenta", bold=True)
+ WS[1]
+ E.ROCKET
+ NL[1]
+ WS[1]
+ NL[1]
)
start()
except Exception as e:
error("Failed to start test server", e)
def migrate_config(config_dir):
"""Copy example config files and remove .example extensions."""
status("Migrating example config files...")
import os
import glob
import shutil
examples = glob.iglob(os.path.join(config_dir, "*.example"))
for file in examples:
basefile, extension = os.path.splitext(file)
try:
if os.path.exists(basefile):
info(f"{basefile} already exists")
else:
shutil.copyfile(file, basefile)
success(f"Migrated {basefile}")
except Exception as e:
error(f"Failed to migrate {basefile}", e)
success("Successfully migrated example config files")
def migrate_systemd(source, destination):
"""Copy example systemd service file to /etc/systemd/system/."""
import os
import shutil
basefile, extension = os.path.splitext(source)
newfile = os.path.join(destination, basefile)
try:
status("Migrating example systemd service...")
if os.path.exists(newfile):
info(f"{newfile} already exists")
else:
shutil.copyfile(source, newfile)
except Exception as e:
error("Error migrating example systemd service", e)
success(f"Successfully migrated systemd service to: {newfile}")

853
manage.py
View File

@@ -1,853 +1,8 @@
#!/usr/bin/env python3
# flake8: noqa
# Standard Library Imports
# Standard Imports
import asyncio
import glob
import grp
import json
import os
import pwd
import random
import shutil
import string
import sys
from functools import update_wrapper
from pathlib import Path
# Third Party Imports
# Module Imports
import click
import requests
import stackprinter
stackprinter.set_excepthook(style="darkbg2")
# Initialize shutil copy function
cp = shutil.copyfile
# Define working directory
working_directory = os.path.dirname(os.path.abspath(__file__))
# Helpers
NL = "\n"
WS1 = " "
WS2 = " "
WS4 = " "
WS6 = " "
WS8 = " "
CL = ":"
E_CHECK = "\U00002705"
E_ERROR = "\U0000274C"
E_ROCKET = "\U0001F680"
E_SPARKLES = "\U00002728"
def async_command(func):
func = asyncio.coroutine(func)
def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
return loop.run_until_complete(func(*args, **kwargs))
return update_wrapper(wrapper, func)
def construct_test(test_query, location, test_target):
"""Constructs JSON POST data for test_hyperglass function"""
constructed_query = json.dumps(
{"type": test_query, "location": location, "target": test_target}
)
return constructed_query
@click.group()
def hg():
pass
@hg.command("pylint-check", help="Runs Pylint and generates a badge for GitHub")
@click.option(
"-m", "--number", "num_only", is_flag=True, help="Output Pylint score as integer"
)
@click.option("-b", "--badge", "create_badge", is_flag=True, help="Create Pylint badge")
@click.option(
"-e", "--print-errors", "errors", is_flag=True, help="Print pylint errors"
)
def pylint_check(num_only, create_badge, errors):
try:
import re
import anybadge
from pylint import epylint
pylint_ver = epylint.py_run("hyperglass --version", return_std=True)[
0
].getvalue()
click.echo("Current directory: " + str(Path.cwd().resolve()))
click.echo("Pylint Version: " + pylint_ver)
pylint_stdout, pylint_stderr = epylint.py_run(
"hyperglass --verbose --rcfile=.pylintrc", return_std=True
)
pylint_output = pylint_stdout.getvalue()
pylint_error = pylint_stderr.getvalue()
pylint_score = re.search(
r"Your code has been rated at (\d+\.\d+)\/10.*", pylint_output
).group(1)
if num_only:
click.echo(pylint_score)
if errors:
click.echo(pylint_error)
click.echo(pylint_output)
if not pylint_score == "10.00":
raise RuntimeError(f"Pylint score {pylint_score} not acceptable.")
if create_badge:
badge_file = os.path.join(working_directory, "pylint.svg")
if os.path.exists(badge_file):
os.remove(badge_file)
ab_thresholds = {1: "red", 10: "green"}
badge = anybadge.Badge("pylint", pylint_score, thresholds=ab_thresholds)
badge.write_badge("pylint.svg")
click.echo(
click.style("Created Pylint badge for score: ", fg="white")
+ click.style(pylint_score, fg="blue", bold=True)
)
except ImportError as error_exception:
click.secho(f"Import error:\n{error_exception}", fg="red", bold=True)
@hg.command("pre-check", help="Check hyperglass config & readiness")
def pre_check():
if sys.version_info < (3, 6):
click.secho(
f"Hyperglass requires Python 3.6 or higher. Curren version: Python {sys.version.split()[0]}",
fg="red",
bold=True,
)
if sys.version_info >= (3, 6):
click.secho(
f"✓ Python Version Check passed (Current version: Python {sys.version.split()[0]})",
fg="green",
bold=True,
)
try:
from hyperglass import configuration
config = configuration.params()
status = True
while status:
if config["general"]["primary_asn"] == "65000" or "":
status = False
reason = f'Primary ASN is not defined (Current: "{config["general"]["primary_asn"]}")'
remediation = f"""
To define the Primary ASN paramter, modify your `configuration.toml` and add the following \
configuration:\n
[general]
primary_asn = "<Your Primary AS Number>"
\nIf you do not define a Primary ASN, \"{config["general"]["primary_asn"]}\" will be used."""
break
click.secho(reason, fg="red", bold=True)
click.secho(remediation, fg="blue")
if config["general"]["org_name"] == "The Company" or "":
status = False
reason = f'Org Name is not defined (Current: "{config["general"]["org_name"]}")'
remediation = f"""
To define an Org Name paramter, modify your `configuration.toml` and add the following \
configuration:\n
[general]
org_name = "<Your Org Name>"
\nIf you do not define an Org Name, \"{config["general"]["org_name"]}\" will be displayed."""
break
click.secho(reason, fg="red", bold=True)
click.secho(remediation, fg="blue")
click.secho(
"✓ All critical hyperglass parameters are defined!",
fg="green",
bold=True,
)
break
except Exception as e:
click.secho(f"Exception occurred:\n{e}", fg="red")
@hg.command("test", help="Full test of all backend features")
@click.option("-l", "--location", type=str, required=True, help="Location to query")
@click.option(
"-4",
"--target-ipv4",
"target_ipv4",
type=str,
default="1.1.1.0/24",
required=False,
show_default=True,
help="IPv4 Target Address",
)
@click.option(
"-6",
"--target-ipv6",
"target_ipv6",
type=str,
default="2606:4700:4700::/48",
required=False,
show_default=True,
help="IPv6 Target Address",
)
@click.option(
"-c",
"--community",
"test_community",
type=str,
required=False,
show_default=True,
default="65000:1",
help="BGP Community",
)
@click.option(
"-a",
"--aspath",
"test_aspath",
type=str,
required=False,
show_default=True,
default="^65001$",
help="BGP AS Path",
)
@click.option(
"-r",
"--requires-ipv6-cidr",
"requires_ipv6_cidr",
type=str,
required=False,
help="Location for testing IPv6 CIDR requirement",
)
@click.option(
"-b",
"--blacklist",
"test_blacklist",
type=str,
default="100.64.0.1",
required=False,
show_default=True,
help="Address to use for blacklist check",
)
@click.option(
"-h",
"--host",
"test_host",
type=str,
default="localhost",
required=False,
show_default=True,
help="Name or IP address of hyperglass server",
)
@click.option(
"-p",
"--port",
"test_port",
type=int,
default=5000,
required=False,
show_default=True,
help="Port hyperglass is running on",
)
def test_hyperglass(
location,
target_ipv4,
target_ipv6,
requires_ipv6_cidr,
test_blacklist,
test_community,
test_aspath,
test_host,
test_port,
):
"""
Fully tests hyperglass backend by making use of requests library to
mimic the JS Ajax POST performed by the front end.
"""
test_target = None
invalid_ip = "this_ain't_an_ip!"
invalid_community = "192.0.2.1"
invalid_aspath = ".*"
ipv4_host = "1.1.1.1"
ipv4_cidr = "1.1.1.0/24"
ipv6_host = "2606:4700:4700::1111"
ipv6_cidr = "2606:4700:4700::/48"
test_headers = {"Content-Type": "application/json"}
test_endpoint = f"http://{test_host}:{test_port}/lg"
# No Query Type Test
try:
click.secho("Starting No Query Type test...", fg="black")
test_query = construct_test("", location, target_ipv4)
hg_response = requests.post(
test_endpoint, headers=test_headers, data=test_query
)
if hg_response.status_code in range(400, 500):
click.secho("✓ No Query Type test passed", fg="green", bold=True)
if not hg_response.status_code in range(400, 500):
click.secho("✗ No Query Type test failed", fg="red", bold=True)
click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True)
click.secho(hg_response.text, fg="red")
except Exception as e:
click.secho(f"Exception occurred:\n{e}")
# No Location Test
try:
click.secho("Starting No Location test...", fg="black")
test_query = construct_test("bgp_route", "", target_ipv6)
hg_response = requests.post(
test_endpoint, headers=test_headers, data=test_query
)
if hg_response.status_code in range(400, 500):
click.secho("✓ No Location test passed", fg="green", bold=True)
if not hg_response.status_code in range(400, 500):
click.secho("✗ No Location test failed", fg="red", bold=True)
click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True)
click.secho(hg_response.text, fg="red")
except Exception as e:
click.secho(f"Exception occurred:\n{e}")
# No Target Test
try:
click.secho("Starting No Target test...", fg="black")
test_query = construct_test("bgp_route", location, "")
hg_response = requests.post(
test_endpoint, headers=test_headers, data=test_query
)
if hg_response.status_code in range(400, 500):
click.secho("✓ No Target test passed", fg="green", bold=True)
if not hg_response.status_code in range(400, 500):
click.secho("✗ No Target test failed", fg="red", bold=True)
click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True)
click.secho(hg_response.text, fg="red")
except Exception as e:
click.secho(f"Exception occurred:\n{e}")
# Valid BGP IPv4 Route Test
try:
click.secho("Starting Valid BGP IPv4 Route test...", fg="black")
test_query = construct_test("bgp_route", location, target_ipv4)
hg_response = requests.post(
test_endpoint, headers=test_headers, data=test_query
)
if hg_response.status_code == 200:
click.secho("✓ Valid BGP IPv4 Route test passed", fg="green", bold=True)
if not hg_response.status_code == 200:
click.secho("✗ Valid BGP IPv4 Route test failed", fg="red", bold=True)
click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True)
click.secho(hg_response.text, fg="red")
except Exception as e:
click.secho(f"Exception occurred:\n{e}")
# Valid BGP IPv6 Route Test
try:
click.secho("Starting Valid BGP IPv6 Route test...", fg="black")
test_query = construct_test("bgp_route", location, target_ipv6)
hg_response = requests.post(
test_endpoint, headers=test_headers, data=test_query
)
if hg_response.status_code == 200:
click.secho("✓ Valid BGP IPv6 Route test passed", fg="green", bold=True)
if not hg_response.status_code == 200:
click.secho("✗ Valid BGP IPv6 Route test failed", fg="red", bold=True)
click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True)
click.secho(hg_response.text, fg="red")
except Exception as e:
click.secho(f"Exception occurred:\n{e}")
# Invalid BGP Route Test
try:
click.secho("Starting Invalid BGP IPv4 Route test...", fg="black")
test_query = construct_test("bgp_route", location, invalid_ip)
hg_response = requests.post(
test_endpoint, headers=test_headers, data=test_query
)
if hg_response.status_code in range(400, 500):
click.secho("✓ Invalid BGP IPv4 Route test passed", fg="green", bold=True)
if not hg_response.status_code in range(400, 500):
click.secho("✗ Invalid BGP IPv4 Route test failed", fg="red", bold=True)
click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True)
click.secho(hg_response.text, fg="red")
except Exception as e:
click.secho(f"Exception occurred:\n{e}")
# Requires IPv6 CIDR Test
if requires_ipv6_cidr:
try:
click.secho("Starting Requires IPv6 CIDR test...", fg="black")
test_query = construct_test("bgp_route", requires_ipv6_cidr, ipv6_host)
hg_response = requests.post(
test_endpoint, headers=test_headers, data=test_query
)
if hg_response.status_code in range(400, 500):
click.secho("✓ Requires IPv6 CIDR test passed", fg="green", bold=True)
if not hg_response.status_code in range(400, 500):
click.secho("✗ Requires IPv6 CIDR test failed", fg="red", bold=True)
click.secho(
f"Status Code: {hg_response.status_code}", fg="red", bold=True
)
click.secho(hg_response.text, fg="red")
except Exception as e:
click.secho(f"Exception occurred:\n{e}")
# Valid BGP Community Test
try:
click.secho("Starting Valid BGP Community test...", fg="black")
test_query = construct_test("bgp_community", location, test_community)
hg_response = requests.post(
test_endpoint, headers=test_headers, data=test_query
)
if hg_response.status_code == 200:
click.secho("✓ Valid BGP Community test passed", fg="green", bold=True)
if not hg_response.status_code == 200:
click.secho("✗ Valid BGP Community test failed", fg="red", bold=True)
click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True)
click.secho(hg_response.text, fg="red")
except Exception as e:
click.secho(f"Exception occurred:\n{e}")
# Invalid BGP Community Test
try:
click.secho("Starting Invalid BGP Community test...", fg="black")
test_query = construct_test("bgp_community", location, target_ipv4)
hg_response = requests.post(
test_endpoint, headers=test_headers, data=test_query
)
if hg_response.status_code in range(400, 500):
click.secho("✓ Invalid BGP Community test passed", fg="green", bold=True)
if not hg_response.status_code in range(400, 500):
click.secho("✗ Invalid BGP Community test failed", fg="red", bold=True)
click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True)
click.secho(hg_response.text, fg="red")
except Exception as e:
click.secho(f"Exception occurred:\n{e}")
# Valid BGP AS_PATH Test
try:
click.secho("Starting Valid BGP AS_PATH test...", fg="black")
test_query = construct_test("bgp_aspath", location, test_aspath)
hg_response = requests.post(
test_endpoint, headers=test_headers, data=test_query
)
if hg_response.status_code == 200:
click.secho("✓ Valid BGP AS_PATH test passed", fg="green", bold=True)
if not hg_response.status_code == 200:
click.secho("✗ Valid BGP AS_PATH test failed", fg="red", bold=True)
click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True)
click.secho(hg_response.text, fg="red")
except Exception as e:
click.secho(f"Exception occurred:\n{e}")
# Invalid BGP AS_PATH Test
try:
click.secho("Starting invalid BGP AS_PATH test...", fg="black")
test_query = construct_test("bgp_aspath", location, invalid_aspath)
hg_response = requests.post(
test_endpoint, headers=test_headers, data=test_query
)
if hg_response.status_code in range(400, 500):
click.secho("✓ Invalid BGP AS_PATH test passed", fg="green", bold=True)
if not hg_response.status_code in range(400, 500):
click.secho("✗ Invalid BGP AS_PATH test failed", fg="red", bold=True)
click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True)
click.secho(hg_response.text, fg="red")
except Exception as e:
click.secho(f"Exception occurred:\n{e}")
# Valid IPv4 Ping Test
try:
click.secho("Starting Valid IPv4 Ping test...", fg="black")
test_query = construct_test("ping", location, ipv4_host)
hg_response = requests.post(
test_endpoint, headers=test_headers, data=test_query
)
if hg_response.status_code == 200:
click.secho("✓ Valid IPv4 Ping test passed", fg="green", bold=True)
if not hg_response.status_code == 200:
click.secho("✗ Valid IPv4 Ping test failed", fg="red", bold=True)
click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True)
click.secho(hg_response.text, fg="red")
except Exception as e:
click.secho(f"Exception occurred:\n{e}")
# Valid IPv6 Ping Test
try:
click.secho("Starting Valid IPv6 Ping test...", fg="black")
test_query = construct_test("ping", location, ipv6_host)
hg_response = requests.post(
test_endpoint, headers=test_headers, data=test_query
)
if hg_response.status_code == 200:
click.secho("✓ Valid IPv6 Ping test passed", fg="green", bold=True)
if not hg_response.status_code == 200:
click.secho("✗ Valid IPv6 Ping test failed", fg="red", bold=True)
click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True)
click.secho(hg_response.text, fg="red")
except Exception as e:
click.secho(f"Exception occurred:\n{e}")
# Invalid IPv4 Ping Test
try:
click.secho("Starting Invalid IPv4 Ping test...", fg="black")
test_query = construct_test("ping", location, ipv4_cidr)
hg_response = requests.post(
test_endpoint, headers=test_headers, data=test_query
)
if hg_response.status_code in range(400, 500):
click.secho("✓ Invalid IPv4 Ping test passed", fg="green", bold=True)
if not hg_response.status_code in range(400, 500):
click.secho("✗ Invalid IPv4 Ping test failed", fg="red", bold=True)
click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True)
click.secho(hg_response.text, fg="red")
except Exception as e:
click.secho(f"Exception occurred:\n{e}")
# Invalid IPv6 Ping Test
try:
click.secho("Starting Invalid IPv6 Ping test...", fg="black")
test_query = construct_test("ping", location, ipv6_cidr)
hg_response = requests.post(
test_endpoint, headers=test_headers, data=test_query
)
if hg_response.status_code in range(400, 500):
click.secho("✓ Invalid IPv6 Ping test passed", fg="green", bold=True)
if not hg_response.status_code in range(400, 500):
click.secho("✗ Invalid IPv6 Ping test failed", fg="red", bold=True)
click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True)
click.secho(hg_response.text, fg="red")
except Exception as e:
click.secho(f"Exception occurred:\n{e}")
# Blacklist Test
try:
click.secho("Starting Blacklist test...", fg="black")
test_query = construct_test("bgp_route", location, test_blacklist)
hg_response = requests.post(
test_endpoint, headers=test_headers, data=test_query
)
if hg_response.status_code in range(400, 500):
click.secho("✓ Blacklist test passed", fg="green", bold=True)
if not hg_response.status_code in range(400, 500):
click.secho("✗ Blacklist test failed", fg="red", bold=True)
click.secho(f"Status Code: {hg_response.status_code}", fg="red", bold=True)
click.secho(hg_response.text, fg="red")
except Exception as e:
click.secho(f"Exception occurred:\n{e}")
@hg.command("clear-cache", help="Clear Redis cache")
@async_command
async def clearcache():
"""Clears the Flask-Caching cache"""
try:
from hyperglass.util import clear_redis_cache
from hyperglass.configuration import params
await clear_redis_cache(
params.features.cache.redis_id,
{"host": str(params.general.redis_host), "port": params.general.redis_port},
)
except (ImportError, RuntimeWarning):
raise click.ClickException(
NL
+ E_ERROR
+ WS1
+ click.style("Failed to clear cache:", fg="white")
+ WS1
+ click.style(str(e), fg="red", bold=True)
)
click.echo(
NL
+ E_CHECK
+ WS1
+ click.style("Successfully cleared cache.", fg="green", bold=True)
)
def start_dev_server(start, params):
"""Starts Sanic development server for testing without WSGI/Reverse Proxy"""
msg_start = "Starting hyperglass web server on"
msg_uri = "http://"
msg_host = str(params["host"])
msg_port = str(params["port"])
msg_len = len("".join([msg_start, WS1, msg_uri, msg_host, CL, msg_port]))
try:
click.echo(
NL
+ WS1 * msg_len
+ WS8
+ E_ROCKET
+ NL
+ E_CHECK
+ WS1
+ click.style(msg_start, fg="green", bold=True)
+ WS1
+ click.style(msg_uri, fg="white")
+ click.style(msg_host, fg="blue", bold=True)
+ click.style(CL, fg="white")
+ click.style(msg_port, fg="magenta", bold=True)
+ WS1
+ E_ROCKET
+ NL
+ WS1
+ NL
)
start()
except Exception as e:
raise click.ClickException(
E_ERROR
+ WS1
+ click.style("Failed to start test server: ", fg="red", bold=True)
+ click.style(str(e), fg="red")
) from None
def write_env_variables(variables):
from hyperglass.util import write_env
result = asyncio.run(write_env(variables))
return result
@hg.command("build-ui", help="Create a new UI build")
def build_ui():
"""Create a new UI build.
Raises:
click.ClickException: Raised on any errors.
"""
from hyperglass.util import build_ui
click.secho("Starting new UI build...", fg="white")
try:
success = asyncio.run(build_ui())
click.echo(
click.style("Completed build, ran", fg="green", bold=True)
+ WS1
+ click.style(success, fg="blue", bold=True)
)
except Exception as e:
raise click.ClickException(str(e)) from None
@hg.command("dev-server", help="Start development web server")
@click.option("-b", "--build", is_flag=True, help="Render Theme & Build Web Assets")
def dev_server(build):
"""Renders theme and web build, then starts dev web server"""
try:
from hyperglass.api import start, ASGI_PARAMS
except ImportError as import_error:
raise click.ClickException(
E_ERROR
+ WS1
+ click.style("Error importing hyperglass:", fg="red", bold=True)
+ WS1
+ click.style(str(import_error), fg="blue")
)
if build:
try:
build_complete = build_ui()
except Exception as e:
raise click.ClickException(
click.style("✗ Error building: ", fg="red", bold=True)
+ click.style(e, fg="white")
) from None
if build_complete:
start_dev_server(start, ASGI_PARAMS)
if not build:
start_dev_server(start, ASGI_PARAMS)
@hg.command("migrate-configs", help="Copy YAML examples to usable config files")
def migrateconfig():
"""Copies example configuration files to usable config files"""
try:
click.secho("Migrating example config files...", fg="black")
config_dir = os.path.join(working_directory, "hyperglass/configuration/")
examples = glob.iglob(os.path.join(config_dir, "*.example"))
for f in examples:
basefile, extension = os.path.splitext(f)
if os.path.exists(basefile):
click.secho(f"{basefile} already exists", fg="blue")
else:
try:
cp(f, basefile)
click.secho(f"✓ Migrated {basefile}", fg="green")
except:
click.secho(f"✗ Failed to migrate {basefile}", fg="red")
raise
click.secho(
"✓ Successfully migrated example config files", fg="green", bold=True
)
except:
click.secho("✗ Error migrating example config files", fg="red", bold=True)
raise
@hg.command("migrate-gunicorn", help="Copy Gunicorn example to usable config file")
def migrategunicorn():
"""Copies example Gunicorn config file to a usable config"""
try:
import hyperglass
except ImportError as error_exception:
click.secho(f"Error while importing hyperglass:\n{error_exception}", fg="red")
try:
click.secho("Migrating example Gunicorn configuration...", fg="black")
hyperglass_root = os.path.dirname(hyperglass.__file__)
ex_file = os.path.join(hyperglass_root, "gunicorn_config.py.example")
basefile, extension = os.path.splitext(ex_file)
newfile = basefile
if os.path.exists(newfile):
click.secho(f"{newfile} already exists", fg="blue")
else:
try:
cp(ex_file, newfile)
click.secho(
f"✓ Successfully migrated Gunicorn configuration to: {newfile}",
fg="green",
bold=True,
)
except:
click.secho(f"✗ Failed to migrate {newfile}", fg="red")
raise
except:
click.secho(
"✗ Error migrating example Gunicorn configuration", fg="red", bold=True
)
raise
@hg.command("migrate-systemd", help="Copy Systemd example to OS")
@click.option(
"-d", "--directory", default="/etc/systemd/system", help="Destination Directory"
)
def migratesystemd(directory):
"""Copies example systemd service file to /etc/systemd/system/"""
try:
click.secho("Migrating example systemd service...", fg="black")
ex_file_base = "hyperglass.service.example"
ex_file = os.path.join(working_directory, f"hyperglass/{ex_file_base}")
basefile, extension = os.path.splitext(ex_file_base)
newfile = os.path.join(directory, basefile)
if os.path.exists(newfile):
click.secho(f"{newfile} already exists", fg="blue")
else:
try:
cp(ex_file, newfile)
click.secho(
f"✓ Successfully migrated systemd service to: {newfile}",
fg="green",
bold=True,
)
except:
click.secho(f"✗ Failed to migrate {newfile}", fg="red")
raise
except:
click.secho("✗ Error migrating example systemd service", fg="red", bold=True)
raise
@hg.command(
"update-permissions",
help="Fix ownership & permissions of hyperglass project directory",
)
@click.option("--user", default="www-data")
@click.option("--group", default="www-data")
def fixpermissions(user, group):
"""Effectively runs `chmod` and `chown` on the hyperglass/hyperglass directory"""
try:
import hyperglass
except ImportError as error_exception:
click.secho(f"Error importing hyperglass:\n{error_exception}")
hyperglass_root = os.path.dirname(hyperglass.__file__)
uid = pwd.getpwnam(user).pw_uid
gid = grp.getgrnam(group).gr_gid
try:
for root, dirs, files in os.walk(hyperglass_root):
for d in dirs:
full_path = os.path.join(root, d)
os.chown(full_path, uid, gid)
for f in files:
full_path = os.path.join(root, f)
os.chown(full_path, uid, gid)
os.chown(root, uid, gid)
click.secho(
"✓ Successfully changed hyperglass/ ownership", fg="green", bold=True
)
except:
click.secho("✗ Failed to change hyperglass/ ownership", fg="red", bold=True)
raise
try:
for root, dirs, files in os.walk(hyperglass_root):
for d in dirs:
full_path = os.path.join(root, d)
os.chmod(full_path, 0o744)
for f in files:
full_path = os.path.join(root, f)
os.chmod(full_path, 0o744)
os.chmod(root, 0o744)
click.secho(
"✓ Successfully changed hyperglass/ permissions", fg="green", bold=True
)
except:
click.secho("✗ Failed to change hyperglass/ permissions", fg="red", bold=True)
raise
@hg.command("generate-secret", help="Generate agent secret")
@click.option("-l", "--length", default=32, help="Secret length")
def generate_secret(length):
import secrets
gen_secret = secrets.token_urlsafe(length)
click.echo(
NL
+ click.style("Secret: ", fg="white")
+ click.style(gen_secret, fg="magenta", bold=True)
+ NL
)
@hg.command("line-count", help="Get line count for source code.")
@click.option(
"-d", "--directory", type=str, default="hyperglass", help="Source code directory"
)
def line_count(directory):
"""Get lines of code.
Arguments:
directory {str} -- Source code directory
"""
from develop import count_lines
count = count_lines(directory)
click.echo(
NL
+ click.style("Line Count: ", fg="blue")
+ click.style(str(count), fg="green", bold=True)
+ NL
)
@hg.command("line-count-badge", help="Generates line count badge")
@click.option(
"-d", "--directory", type=str, default="hyperglass", help="Source code directory"
)
def line_count_badge(directory):
"""Generate shields.io-like badge for lines of code.
Arguments:
directory {str} -- Source code directory
Returns:
{int} -- Exit status
"""
import anybadge
from develop import count_lines
this_dir = Path.cwd()
file_name = "line_count.svg"
badge_file = this_dir / file_name
if badge_file.exists():
badge_file.unlink()
count = count_lines(directory)
badge = anybadge.Badge(label="Lines of Code", value=count, default_color="#007ec6")
badge.write_badge(file_name)
click.echo(
click.style("Created line count badge. Lines: ", fg="white")
+ click.style(str(count), fg="green", bold=True)
)
return 0
"""hyperglass CLI management tool."""
# Project Imports
from cli import CLI
if __name__ == "__main__":
hg()
CLI()

41
poetry.lock generated
View File

@@ -847,24 +847,6 @@ optional = false
python-versions = "*"
version = "2019.12.20"
[[package]]
category = "main"
description = "Python HTTP for Humans."
name = "requests"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*"
version = "2.22.0"
[package.dependencies]
certifi = ">=2017.4.17"
chardet = ">=3.0.2,<3.1.0"
idna = ">=2.5,<2.9"
urllib3 = ">=1.21.1,<1.25.0 || >1.25.0,<1.25.1 || >1.25.1,<1.26"
[package.extras]
security = ["pyOpenSSL (>=0.14)", "cryptography (>=1.3.4)", "idna (>=2.0.0)"]
socks = ["PySocks (>=1.5.6,<1.5.7 || >1.5.7)", "win-inet-pton"]
[[package]]
category = "main"
description = "Validating URI References per RFC 3986"
@@ -1015,19 +997,6 @@ optional = false
python-versions = "*"
version = "1.35"
[[package]]
category = "main"
description = "HTTP library with thread-safe connection pooling, file post, and more."
name = "urllib3"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, <4"
version = "1.25.7"
[package.extras]
brotli = ["brotlipy (>=0.6.0)"]
secure = ["pyOpenSSL (>=0.14)", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "certifi", "ipaddress"]
socks = ["PySocks (>=1.5.6,<1.5.7 || >1.5.7,<2.0)"]
[[package]]
category = "main"
description = "The lightning-fast ASGI server."
@@ -1100,7 +1069,7 @@ docs = ["sphinx", "jaraco.packaging (>=3.2)", "rst.linker (>=1.9)"]
testing = ["pathlib2", "contextlib2", "unittest2"]
[metadata]
content-hash = "8a4dcbd8ac789a86e1ef255ee8f802454197eb91d25dcbd33142a8a97b454778"
content-hash = "cb351fd182d6d1828981ce49a0be5d871a269c54cdffd7042c0b081c102591cc"
python-versions = "^3.7"
[metadata.files]
@@ -1581,10 +1550,6 @@ regex = [
{file = "regex-2019.12.20-cp38-cp38-win_amd64.whl", hash = "sha256:d3ee0b035816e0520fac928de31b6572106f0d75597f6fa3206969a02baba06f"},
{file = "regex-2019.12.20.tar.gz", hash = "sha256:106e25a841921d8259dcef2a42786caae35bc750fb996f830065b3dfaa67b77e"},
]
requests = [
{file = "requests-2.22.0-py2.py3-none-any.whl", hash = "sha256:9cf5292fcd0f598c671cfc1e0d7d1a7f13bb8085e9a590f48c010551dc6c4b31"},
{file = "requests-2.22.0.tar.gz", hash = "sha256:11e007a8a2aa0323f5a921e9e6a2d7e4e67d9877e85773fba9ba6419025cbeb4"},
]
rfc3986 = [
{file = "rfc3986-1.3.2-py2.py3-none-any.whl", hash = "sha256:df4eba676077cefb86450c8f60121b9ae04b94f65f85b69f3f731af0516b7b18"},
{file = "rfc3986-1.3.2.tar.gz", hash = "sha256:0344d0bd428126ce554e7ca2b61787b6a28d2bbd19fc70ed2dd85efe31176405"},
@@ -1660,10 +1625,6 @@ typed-ast = [
ujson = [
{file = "ujson-1.35.tar.gz", hash = "sha256:f66073e5506e91d204ab0c614a148d5aa938bdbf104751be66f8ad7a222f5f86"},
]
urllib3 = [
{file = "urllib3-1.25.7-py2.py3-none-any.whl", hash = "sha256:a8a318824cc77d1fd4b2bec2ded92646630d7fe8619497b142c84a9e6f5a7293"},
{file = "urllib3-1.25.7.tar.gz", hash = "sha256:f3c5fd51747d450d4dcf6f923c81f78f811aab8205fda64b0aba34a4e48b0745"},
]
uvicorn = [
{file = "uvicorn-0.11.1-py3-none-any.whl", hash = "sha256:d07129d98440ef69e4fd3aaebf16ab9b96cbcdffd813b9889bf8ec001351f4b8"},
{file = "uvicorn-0.11.1.tar.gz", hash = "sha256:68a13fedeb38260ce663a1d01d367e6809b09b2dedd2a973af5d73291e010e28"},

View File

@@ -26,7 +26,6 @@ PyJWT = "^1.7.1"
python = "^3.7"
PyYAML = "^5.2"
redis = "^3.3.11"
requests = "^2.22.0"
sshtunnel = "^0.1.5"
stackprinter = "^0.2.3"
ujson = "^1.35"