mirror of
https://github.com/checktheroads/hyperglass
synced 2024-05-11 05:55:08 +00:00
Implement better __repr__ generator
This commit is contained in:
@@ -50,8 +50,8 @@ async def execute(query: "Query") -> Union["OutputDataModel", str]:
|
|||||||
params = use_state("params")
|
params = use_state("params")
|
||||||
output = params.messages.general
|
output = params.messages.general
|
||||||
|
|
||||||
log.debug("Received query {}", query.export_dict())
|
log.debug("Received query {!r}", query)
|
||||||
log.debug("Matched device config: {!s}", query.device)
|
log.debug("Matched device config: {!r}", query.device)
|
||||||
|
|
||||||
mapped_driver = map_driver(query.device.driver)
|
mapped_driver = map_driver(query.device.driver)
|
||||||
driver: "Connection" = mapped_driver(query.device, query)
|
driver: "Connection" = mapped_driver(query.device, query)
|
||||||
@@ -83,7 +83,7 @@ async def execute(query: "Query") -> Union["OutputDataModel", str]:
|
|||||||
if not output:
|
if not output:
|
||||||
raise ResponseEmpty(query=query)
|
raise ResponseEmpty(query=query)
|
||||||
|
|
||||||
log.debug("Output for query: {}:\n{}", query.json(), repr(output))
|
log.debug("Output for query {!r}:\n{!r}", query, output)
|
||||||
signal.alarm(0)
|
signal.alarm(0)
|
||||||
|
|
||||||
return output
|
return output
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ from pydantic import BaseModel, StrictStr, constr, validator
|
|||||||
|
|
||||||
# Project
|
# Project
|
||||||
from hyperglass.log import log
|
from hyperglass.log import log
|
||||||
from hyperglass.util import snake_to_camel
|
from hyperglass.util import snake_to_camel, repr_from_attrs
|
||||||
from hyperglass.state import use_state
|
from hyperglass.state import use_state
|
||||||
from hyperglass.exceptions.public import (
|
from hyperglass.exceptions.public import (
|
||||||
InputInvalid,
|
InputInvalid,
|
||||||
@@ -24,7 +24,6 @@ from hyperglass.exceptions.private import InputValidationError
|
|||||||
|
|
||||||
# Local
|
# Local
|
||||||
from ..config.devices import Device
|
from ..config.devices import Device
|
||||||
from ..commands.generic import Directive
|
|
||||||
|
|
||||||
(TEXT := use_state("params").web.text)
|
(TEXT := use_state("params").web.text)
|
||||||
|
|
||||||
@@ -75,6 +74,12 @@ class Query(BaseModel):
|
|||||||
self.timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
|
self.timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
state = use_state()
|
state = use_state()
|
||||||
self._state = state
|
self._state = state
|
||||||
|
for command in self.device.commands:
|
||||||
|
if command.id == self.query_type:
|
||||||
|
self.directive = command
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
raise QueryTypeNotFound(query_type=self.query_type)
|
||||||
try:
|
try:
|
||||||
self.validate_query_target()
|
self.validate_query_target()
|
||||||
except InputValidationError as err:
|
except InputValidationError as err:
|
||||||
@@ -82,11 +87,11 @@ class Query(BaseModel):
|
|||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
"""Represent only the query fields."""
|
"""Represent only the query fields."""
|
||||||
return (
|
return repr_from_attrs(self, self.__config__.fields.keys())
|
||||||
f'Query(query_location="{str(self.query_location)}", '
|
|
||||||
f'query_type="{str(self.query_type)}", query_group="{str(self.query_group)}", '
|
def __str__(self) -> str:
|
||||||
f'query_target="{str(self.query_target)}")'
|
"""Alias __str__ to __repr__."""
|
||||||
)
|
return repr(self)
|
||||||
|
|
||||||
def digest(self):
|
def digest(self):
|
||||||
"""Create SHA256 hash digest of model representation."""
|
"""Create SHA256 hash digest of model representation."""
|
||||||
@@ -101,7 +106,7 @@ class Query(BaseModel):
|
|||||||
def validate_query_target(self):
|
def validate_query_target(self):
|
||||||
"""Validate a query target after all fields/relationships havebeen initialized."""
|
"""Validate a query target after all fields/relationships havebeen initialized."""
|
||||||
self.directive.validate_target(self.query_target)
|
self.directive.validate_target(self.query_target)
|
||||||
log.debug("Validation passed for query {}", repr(self))
|
log.debug("Validation passed for query {!r}", self)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def summary(self):
|
def summary(self):
|
||||||
@@ -110,7 +115,7 @@ class Query(BaseModel):
|
|||||||
f"query_location={self.query_location}",
|
f"query_location={self.query_location}",
|
||||||
f"query_type={self.query_type}",
|
f"query_type={self.query_type}",
|
||||||
f"query_group={self.query_group}",
|
f"query_group={self.query_group}",
|
||||||
f"query_target={str(self.query_target)}",
|
f"query_target={self.query_target!s}",
|
||||||
)
|
)
|
||||||
return f'Query({", ".join(items)})'
|
return f'Query({", ".join(items)})'
|
||||||
|
|
||||||
@@ -119,15 +124,6 @@ class Query(BaseModel):
|
|||||||
"""Get this query's device object by query_location."""
|
"""Get this query's device object by query_location."""
|
||||||
return self._state.devices[self.query_location]
|
return self._state.devices[self.query_location]
|
||||||
|
|
||||||
@property
|
|
||||||
def directive(self) -> Directive:
|
|
||||||
"""Get this query's directive."""
|
|
||||||
|
|
||||||
for command in self.device.commands:
|
|
||||||
if command.id == self.query_type:
|
|
||||||
return command
|
|
||||||
raise QueryTypeNotFound(query_type=self.query_type)
|
|
||||||
|
|
||||||
def export_dict(self, pretty=False):
|
def export_dict(self, pretty=False):
|
||||||
"""Create dictionary representation of instance."""
|
"""Create dictionary representation of instance."""
|
||||||
|
|
||||||
|
|||||||
@@ -223,7 +223,7 @@ class RuleWithoutValidation(Rule):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
Rules = t.Union[RuleWithIPv4, RuleWithIPv6, RuleWithPattern, RuleWithoutValidation]
|
RuleType = t.Union[RuleWithIPv4, RuleWithIPv6, RuleWithPattern, RuleWithoutValidation]
|
||||||
|
|
||||||
|
|
||||||
class Directive(HyperglassModelWithId):
|
class Directive(HyperglassModelWithId):
|
||||||
@@ -231,7 +231,7 @@ class Directive(HyperglassModelWithId):
|
|||||||
|
|
||||||
id: StrictStr
|
id: StrictStr
|
||||||
name: StrictStr
|
name: StrictStr
|
||||||
rules: t.List[Rules]
|
rules: t.List[RuleType]
|
||||||
field: t.Union[Text, Select, None]
|
field: t.Union[Text, Select, None]
|
||||||
info: t.Optional[FilePath]
|
info: t.Optional[FilePath]
|
||||||
plugins: t.List[StrictStr] = []
|
plugins: t.List[StrictStr] = []
|
||||||
|
|||||||
@@ -9,7 +9,8 @@ from pydantic import HttpUrl, BaseModel, BaseConfig
|
|||||||
|
|
||||||
# Project
|
# Project
|
||||||
from hyperglass.log import log
|
from hyperglass.log import log
|
||||||
from hyperglass.util import snake_to_camel
|
from hyperglass.util import snake_to_camel, repr_from_attrs
|
||||||
|
from hyperglass.types import Series
|
||||||
|
|
||||||
|
|
||||||
class HyperglassModel(BaseModel):
|
class HyperglassModel(BaseModel):
|
||||||
@@ -45,6 +46,10 @@ class HyperglassModel(BaseModel):
|
|||||||
)
|
)
|
||||||
return snake_to_camel(snake_field)
|
return snake_to_camel(snake_field)
|
||||||
|
|
||||||
|
def _repr_from_attrs(self, attrs: Series[str]) -> str:
|
||||||
|
"""Alias to `hyperglass.util:repr_from_attrs` in the context of this model."""
|
||||||
|
return repr_from_attrs(self, attrs)
|
||||||
|
|
||||||
def export_json(self, *args, **kwargs):
|
def export_json(self, *args, **kwargs):
|
||||||
"""Return instance as JSON."""
|
"""Return instance as JSON."""
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import typing as t
|
|||||||
from redis import Redis, ConnectionPool
|
from redis import Redis, ConnectionPool
|
||||||
|
|
||||||
# Project
|
# Project
|
||||||
|
from hyperglass.util import repr_from_attrs
|
||||||
from hyperglass.configuration import params, devices, ui_params
|
from hyperglass.configuration import params, devices, ui_params
|
||||||
|
|
||||||
# Local
|
# Local
|
||||||
@@ -40,6 +41,14 @@ class StateManager:
|
|||||||
self.redis.set("devices", devices)
|
self.redis.set("devices", devices)
|
||||||
self.redis.set("ui_params", ui_params)
|
self.redis.set("ui_params", ui_params)
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
"""Represent state manager by name and namespace."""
|
||||||
|
return repr_from_attrs(self, ("redis", "namespace"))
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
"""Represent state manager by __repr__."""
|
||||||
|
return repr(self)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def properties(cls: "StateManager") -> t.Tuple[str, ...]:
|
def properties(cls: "StateManager") -> t.Tuple[str, ...]:
|
||||||
"""Get all read-only properties of the state manager."""
|
"""Get all read-only properties of the state manager."""
|
||||||
|
|||||||
@@ -29,6 +29,10 @@ class RedisManager:
|
|||||||
"""Alias repr to Redis instance's repr."""
|
"""Alias repr to Redis instance's repr."""
|
||||||
return repr(self.instance)
|
return repr(self.instance)
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
"""String-friendly redis manager."""
|
||||||
|
return repr(self)
|
||||||
|
|
||||||
def _key_join(self, *keys: str) -> str:
|
def _key_join(self, *keys: str) -> str:
|
||||||
"""Format keys with state namespace."""
|
"""Format keys with state namespace."""
|
||||||
key_in_parts = (k for key in keys for k in key.split("."))
|
key_in_parts = (k for key in keys for k in key.split("."))
|
||||||
|
|||||||
9
hyperglass/types.py
Normal file
9
hyperglass/types.py
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
"""Custom types."""
|
||||||
|
|
||||||
|
# Standard Library
|
||||||
|
import typing as _t
|
||||||
|
|
||||||
|
_S = _t.TypeVar("_S")
|
||||||
|
|
||||||
|
Series = _t.Union[_t.MutableSequence[_S], _t.Tuple[_S], _t.Set[_S]]
|
||||||
|
"""Like Sequence, but excludes `str`."""
|
||||||
@@ -17,6 +17,7 @@ from netmiko.ssh_dispatcher import CLASS_MAPPER # type: ignore
|
|||||||
|
|
||||||
# Project
|
# Project
|
||||||
from hyperglass.log import log
|
from hyperglass.log import log
|
||||||
|
from hyperglass.types import Series
|
||||||
from hyperglass.constants import DRIVER_MAP
|
from hyperglass.constants import DRIVER_MAP
|
||||||
|
|
||||||
ALL_DEVICE_TYPES = {*DRIVER_MAP.keys(), *CLASS_MAPPER.keys()}
|
ALL_DEVICE_TYPES = {*DRIVER_MAP.keys(), *CLASS_MAPPER.keys()}
|
||||||
@@ -205,6 +206,20 @@ def make_repr(_class):
|
|||||||
return f'{_class.__name__}({", ".join(_process_attrs(dir(_class)))})'
|
return f'{_class.__name__}({", ".join(_process_attrs(dir(_class)))})'
|
||||||
|
|
||||||
|
|
||||||
|
def repr_from_attrs(obj: object, attrs: Series[str]) -> str:
|
||||||
|
"""Generate a `__repr__()` value from a specific set of attribute names.
|
||||||
|
|
||||||
|
Useful for complex models/objects where `__repr__()` should only display specific fields.
|
||||||
|
"""
|
||||||
|
# Check the object to ensure each attribute actually exists, and deduplicate
|
||||||
|
attr_names = {a for a in attrs if hasattr(obj, a)}
|
||||||
|
# Dict representation of attr name to obj value (e.g. `obj.attr`), if the value has a
|
||||||
|
# `__repr__` method.
|
||||||
|
attr_values = {f: v for f in attr_names if hasattr((v := getattr(obj, f)), "__repr__")}
|
||||||
|
pairs = (f"{k}={v!r}" for k, v in attr_values.items())
|
||||||
|
return f"{obj.__class__.__name__}({','.join(pairs)})"
|
||||||
|
|
||||||
|
|
||||||
def validate_device_type(_type: str) -> t.Tuple[bool, t.Union[None, str]]:
|
def validate_device_type(_type: str) -> t.Tuple[bool, t.Union[None, str]]:
|
||||||
"""Validate device type is supported."""
|
"""Validate device type is supported."""
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user