1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00

Initial work on #6087

This commit is contained in:
jeremystretch
2021-05-24 16:51:05 -04:00
parent da1fb4f969
commit da558de769
9 changed files with 515 additions and 102 deletions

View File

@ -209,6 +209,12 @@ class PrefixFilterSet(PrimaryModelFilterSet, TenancyFilterSet):
method='search_contains',
label='Prefixes which contain this prefix or IP',
)
depth = django_filters.NumberFilter(
field_name='_depth'
)
children = django_filters.NumberFilter(
field_name='_children'
)
mask_length = django_filters.NumberFilter(
field_name='prefix',
lookup_expr='net_mask_length'

View File

@ -0,0 +1,21 @@
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ipam', '0046_set_vlangroup_scope_types'),
]
operations = [
migrations.AddField(
model_name='prefix',
name='_children',
field=models.PositiveBigIntegerField(default=0, editable=False),
),
migrations.AddField(
model_name='prefix',
name='_depth',
field=models.PositiveSmallIntegerField(default=0, editable=False),
),
]

View File

@ -0,0 +1,86 @@
from django.db import migrations
def push_to_stack(stack, prefix):
# Increment child count on parent nodes
for n in stack:
n['children'] += 1
stack.append({
'pk': prefix['pk'],
'prefix': prefix['prefix'],
'children': 0,
})
def populate_prefix_hierarchy(apps, schema_editor):
"""
Populate _depth and _children attrs for all Prefixes.
"""
Prefix = apps.get_model('ipam', 'Prefix')
VRF = apps.get_model('ipam', 'VRF')
total_count = Prefix.objects.count()
print(f'\nUpdating {total_count} prefixes...')
# Iterate through all VRFs and the global table
vrfs = [None] + list(VRF.objects.values_list('pk', flat=True))
for vrf in vrfs:
stack = []
update_queue = []
# Iterate through all Prefixes in the VRF, growing and shrinking the stack as we go
prefixes = Prefix.objects.filter(vrf=vrf).values('pk', 'prefix')
for i, p in enumerate(prefixes):
# Grow the stack if this is a child of the most recent prefix
if not stack or p['prefix'] in stack[-1]['prefix']:
push_to_stack(stack, p)
# If this is a sibling or parent of the most recent prefix, pop nodes from the
# stack until we reach a parent prefix (or the root)
else:
while stack and p['prefix'] not in stack[-1]['prefix'] and p['prefix'] != stack[-1]['prefix']:
node = stack.pop()
update_queue.append(
Prefix(
pk=node['pk'],
_depth=len(stack),
_children=node['children']
)
)
push_to_stack(stack, p)
# Flush the update queue once it reaches 100 Prefixes
if len(update_queue) >= 100:
Prefix.objects.bulk_update(update_queue, ['_depth', '_children'])
update_queue = []
print(f' [{i}/{total_count}]')
# Clear out any prefixes remaining in the stack
while stack:
node = stack.pop()
update_queue.append(
Prefix(
pk=node['pk'],
_depth=len(stack),
_children=node['children']
)
)
# Final flush of any remaining Prefixes
Prefix.objects.bulk_update(update_queue, ['_depth', '_children'])
class Migration(migrations.Migration):
dependencies = [
('ipam', '0047_prefix_depth_children'),
]
operations = [
migrations.RunPython(
code=populate_prefix_hierarchy,
reverse_code=migrations.RunPython.noop
),
]

View File

@ -293,6 +293,16 @@ class Prefix(PrimaryModel):
blank=True
)
# Cached depth & child counts
_depth = models.PositiveSmallIntegerField(
default=0,
editable=False
)
_children = models.PositiveBigIntegerField(
default=0,
editable=False
)
objects = PrefixQuerySet.as_manager()
csv_headers = [
@ -306,6 +316,13 @@ class Prefix(PrimaryModel):
ordering = (F('vrf').asc(nulls_first=True), 'prefix', 'pk') # (vrf, prefix) may be non-unique
verbose_name_plural = 'prefixes'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Cache the original prefix and VRF so we can check if they have changed on post_save
self._prefix = self.prefix
self._vrf = self.vrf
def __str__(self):
return str(self.prefix)
@ -373,6 +390,14 @@ class Prefix(PrimaryModel):
return self.prefix.version
return None
@property
def depth(self):
return self._depth
@property
def children(self):
return self._children
def _set_prefix_length(self, value):
"""
Expose the IPNetwork object's prefixlen attribute on the parent model so that it can be manipulated directly,
@ -385,6 +410,26 @@ class Prefix(PrimaryModel):
def get_status_class(self):
return PrefixStatusChoices.CSS_CLASSES.get(self.status)
def get_parents(self, include_self=False):
"""
Return all containing Prefixes in the hierarchy.
"""
lookup = 'net_contains_or_equals' if include_self else 'net_contains'
return Prefix.objects.filter(**{
'vrf': self.vrf,
f'prefix__{lookup}': self.prefix
})
def get_children(self, include_self=False):
"""
Return all covered Prefixes in the hierarchy.
"""
lookup = 'net_contained_or_equal' if include_self else 'net_contained'
return Prefix.objects.filter(**{
'vrf': self.vrf,
f'prefix__{lookup}': self.prefix
})
def get_duplicates(self):
return Prefix.objects.filter(vrf=self.vrf, prefix=str(self.prefix)).exclude(pk=self.pk)

View File

@ -1,27 +1,32 @@
from django.contrib.contenttypes.models import ContentType
from django.db.models import Q
from django.db.models.expressions import RawSQL
from utilities.querysets import RestrictedQuerySet
class PrefixQuerySet(RestrictedQuerySet):
def annotate_tree(self):
def annotate_hierarchy(self):
"""
Annotate the number of parent and child prefixes for each Prefix. Raw SQL is needed for these subqueries
because we need to cast NULL VRF values to integers for comparison. (NULL != NULL).
Annotate the depth and number of child prefixes for each Prefix. Cast null VRF values to zero for
comparison. (NULL != NULL).
"""
return self.extra(
select={
'parents': 'SELECT COUNT(U0."prefix") AS "c" '
'FROM "ipam_prefix" U0 '
'WHERE (U0."prefix" >> "ipam_prefix"."prefix" '
'AND COALESCE(U0."vrf_id", 0) = COALESCE("ipam_prefix"."vrf_id", 0))',
'children': 'SELECT COUNT(U1."prefix") AS "c" '
'FROM "ipam_prefix" U1 '
'WHERE (U1."prefix" << "ipam_prefix"."prefix" '
'AND COALESCE(U1."vrf_id", 0) = COALESCE("ipam_prefix"."vrf_id", 0))',
}
return self.annotate(
hierarchy_depth=RawSQL(
'SELECT COUNT(DISTINCT U0."prefix") AS "c" '
'FROM "ipam_prefix" U0 '
'WHERE (U0."prefix" >> "ipam_prefix"."prefix" '
'AND COALESCE(U0."vrf_id", 0) = COALESCE("ipam_prefix"."vrf_id", 0))',
()
),
hierarchy_children=RawSQL(
'SELECT COUNT(U1."prefix") AS "c" '
'FROM "ipam_prefix" U1 '
'WHERE (U1."prefix" << "ipam_prefix"."prefix" '
'AND COALESCE(U1."vrf_id", 0) = COALESCE("ipam_prefix"."vrf_id", 0))',
()
)
)

View File

@ -1,9 +1,52 @@
from django.db.models.signals import pre_delete
from django.db.models.signals import post_delete, post_save, pre_delete
from django.dispatch import receiver
from dcim.models import Device
from virtualization.models import VirtualMachine
from .models import IPAddress
from .models import IPAddress, Prefix
def update_parents_children(prefix):
"""
Update depth on prefix & containing prefixes
"""
parents = prefix.get_parents(include_self=True).annotate_hierarchy()
for parent in parents:
parent._children = parent.hierarchy_children
Prefix.objects.bulk_update(parents, ['_children'])
def update_children_depth(prefix):
"""
Update children count on prefix & contained prefixes
"""
children = prefix.get_children(include_self=True).annotate_hierarchy()
for child in children:
child._depth = child.hierarchy_depth
Prefix.objects.bulk_update(children, ['_depth'])
@receiver(post_save, sender=Prefix)
def handle_prefix_saved(instance, created, **kwargs):
# Prefix has changed (or new instance has been created)
if created or instance.vrf != instance._vrf or instance.prefix != instance._prefix:
update_parents_children(instance)
update_children_depth(instance)
# If this is not a new prefix, clean up parent/children of previous prefix
if not created:
old_prefix = Prefix(vrf=instance._vrf, prefix=instance._prefix)
update_parents_children(old_prefix)
update_children_depth(old_prefix)
@receiver(post_delete, sender=Prefix)
def handle_prefix_deleted(instance, **kwargs):
update_parents_children(instance)
update_children_depth(instance)
@receiver(pre_delete, sender=IPAddress)

View File

@ -15,7 +15,7 @@ AVAILABLE_LABEL = mark_safe('<span class="label label-success">Available</span>'
PREFIX_LINK = """
{% load helpers %}
{% for i in record.parents|as_range %}
{% for i in record.depth|as_range %}
<i class="mdi mdi-circle-small"></i>
{% endfor %}
<a href="{% if record.pk %}{% url 'ipam:prefix' pk=record.pk %}{% else %}{% url 'ipam:prefix_add' %}?prefix={{ record }}{% if object.vrf %}&vrf={{ object.vrf.pk }}{% endif %}{% if object.site %}&site={{ object.site.pk }}{% endif %}{% if object.tenant %}&tenant_group={{ object.tenant.group.pk }}&tenant={{ object.tenant.pk }}{% endif %}{% endif %}">{{ record.prefix }}</a>
@ -262,6 +262,14 @@ class PrefixTable(BaseTable):
template_code=PREFIX_LINK,
attrs={'td': {'class': 'text-nowrap'}}
)
depth = tables.Column(
accessor=Accessor('_depth'),
verbose_name='Depth'
)
children = tables.Column(
accessor=Accessor('_children'),
verbose_name='Children'
)
status = ChoiceFieldColumn(
default=AVAILABLE_LABEL
)
@ -287,7 +295,8 @@ class PrefixTable(BaseTable):
class Meta(BaseTable.Meta):
model = Prefix
fields = (
'pk', 'prefix', 'status', 'children', 'vrf', 'tenant', 'site', 'vlan', 'role', 'is_pool', 'description',
'pk', 'prefix', 'status', 'depth', 'children', 'vrf', 'tenant', 'site', 'vlan', 'role', 'is_pool',
'description',
)
default_columns = ('pk', 'prefix', 'status', 'vrf', 'tenant', 'site', 'vlan', 'role', 'description')
row_attrs = {

View File

@ -1,4 +1,4 @@
import netaddr
from netaddr import IPNetwork, IPSet
from django.core.exceptions import ValidationError
from django.test import TestCase, override_settings
@ -10,27 +10,27 @@ class TestAggregate(TestCase):
def test_get_utilization(self):
rir = RIR.objects.create(name='RIR 1', slug='rir-1')
aggregate = Aggregate(prefix=netaddr.IPNetwork('10.0.0.0/8'), rir=rir)
aggregate = Aggregate(prefix=IPNetwork('10.0.0.0/8'), rir=rir)
aggregate.save()
# 25% utilization
Prefix.objects.bulk_create((
Prefix(prefix=netaddr.IPNetwork('10.0.0.0/12')),
Prefix(prefix=netaddr.IPNetwork('10.16.0.0/12')),
Prefix(prefix=netaddr.IPNetwork('10.32.0.0/12')),
Prefix(prefix=netaddr.IPNetwork('10.48.0.0/12')),
Prefix(prefix=IPNetwork('10.0.0.0/12')),
Prefix(prefix=IPNetwork('10.16.0.0/12')),
Prefix(prefix=IPNetwork('10.32.0.0/12')),
Prefix(prefix=IPNetwork('10.48.0.0/12')),
))
self.assertEqual(aggregate.get_utilization(), 25)
# 50% utilization
Prefix.objects.bulk_create((
Prefix(prefix=netaddr.IPNetwork('10.64.0.0/10')),
Prefix(prefix=IPNetwork('10.64.0.0/10')),
))
self.assertEqual(aggregate.get_utilization(), 50)
# 100% utilization
Prefix.objects.bulk_create((
Prefix(prefix=netaddr.IPNetwork('10.128.0.0/9')),
Prefix(prefix=IPNetwork('10.128.0.0/9')),
))
self.assertEqual(aggregate.get_utilization(), 100)
@ -39,9 +39,9 @@ class TestPrefix(TestCase):
def test_get_duplicates(self):
prefixes = Prefix.objects.bulk_create((
Prefix(prefix=netaddr.IPNetwork('192.0.2.0/24')),
Prefix(prefix=netaddr.IPNetwork('192.0.2.0/24')),
Prefix(prefix=netaddr.IPNetwork('192.0.2.0/24')),
Prefix(prefix=IPNetwork('192.0.2.0/24')),
Prefix(prefix=IPNetwork('192.0.2.0/24')),
Prefix(prefix=IPNetwork('192.0.2.0/24')),
))
duplicate_prefix_pks = [p.pk for p in prefixes[0].get_duplicates()]
@ -54,11 +54,11 @@ class TestPrefix(TestCase):
VRF(name='VRF 3'),
))
prefixes = Prefix.objects.bulk_create((
Prefix(prefix=netaddr.IPNetwork('10.0.0.0/16'), status=PrefixStatusChoices.STATUS_CONTAINER),
Prefix(prefix=netaddr.IPNetwork('10.0.0.0/24'), vrf=None),
Prefix(prefix=netaddr.IPNetwork('10.0.1.0/24'), vrf=vrfs[0]),
Prefix(prefix=netaddr.IPNetwork('10.0.2.0/24'), vrf=vrfs[1]),
Prefix(prefix=netaddr.IPNetwork('10.0.3.0/24'), vrf=vrfs[2]),
Prefix(prefix=IPNetwork('10.0.0.0/16'), status=PrefixStatusChoices.STATUS_CONTAINER),
Prefix(prefix=IPNetwork('10.0.0.0/24'), vrf=None),
Prefix(prefix=IPNetwork('10.0.1.0/24'), vrf=vrfs[0]),
Prefix(prefix=IPNetwork('10.0.2.0/24'), vrf=vrfs[1]),
Prefix(prefix=IPNetwork('10.0.3.0/24'), vrf=vrfs[2]),
))
child_prefix_pks = {p.pk for p in prefixes[0].get_child_prefixes()}
@ -79,13 +79,13 @@ class TestPrefix(TestCase):
VRF(name='VRF 3'),
))
parent_prefix = Prefix.objects.create(
prefix=netaddr.IPNetwork('10.0.0.0/16'), status=PrefixStatusChoices.STATUS_CONTAINER
prefix=IPNetwork('10.0.0.0/16'), status=PrefixStatusChoices.STATUS_CONTAINER
)
ips = IPAddress.objects.bulk_create((
IPAddress(address=netaddr.IPNetwork('10.0.0.1/24'), vrf=None),
IPAddress(address=netaddr.IPNetwork('10.0.1.1/24'), vrf=vrfs[0]),
IPAddress(address=netaddr.IPNetwork('10.0.2.1/24'), vrf=vrfs[1]),
IPAddress(address=netaddr.IPNetwork('10.0.3.1/24'), vrf=vrfs[2]),
IPAddress(address=IPNetwork('10.0.0.1/24'), vrf=None),
IPAddress(address=IPNetwork('10.0.1.1/24'), vrf=vrfs[0]),
IPAddress(address=IPNetwork('10.0.2.1/24'), vrf=vrfs[1]),
IPAddress(address=IPNetwork('10.0.3.1/24'), vrf=vrfs[2]),
))
child_ip_pks = {p.pk for p in parent_prefix.get_child_ips()}
@ -102,16 +102,16 @@ class TestPrefix(TestCase):
def test_get_available_prefixes(self):
prefixes = Prefix.objects.bulk_create((
Prefix(prefix=netaddr.IPNetwork('10.0.0.0/16')), # Parent prefix
Prefix(prefix=netaddr.IPNetwork('10.0.0.0/20')),
Prefix(prefix=netaddr.IPNetwork('10.0.32.0/20')),
Prefix(prefix=netaddr.IPNetwork('10.0.128.0/18')),
Prefix(prefix=IPNetwork('10.0.0.0/16')), # Parent prefix
Prefix(prefix=IPNetwork('10.0.0.0/20')),
Prefix(prefix=IPNetwork('10.0.32.0/20')),
Prefix(prefix=IPNetwork('10.0.128.0/18')),
))
missing_prefixes = netaddr.IPSet([
netaddr.IPNetwork('10.0.16.0/20'),
netaddr.IPNetwork('10.0.48.0/20'),
netaddr.IPNetwork('10.0.64.0/18'),
netaddr.IPNetwork('10.0.192.0/18'),
missing_prefixes = IPSet([
IPNetwork('10.0.16.0/20'),
IPNetwork('10.0.48.0/20'),
IPNetwork('10.0.64.0/18'),
IPNetwork('10.0.192.0/18'),
])
available_prefixes = prefixes[0].get_available_prefixes()
@ -119,17 +119,17 @@ class TestPrefix(TestCase):
def test_get_available_ips(self):
parent_prefix = Prefix.objects.create(prefix=netaddr.IPNetwork('10.0.0.0/28'))
parent_prefix = Prefix.objects.create(prefix=IPNetwork('10.0.0.0/28'))
IPAddress.objects.bulk_create((
IPAddress(address=netaddr.IPNetwork('10.0.0.1/26')),
IPAddress(address=netaddr.IPNetwork('10.0.0.3/26')),
IPAddress(address=netaddr.IPNetwork('10.0.0.5/26')),
IPAddress(address=netaddr.IPNetwork('10.0.0.7/26')),
IPAddress(address=netaddr.IPNetwork('10.0.0.9/26')),
IPAddress(address=netaddr.IPNetwork('10.0.0.11/26')),
IPAddress(address=netaddr.IPNetwork('10.0.0.13/26')),
IPAddress(address=IPNetwork('10.0.0.1/26')),
IPAddress(address=IPNetwork('10.0.0.3/26')),
IPAddress(address=IPNetwork('10.0.0.5/26')),
IPAddress(address=IPNetwork('10.0.0.7/26')),
IPAddress(address=IPNetwork('10.0.0.9/26')),
IPAddress(address=IPNetwork('10.0.0.11/26')),
IPAddress(address=IPNetwork('10.0.0.13/26')),
))
missing_ips = netaddr.IPSet([
missing_ips = IPSet([
'10.0.0.2/32',
'10.0.0.4/32',
'10.0.0.6/32',
@ -145,39 +145,39 @@ class TestPrefix(TestCase):
def test_get_first_available_prefix(self):
prefixes = Prefix.objects.bulk_create((
Prefix(prefix=netaddr.IPNetwork('10.0.0.0/16')), # Parent prefix
Prefix(prefix=netaddr.IPNetwork('10.0.0.0/24')),
Prefix(prefix=netaddr.IPNetwork('10.0.1.0/24')),
Prefix(prefix=netaddr.IPNetwork('10.0.2.0/24')),
Prefix(prefix=IPNetwork('10.0.0.0/16')), # Parent prefix
Prefix(prefix=IPNetwork('10.0.0.0/24')),
Prefix(prefix=IPNetwork('10.0.1.0/24')),
Prefix(prefix=IPNetwork('10.0.2.0/24')),
))
self.assertEqual(prefixes[0].get_first_available_prefix(), netaddr.IPNetwork('10.0.3.0/24'))
self.assertEqual(prefixes[0].get_first_available_prefix(), IPNetwork('10.0.3.0/24'))
Prefix.objects.create(prefix=netaddr.IPNetwork('10.0.3.0/24'))
self.assertEqual(prefixes[0].get_first_available_prefix(), netaddr.IPNetwork('10.0.4.0/22'))
Prefix.objects.create(prefix=IPNetwork('10.0.3.0/24'))
self.assertEqual(prefixes[0].get_first_available_prefix(), IPNetwork('10.0.4.0/22'))
def test_get_first_available_ip(self):
parent_prefix = Prefix.objects.create(prefix=netaddr.IPNetwork('10.0.0.0/24'))
parent_prefix = Prefix.objects.create(prefix=IPNetwork('10.0.0.0/24'))
IPAddress.objects.bulk_create((
IPAddress(address=netaddr.IPNetwork('10.0.0.1/24')),
IPAddress(address=netaddr.IPNetwork('10.0.0.2/24')),
IPAddress(address=netaddr.IPNetwork('10.0.0.3/24')),
IPAddress(address=IPNetwork('10.0.0.1/24')),
IPAddress(address=IPNetwork('10.0.0.2/24')),
IPAddress(address=IPNetwork('10.0.0.3/24')),
))
self.assertEqual(parent_prefix.get_first_available_ip(), '10.0.0.4/24')
IPAddress.objects.create(address=netaddr.IPNetwork('10.0.0.4/24'))
IPAddress.objects.create(address=IPNetwork('10.0.0.4/24'))
self.assertEqual(parent_prefix.get_first_available_ip(), '10.0.0.5/24')
def test_get_utilization(self):
# Container Prefix
prefix = Prefix.objects.create(
prefix=netaddr.IPNetwork('10.0.0.0/24'),
prefix=IPNetwork('10.0.0.0/24'),
status=PrefixStatusChoices.STATUS_CONTAINER
)
Prefix.objects.bulk_create((
Prefix(prefix=netaddr.IPNetwork('10.0.0.0/26')),
Prefix(prefix=netaddr.IPNetwork('10.0.0.128/26')),
Prefix(prefix=IPNetwork('10.0.0.0/26')),
Prefix(prefix=IPNetwork('10.0.0.128/26')),
))
self.assertEqual(prefix.get_utilization(), 50)
@ -186,7 +186,7 @@ class TestPrefix(TestCase):
prefix.save()
IPAddress.objects.bulk_create(
# Create 32 IPAddresses within the Prefix
[IPAddress(address=netaddr.IPNetwork('10.0.0.{}/24'.format(i))) for i in range(1, 33)]
[IPAddress(address=IPNetwork('10.0.0.{}/24'.format(i))) for i in range(1, 33)]
)
self.assertEqual(prefix.get_utilization(), 12) # ~= 12%
@ -196,36 +196,234 @@ class TestPrefix(TestCase):
@override_settings(ENFORCE_GLOBAL_UNIQUE=False)
def test_duplicate_global(self):
Prefix.objects.create(prefix=netaddr.IPNetwork('192.0.2.0/24'))
duplicate_prefix = Prefix(prefix=netaddr.IPNetwork('192.0.2.0/24'))
Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24'))
duplicate_prefix = Prefix(prefix=IPNetwork('192.0.2.0/24'))
self.assertIsNone(duplicate_prefix.clean())
@override_settings(ENFORCE_GLOBAL_UNIQUE=True)
def test_duplicate_global_unique(self):
Prefix.objects.create(prefix=netaddr.IPNetwork('192.0.2.0/24'))
duplicate_prefix = Prefix(prefix=netaddr.IPNetwork('192.0.2.0/24'))
Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24'))
duplicate_prefix = Prefix(prefix=IPNetwork('192.0.2.0/24'))
self.assertRaises(ValidationError, duplicate_prefix.clean)
def test_duplicate_vrf(self):
vrf = VRF.objects.create(name='Test', rd='1:1', enforce_unique=False)
Prefix.objects.create(vrf=vrf, prefix=netaddr.IPNetwork('192.0.2.0/24'))
duplicate_prefix = Prefix(vrf=vrf, prefix=netaddr.IPNetwork('192.0.2.0/24'))
Prefix.objects.create(vrf=vrf, prefix=IPNetwork('192.0.2.0/24'))
duplicate_prefix = Prefix(vrf=vrf, prefix=IPNetwork('192.0.2.0/24'))
self.assertIsNone(duplicate_prefix.clean())
def test_duplicate_vrf_unique(self):
vrf = VRF.objects.create(name='Test', rd='1:1', enforce_unique=True)
Prefix.objects.create(vrf=vrf, prefix=netaddr.IPNetwork('192.0.2.0/24'))
duplicate_prefix = Prefix(vrf=vrf, prefix=netaddr.IPNetwork('192.0.2.0/24'))
Prefix.objects.create(vrf=vrf, prefix=IPNetwork('192.0.2.0/24'))
duplicate_prefix = Prefix(vrf=vrf, prefix=IPNetwork('192.0.2.0/24'))
self.assertRaises(ValidationError, duplicate_prefix.clean)
class TestPrefixHierarchy(TestCase):
"""
Test the automatic updating of depth and child count in response to changes made within
the prefix hierarchy.
"""
@classmethod
def setUpTestData(cls):
prefixes = (
# IPv4
Prefix(prefix='10.0.0.0/8', _depth=0, _children=2),
Prefix(prefix='10.0.0.0/16', _depth=1, _children=1),
Prefix(prefix='10.0.0.0/24', _depth=2, _children=0),
# IPv6
Prefix(prefix='2001:db8::/32', _depth=0, _children=2),
Prefix(prefix='2001:db8::/40', _depth=1, _children=1),
Prefix(prefix='2001:db8::/48', _depth=2, _children=0),
)
Prefix.objects.bulk_create(prefixes)
def test_create_prefix4(self):
# Create 10.0.0.0/12
Prefix(prefix='10.0.0.0/12').save()
prefixes = Prefix.objects.filter(prefix__family=4)
self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 3)
self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/12'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 2)
self.assertEqual(prefixes[2].prefix, IPNetwork('10.0.0.0/16'))
self.assertEqual(prefixes[2]._depth, 2)
self.assertEqual(prefixes[2]._children, 1)
self.assertEqual(prefixes[3].prefix, IPNetwork('10.0.0.0/24'))
self.assertEqual(prefixes[3]._depth, 3)
self.assertEqual(prefixes[3]._children, 0)
def test_create_prefix6(self):
# Create 2001:db8::/36
Prefix(prefix='2001:db8::/36').save()
prefixes = Prefix.objects.filter(prefix__family=6)
self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 3)
self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/36'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 2)
self.assertEqual(prefixes[2].prefix, IPNetwork('2001:db8::/40'))
self.assertEqual(prefixes[2]._depth, 2)
self.assertEqual(prefixes[2]._children, 1)
self.assertEqual(prefixes[3].prefix, IPNetwork('2001:db8::/48'))
self.assertEqual(prefixes[3]._depth, 3)
self.assertEqual(prefixes[3]._children, 0)
def test_update_prefix4(self):
# Change 10.0.0.0/24 to 10.0.0.0/12
p = Prefix.objects.get(prefix='10.0.0.0/24')
p.prefix = '10.0.0.0/12'
p.save()
prefixes = Prefix.objects.filter(prefix__family=4)
self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 2)
self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/12'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 1)
self.assertEqual(prefixes[2].prefix, IPNetwork('10.0.0.0/16'))
self.assertEqual(prefixes[2]._depth, 2)
self.assertEqual(prefixes[2]._children, 0)
def test_update_prefix6(self):
# Change 2001:db8::/48 to 2001:db8::/36
p = Prefix.objects.get(prefix='2001:db8::/48')
p.prefix = '2001:db8::/36'
p.save()
prefixes = Prefix.objects.filter(prefix__family=6)
self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 2)
self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/36'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 1)
self.assertEqual(prefixes[2].prefix, IPNetwork('2001:db8::/40'))
self.assertEqual(prefixes[2]._depth, 2)
self.assertEqual(prefixes[2]._children, 0)
def test_update_prefix_vrf4(self):
vrf = VRF(name='VRF A')
vrf.save()
# Move 10.0.0.0/16 to a VRF
p = Prefix.objects.get(prefix='10.0.0.0/16')
p.vrf = vrf
p.save()
prefixes = Prefix.objects.filter(vrf__isnull=True, prefix__family=4)
self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 1)
self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/24'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 0)
prefixes = Prefix.objects.filter(vrf=vrf)
self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/16'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 0)
def test_update_prefix_vrf6(self):
vrf = VRF(name='VRF A')
vrf.save()
# Move 2001:db8::/40 to a VRF
p = Prefix.objects.get(prefix='2001:db8::/40')
p.vrf = vrf
p.save()
prefixes = Prefix.objects.filter(vrf__isnull=True, prefix__family=6)
self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 1)
self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/48'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 0)
prefixes = Prefix.objects.filter(vrf=vrf)
self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/40'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 0)
def test_delete_prefix4(self):
# Delete 10.0.0.0/16
Prefix.objects.filter(prefix='10.0.0.0/16').delete()
prefixes = Prefix.objects.filter(prefix__family=4)
self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 1)
self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/24'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 0)
def test_delete_prefix6(self):
# Delete 2001:db8::/40
Prefix.objects.filter(prefix='2001:db8::/40').delete()
prefixes = Prefix.objects.filter(prefix__family=6)
self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 1)
self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/48'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 0)
def test_duplicate_prefix4(self):
# Duplicate 10.0.0.0/16
Prefix(prefix='10.0.0.0/16').save()
prefixes = Prefix.objects.filter(prefix__family=4)
self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 3)
self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/16'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 1)
self.assertEqual(prefixes[2].prefix, IPNetwork('10.0.0.0/16'))
self.assertEqual(prefixes[2]._depth, 1)
self.assertEqual(prefixes[2]._children, 1)
self.assertEqual(prefixes[3].prefix, IPNetwork('10.0.0.0/24'))
self.assertEqual(prefixes[3]._depth, 2)
self.assertEqual(prefixes[3]._children, 0)
def test_duplicate_prefix6(self):
# Duplicate 2001:db8::/40
Prefix(prefix='2001:db8::/40').save()
prefixes = Prefix.objects.filter(prefix__family=6)
self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 3)
self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/40'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 1)
self.assertEqual(prefixes[2].prefix, IPNetwork('2001:db8::/40'))
self.assertEqual(prefixes[2]._depth, 1)
self.assertEqual(prefixes[2]._children, 1)
self.assertEqual(prefixes[3].prefix, IPNetwork('2001:db8::/48'))
self.assertEqual(prefixes[3]._depth, 2)
self.assertEqual(prefixes[3]._children, 0)
class TestIPAddress(TestCase):
def test_get_duplicates(self):
ips = IPAddress.objects.bulk_create((
IPAddress(address=netaddr.IPNetwork('192.0.2.1/24')),
IPAddress(address=netaddr.IPNetwork('192.0.2.1/24')),
IPAddress(address=netaddr.IPNetwork('192.0.2.1/24')),
IPAddress(address=IPNetwork('192.0.2.1/24')),
IPAddress(address=IPNetwork('192.0.2.1/24')),
IPAddress(address=IPNetwork('192.0.2.1/24')),
))
duplicate_ip_pks = [p.pk for p in ips[0].get_duplicates()]
@ -237,44 +435,44 @@ class TestIPAddress(TestCase):
@override_settings(ENFORCE_GLOBAL_UNIQUE=False)
def test_duplicate_global(self):
IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(address=netaddr.IPNetwork('192.0.2.1/24'))
IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(address=IPNetwork('192.0.2.1/24'))
self.assertIsNone(duplicate_ip.clean())
@override_settings(ENFORCE_GLOBAL_UNIQUE=True)
def test_duplicate_global_unique(self):
IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(address=netaddr.IPNetwork('192.0.2.1/24'))
IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(address=IPNetwork('192.0.2.1/24'))
self.assertRaises(ValidationError, duplicate_ip.clean)
def test_duplicate_vrf(self):
vrf = VRF.objects.create(name='Test', rd='1:1', enforce_unique=False)
IPAddress.objects.create(vrf=vrf, address=netaddr.IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(vrf=vrf, address=netaddr.IPNetwork('192.0.2.1/24'))
IPAddress.objects.create(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
self.assertIsNone(duplicate_ip.clean())
def test_duplicate_vrf_unique(self):
vrf = VRF.objects.create(name='Test', rd='1:1', enforce_unique=True)
IPAddress.objects.create(vrf=vrf, address=netaddr.IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(vrf=vrf, address=netaddr.IPNetwork('192.0.2.1/24'))
IPAddress.objects.create(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
self.assertRaises(ValidationError, duplicate_ip.clean)
@override_settings(ENFORCE_GLOBAL_UNIQUE=True)
def test_duplicate_nonunique_nonrole_role(self):
IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(address=netaddr.IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(address=IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
self.assertRaises(ValidationError, duplicate_ip.clean)
@override_settings(ENFORCE_GLOBAL_UNIQUE=True)
def test_duplicate_nonunique_role_nonrole(self):
IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
duplicate_ip = IPAddress(address=netaddr.IPNetwork('192.0.2.1/24'))
IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
duplicate_ip = IPAddress(address=IPNetwork('192.0.2.1/24'))
self.assertRaises(ValidationError, duplicate_ip.clean)
@override_settings(ENFORCE_GLOBAL_UNIQUE=True)
def test_duplicate_nonunique_role(self):
IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
class TestVLANGroup(TestCase):

View File

@ -238,7 +238,7 @@ class AggregateView(generic.ObjectView):
'site', 'role'
).order_by(
'prefix'
).annotate_tree()
)
# Add available prefixes to the table if requested
if request.GET.get('show_available', 'true') == 'true':
@ -352,7 +352,7 @@ class RoleBulkDeleteView(generic.BulkDeleteView):
#
class PrefixListView(generic.ObjectListView):
queryset = Prefix.objects.annotate_tree()
queryset = Prefix.objects.all()
filterset = filtersets.PrefixFilterSet
filterset_form = forms.PrefixFilterForm
table = tables.PrefixDetailTable
@ -377,7 +377,7 @@ class PrefixView(generic.ObjectView):
prefix__net_contains=str(instance.prefix)
).prefetch_related(
'site', 'role'
).annotate_tree()
)
parent_prefix_table = tables.PrefixTable(list(parent_prefixes), orderable=False)
parent_prefix_table.exclude = ('vrf',)
@ -407,7 +407,7 @@ class PrefixPrefixesView(generic.ObjectView):
# Child prefixes table
child_prefixes = instance.get_child_prefixes().restrict(request.user, 'view').prefetch_related(
'site', 'vlan', 'role',
).annotate_tree()
)
# Add available prefixes to the table if requested
if child_prefixes and request.GET.get('show_available', 'true') == 'true':