mirror of
https://github.com/checktheroads/hyperglass
synced 2024-05-11 05:55:08 +00:00
Improve MultiModel API
This commit is contained in:
@@ -1,9 +1,10 @@
|
|||||||
"""All Data Models used by hyperglass."""
|
"""All Data Models used by hyperglass."""
|
||||||
|
|
||||||
# Local
|
# Local
|
||||||
from .main import HyperglassModel, HyperglassModelWithId
|
from .main import MultiModel, HyperglassModel, HyperglassModelWithId
|
||||||
|
|
||||||
__all__ = (
|
__all__ = (
|
||||||
|
"MultiModel",
|
||||||
"HyperglassModel",
|
"HyperglassModel",
|
||||||
"HyperglassModelWithId",
|
"HyperglassModelWithId",
|
||||||
)
|
)
|
||||||
|
@@ -234,7 +234,7 @@ class Device(HyperglassModelWithId, extra="allow"):
|
|||||||
if isinstance(statement, str) and not statement.startswith("__")
|
if isinstance(statement, str) and not statement.startswith("__")
|
||||||
]
|
]
|
||||||
# Directives matching provided IDs.
|
# Directives matching provided IDs.
|
||||||
device_directives = directives.filter_by_ids(*directive_ids)
|
device_directives = directives.filter(*directive_ids)
|
||||||
# Matching built-in directives for this device's platform.
|
# Matching built-in directives for this device's platform.
|
||||||
builtins = directives.device_builtins(platform=platform)
|
builtins = directives.device_builtins(platform=platform)
|
||||||
|
|
||||||
|
@@ -23,7 +23,7 @@ from hyperglass.settings import Settings
|
|||||||
from hyperglass.exceptions.private import InputValidationError
|
from hyperglass.exceptions.private import InputValidationError
|
||||||
|
|
||||||
# Local
|
# Local
|
||||||
from .main import HyperglassModel, HyperglassMultiModel, HyperglassModelWithId
|
from .main import MultiModel, HyperglassModel, HyperglassModelWithId
|
||||||
from .fields import Action
|
from .fields import Action
|
||||||
|
|
||||||
if t.TYPE_CHECKING:
|
if t.TYPE_CHECKING:
|
||||||
@@ -315,35 +315,9 @@ class BuiltinDirective(Directive):
|
|||||||
DirectiveT = t.Union[BuiltinDirective, Directive]
|
DirectiveT = t.Union[BuiltinDirective, Directive]
|
||||||
|
|
||||||
|
|
||||||
class Directives(HyperglassMultiModel[Directive]):
|
class Directives(MultiModel[Directive], model=Directive, unique_by="id"):
|
||||||
"""Collection of directives."""
|
"""Collection of directives."""
|
||||||
|
|
||||||
def __init__(self, *items: t.Union[DirectiveT, t.Dict[str, t.Any]]) -> None:
|
|
||||||
"""Initialize base class and validate objects."""
|
|
||||||
super().__init__(*items, model=Directive, accessor="id")
|
|
||||||
|
|
||||||
def __add__(self, other: "Directives") -> "Directives":
|
|
||||||
"""Create a new `Directives` instance by merging this instance with another."""
|
|
||||||
valid = all(
|
|
||||||
(
|
|
||||||
isinstance(other, self.__class__),
|
|
||||||
hasattr(other, "model"),
|
|
||||||
getattr(other, "model", None) == self.model,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
if not valid:
|
|
||||||
raise TypeError(f"Cannot add {other!r} to {self.__class__.__name__}")
|
|
||||||
merged = self._merge_with(*other, unique_by=self.accessor)
|
|
||||||
return Directives(*merged)
|
|
||||||
|
|
||||||
def ids(self) -> t.Tuple[str]:
|
|
||||||
"""Get all directive IDs."""
|
|
||||||
return tuple(sorted(directive.id for directive in self))
|
|
||||||
|
|
||||||
def filter_by_ids(self, *ids: str) -> "Directives":
|
|
||||||
"""Filter directives by directive IDs."""
|
|
||||||
return Directives(*(directive for directive in self if directive.id in ids))
|
|
||||||
|
|
||||||
def device_builtins(self, *, platform: str):
|
def device_builtins(self, *, platform: str):
|
||||||
"""Get builtin directives for a device."""
|
"""Get builtin directives for a device."""
|
||||||
return Directives(
|
return Directives(
|
||||||
|
@@ -115,12 +115,14 @@ class HyperglassModelWithId(HyperglassModel):
|
|||||||
return hash(self.id)
|
return hash(self.id)
|
||||||
|
|
||||||
|
|
||||||
class HyperglassMultiModel(GenericModel, t.Generic[MultiModelT]):
|
class MultiModel(GenericModel, t.Generic[MultiModelT]):
|
||||||
"""Extension of HyperglassModel for managing multiple models as a list."""
|
"""Extension of HyperglassModel for managing multiple models as a list."""
|
||||||
|
|
||||||
|
model: t.ClassVar[MultiModelT]
|
||||||
|
unique_by: t.ClassVar[str]
|
||||||
|
model_name: t.ClassVar[str] = "MultiModel"
|
||||||
|
|
||||||
__root__: t.List[MultiModelT] = []
|
__root__: t.List[MultiModelT] = []
|
||||||
_accessor: str = PrivateAttr()
|
|
||||||
_model: MultiModelT = PrivateAttr()
|
|
||||||
_count: int = PrivateAttr()
|
_count: int = PrivateAttr()
|
||||||
|
|
||||||
class Config(BaseConfig):
|
class Config(BaseConfig):
|
||||||
@@ -130,41 +132,48 @@ class HyperglassMultiModel(GenericModel, t.Generic[MultiModelT]):
|
|||||||
extra = "forbid"
|
extra = "forbid"
|
||||||
validate_assignment = True
|
validate_assignment = True
|
||||||
|
|
||||||
def __init__(
|
def __init__(self, *items: t.Union[MultiModelT, t.Dict[str, t.Any]]) -> None:
|
||||||
self, *items: t.Union[MultiModelT, t.Dict[str, t.Any]], model: MultiModelT, accessor: str
|
|
||||||
) -> None:
|
|
||||||
"""Validate items."""
|
"""Validate items."""
|
||||||
self._accessor = accessor
|
for cls_var in ("model", "unique_by"):
|
||||||
self._model = model
|
if getattr(self, cls_var, None) is None:
|
||||||
|
raise AttributeError(f"MultiModel is missing class variable '{cls_var}'")
|
||||||
valid = self._valid_items(*items)
|
valid = self._valid_items(*items)
|
||||||
super().__init__(__root__=valid)
|
super().__init__(__root__=valid)
|
||||||
self._count = len(self.__root__)
|
self._count = len(self.__root__)
|
||||||
|
|
||||||
|
def __init_subclass__(cls, **kw: t.Any) -> None:
|
||||||
|
"""Add class variables from keyword arguments."""
|
||||||
|
model = kw.pop("model", None)
|
||||||
|
cls.model = model
|
||||||
|
cls.unique_by = kw.pop("unique_by", None)
|
||||||
|
cls.model_name = getattr(model, "__name__", "MultiModel")
|
||||||
|
super().__init_subclass__()
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
"""Represent model."""
|
"""Represent model."""
|
||||||
return repr_from_attrs(self, ["_count", "_accessor"], strip="_")
|
return repr_from_attrs(self, ["_count", "unique_by", "model_name"], strip="_")
|
||||||
|
|
||||||
def __iter__(self) -> t.Iterator[MultiModelT]:
|
def __iter__(self) -> t.Iterator[MultiModelT]:
|
||||||
"""Iterate items."""
|
"""Iterate items."""
|
||||||
return iter(self.__root__)
|
return iter(self.__root__)
|
||||||
|
|
||||||
def __getitem__(self, value: t.Union[int, str]) -> MultiModelT:
|
def __getitem__(self, value: t.Union[int, str]) -> MultiModelT:
|
||||||
"""Get an item by accessor value."""
|
"""Get an item by its `unique_by` property."""
|
||||||
if not isinstance(value, (str, int)):
|
if not isinstance(value, (str, int)):
|
||||||
raise TypeError(
|
raise TypeError(
|
||||||
"Value of {}.{!s} should be a string or integer. Got {!r} ({!s})".format(
|
"Value of {}.{!s} should be a string or integer. Got {!r} ({!s})".format(
|
||||||
self.__class__.__name__, self.accessor, value, type(value)
|
self.__class__.__name__, self.unique_by, value, type(value)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
if isinstance(value, int):
|
if isinstance(value, int):
|
||||||
return self.__root__[value]
|
return self.__root__[value]
|
||||||
|
|
||||||
for item in self:
|
for item in self:
|
||||||
if hasattr(item, self.accessor) and getattr(item, self.accessor) == value:
|
if hasattr(item, self.unique_by) and getattr(item, self.unique_by) == value:
|
||||||
return item
|
return item
|
||||||
raise IndexError(
|
raise IndexError(
|
||||||
"No match found for {!s}.{!s}={!r}".format(
|
"No match found for {!s}.{!s}={!r}".format(
|
||||||
self.model.__class__.__name__, self.accessor, value
|
self.model.__class__.__name__, self.unique_by, value
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -183,30 +192,38 @@ class HyperglassMultiModel(GenericModel, t.Generic[MultiModelT]):
|
|||||||
)
|
)
|
||||||
if not valid:
|
if not valid:
|
||||||
raise TypeError(f"Cannot add {other!r} to {self.__class__.__name__}")
|
raise TypeError(f"Cannot add {other!r} to {self.__class__.__name__}")
|
||||||
merged = self._merge_with(*other, unique_by=self.accessor)
|
merged = self._merge_with(*other, unique_by=self.unique_by)
|
||||||
|
|
||||||
if compare_init(self.__class__, other.__class__):
|
if compare_init(self.__class__, other.__class__):
|
||||||
return self.__class__(*merged, model=self.model, accessor=self.accessor)
|
return self.__class__(*merged)
|
||||||
raise TypeError(
|
raise TypeError(
|
||||||
f"{self.__class__.__name__} and {other.__class__.__name__} have different `__init__` "
|
f"{self.__class__.__name__} and {other.__class__.__name__} have different `__init__` "
|
||||||
"signatures. You probably need to override `HyperglassMultiModel.__add__`"
|
"signatures. You probably need to override `MultiModel.__add__`"
|
||||||
)
|
)
|
||||||
|
|
||||||
@property
|
def __len__(self) -> int:
|
||||||
def accessor(self) -> str:
|
"""Get number of items."""
|
||||||
"""Access item accessor."""
|
return len(self.__root__)
|
||||||
return self._accessor
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def model(self) -> MultiModelT:
|
def ids(self) -> t.Tuple[t.Any, ...]:
|
||||||
"""Access item model class."""
|
"""Get values of all items by `unique_by` property."""
|
||||||
return self._model
|
return tuple(sorted(getattr(item, self.unique_by) for item in self))
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def count(self) -> int:
|
def count(self) -> int:
|
||||||
"""Access item count."""
|
"""Access item count."""
|
||||||
return self._count
|
return self._count
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def create(cls, name: str, *, model: MultiModelT, unique_by: str) -> "MultiModel":
|
||||||
|
"""Create a MultiModel."""
|
||||||
|
new = type(name, (cls,), cls.__dict__)
|
||||||
|
new.model = model
|
||||||
|
new.unique_by = unique_by
|
||||||
|
new.model_name = getattr(model, "__name__", "MultiModel")
|
||||||
|
return new
|
||||||
|
|
||||||
def _valid_items(
|
def _valid_items(
|
||||||
self, *to_validate: t.List[t.Union[MultiModelT, t.Dict[str, t.Any]]]
|
self, *to_validate: t.List[t.Union[MultiModelT, t.Dict[str, t.Any]]]
|
||||||
) -> t.List[MultiModelT]:
|
) -> t.List[MultiModelT]:
|
||||||
@@ -215,8 +232,8 @@ class HyperglassMultiModel(GenericModel, t.Generic[MultiModelT]):
|
|||||||
for item in to_validate
|
for item in to_validate
|
||||||
if any(
|
if any(
|
||||||
(
|
(
|
||||||
(isinstance(item, self.model) and hasattr(item, self.accessor)),
|
(isinstance(item, self.model) and hasattr(item, self.unique_by)),
|
||||||
(isinstance(item, t.Dict) and self.accessor in item),
|
(isinstance(item, t.Dict) and self.unique_by in item),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
]
|
]
|
||||||
@@ -225,23 +242,6 @@ class HyperglassMultiModel(GenericModel, t.Generic[MultiModelT]):
|
|||||||
items[index] = self.model(**item)
|
items[index] = self.model(**item)
|
||||||
return items
|
return items
|
||||||
|
|
||||||
def matching(self, *accessors: str) -> MultiModelT:
|
|
||||||
"""Get a new instance containing partial matches from `accessors`."""
|
|
||||||
|
|
||||||
def matches(*searches: str) -> t.Generator[MultiModelT, None, None]:
|
|
||||||
"""Get any matching items by accessor value.
|
|
||||||
|
|
||||||
For example, if `accessors` is `('one', 'two')`, and `Model.<accessor>` is `'one'`,
|
|
||||||
`Model` is yielded.
|
|
||||||
"""
|
|
||||||
for search in searches:
|
|
||||||
pattern = re.compile(fr".*{search}.*", re.IGNORECASE)
|
|
||||||
for item in self:
|
|
||||||
if pattern.match(getattr(item, self.accessor)):
|
|
||||||
yield item
|
|
||||||
|
|
||||||
return self.__class__(*matches(*accessors))
|
|
||||||
|
|
||||||
def _merge_with(self, *items, unique_by: t.Optional[str] = None) -> Series[MultiModelT]:
|
def _merge_with(self, *items, unique_by: t.Optional[str] = None) -> Series[MultiModelT]:
|
||||||
to_add = self._valid_items(*items)
|
to_add = self._valid_items(*items)
|
||||||
if unique_by is not None:
|
if unique_by is not None:
|
||||||
@@ -257,6 +257,29 @@ class HyperglassMultiModel(GenericModel, t.Generic[MultiModelT]):
|
|||||||
return tuple(unique_by_objects.values())
|
return tuple(unique_by_objects.values())
|
||||||
return (*self.__root__, *to_add)
|
return (*self.__root__, *to_add)
|
||||||
|
|
||||||
|
def filter(self, *properties: str) -> MultiModelT:
|
||||||
|
"""Get only items with `unique_by` properties matching values in `properties`."""
|
||||||
|
return self.__class__(
|
||||||
|
*(item for item in self if getattr(item, self.unique_by, None) in properties)
|
||||||
|
)
|
||||||
|
|
||||||
|
def matching(self, *unique: str) -> MultiModelT:
|
||||||
|
"""Get a new instance containing partial matches from `accessors`."""
|
||||||
|
|
||||||
|
def matches(*searches: str) -> t.Generator[MultiModelT, None, None]:
|
||||||
|
"""Get any matching items by unique_by property.
|
||||||
|
|
||||||
|
For example, if `unique` is `('one', 'two')`, and `Model.<unique_by>` is `'one'`,
|
||||||
|
`Model` is yielded.
|
||||||
|
"""
|
||||||
|
for search in searches:
|
||||||
|
pattern = re.compile(fr".*{search}.*", re.IGNORECASE)
|
||||||
|
for item in self:
|
||||||
|
if pattern.match(getattr(item, self.unique_by)):
|
||||||
|
yield item
|
||||||
|
|
||||||
|
return self.__class__(*matches(*unique))
|
||||||
|
|
||||||
def add(self, *items, unique_by: t.Optional[str] = None) -> None:
|
def add(self, *items, unique_by: t.Optional[str] = None) -> None:
|
||||||
"""Add an item to the model."""
|
"""Add an item to the model."""
|
||||||
new = self._merge_with(*items, unique_by=unique_by)
|
new = self._merge_with(*items, unique_by=unique_by)
|
||||||
@@ -266,6 +289,6 @@ class HyperglassMultiModel(GenericModel, t.Generic[MultiModelT]):
|
|||||||
log.debug(
|
log.debug(
|
||||||
"Added {} '{!s}' to {}",
|
"Added {} '{!s}' to {}",
|
||||||
item.__class__.__name__,
|
item.__class__.__name__,
|
||||||
getattr(item, self.accessor),
|
getattr(item, self.unique_by),
|
||||||
self.__class__.__name__,
|
self.__class__.__name__,
|
||||||
)
|
)
|
||||||
|
@@ -221,7 +221,7 @@ def repr_from_attrs(obj: object, attrs: Series[str], strip: t.Optional[str] = No
|
|||||||
if hasattr((v := getattr(obj, f)), "__repr__")
|
if hasattr((v := getattr(obj, f)), "__repr__")
|
||||||
}
|
}
|
||||||
pairs = (f"{k}={v!r}" for k, v in attr_values.items())
|
pairs = (f"{k}={v!r}" for k, v in attr_values.items())
|
||||||
return f"{obj.__class__.__name__}({','.join(pairs)})"
|
return f"{obj.__class__.__name__}({', '.join(pairs)})"
|
||||||
|
|
||||||
|
|
||||||
def validate_platform(_type: str) -> t.Tuple[bool, t.Union[None, str]]:
|
def validate_platform(_type: str) -> t.Tuple[bool, t.Union[None, str]]:
|
||||||
|
Reference in New Issue
Block a user