mirror of
https://github.com/netbox-community/netbox.git
synced 2024-05-10 07:54:54 +00:00
Replace legacy trace() method
This commit is contained in:
@@ -1,5 +1,3 @@
|
||||
import logging
|
||||
|
||||
from django.contrib.contenttypes.fields import GenericRelation
|
||||
from django.core.exceptions import ObjectDoesNotExist, ValidationError
|
||||
from django.core.validators import MaxValueValidator, MinValueValidator
|
||||
@@ -11,8 +9,8 @@ from taggit.managers import TaggableManager
|
||||
|
||||
from dcim.choices import *
|
||||
from dcim.constants import *
|
||||
from dcim.exceptions import CableTraceSplit
|
||||
from dcim.fields import MACAddressField
|
||||
from dcim.utils import path_node_to_object
|
||||
from extras.models import ObjectChange, TaggedItem
|
||||
from extras.utils import extras_features
|
||||
from utilities.fields import NaturalOrderingField
|
||||
@@ -117,114 +115,6 @@ class CableTermination(models.Model):
|
||||
class Meta:
|
||||
abstract = True
|
||||
|
||||
def trace(self):
|
||||
"""
|
||||
Return three items: the traceable portion of a cable path, the termination points where it splits (if any), and
|
||||
the remaining positions on the position stack (if any). Splits occur when the trace is initiated from a midpoint
|
||||
along a path which traverses a RearPort. In cases where the originating endpoint is unknown, it is not possible
|
||||
to know which corresponding FrontPort to follow. Remaining positions occur when tracing a path that traverses
|
||||
a FrontPort without traversing a RearPort again.
|
||||
|
||||
The path is a list representing a complete cable path, with each individual segment represented as a
|
||||
three-tuple:
|
||||
|
||||
[
|
||||
(termination A, cable, termination B),
|
||||
(termination C, cable, termination D),
|
||||
(termination E, cable, termination F)
|
||||
]
|
||||
"""
|
||||
endpoint = self
|
||||
path = []
|
||||
position_stack = []
|
||||
|
||||
def get_peer_port(termination):
|
||||
from circuits.models import CircuitTermination
|
||||
|
||||
# Map a front port to its corresponding rear port
|
||||
if isinstance(termination, FrontPort):
|
||||
# Retrieve the corresponding RearPort from database to ensure we have an up-to-date instance
|
||||
peer_port = RearPort.objects.get(pk=termination.rear_port.pk)
|
||||
|
||||
# Don't use the stack for RearPorts with a single position. Only remember the position at
|
||||
# many-to-one points so we can select the correct FrontPort when we reach the corresponding
|
||||
# one-to-many point.
|
||||
if peer_port.positions > 1:
|
||||
position_stack.append(termination)
|
||||
|
||||
return peer_port
|
||||
|
||||
# Map a rear port/position to its corresponding front port
|
||||
elif isinstance(termination, RearPort):
|
||||
if termination.positions > 1:
|
||||
# Can't map to a FrontPort without a position if there are multiple options
|
||||
if not position_stack:
|
||||
raise CableTraceSplit(termination)
|
||||
|
||||
front_port = position_stack.pop()
|
||||
position = front_port.rear_port_position
|
||||
|
||||
# Validate the position
|
||||
if position not in range(1, termination.positions + 1):
|
||||
raise Exception("Invalid position for {} ({} positions): {})".format(
|
||||
termination, termination.positions, position
|
||||
))
|
||||
else:
|
||||
# Don't use the stack for RearPorts with a single position. The only possible position is 1.
|
||||
position = 1
|
||||
|
||||
try:
|
||||
peer_port = FrontPort.objects.get(
|
||||
rear_port=termination,
|
||||
rear_port_position=position,
|
||||
)
|
||||
return peer_port
|
||||
except ObjectDoesNotExist:
|
||||
return None
|
||||
|
||||
# Follow a circuit to its other termination
|
||||
elif isinstance(termination, CircuitTermination):
|
||||
peer_termination = termination.get_peer_termination()
|
||||
if peer_termination is None:
|
||||
return None
|
||||
return peer_termination
|
||||
|
||||
# Termination is not a pass-through port
|
||||
else:
|
||||
return None
|
||||
|
||||
logger = logging.getLogger('netbox.dcim.cable.trace')
|
||||
logger.debug("Tracing cable from {} {}".format(self.parent, self))
|
||||
|
||||
while endpoint is not None:
|
||||
|
||||
# No cable connected; nothing to trace
|
||||
if not endpoint.cable:
|
||||
path.append((endpoint, None, None))
|
||||
logger.debug("No cable connected")
|
||||
return path, None, position_stack
|
||||
|
||||
# Check for loops
|
||||
if endpoint.cable in [segment[1] for segment in path]:
|
||||
logger.debug("Loop detected!")
|
||||
return path, None, position_stack
|
||||
|
||||
# Record the current segment in the path
|
||||
far_end = endpoint.get_cable_peer()
|
||||
path.append((endpoint, endpoint.cable, far_end))
|
||||
logger.debug("{}[{}] --- Cable {} ---> {}[{}]".format(
|
||||
endpoint.parent, endpoint, endpoint.cable.pk, far_end.parent, far_end
|
||||
))
|
||||
|
||||
# Get the peer port of the far end termination
|
||||
try:
|
||||
endpoint = get_peer_port(far_end)
|
||||
except CableTraceSplit as e:
|
||||
return path, e.termination.frontports.all(), position_stack
|
||||
|
||||
if endpoint is None:
|
||||
return path, None, position_stack
|
||||
|
||||
def get_cable_peer(self):
|
||||
if self.cable is None:
|
||||
return None
|
||||
@@ -233,23 +123,6 @@ class CableTermination(models.Model):
|
||||
if self._cabled_as_b.exists():
|
||||
return self.cable.termination_a
|
||||
|
||||
def get_path_endpoints(self):
|
||||
"""
|
||||
Return all endpoints of paths which traverse this object.
|
||||
"""
|
||||
endpoints = []
|
||||
|
||||
# Get the far end of the last path segment
|
||||
path, split_ends, position_stack = self.trace()
|
||||
endpoint = path[-1][2]
|
||||
if split_ends is not None:
|
||||
for termination in split_ends:
|
||||
endpoints.extend(termination.get_path_endpoints())
|
||||
elif endpoint is not None:
|
||||
endpoints.append(endpoint)
|
||||
|
||||
return endpoints
|
||||
|
||||
|
||||
class PathEndpoint(models.Model):
|
||||
"""
|
||||
@@ -265,6 +138,17 @@ class PathEndpoint(models.Model):
|
||||
class Meta:
|
||||
abstract = True
|
||||
|
||||
def trace(self):
|
||||
if self.path is None:
|
||||
return []
|
||||
|
||||
# Construct the complete path
|
||||
path = [self, *[path_node_to_object(obj) for obj in self.path.path], self.path.destination]
|
||||
assert not len(path) % 3, f"Invalid path length for CablePath #{self.pk}: {len(self.path)} elements in path"
|
||||
|
||||
# Return the path as a list of three-tuples (A termination, cable, B termination)
|
||||
return list(zip(*[iter(path)] * 3))
|
||||
|
||||
@property
|
||||
def path(self):
|
||||
"""
|
||||
|
Reference in New Issue
Block a user