1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00

548 lines
20 KiB
Python
Raw Normal View History

from collections import defaultdict
2020-10-14 16:54:30 -04:00
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ObjectDoesNotExist, ValidationError
from django.db import models
from django.db.models import Sum
from django.urls import reverse
from dcim.choices import *
from dcim.constants import *
from dcim.fields import PathField
2020-11-16 15:49:07 -05:00
from dcim.utils import decompile_path_node, object_to_path_node, path_node_to_object
2020-10-14 16:54:30 -04:00
from extras.utils import extras_features
from netbox.models import BigIDModel, PrimaryModel
2020-10-14 16:54:30 -04:00
from utilities.fields import ColorField
from utilities.querysets import RestrictedQuerySet
from utilities.utils import to_meters
from .devices import Device
from .device_components import FrontPort, RearPort
__all__ = (
'Cable',
'CablePath',
)
#
# Cables
#
@extras_features('custom_fields', 'custom_links', 'export_templates', 'webhooks')
class Cable(PrimaryModel):
2020-10-14 16:54:30 -04:00
"""
A physical connection between two endpoints.
"""
termination_a_type = models.ForeignKey(
to=ContentType,
limit_choices_to=CABLE_TERMINATION_MODELS,
on_delete=models.PROTECT,
related_name='+'
)
termination_a_id = models.PositiveIntegerField()
termination_a = GenericForeignKey(
ct_field='termination_a_type',
fk_field='termination_a_id'
)
termination_b_type = models.ForeignKey(
to=ContentType,
limit_choices_to=CABLE_TERMINATION_MODELS,
on_delete=models.PROTECT,
related_name='+'
)
termination_b_id = models.PositiveIntegerField()
termination_b = GenericForeignKey(
ct_field='termination_b_type',
fk_field='termination_b_id'
)
type = models.CharField(
max_length=50,
choices=CableTypeChoices,
blank=True
)
status = models.CharField(
max_length=50,
choices=CableStatusChoices,
default=CableStatusChoices.STATUS_CONNECTED
)
label = models.CharField(
max_length=100,
blank=True
)
color = ColorField(
blank=True
)
length = models.PositiveSmallIntegerField(
blank=True,
null=True
)
length_unit = models.CharField(
max_length=50,
choices=CableLengthUnitChoices,
blank=True,
)
# Stores the normalized length (in meters) for database ordering
_abs_length = models.DecimalField(
max_digits=10,
decimal_places=4,
blank=True,
null=True
)
# Cache the associated device (where applicable) for the A and B terminations. This enables filtering of Cables by
# their associated Devices.
_termination_a_device = models.ForeignKey(
to=Device,
on_delete=models.CASCADE,
related_name='+',
blank=True,
null=True
)
_termination_b_device = models.ForeignKey(
to=Device,
on_delete=models.CASCADE,
related_name='+',
blank=True,
null=True
)
objects = RestrictedQuerySet.as_manager()
csv_headers = [
'termination_a_type', 'termination_a_id', 'termination_b_type', 'termination_b_id', 'type', 'status', 'label',
'color', 'length', 'length_unit',
]
class Meta:
ordering = ['pk']
unique_together = (
('termination_a_type', 'termination_a_id'),
('termination_b_type', 'termination_b_id'),
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# A copy of the PK to be used by __str__ in case the object is deleted
self._pk = self.pk
# Cache the original status so we can check later if it's been changed
self._orig_status = self.status
@classmethod
def from_db(cls, db, field_names, values):
"""
Cache the original A and B terminations of existing Cable instances for later reference inside clean().
"""
instance = super().from_db(db, field_names, values)
instance._orig_termination_a_type_id = instance.termination_a_type_id
instance._orig_termination_a_id = instance.termination_a_id
instance._orig_termination_b_type_id = instance.termination_b_type_id
instance._orig_termination_b_id = instance.termination_b_id
return instance
def __str__(self):
pk = self.pk or self._pk
return self.label or f'#{pk}'
2020-10-14 16:54:30 -04:00
def get_absolute_url(self):
return reverse('dcim:cable', args=[self.pk])
def clean(self):
from circuits.models import CircuitTermination
super().clean()
2020-10-14 16:54:30 -04:00
# Validate that termination A exists
if not hasattr(self, 'termination_a_type'):
raise ValidationError('Termination A type has not been specified')
try:
self.termination_a_type.model_class().objects.get(pk=self.termination_a_id)
except ObjectDoesNotExist:
raise ValidationError({
'termination_a': 'Invalid ID for type {}'.format(self.termination_a_type)
})
# Validate that termination B exists
if not hasattr(self, 'termination_b_type'):
raise ValidationError('Termination B type has not been specified')
try:
self.termination_b_type.model_class().objects.get(pk=self.termination_b_id)
except ObjectDoesNotExist:
raise ValidationError({
'termination_b': 'Invalid ID for type {}'.format(self.termination_b_type)
})
# If editing an existing Cable instance, check that neither termination has been modified.
if self.pk:
err_msg = 'Cable termination points may not be modified. Delete and recreate the cable instead.'
if (
self.termination_a_type_id != self._orig_termination_a_type_id or
self.termination_a_id != self._orig_termination_a_id
):
raise ValidationError({
'termination_a': err_msg
})
if (
self.termination_b_type_id != self._orig_termination_b_type_id or
self.termination_b_id != self._orig_termination_b_id
):
raise ValidationError({
'termination_b': err_msg
})
type_a = self.termination_a_type.model
type_b = self.termination_b_type.model
# Validate interface types
if type_a == 'interface' and self.termination_a.type in NONCONNECTABLE_IFACE_TYPES:
raise ValidationError({
'termination_a_id': 'Cables cannot be terminated to {} interfaces'.format(
self.termination_a.get_type_display()
)
})
if type_b == 'interface' and self.termination_b.type in NONCONNECTABLE_IFACE_TYPES:
raise ValidationError({
'termination_b_id': 'Cables cannot be terminated to {} interfaces'.format(
self.termination_b.get_type_display()
)
})
# Check that termination types are compatible
if type_b not in COMPATIBLE_TERMINATION_TYPES.get(type_a):
raise ValidationError(
f"Incompatible termination types: {self.termination_a_type} and {self.termination_b_type}"
)
2020-11-17 10:26:03 -05:00
# Check that two connected RearPorts have the same number of positions (if both are >1)
2020-11-16 15:49:07 -05:00
if isinstance(self.termination_a, RearPort) and isinstance(self.termination_b, RearPort):
2020-11-17 10:26:03 -05:00
if self.termination_a.positions > 1 and self.termination_b.positions > 1:
if self.termination_a.positions != self.termination_b.positions:
raise ValidationError(
f"{self.termination_a} has {self.termination_a.positions} position(s) but "
f"{self.termination_b} has {self.termination_b.positions}. "
f"Both terminations must have the same number of positions (if greater than one)."
)
2020-10-14 16:54:30 -04:00
# A termination point cannot be connected to itself
if self.termination_a == self.termination_b:
raise ValidationError(f"Cannot connect {self.termination_a_type} to itself")
# A front port cannot be connected to its corresponding rear port
if (
type_a in ['frontport', 'rearport'] and
type_b in ['frontport', 'rearport'] and
(
getattr(self.termination_a, 'rear_port', None) == self.termination_b or
getattr(self.termination_b, 'rear_port', None) == self.termination_a
)
):
raise ValidationError("A front port cannot be connected to it corresponding rear port")
2021-04-01 10:21:41 -04:00
# A CircuitTermination attached to a ProviderNetwork cannot have a Cable
if isinstance(self.termination_a, CircuitTermination) and self.termination_a.provider_network is not None:
raise ValidationError({
2021-04-01 10:21:41 -04:00
'termination_a_id': "Circuit terminations attached to a provider network may not be cabled."
})
2021-04-01 10:21:41 -04:00
if isinstance(self.termination_b, CircuitTermination) and self.termination_b.provider_network is not None:
raise ValidationError({
2021-04-01 10:21:41 -04:00
'termination_b_id': "Circuit terminations attached to a provider network may not be cabled."
})
2020-10-14 16:54:30 -04:00
# Check for an existing Cable connected to either termination object
if self.termination_a.cable not in (None, self):
raise ValidationError("{} already has a cable attached (#{})".format(
self.termination_a, self.termination_a.cable_id
))
if self.termination_b.cable not in (None, self):
raise ValidationError("{} already has a cable attached (#{})".format(
self.termination_b, self.termination_b.cable_id
))
# Validate length and length_unit
if self.length is not None and not self.length_unit:
raise ValidationError("Must specify a unit when setting a cable length")
elif self.length is None:
self.length_unit = ''
def save(self, *args, **kwargs):
# Store the given length (if any) in meters for use in database ordering
if self.length and self.length_unit:
self._abs_length = to_meters(self.length, self.length_unit)
else:
self._abs_length = None
# Store the parent Device for the A and B terminations (if applicable) to enable filtering
if hasattr(self.termination_a, 'device'):
self._termination_a_device = self.termination_a.device
if hasattr(self.termination_b, 'device'):
self._termination_b_device = self.termination_b.device
super().save(*args, **kwargs)
# Update the private pk used in __str__ in case this is a new object (i.e. just got its pk)
self._pk = self.pk
def to_csv(self):
return (
'{}.{}'.format(self.termination_a_type.app_label, self.termination_a_type.model),
self.termination_a_id,
'{}.{}'.format(self.termination_b_type.app_label, self.termination_b_type.model),
self.termination_b_id,
self.get_type_display(),
self.get_status_display(),
self.label,
self.color,
self.length,
self.length_unit,
)
def get_status_class(self):
return CableStatusChoices.CSS_CLASSES.get(self.status)
def get_compatible_types(self):
"""
Return all termination types compatible with termination A.
"""
if self.termination_a is None:
return
return COMPATIBLE_TERMINATION_TYPES[self.termination_a._meta.model_name]
class CablePath(BigIDModel):
2020-10-14 16:54:30 -04:00
"""
A CablePath instance represents the physical path from an origin to a destination, including all intermediate
elements in the path. Every instance must specify an `origin`, whereas `destination` may be null (for paths which do
not terminate on a PathEndpoint).
`path` contains a list of nodes within the path, each represented by a tuple of (type, ID). The first element in the
path must be a Cable instance, followed by a pair of pass-through ports. For example, consider the following
topology:
1 2 3
Interface A --- Front Port A | Rear Port A --- Rear Port B | Front Port B --- Interface B
This path would be expressed as:
CablePath(
origin = Interface A
destination = Interface B
path = [Cable 1, Front Port A, Rear Port A, Cable 2, Rear Port B, Front Port B, Cable 3]
)
`is_active` is set to True only if 1) `destination` is not null, and 2) every Cable within the path has a status of
"connected".
"""
origin_type = models.ForeignKey(
to=ContentType,
on_delete=models.CASCADE,
related_name='+'
)
origin_id = models.PositiveIntegerField()
origin = GenericForeignKey(
ct_field='origin_type',
fk_field='origin_id'
)
destination_type = models.ForeignKey(
to=ContentType,
on_delete=models.CASCADE,
related_name='+',
blank=True,
null=True
)
destination_id = models.PositiveIntegerField(
blank=True,
null=True
)
destination = GenericForeignKey(
ct_field='destination_type',
fk_field='destination_id'
)
path = PathField()
is_active = models.BooleanField(
default=False
)
2020-11-16 15:49:07 -05:00
is_split = models.BooleanField(
default=False
)
2020-10-14 16:54:30 -04:00
class Meta:
unique_together = ('origin_type', 'origin_id')
def __str__(self):
2020-11-16 15:49:07 -05:00
status = ' (active)' if self.is_active else ' (split)' if self.is_split else ''
return f"Path #{self.pk}: {self.origin} to {self.destination} via {len(self.path)} nodes{status}"
2020-10-14 16:54:30 -04:00
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
# Record a direct reference to this CablePath on its originating object
model = self.origin._meta.model
model.objects.filter(pk=self.origin.pk).update(_path=self.pk)
@property
def segment_count(self):
total_length = 1 + len(self.path) + (1 if self.destination else 0)
return int(total_length / 3)
2020-11-16 15:49:07 -05:00
@classmethod
def from_origin(cls, origin):
"""
Create a new CablePath instance as traced from the given path origin.
"""
from circuits.models import CircuitTermination
2020-11-16 15:49:07 -05:00
if origin is None or origin.cable is None:
return None
destination = None
path = []
position_stack = []
is_active = True
is_split = False
node = origin
while node.cable is not None:
if node.cable.status != CableStatusChoices.STATUS_CONNECTED:
is_active = False
# Follow the cable to its far-end termination
path.append(object_to_path_node(node.cable))
peer_termination = node.get_cable_peer()
# Follow a FrontPort to its corresponding RearPort
if isinstance(peer_termination, FrontPort):
path.append(object_to_path_node(peer_termination))
node = peer_termination.rear_port
if node.positions > 1:
position_stack.append(peer_termination.rear_port_position)
path.append(object_to_path_node(node))
# Follow a RearPort to its corresponding FrontPort (if any)
2020-11-16 15:49:07 -05:00
elif isinstance(peer_termination, RearPort):
path.append(object_to_path_node(peer_termination))
# Determine the peer FrontPort's position
2020-11-16 15:49:07 -05:00
if peer_termination.positions == 1:
position = 1
2020-11-16 15:49:07 -05:00
elif position_stack:
position = position_stack.pop()
else:
# No position indicated: path has split, so we stop at the RearPort
is_split = True
break
try:
node = FrontPort.objects.get(rear_port=peer_termination, rear_port_position=position)
path.append(object_to_path_node(node))
except ObjectDoesNotExist:
# No corresponding FrontPort found for the RearPort
break
# Follow a CircuitTermination to its corresponding CircuitTermination (A to Z or vice versa)
elif isinstance(peer_termination, CircuitTermination):
path.append(object_to_path_node(peer_termination))
# Get peer CircuitTermination
node = peer_termination.get_peer_termination()
if node:
path.append(object_to_path_node(node))
if node.provider_network:
destination = node.provider_network
break
elif node.site and not node.cable:
destination = node.site
break
else:
# No peer CircuitTermination exists; halt the trace
break
2020-11-16 15:49:07 -05:00
# Anything else marks the end of the path
else:
destination = peer_termination
break
if destination is None:
is_active = False
return cls(
origin=origin,
destination=destination,
path=path,
is_active=is_active,
is_split=is_split
)
def get_path(self):
"""
Return the path as a list of prefetched objects.
"""
# Compile a list of IDs to prefetch for each type of model in the path
to_prefetch = defaultdict(list)
for node in self.path:
ct_id, object_id = decompile_path_node(node)
to_prefetch[ct_id].append(object_id)
# Prefetch path objects using one query per model type. Prefetch related devices where appropriate.
prefetched = {}
for ct_id, object_ids in to_prefetch.items():
model_class = ContentType.objects.get_for_id(ct_id).model_class()
queryset = model_class.objects.filter(pk__in=object_ids)
if hasattr(model_class, 'device'):
queryset = queryset.prefetch_related('device')
prefetched[ct_id] = {
obj.id: obj for obj in queryset
}
# Replicate the path using the prefetched objects.
path = []
for node in self.path:
ct_id, object_id = decompile_path_node(node)
path.append(prefetched[ct_id][object_id])
return path
@property
def last_node(self):
"""
Return either the destination or the last node within the path.
"""
return self.destination or path_node_to_object(self.path[-1])
def get_cable_ids(self):
"""
Return all Cable IDs within the path.
"""
cable_ct = ContentType.objects.get_for_model(Cable).pk
cable_ids = []
for node in self.path:
ct, id = decompile_path_node(node)
if ct == cable_ct:
cable_ids.append(id)
return cable_ids
2020-10-14 16:54:30 -04:00
def get_total_length(self):
"""
Return a tuple containing the sum of the length of each cable in the path
and a flag indicating whether the length is definitive.
2020-10-14 16:54:30 -04:00
"""
cable_ids = self.get_cable_ids()
cables = Cable.objects.filter(id__in=cable_ids, _abs_length__isnull=False)
total_length = cables.aggregate(total=Sum('_abs_length'))['total']
is_definitive = len(cables) == len(cable_ids)
2021-03-25 11:51:02 -04:00
return total_length, is_definitive
2020-11-16 15:49:07 -05:00
def get_split_nodes(self):
"""
2020-11-17 10:26:03 -05:00
Return all available next segments in a split cable path.
2020-11-16 15:49:07 -05:00
"""
rearport = path_node_to_object(self.path[-1])
2021-03-25 11:51:02 -04:00
2020-11-16 15:49:07 -05:00
return FrontPort.objects.filter(rear_port=rearport)