1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00
Files
netbox-community-netbox/netbox/dcim/models/cables.py

618 lines
22 KiB
Python
Raw Normal View History

from collections import defaultdict
2020-10-14 16:54:30 -04:00
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
2022-05-05 14:29:28 -04:00
from django.core.exceptions import ValidationError
2020-10-14 16:54:30 -04:00
from django.db import models
from django.db.models import Sum
from django.dispatch import Signal
2020-10-14 16:54:30 -04:00
from django.urls import reverse
from dcim.choices import *
from dcim.constants import *
2022-05-05 14:29:28 -04:00
from dcim.fields import PathField
from dcim.utils import decompile_path_node, flatten_path, object_to_path_node, path_node_to_object
2022-01-26 20:57:14 -05:00
from netbox.models import NetBoxModel
2020-10-14 16:54:30 -04:00
from utilities.fields import ColorField
from utilities.querysets import RestrictedQuerySet
2020-10-14 16:54:30 -04:00
from utilities.utils import to_meters
2022-05-10 09:53:55 -04:00
from wireless.models import WirelessLink
2020-10-14 16:54:30 -04:00
from .devices import Device
from .device_components import FrontPort, RearPort
__all__ = (
'Cable',
'CablePath',
'CableTermination',
2020-10-14 16:54:30 -04:00
)
trace_paths = Signal()
2020-10-14 16:54:30 -04:00
#
# Cables
#
2022-01-26 20:57:14 -05:00
class Cable(NetBoxModel):
2020-10-14 16:54:30 -04:00
"""
A physical connection between two endpoints.
"""
type = models.CharField(
max_length=50,
choices=CableTypeChoices,
blank=True
)
status = models.CharField(
max_length=50,
2021-10-13 14:31:30 -04:00
choices=LinkStatusChoices,
default=LinkStatusChoices.STATUS_CONNECTED
2020-10-14 16:54:30 -04:00
)
tenant = models.ForeignKey(
to='tenancy.Tenant',
on_delete=models.PROTECT,
related_name='cables',
blank=True,
null=True
)
2020-10-14 16:54:30 -04:00
label = models.CharField(
max_length=100,
blank=True
)
color = ColorField(
blank=True
)
length = models.DecimalField(
max_digits=8,
decimal_places=2,
2020-10-14 16:54:30 -04:00
blank=True,
null=True
)
length_unit = models.CharField(
max_length=50,
choices=CableLengthUnitChoices,
blank=True,
)
# Stores the normalized length (in meters) for database ordering
_abs_length = models.DecimalField(
max_digits=10,
decimal_places=4,
blank=True,
null=True
)
# Cache the associated device (where applicable) for the A and B terminations. This enables filtering of Cables by
# their associated Devices.
_termination_a_device = models.ForeignKey(
to=Device,
on_delete=models.CASCADE,
related_name='+',
blank=True,
null=True
)
_termination_b_device = models.ForeignKey(
to=Device,
on_delete=models.CASCADE,
related_name='+',
blank=True,
null=True
)
class Meta:
ordering = ('pk',)
2020-10-14 16:54:30 -04:00
2022-05-12 17:11:01 -04:00
def __init__(self, *args, a_terminations=None, b_terminations=None, **kwargs):
2020-10-14 16:54:30 -04:00
super().__init__(*args, **kwargs)
# A copy of the PK to be used by __str__ in case the object is deleted
self._pk = self.pk
# Cache the original status so we can check later if it's been changed
self._orig_status = self.status
2022-05-03 16:30:39 -04:00
# Assign associated CableTerminations (if any)
if a_terminations is not None:
self.a_terminations = a_terminations
if b_terminations is not None:
self.b_terminations = b_terminations
2022-05-03 16:30:39 -04:00
2020-10-14 16:54:30 -04:00
def __str__(self):
pk = self.pk or self._pk
return self.label or f'#{pk}'
2020-10-14 16:54:30 -04:00
def get_absolute_url(self):
return reverse('dcim:cable', args=[self.pk])
def clean(self):
super().clean()
# TODO: Is this validation still necessary?
# # Check that two connected RearPorts have the same number of positions (if both are >1)
# if isinstance(self.termination_a, RearPort) and isinstance(self.termination_b, RearPort):
# if self.termination_a.positions > 1 and self.termination_b.positions > 1:
# if self.termination_a.positions != self.termination_b.positions:
# raise ValidationError(
# f"{self.termination_a} has {self.termination_a.positions} position(s) but "
# f"{self.termination_b} has {self.termination_b.positions}. "
# f"Both terminations must have the same number of positions (if greater than one)."
# )
2020-10-14 16:54:30 -04:00
# Validate length and length_unit
if self.length is not None and not self.length_unit:
raise ValidationError("Must specify a unit when setting a cable length")
elif self.length is None:
self.length_unit = ''
def save(self, *args, **kwargs):
_created = self.pk is None
2020-10-14 16:54:30 -04:00
# Store the given length (if any) in meters for use in database ordering
if self.length and self.length_unit:
self._abs_length = to_meters(self.length, self.length_unit)
else:
self._abs_length = None
# Store the parent Device for the A and B terminations (if applicable) to enable filtering
if hasattr(self, 'a_terminations'):
self._termination_a_device = self.a_terminations[0].device
if hasattr(self, 'b_terminations'):
self._termination_b_device = self.b_terminations[0].device
2020-10-14 16:54:30 -04:00
super().save(*args, **kwargs)
# Update the private pk used in __str__ in case this is a new object (i.e. just got its pk)
self._pk = self.pk
# Retrieve existing A/B terminations for the Cable
a_terminations = {ct.termination: ct for ct in self.terminations.filter(cable_end='A')}
b_terminations = {ct.termination: ct for ct in self.terminations.filter(cable_end='B')}
# Delete stale CableTerminations
if hasattr(self, 'a_terminations'):
for termination, ct in a_terminations.items():
if termination not in self.a_terminations:
ct.delete()
if hasattr(self, 'b_terminations'):
for termination, ct in b_terminations.items():
if termination not in self.b_terminations:
ct.delete()
# Save new CableTerminations (if any)
if hasattr(self, 'a_terminations'):
for termination in self.a_terminations:
if termination not in a_terminations:
CableTermination(cable=self, cable_end='A', termination=termination).save()
if hasattr(self, 'b_terminations'):
for termination in self.b_terminations:
if termination not in b_terminations:
CableTermination(cable=self, cable_end='B', termination=termination).save()
trace_paths.send(Cable, instance=self, created=_created)
def get_status_color(self):
return LinkStatusChoices.colors.get(self.status)
2022-04-29 15:16:35 -04:00
def get_a_terminations(self):
return [
term.termination for term in CableTermination.objects.filter(cable=self, cable_end='A')
]
def get_b_terminations(self):
return [
term.termination for term in CableTermination.objects.filter(cable=self, cable_end='B')
]
2020-10-14 16:54:30 -04:00
class CableTermination(models.Model):
"""
A mapping between side A or B of a Cable and a terminating object (e.g. an Interface or CircuitTermination).
"""
cable = models.ForeignKey(
to='dcim.Cable',
on_delete=models.CASCADE,
related_name='terminations'
)
cable_end = models.CharField(
max_length=1,
choices=CableEndChoices,
verbose_name='End'
)
termination_type = models.ForeignKey(
to=ContentType,
limit_choices_to=CABLE_TERMINATION_MODELS,
on_delete=models.PROTECT,
related_name='+'
)
termination_id = models.PositiveBigIntegerField()
termination = GenericForeignKey(
ct_field='termination_type',
fk_field='termination_id'
)
objects = RestrictedQuerySet.as_manager()
class Meta:
ordering = ('cable', 'cable_end', 'pk')
constraints = (
models.UniqueConstraint(
fields=('termination_type', 'termination_id'),
name='unique_termination'
),
)
def __str__(self):
return f'Cable {self.cable} to {self.termination}'
def clean(self):
super().clean()
# Validate interface type (if applicable)
if self.termination_type.model == 'interface' and self.termination.type in NONCONNECTABLE_IFACE_TYPES:
raise ValidationError({
'termination': f'Cables cannot be terminated to {self.termination.get_type_display()} interfaces'
})
# A CircuitTermination attached to a ProviderNetwork cannot have a Cable
if self.termination_type.model == 'circuittermination' and self.termination.provider_network is not None:
raise ValidationError({
'termination': "Circuit terminations attached to a provider network may not be cabled."
})
# TODO
# # A front port cannot be connected to its corresponding rear port
# if (
# type_a in ['frontport', 'rearport'] and
# type_b in ['frontport', 'rearport'] and
# (
# getattr(self.termination_a, 'rear_port', None) == self.termination_b or
# getattr(self.termination_b, 'rear_port', None) == self.termination_a
# )
# ):
# raise ValidationError("A front port cannot be connected to it corresponding rear port")
# TODO
# # Check that termination types are compatible
# if type_b not in COMPATIBLE_TERMINATION_TYPES.get(type_a):
# raise ValidationError(
# f"Incompatible termination types: {self.termination_a_type} and {self.termination_b_type}"
# )
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
# Set the cable on the terminating object
termination_model = self.termination._meta.model
termination_model.objects.filter(pk=self.termination_id).update(cable=self.cable)
def delete(self, *args, **kwargs):
# Delete the cable association on the terminating object
termination_model = self.termination._meta.model
termination_model.objects.filter(pk=self.termination_id).update(cable=None)
super().delete(*args, **kwargs)
class CablePath(models.Model):
2020-10-14 16:54:30 -04:00
"""
A CablePath instance represents the physical path from an origin to a destination, including all intermediate
elements in the path. Every instance must specify an `origin`, whereas `destination` may be null (for paths which do
not terminate on a PathEndpoint).
`path` contains a list of nodes within the path, each represented by a tuple of (type, ID). The first element in the
path must be a Cable instance, followed by a pair of pass-through ports. For example, consider the following
topology:
1 2 3
Interface A --- Front Port A | Rear Port A --- Rear Port B | Front Port B --- Interface B
This path would be expressed as:
CablePath(
origin = Interface A
destination = Interface B
path = [Cable 1, Front Port A, Rear Port A, Cable 2, Rear Port B, Front Port B, Cable 3]
)
`is_active` is set to True only if 1) `destination` is not null, and 2) every Cable within the path has a status of
"connected".
"""
2022-05-05 14:29:28 -04:00
path = models.JSONField(
default=list
)
2020-10-14 16:54:30 -04:00
is_active = models.BooleanField(
default=False
)
is_complete = models.BooleanField(
default=False
)
2020-11-16 15:49:07 -05:00
is_split = models.BooleanField(
default=False
)
_nodes = PathField()
2020-10-14 16:54:30 -04:00
class Meta:
pass
2020-10-14 16:54:30 -04:00
def __str__(self):
2020-11-16 15:49:07 -05:00
status = ' (active)' if self.is_active else ' (split)' if self.is_split else ''
return f"Path #{self.pk}: {len(self.path)} nodes{status}"
2020-10-14 16:54:30 -04:00
def save(self, *args, **kwargs):
# Save the flattened nodes list
self._nodes = flatten_path(self.path)
2022-05-03 16:30:39 -04:00
super().save(*args, **kwargs)
# Record a direct reference to this CablePath on its originating object(s)
origin_model = self.origins[0]._meta.model
origin_ids = [o.id for o in self.origins]
2022-05-03 16:30:39 -04:00
origin_model.objects.filter(pk__in=origin_ids).update(_path=self.pk)
2020-10-14 16:54:30 -04:00
@property
def origin_type(self):
ct_id, _ = decompile_path_node(self.path[0][0])
return ContentType.objects.get_for_id(ct_id)
@property
def destination_type(self):
if not self.is_complete:
return None
ct_id, _ = decompile_path_node(self.path[-1][0])
return ContentType.objects.get_for_id(ct_id)
@property
def path_objects(self):
"""
Cache and return the complete path as lists of objects, derived from their annotation within the path.
"""
if not hasattr(self, '_path_objects'):
self._path_objects = self._get_path()
return self._path_objects
@property
def origins(self):
"""
Return the list of originating objects (from cache, if available).
"""
if hasattr(self, '_path_objects'):
return self.path_objects[0]
return [
path_node_to_object(node) for node in self.path[0]
]
@property
def destinations(self):
"""
Return the list of destination objects (from cache, if available), if the path is complete.
"""
if not self.is_complete:
return []
if hasattr(self, '_path_objects'):
return self.path_objects[-1]
return [
path_node_to_object(node) for node in self.path[-1]
]
@property
def segment_count(self):
return int(len(self.path) / 3)
2020-11-16 15:49:07 -05:00
@classmethod
2022-05-03 16:30:39 -04:00
def from_origin(cls, terminations):
2020-11-16 15:49:07 -05:00
"""
2022-05-10 09:53:55 -04:00
Create a new CablePath instance as traced from the given termination objects. These can be any object to which a
Cable or WirelessLink connects (interfaces, console ports, circuit termination, etc.). All terminations must be
of the same type and must belong to the same parent object.
2020-11-16 15:49:07 -05:00
"""
from circuits.models import CircuitTermination
2020-11-16 15:49:07 -05:00
path = []
position_stack = []
2022-05-05 16:41:30 -04:00
is_complete = False
2020-11-16 15:49:07 -05:00
is_active = True
is_split = False
2022-05-10 09:53:55 -04:00
while terminations:
2022-05-03 16:30:39 -04:00
2022-05-10 09:53:55 -04:00
# Terminations must all be of the same type and belong to the same parent
assert all(isinstance(t, type(terminations[0])) for t in terminations[1:])
2022-05-10 15:45:12 -04:00
assert all(t.parent_object == terminations[0].parent_object for t in terminations[1:])
2020-11-16 15:49:07 -05:00
2022-05-10 09:53:55 -04:00
# Step 1: Record the near-end termination object(s)
path.append([
object_to_path_node(t) for t in terminations
])
# Step 2: Determine the attached link (Cable or WirelessLink), if any
link = terminations[0].link
2022-05-10 15:45:12 -04:00
assert all(t.link == link for t in terminations[1:])
if link is None and len(path) == 1:
# If this is the start of the path and no link exists, return None
return None
elif link is None:
# Otherwise, halt the trace if no link exists
2022-05-10 09:53:55 -04:00
break
assert type(link) in (Cable, WirelessLink)
2020-11-16 15:49:07 -05:00
2022-05-10 09:53:55 -04:00
# Step 3: Record the link and update path status if not "connected"
path.append([object_to_path_node(link)])
if hasattr(link, 'status') and link.status != LinkStatusChoices.STATUS_CONNECTED:
is_active = False
# Step 4: Determine the far-end terminations
if isinstance(link, Cable):
termination_type = ContentType.objects.get_for_model(terminations[0])
local_cable_terminations = CableTermination.objects.filter(
termination_type=termination_type,
termination_id__in=[t.pk for t in terminations]
)
# Terminations must all belong to same end of Cable
local_cable_end = local_cable_terminations[0].cable_end
assert all(ct.cable_end == local_cable_end for ct in local_cable_terminations[1:])
remote_cable_terminations = CableTermination.objects.filter(
cable=link,
cable_end='A' if local_cable_end == 'B' else 'B'
)
remote_terminations = [ct.termination for ct in remote_cable_terminations]
2022-05-03 16:30:39 -04:00
else:
2022-05-10 09:53:55 -04:00
# WirelessLink
remote_terminations = [link.interface_b] if link.interface_a is terminations[0] else [link.interface_a]
# Step 5: Record the far-end termination object(s)
path.append([
object_to_path_node(t) for t in remote_terminations
])
# Step 6: Determine the "next hop" terminations, if applicable
if isinstance(remote_terminations[0], FrontPort):
# Follow FrontPorts to their corresponding RearPorts
rear_ports = RearPort.objects.filter(
pk__in=[t.rear_port_id for t in remote_terminations]
2022-05-03 16:30:39 -04:00
)
2022-05-10 15:45:12 -04:00
if len(rear_ports) > 1:
assert all(rp.positions == 1 for rp in rear_ports)
elif rear_ports[0].positions > 1:
position_stack.append([fp.rear_port_position for fp in remote_terminations])
2022-05-10 09:53:55 -04:00
terminations = rear_ports
elif isinstance(remote_terminations[0], RearPort):
2022-05-10 15:45:12 -04:00
if len(remote_terminations) > 1 or remote_terminations[0].positions == 1:
front_ports = FrontPort.objects.filter(
rear_port_id__in=[rp.pk for rp in remote_terminations],
rear_port_position=1
)
2020-11-16 15:49:07 -05:00
elif position_stack:
2022-05-10 15:45:12 -04:00
front_ports = FrontPort.objects.filter(
rear_port_id=remote_terminations[0].pk,
rear_port_position__in=position_stack.pop()
)
2020-11-16 15:49:07 -05:00
else:
2022-05-10 09:53:55 -04:00
# No position indicated: path has split, so we stop at the RearPorts
2020-11-16 15:49:07 -05:00
is_split = True
break
2022-05-10 09:53:55 -04:00
terminations = front_ports
elif isinstance(remote_terminations[0], CircuitTermination):
# Follow a CircuitTermination to its corresponding CircuitTermination (A to Z or vice versa)
term_side = remote_terminations[0].term_side
assert all(ct.term_side == term_side for ct in remote_terminations[1:])
circuit_termination = CircuitTermination.objects.filter(
circuit=remote_terminations[0].circuit,
term_side='Z' if term_side == 'A' else 'A'
2022-05-10 09:53:55 -04:00
).first()
if circuit_termination is None:
break
elif circuit_termination.provider_network:
# Circuit terminates to a ProviderNetwork
path.extend([
[object_to_path_node(circuit_termination)],
[object_to_path_node(circuit_termination.provider_network)],
2022-05-03 16:30:39 -04:00
])
break
2022-05-10 09:53:55 -04:00
elif circuit_termination.site and not circuit_termination.cable:
# Circuit terminates to a Site
path.extend([
[object_to_path_node(circuit_termination)],
[object_to_path_node(circuit_termination.site)],
2022-05-03 16:30:39 -04:00
])
break
2022-05-10 09:53:55 -04:00
terminations = [circuit_termination]
2020-11-16 15:49:07 -05:00
# Anything else marks the end of the path
else:
2022-05-05 16:41:30 -04:00
is_complete = True
2020-11-16 15:49:07 -05:00
break
return cls(
path=path,
2022-05-05 16:41:30 -04:00
is_complete=is_complete,
2020-11-16 15:49:07 -05:00
is_active=is_active,
is_split=is_split
)
def retrace(self):
"""
Retrace the path from the currently-defined originating termination(s)
"""
_new = self.from_origin(self.origins)
if _new:
self.path = _new.path
self.is_complete = _new.is_complete
self.is_active = _new.is_active
self.is_split = _new.is_split
self.save()
else:
self.delete()
def _get_path(self):
"""
Return the path as a list of prefetched objects.
"""
# Compile a list of IDs to prefetch for each type of model in the path
to_prefetch = defaultdict(list)
for node in self._nodes:
ct_id, object_id = decompile_path_node(node)
to_prefetch[ct_id].append(object_id)
# Prefetch path objects using one query per model type. Prefetch related devices where appropriate.
prefetched = {}
for ct_id, object_ids in to_prefetch.items():
model_class = ContentType.objects.get_for_id(ct_id).model_class()
queryset = model_class.objects.filter(pk__in=object_ids)
if hasattr(model_class, 'device'):
queryset = queryset.prefetch_related('device')
prefetched[ct_id] = {
obj.id: obj for obj in queryset
}
# Replicate the path using the prefetched objects.
path = []
for step in self.path:
nodes = []
for node in step:
ct_id, object_id = decompile_path_node(node)
nodes.append(prefetched[ct_id][object_id])
path.append(nodes)
return path
def get_cable_ids(self):
"""
Return all Cable IDs within the path.
"""
cable_ct = ContentType.objects.get_for_model(Cable).pk
cable_ids = []
for node in self._nodes:
ct, id = decompile_path_node(node)
if ct == cable_ct:
cable_ids.append(id)
return cable_ids
2020-10-14 16:54:30 -04:00
def get_total_length(self):
"""
Return a tuple containing the sum of the length of each cable in the path
and a flag indicating whether the length is definitive.
2020-10-14 16:54:30 -04:00
"""
cable_ids = self.get_cable_ids()
cables = Cable.objects.filter(id__in=cable_ids, _abs_length__isnull=False)
total_length = cables.aggregate(total=Sum('_abs_length'))['total']
is_definitive = len(cables) == len(cable_ids)
2021-03-25 11:51:02 -04:00
return total_length, is_definitive
2020-11-16 15:49:07 -05:00
def get_split_nodes(self):
"""
2020-11-17 10:26:03 -05:00
Return all available next segments in a split cable path.
2020-11-16 15:49:07 -05:00
"""
rearport = path_node_to_object(self._nodes[-1])
2021-03-25 11:51:02 -04:00
2020-11-16 15:49:07 -05:00
return FrontPort.objects.filter(rear_port=rearport)