1
0
mirror of https://github.com/checktheroads/hyperglass synced 2024-05-11 05:55:08 +00:00

Implement input-plugin-based validation, implement common plugins

This commit is contained in:
thatmattlove
2021-09-26 11:39:46 -07:00
parent c2240d92c6
commit e73de24904
10 changed files with 103 additions and 113 deletions

View File

@@ -101,7 +101,7 @@ async def query(
# each command output value is unique.
cache_key = f"hyperglass.query.{query_data.digest()}"
log.info("Starting query execution for query {}", query_data.summary)
log.info("Starting query execution for {!r}", query)
cache_response = cache.get_map(cache_key, "output")
json_output = False

5
hyperglass/configuration/.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
.DS_Store
*.toml
*.yaml
*.test
configuration_old

View File

@@ -114,7 +114,7 @@ def init_ui_params(*, params: "Params", devices: "Devices") -> "UIParameters":
from hyperglass.constants import PARSED_RESPONSE_FIELDS, __version__
content_greeting = get_markdown(
config_path=params.web.greeting, default="", params={"title": params.web.greeting.title},
config=params.web.greeting, default="", params={"title": params.web.greeting.title},
)
content_credit = CREDIT.format(version=__version__)

View File

@@ -22,10 +22,6 @@ from .plugins import (
from .constants import MIN_NODE_VERSION, MIN_PYTHON_VERSION, __version__
from .util.frontend import get_node_version
if t.TYPE_CHECKING:
# Local
from .models.config.devices import Devices
# Ensure the Python version meets the minimum requirements.
pretty_version = ".".join(tuple(str(v) for v in MIN_PYTHON_VERSION))
if sys.version_info < MIN_PYTHON_VERSION:
@@ -59,19 +55,28 @@ async def build_ui() -> bool:
return True
def register_all_plugins(devices: "Devices") -> None:
def register_all_plugins() -> None:
"""Validate and register configured plugins."""
state = use_state()
# Register built-in plugins.
init_builtin_plugins()
# Register external plugins.
for plugin_file, directives in devices.directive_plugins().items():
failures = register_plugin(plugin_file, directives=directives)
for failure in failures:
log.warning(
"Plugin '{}' is not a valid hyperglass plugin, and was not registered", failure,
)
failures = ()
# Register external directive-based plugins (defined in directives).
for plugin_file, directives in state.devices.directive_plugins().items():
failures += register_plugin(plugin_file, directives=directives)
# Register external global/common plugins (defined in config).
for plugin_file in state.params.common_plugins():
failures += register_plugin(plugin_file, common=True)
for failure in failures:
log.warning(
"Plugin {!r} is not a valid hyperglass plugin and was not registered", failure,
)
def unregister_all_plugins() -> None:
@@ -87,9 +92,7 @@ def on_starting(server: "Arbiter") -> None:
required = ".".join((str(v) for v in MIN_PYTHON_VERSION))
log.debug("Python {} detected ({} required)", python_version, required)
state = use_state()
register_all_plugins(state.devices)
register_all_plugins()
asyncio.run(build_ui())

View File

@@ -12,6 +12,7 @@ from pydantic import BaseModel, StrictStr, constr, validator
from hyperglass.log import log
from hyperglass.util import snake_to_camel, repr_from_attrs
from hyperglass.state import use_state
from hyperglass.plugins import InputPluginManager
from hyperglass.exceptions.public import (
InputInvalid,
QueryTypeNotFound,
@@ -91,7 +92,11 @@ class Query(BaseModel):
def validate_query_target(self):
"""Validate a query target after all fields/relationships havebeen initialized."""
# Run config/rule-based validations.
self.directive.validate_target(self.query_target)
# Run plugin-based validations.
manager = InputPluginManager()
manager.execute(query=self)
log.debug("Validation passed for query {!r}", self)
@property

View File

@@ -1,11 +1,14 @@
"""Configuration validation entry point."""
# Standard Library
from typing import Any, Dict, List, Union, Literal, Optional
from ipaddress import ip_address
from typing import Any, Dict, List, Tuple, Union, Literal, Optional
from pathlib import Path
# Third Party
from pydantic import Field, StrictInt, StrictStr, StrictBool, IPvAnyAddress, validator
from pydantic import Field, StrictInt, StrictStr, StrictBool, validator
# Project
from hyperglass.settings import Settings
# Local
from .web import Web
@@ -24,16 +27,6 @@ Localhost = Literal["localhost"]
class ParamsPublic(HyperglassModel):
"""Public configuration parameters."""
debug: StrictBool = Field(
False,
title="Debug",
description="Enable debug mode. Warning: this will generate a *lot* of log output.",
)
developer_mode: StrictBool = Field(
False,
title="Developer Mode",
description='Enable developer mode. If enabled, the hyperglass backend (Python) and frontend (React/Javascript) applications are "unlinked", so that React tools can be used for front end development. A `<Debugger />` convenience component is also displayed in the UI for easier UI development.',
)
request_timeout: StrictInt = Field(
90,
title="Request Timeout",
@@ -94,16 +87,6 @@ class Params(ParamsPublic, HyperglassModel):
title="Fake Output",
description="If enabled, the hyperglass backend will return static fake output for development/testing purposes.",
)
listen_address: Optional[Union[IPvAnyAddress, Localhost]] = Field(
None,
title="Listen Address",
description="Local IP Address or hostname the hyperglass application listens on to serve web traffic.",
)
listen_port: StrictInt = Field(
8001,
title="Listen Port",
description="Local TCP port the hyperglass application listens on to serve web traffic.",
)
cors_origins: List[StrictStr] = Field(
[],
title="Cross-Origin Resource Sharing",
@@ -112,6 +95,7 @@ class Params(ParamsPublic, HyperglassModel):
netmiko_delay_factor: IntFloat = Field(
0.1, title="Netmiko Delay Factor", description="Override the netmiko global delay factor.",
)
plugins: List[StrictStr] = []
# Sub Level Params
cache: Cache = Cache()
@@ -127,58 +111,35 @@ class Params(ParamsPublic, HyperglassModel):
schema_extra = {"level": 1}
@validator("listen_address", pre=True, always=True)
def validate_listen_address(cls, value, values):
"""Set default listen_address based on debug mode.
Arguments:
value {str|IPvAnyAddress|None} -- listen_address
values {dict} -- already-validated entries before listen_address
Returns:
{str} -- Validated listen_address
"""
if value is None and not values["debug"]:
listen_address = ip_address("127.0.0.1")
elif value is None and values["debug"]:
listen_address = ip_address("0.0.0.0") # noqa: S104
elif isinstance(value, str) and value != "localhost":
try:
listen_address = ip_address(value)
except ValueError:
raise ValueError(str(value))
elif isinstance(value, str) and value == "localhost":
listen_address = ip_address("127.0.0.1")
else:
raise ValueError(str(value))
return listen_address
@validator("site_description")
def validate_site_description(cls, value, values):
"""Format the site descripion with the org_name field.
Arguments:
value {str} -- site_description
values {str} -- Values before site_description
Returns:
{str} -- Formatted description
"""
def validate_site_description(cls: "Params", value: str, values: Dict[str, Any]) -> str:
"""Format the site descripion with the org_name field."""
return value.format(org_name=values["org_name"])
@validator("primary_asn")
def validate_primary_asn(cls, value):
"""Stringify primary_asn if passed as an integer.
def validate_primary_asn(cls: "Params", value: Union[int, str]) -> str:
"""Stringify primary_asn if passed as an integer."""
return str(value)
Arguments:
value {str|int} -- Unvalidated Primary ASN
@validator("plugins")
def validate_plugins(cls: "Params", value: List[str]) -> List[str]:
"""Validate and register configured plugins."""
plugin_dir = Settings.app_path / "plugins"
Returns:
{str} -- Stringified Primary ASN.
"""
if not isinstance(value, str):
value = str(value)
return value
if plugin_dir.exists():
# Path objects whose file names match configured file names, should work
# whether or not file extension is specified.
matching_plugins = (
f
for f in plugin_dir.iterdir()
if f.name.split(".")[0] in (p.split(".")[0] for p in value)
)
return [str(f) for f in matching_plugins]
return []
def common_plugins(self) -> Tuple[Path, ...]:
"""Get all validated external common plugins as Path objects."""
return tuple(Path(p) for p in self.plugins)
def content_params(self) -> Dict[str, Any]:
"""Export content-specific parameters."""

View File

@@ -1,15 +1,15 @@
"""Base Plugin Definition."""
# Standard Library
import typing as t
from abc import ABC
from typing import Any, Union, Literal, TypeVar, Sequence
from inspect import Signature
# Third Party
from pydantic import BaseModel, PrivateAttr
PluginType = Union[Literal["output"], Literal["input"]]
SupportedMethod = TypeVar("SupportedMethod")
PluginType = t.Union[t.Literal["output"], t.Literal["input"]]
SupportedMethod = t.TypeVar("SupportedMethod")
class HyperglassPlugin(BaseModel, ABC):
@@ -17,6 +17,8 @@ class HyperglassPlugin(BaseModel, ABC):
__hyperglass_builtin__: bool = PrivateAttr(False)
name: str
common: bool = False
ref: t.Optional[str] = None
@property
def _signature(self) -> Signature:
@@ -42,13 +44,13 @@ class HyperglassPlugin(BaseModel, ABC):
return self.name
@classmethod
def __init_subclass__(cls, **kwargs: Any) -> None:
def __init_subclass__(cls, **kwargs: t.Any) -> None:
"""Initialize plugin object."""
name = kwargs.pop("name", None) or cls.__name__
cls._name = name
cls.name = name
super().__init_subclass__()
def __init__(self, **kwargs: Any) -> None:
def __init__(self, **kwargs: t.Any) -> None:
"""Initialize plugin instance."""
name = kwargs.pop("name", None) or self.__class__.__name__
super().__init__(name=name, **kwargs)
@@ -60,7 +62,7 @@ class DirectivePlugin(BaseModel):
Should always be subclassed with `HyperglassPlugin`.
"""
directives: Sequence[str] = ()
directives: t.Sequence[str] = ()
class PlatformPlugin(BaseModel):
@@ -69,4 +71,4 @@ class PlatformPlugin(BaseModel):
Should always be subclassed with `HyperglassPlugin`.
"""
platforms: Sequence[str] = ()
platforms: t.Sequence[str] = ()

View File

@@ -4,7 +4,7 @@
import typing as t
# Local
from ._base import DirectivePlugin
from ._base import DirectivePlugin, HyperglassPlugin
if t.TYPE_CHECKING:
# Project
@@ -13,7 +13,7 @@ if t.TYPE_CHECKING:
InputPluginReturn = t.Union[None, bool]
class InputPlugin(DirectivePlugin):
class InputPlugin(HyperglassPlugin, DirectivePlugin):
"""Plugin to validate user input prior to running commands."""
failure_reason: t.Optional[str] = None

View File

@@ -7,7 +7,7 @@ from inspect import isclass
# Project
from hyperglass.log import log
from hyperglass.state import use_state
from hyperglass.exceptions.private import PluginError
from hyperglass.exceptions.private import PluginError, InputValidationError
# Local
from ._base import PluginType, HyperglassPlugin
@@ -18,7 +18,6 @@ if t.TYPE_CHECKING:
# Project
from hyperglass.state import HyperglassState
from hyperglass.models.api.query import Query
from hyperglass.models.directive import Directive
PluginT = t.TypeVar("PluginT", bound=HyperglassPlugin)
@@ -57,9 +56,10 @@ class PluginManager(t.Generic[PluginT]):
self._index = 0
raise StopIteration
def plugins(self: "PluginManager", builtins: bool = True) -> t.List[PluginT]:
def plugins(self: "PluginManager", *, builtins: bool = True) -> t.List[PluginT]:
"""Get all plugins, with built-in plugins last."""
plugins = self._state.plugins(self._type)
if builtins is False:
plugins = [p for p in plugins if p.__hyperglass_builtin__ is False]
@@ -68,9 +68,7 @@ class PluginManager(t.Generic[PluginT]):
# Sort with built-in plugins last.
return sorted(
sorted_by_name,
key=lambda p: -1 if p.__hyperglass_builtin__ else 1, # flake8: noqa IF100
reverse=True,
sorted_by_name, key=lambda p: -1 if p.__hyperglass_builtin__ else 1, reverse=True,
)
@property
@@ -112,9 +110,9 @@ class PluginManager(t.Generic[PluginT]):
instance = plugin(*args, **kwargs)
self._state.add_plugin(self._type, instance)
if instance.__hyperglass_builtin__ is True:
log.debug("Registered built-in plugin '{}'", instance.name)
log.debug("Registered {} built-in plugin {!r}", self._type, instance.name)
else:
log.success("Registered plugin '{}'", instance.name)
log.success("Registered {} plugin {!r}", self._type, instance.name)
return
except TypeError:
raise PluginError(
@@ -128,18 +126,30 @@ class PluginManager(t.Generic[PluginT]):
class InputPluginManager(PluginManager[InputPlugin], type="input"):
"""Manage Input Validation Plugins."""
def execute(
self: "InputPluginManager", *, directive: "Directive", query: "Query"
) -> InputPluginReturn:
def execute(self: "InputPluginManager", *, query: "Query") -> InputPluginReturn:
"""Execute all input validation plugins.
If any plugin returns `False`, execution is halted.
"""
result = None
for plugin in (plugin for plugin in self.plugins() if directive.id in plugin.directives):
if result is False:
return result
builtins = (
plugin
for plugin in self.plugins(builtins=True)
if plugin.directives and query.directive.id in plugin.directives
)
directives = (plugin for plugin in self.plugins() if plugin.ref in query.directive.plugins)
common = (plugin for plugin in self.plugins() if plugin.common is True)
for plugin in (*directives, *builtins, *common):
result = plugin.validate(query)
result_test = "valid" if result is True else "invalid" if result is False else "none"
log.debug("Input Plugin {!r} result={!r}", plugin.name, result_test)
if result is False:
raise InputValidationError(
error="No matched validation rules", target=query.query_target
)
if result is True:
return result
return result
@@ -152,14 +162,18 @@ class OutputPluginManager(PluginManager[OutputPlugin], type="output"):
The result of each plugin is passed to the next plugin.
"""
result = output
for plugin in (
directives = (
plugin
for plugin in self.plugins()
if query.directive.id in plugin.directives and query.device.platform in plugin.platforms
):
)
common = (plugin for plugin in self.plugins() if plugin.common is True)
for plugin in (*directives, *common):
log.debug("Output Plugin {!r} starting with\n{!r}", plugin.name, result)
result = plugin.process(output=result, query=query)
log.debug("Output Plugin {!r} completed with\n{!r}", plugin.name, result)
if result is False:
return result
# Pass the result of each plugin to the next plugin.

View File

@@ -68,6 +68,6 @@ def register_plugin(plugin_file: Path, **kwargs) -> t.Tuple[str, ...]:
"""Register an external plugin by file path."""
if plugin_file.exists():
module = _module_from_file(plugin_file)
results = _register_from_module(module, **kwargs)
results = _register_from_module(module, ref=plugin_file.stem, **kwargs)
return results
raise FileNotFoundError(str(plugin_file))