1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00

Merge branch 'develop' into feature

This commit is contained in:
jeremystretch
2021-06-02 20:35:38 -04:00
38 changed files with 959 additions and 212 deletions

View File

@ -3,7 +3,10 @@ blank_issues_enabled: false
contact_links:
- name: 📖 Contributing Policy
url: https://github.com/netbox-community/netbox/blob/develop/CONTRIBUTING.md
about: Please read through our contributing policy before opening an issue or pull request
- name: 💬 Discussion Group
url: https://groups.google.com/g/netbox-discuss
about: Join our discussion group for assistance with installation issues and other problems
about: "Please read through our contributing policy before opening an issue or pull request"
- name: Discussion
url: https://github.com/netbox-community/netbox/discussions
about: "If you're just looking for help, try starting a discussion instead"
- name: 💬 Community Slack
url: https://netdev.chat/
about: "Join #netbox on the NetDev Community Slack for assistance with installation issues and other problems"

View File

@ -24,7 +24,7 @@ The video below demonstrates the installation of NetBox v2.10.3 on Ubuntu 20.04
| Redis | 4.0 |
!!! note
Python 3.7 or later will be required in NetBox v2.12. Users are strongly encouraged to install NetBox using Python 3.7 or later for new deployments.
Python 3.7 or later will be required in NetBox v3.0. Users are strongly encouraged to install NetBox using Python 3.7 or later for new deployments.
Below is a simplified overview of the NetBox application stack for reference:

View File

@ -1,5 +1,26 @@
# NetBox v2.11
## v2.11.5 (FUTURE)
### Enhancements
* [#6087](https://github.com/netbox-community/netbox/issues/6087) - Improved prefix hierarchy rendering
* [#6487](https://github.com/netbox-community/netbox/issues/6487) - Add location filter to cable connection form
* [#6501](https://github.com/netbox-community/netbox/issues/6501) - Expose prefix depth and children on REST API serializer
### Bug Fixes
* [#6064](https://github.com/netbox-community/netbox/issues/6064) - Fix object permission assignments for user and group models
* [#6217](https://github.com/netbox-community/netbox/issues/6217) - Disallow passing of string values for integer custom fields
* [#6284](https://github.com/netbox-community/netbox/issues/6284) - Avoid sending redundant webhooks when adding/removing tags
* [#6492](https://github.com/netbox-community/netbox/issues/6492) - Correct tag population in post-change data resulting from REST API changes
* [#6496](https://github.com/netbox-community/netbox/issues/6496) - Fix upgrade script when Python installed in nonstandard path
* [#6502](https://github.com/netbox-community/netbox/issues/6502) - Correct permissions evaluation for running a report via the REST API
* [#6517](https://github.com/netbox-community/netbox/issues/6517) - Fix assignment of user when creating rack reservations via REST API
* [#6525](https://github.com/netbox-community/netbox/issues/6525) - Paginate related IPs table under IP address view
---
## v2.11.4 (2021-05-25)
### Enhancements
@ -93,7 +114,7 @@
## v2.11.0 (2021-04-16)
**Note:** NetBox v2.11 is the last major release that will support Python 3.6. Beginning with NetBox v2.12, Python 3.7 or later will be required.
**Note:** NetBox v2.11 is the last major release that will support Python 3.6. Beginning with NetBox v3.0, Python 3.7 or later will be required.
### Breaking Changes
@ -151,7 +172,7 @@ Devices can now be assigned to locations (formerly known as rack groups) within
When exporting a list of objects in NetBox, users now have the option of selecting the "current view". This will render CSV output matching the current configuration of the table being viewed. For example, if you modify the sites list to display only the site name, tenant, and status, the rendered CSV will include only these columns, and they will appear in the order chosen.
The legacy static export behavior has been retained to ensure backward compatibility for dependent integrations. However, users are strongly encouraged to adapt custom export templates where needed as this functionality will be removed in v2.12.
The legacy static export behavior has been retained to ensure backward compatibility for dependent integrations. However, users are strongly encouraged to adapt custom export templates where needed as this functionality will be removed in v3.0.
#### Variable Scope Support for VLAN Groups ([#5284](https://github.com/netbox-community/netbox/issues/5284))

View File

@ -246,10 +246,6 @@ class RackReservationViewSet(ModelViewSet):
serializer_class = serializers.RackReservationSerializer
filterset_class = filtersets.RackReservationFilterSet
# Assign user from request
def perform_create(self, serializer):
serializer.save(user=self.request.user)
#
# Manufacturers

View File

@ -3968,13 +3968,23 @@ class ConnectCableToDeviceForm(BootstrapMixin, CustomFieldModelForm):
'group_id': '$termination_b_site_group',
}
)
termination_b_location = DynamicModelChoiceField(
queryset=Location.objects.all(),
label='Location',
required=False,
null_option='None',
query_params={
'site_id': '$termination_b_site'
}
)
termination_b_rack = DynamicModelChoiceField(
queryset=Rack.objects.all(),
label='Rack',
required=False,
null_option='None',
query_params={
'site_id': '$termination_b_site'
'site_id': '$termination_b_site',
'location_id': '$termination_b_location',
}
)
termination_b_device = DynamicModelChoiceField(

View File

@ -349,40 +349,36 @@ class RackReservationTest(APIViewTestCases.APIViewTestCase):
user = User.objects.create(username='user1', is_active=True)
site = Site.objects.create(name='Test Site 1', slug='test-site-1')
cls.racks = (
racks = (
Rack(site=site, name='Rack 1'),
Rack(site=site, name='Rack 2'),
)
Rack.objects.bulk_create(cls.racks)
Rack.objects.bulk_create(racks)
rack_reservations = (
RackReservation(rack=cls.racks[0], units=[1, 2, 3], user=user, description='Reservation #1'),
RackReservation(rack=cls.racks[0], units=[4, 5, 6], user=user, description='Reservation #2'),
RackReservation(rack=cls.racks[0], units=[7, 8, 9], user=user, description='Reservation #3'),
RackReservation(rack=racks[0], units=[1, 2, 3], user=user, description='Reservation #1'),
RackReservation(rack=racks[0], units=[4, 5, 6], user=user, description='Reservation #2'),
RackReservation(rack=racks[0], units=[7, 8, 9], user=user, description='Reservation #3'),
)
RackReservation.objects.bulk_create(rack_reservations)
def setUp(self):
super().setUp()
# We have to set creation data under setUp() because we need access to the test user.
self.create_data = [
cls.create_data = [
{
'rack': self.racks[1].pk,
'rack': racks[1].pk,
'units': [10, 11, 12],
'user': self.user.pk,
'user': user.pk,
'description': 'Reservation #4',
},
{
'rack': self.racks[1].pk,
'rack': racks[1].pk,
'units': [13, 14, 15],
'user': self.user.pk,
'user': user.pk,
'description': 'Reservation #5',
},
{
'rack': self.racks[1].pk,
'rack': racks[1].pk,
'units': [16, 17, 18],
'user': self.user.pk,
'user': user.pk,
'description': 'Reservation #6',
},
]

View File

@ -239,7 +239,7 @@ class ReportViewSet(ViewSet):
Run a Report identified as "<module>.<script>" and return the pending JobResult as the result
"""
# Check that the user has permission to run reports.
if not request.user.has_perm('extras.run_script'):
if not request.user.has_perm('extras.run_report'):
raise PermissionDenied("This user does not have permission to run reports.")
# Check that at least one RQ worker is running

View File

@ -4,6 +4,7 @@ from django.db.models.signals import m2m_changed, pre_delete, post_save
from extras.signals import _handle_changed_object, _handle_deleted_object
from utilities.utils import curry
from .webhooks import flush_webhooks
@contextmanager
@ -14,9 +15,11 @@ def change_logging(request):
:param request: WSGIRequest object with a unique `id` set
"""
webhook_queue = []
# Curry signals receivers to pass the current request
handle_changed_object = curry(_handle_changed_object, request)
handle_deleted_object = curry(_handle_deleted_object, request)
handle_changed_object = curry(_handle_changed_object, request, webhook_queue)
handle_deleted_object = curry(_handle_deleted_object, request, webhook_queue)
# Connect our receivers to the post_save and post_delete signals.
post_save.connect(handle_changed_object, dispatch_uid='handle_changed_object')
@ -30,3 +33,7 @@ def change_logging(request):
post_save.disconnect(handle_changed_object, dispatch_uid='handle_changed_object')
m2m_changed.disconnect(handle_changed_object, dispatch_uid='handle_changed_object')
pre_delete.disconnect(handle_deleted_object, dispatch_uid='handle_deleted_object')
# Flush queued webhooks to RQ
flush_webhooks(webhook_queue)
del webhook_queue

View File

@ -286,9 +286,7 @@ class CustomField(BigIDModel):
# Validate integer
if self.type == CustomFieldTypeChoices.TYPE_INTEGER:
try:
int(value)
except ValueError:
if type(value) is not int:
raise ValidationError("Value must be an integer.")
if self.validation_minimum is not None and value < self.validation_minimum:
raise ValidationError(f"Value must be at least {self.validation_minimum}")

View File

@ -12,17 +12,27 @@ from prometheus_client import Counter
from .choices import ObjectChangeActionChoices
from .models import CustomField, ObjectChange
from .webhooks import enqueue_webhooks
from .webhooks import enqueue_object, get_snapshots, serialize_for_webhook
#
# Change logging/webhooks
#
def _handle_changed_object(request, sender, instance, **kwargs):
def _handle_changed_object(request, webhook_queue, sender, instance, **kwargs):
"""
Fires when an object is created or updated.
"""
def is_same_object(instance, webhook_data):
return (
ContentType.objects.get_for_model(instance) == webhook_data['content_type'] and
instance.pk == webhook_data['object_id'] and
request.id == webhook_data['request_id']
)
if not hasattr(instance, 'to_objectchange'):
return
m2m_changed = False
# Determine the type of change being made
@ -53,8 +63,13 @@ def _handle_changed_object(request, sender, instance, **kwargs):
objectchange.request_id = request.id
objectchange.save()
# Enqueue webhooks
enqueue_webhooks(instance, request.user, request.id, action)
# If this is an M2M change, update the previously queued webhook (from post_save)
if m2m_changed and webhook_queue and is_same_object(instance, webhook_queue[-1]):
instance.refresh_from_db() # Ensure that we're working with fresh M2M assignments
webhook_queue[-1]['data'] = serialize_for_webhook(instance)
webhook_queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange']
else:
enqueue_object(webhook_queue, instance, request.user, request.id, action)
# Increment metric counters
if action == ObjectChangeActionChoices.ACTION_CREATE:
@ -68,10 +83,13 @@ def _handle_changed_object(request, sender, instance, **kwargs):
ObjectChange.objects.filter(time__lt=cutoff)._raw_delete(using=DEFAULT_DB_ALIAS)
def _handle_deleted_object(request, sender, instance, **kwargs):
def _handle_deleted_object(request, webhook_queue, sender, instance, **kwargs):
"""
Fires when an object is deleted.
"""
if not hasattr(instance, 'to_objectchange'):
return
# Record an ObjectChange if applicable
if hasattr(instance, 'to_objectchange'):
objectchange = instance.to_objectchange(ObjectChangeActionChoices.ACTION_DELETE)
@ -80,7 +98,7 @@ def _handle_deleted_object(request, sender, instance, **kwargs):
objectchange.save()
# Enqueue webhooks
enqueue_webhooks(instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE)
enqueue_object(webhook_queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE)
# Increment metric counters
model_deletes.labels(instance._meta.model_name).inc()

View File

@ -11,8 +11,8 @@ from rest_framework import status
from dcim.models import Site
from extras.choices import ObjectChangeActionChoices
from extras.models import Webhook
from extras.webhooks import enqueue_webhooks, generate_signature
from extras.models import Tag, Webhook
from extras.webhooks import enqueue_object, flush_webhooks, generate_signature
from extras.webhooks_worker import process_webhook
from utilities.testing import APITestCase
@ -20,11 +20,10 @@ from utilities.testing import APITestCase
class WebhookTest(APITestCase):
def setUp(self):
super().setUp()
self.queue = django_rq.get_queue('default')
self.queue.empty() # Begin each test with an empty queue
self.queue.empty()
@classmethod
def setUpTestData(cls):
@ -34,38 +33,104 @@ class WebhookTest(APITestCase):
DUMMY_SECRET = "LOOKATMEIMASECRETSTRING"
webhooks = Webhook.objects.bulk_create((
Webhook(name='Site Create Webhook', type_create=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET, additional_headers='X-Foo: Bar'),
Webhook(name='Site Update Webhook', type_update=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET),
Webhook(name='Site Delete Webhook', type_delete=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET),
Webhook(name='Webhook 1', type_create=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET, additional_headers='X-Foo: Bar'),
Webhook(name='Webhook 2', type_update=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET),
Webhook(name='Webhook 3', type_delete=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET),
))
for webhook in webhooks:
webhook.content_types.set([site_ct])
Tag.objects.bulk_create((
Tag(name='Foo', slug='foo'),
Tag(name='Bar', slug='bar'),
Tag(name='Baz', slug='baz'),
))
def test_enqueue_webhook_create(self):
# Create an object via the REST API
data = {
'name': 'Test Site',
'slug': 'test-site',
'name': 'Site 1',
'slug': 'site-1',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
}
url = reverse('dcim-api:site-list')
self.add_permissions('dcim.add_site')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(Site.objects.count(), 1)
self.assertEqual(Site.objects.first().tags.count(), 2)
# Verify that a job was queued for the object creation webhook
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_create=True))
self.assertEqual(job.kwargs['data']['id'], response.data['id'])
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_CREATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], response.data['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site 1')
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Bar', 'Foo'])
def test_enqueue_webhook_bulk_create(self):
# Create multiple objects via the REST API
data = [
{
'name': 'Site 1',
'slug': 'site-1',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
},
{
'name': 'Site 2',
'slug': 'site-2',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
},
{
'name': 'Site 3',
'slug': 'site-3',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
},
]
url = reverse('dcim-api:site-list')
self.add_permissions('dcim.add_site')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(Site.objects.count(), 3)
self.assertEqual(Site.objects.first().tags.count(), 2)
# Verify that a webhook was queued for each object
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_create=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_CREATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], response.data[i]['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags']))
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], response.data[i]['name'])
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Bar', 'Foo'])
def test_enqueue_webhook_update(self):
# Update an object via the REST API
site = Site.objects.create(name='Site 1', slug='site-1')
site.tags.set(*Tag.objects.filter(name__in=['Foo', 'Bar']))
# Update an object via the REST API
data = {
'name': 'Site X',
'comments': 'Updated the site',
'tags': [
{'name': 'Baz'}
]
}
url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
self.add_permissions('dcim.change_site')
@ -76,13 +141,72 @@ class WebhookTest(APITestCase):
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_update=True))
self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site X')
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Baz'])
def test_enqueue_webhook_bulk_update(self):
sites = (
Site(name='Site 1', slug='site-1'),
Site(name='Site 2', slug='site-2'),
Site(name='Site 3', slug='site-3'),
)
Site.objects.bulk_create(sites)
for site in sites:
site.tags.set(*Tag.objects.filter(name__in=['Foo', 'Bar']))
# Update three objects via the REST API
data = [
{
'id': sites[0].pk,
'name': 'Site X',
'tags': [
{'name': 'Baz'}
]
},
{
'id': sites[1].pk,
'name': 'Site Y',
'tags': [
{'name': 'Baz'}
]
},
{
'id': sites[2].pk,
'name': 'Site Z',
'tags': [
{'name': 'Baz'}
]
},
]
url = reverse('dcim-api:site-list')
self.add_permissions('dcim.change_site')
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
# Verify that a job was queued for the object update webhook
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_update=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], data[i]['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags']))
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], response.data[i]['name'])
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Baz'])
def test_enqueue_webhook_delete(self):
# Delete an object via the REST API
site = Site.objects.create(name='Site 1', slug='site-1')
site.tags.set(*Tag.objects.filter(name__in=['Foo', 'Bar']))
# Delete an object via the REST API
url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
self.add_permissions('dcim.delete_site')
response = self.client.delete(url, **self.header)
@ -92,9 +216,40 @@ class WebhookTest(APITestCase):
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_delete=True))
self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_DELETE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
def test_enqueue_webhook_bulk_delete(self):
sites = (
Site(name='Site 1', slug='site-1'),
Site(name='Site 2', slug='site-2'),
Site(name='Site 3', slug='site-3'),
)
Site.objects.bulk_create(sites)
for site in sites:
site.tags.set(*Tag.objects.filter(name__in=['Foo', 'Bar']))
# Delete three objects via the REST API
data = [
{'id': site.pk} for site in sites
]
url = reverse('dcim-api:site-list')
self.add_permissions('dcim.delete_site')
response = self.client.delete(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
# Verify that a job was queued for the object update webhook
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_delete=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_DELETE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], sites[i].pk)
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
def test_webhooks_worker(self):
@ -125,13 +280,16 @@ class WebhookTest(APITestCase):
return HttpResponse()
# Enqueue a webhook for processing
webhooks_queue = []
site = Site.objects.create(name='Site 1', slug='site-1')
enqueue_webhooks(
enqueue_object(
webhooks_queue,
instance=site,
user=self.user,
request_id=request_id,
action=ObjectChangeActionChoices.ACTION_CREATE
)
flush_webhooks(webhooks_queue)
# Retrieve the job from queue
job = self.queue.jobs[0]

View File

@ -1,5 +1,6 @@
import hashlib
import hmac
from collections import defaultdict
from django.contrib.contenttypes.models import ContentType
from django.utils import timezone
@ -12,6 +13,26 @@ from .models import Webhook
from .registry import registry
def serialize_for_webhook(instance):
"""
Return a serialized representation of the given instance suitable for use in a webhook.
"""
serializer_class = get_serializer_for_model(instance.__class__)
serializer_context = {
'request': None,
}
serializer = serializer_class(instance, context=serializer_context)
return serializer.data
def get_snapshots(instance, action):
return {
'prechange': getattr(instance, '_prechange_snapshot', None),
'postchange': serialize_object(instance) if action != ObjectChangeActionChoices.ACTION_DELETE else None,
}
def generate_signature(request_body, secret):
"""
Return a cryptographic signature that can be used to verify the authenticity of webhook data.
@ -24,10 +45,10 @@ def generate_signature(request_body, secret):
return hmac_prep.hexdigest()
def enqueue_webhooks(instance, user, request_id, action):
def enqueue_object(queue, instance, user, request_id, action):
"""
Find Webhook(s) assigned to this instance + action and enqueue them
to be processed
Enqueue a serialized representation of a created/updated/deleted object for the processing of
webhooks once the request has completed.
"""
# Determine whether this type of object supports webhooks
app_label = instance._meta.app_label
@ -35,41 +56,55 @@ def enqueue_webhooks(instance, user, request_id, action):
if model_name not in registry['model_features']['webhooks'].get(app_label, []):
return
# Retrieve any applicable Webhooks
content_type = ContentType.objects.get_for_model(instance)
action_flag = {
ObjectChangeActionChoices.ACTION_CREATE: 'type_create',
ObjectChangeActionChoices.ACTION_UPDATE: 'type_update',
ObjectChangeActionChoices.ACTION_DELETE: 'type_delete',
}[action]
webhooks = Webhook.objects.filter(content_types=content_type, enabled=True, **{action_flag: True})
queue.append({
'content_type': ContentType.objects.get_for_model(instance),
'object_id': instance.pk,
'event': action,
'data': serialize_for_webhook(instance),
'snapshots': get_snapshots(instance, action),
'username': user.username,
'request_id': request_id
})
if webhooks.exists():
# Get the Model's API serializer class and serialize the object
serializer_class = get_serializer_for_model(instance.__class__)
serializer_context = {
'request': None,
}
serializer = serializer_class(instance, context=serializer_context)
def flush_webhooks(queue):
"""
Flush a list of object representation to RQ for webhook processing.
"""
rq_queue = get_queue('default')
webhooks_cache = {
'type_create': {},
'type_update': {},
'type_delete': {},
}
# Gather pre- and post-change snapshots
snapshots = {
'prechange': getattr(instance, '_prechange_snapshot', None),
'postchange': serialize_object(instance) if action != ObjectChangeActionChoices.ACTION_DELETE else None,
}
for data in queue:
action_flag = {
ObjectChangeActionChoices.ACTION_CREATE: 'type_create',
ObjectChangeActionChoices.ACTION_UPDATE: 'type_update',
ObjectChangeActionChoices.ACTION_DELETE: 'type_delete',
}[data['event']]
content_type = data['content_type']
# Cache applicable Webhooks
if content_type not in webhooks_cache[action_flag]:
webhooks_cache[action_flag][content_type] = Webhook.objects.filter(
**{action_flag: True},
content_types=content_type,
enabled=True
)
webhooks = webhooks_cache[action_flag][content_type]
# Enqueue the webhooks
webhook_queue = get_queue('default')
for webhook in webhooks:
webhook_queue.enqueue(
rq_queue.enqueue(
"extras.webhooks_worker.process_webhook",
webhook=webhook,
model_name=instance._meta.model_name,
event=action,
data=serializer.data,
snapshots=snapshots,
model_name=content_type.model,
event=data['event'],
data=data['data'],
snapshots=data['snapshots'],
timestamp=str(timezone.now()),
username=user.username,
request_id=request_id
username=data['username'],
request_id=data['request_id']
)

View File

@ -102,10 +102,11 @@ class NestedVLANSerializer(WritableNestedSerializer):
class NestedPrefixSerializer(WritableNestedSerializer):
url = serializers.HyperlinkedIdentityField(view_name='ipam-api:prefix-detail')
family = serializers.IntegerField(read_only=True)
_depth = serializers.IntegerField(read_only=True)
class Meta:
model = models.Prefix
fields = ['id', 'url', 'display', 'family', 'prefix']
fields = ['id', 'url', 'display', 'family', 'prefix', '_depth']
#

View File

@ -196,12 +196,14 @@ class PrefixSerializer(PrimaryModelSerializer):
vlan = NestedVLANSerializer(required=False, allow_null=True)
status = ChoiceField(choices=PrefixStatusChoices, required=False)
role = NestedRoleSerializer(required=False, allow_null=True)
children = serializers.IntegerField(read_only=True)
_depth = serializers.IntegerField(read_only=True)
class Meta:
model = Prefix
fields = [
'id', 'url', 'display', 'family', 'prefix', 'site', 'vrf', 'tenant', 'vlan', 'status', 'role', 'is_pool',
'mark_utilized', 'description', 'tags', 'custom_fields', 'created', 'last_updated',
'mark_utilized', 'description', 'tags', 'custom_fields', 'created', 'last_updated', 'children', '_depth',
]
read_only_fields = ['family']

View File

@ -209,6 +209,12 @@ class PrefixFilterSet(PrimaryModelFilterSet, TenancyFilterSet):
method='search_contains',
label='Prefixes which contain this prefix or IP',
)
depth = MultiValueNumberFilter(
field_name='_depth'
)
children = MultiValueNumberFilter(
field_name='_children'
)
mask_length = django_filters.NumberFilter(
field_name='prefix',
lookup_expr='net_mask_length'

View File

View File

@ -0,0 +1,27 @@
from django.core.management.base import BaseCommand
from ipam.models import Prefix, VRF
from ipam.utils import rebuild_prefixes
class Command(BaseCommand):
help = "Rebuild the prefix hierarchy (depth and children counts)"
def handle(self, *model_names, **options):
self.stdout.write(f'Rebuilding {Prefix.objects.count()} prefixes...')
# Reset existing counts
Prefix.objects.update(_depth=0, _children=0)
# Rebuild the global table
global_count = Prefix.objects.filter(vrf__isnull=True).count()
self.stdout.write(f'Global: {global_count} prefixes...')
rebuild_prefixes(None)
# Rebuild each VRF
for vrf in VRF.objects.all():
vrf_count = Prefix.objects.filter(vrf=vrf).count()
self.stdout.write(f'VRF {vrf}: {vrf_count} prefixes...')
rebuild_prefixes(vrf)
self.stdout.write(self.style.SUCCESS('Finished.'))

View File

@ -0,0 +1,21 @@
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ipam', '0046_set_vlangroup_scope_types'),
]
operations = [
migrations.AddField(
model_name='prefix',
name='_children',
field=models.PositiveBigIntegerField(default=0, editable=False),
),
migrations.AddField(
model_name='prefix',
name='_depth',
field=models.PositiveSmallIntegerField(default=0, editable=False),
),
]

View File

@ -0,0 +1,48 @@
import sys
from django.db import migrations
from ipam.utils import rebuild_prefixes
def push_to_stack(stack, prefix):
# Increment child count on parent nodes
for n in stack:
n['children'] += 1
stack.append({
'pk': prefix['pk'],
'prefix': prefix['prefix'],
'children': 0,
})
def populate_prefix_hierarchy(apps, schema_editor):
"""
Populate _depth and _children attrs for all Prefixes.
"""
Prefix = apps.get_model('ipam', 'Prefix')
VRF = apps.get_model('ipam', 'VRF')
total_count = Prefix.objects.count()
if 'test' not in sys.argv:
print(f'\nUpdating {total_count} prefixes...')
# Rebuild the global table
rebuild_prefixes(None)
# Iterate through all VRFs, rebuilding each
for vrf in VRF.objects.all():
rebuild_prefixes(vrf)
class Migration(migrations.Migration):
dependencies = [
('ipam', '0047_prefix_depth_children'),
]
operations = [
migrations.RunPython(
code=populate_prefix_hierarchy,
reverse_code=migrations.RunPython.noop
),
]

View File

@ -4,7 +4,7 @@ from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ipam', '0046_set_vlangroup_scope_types'),
('ipam', '0048_prefix_populate_depth_children'),
]
operations = [

View File

@ -297,6 +297,16 @@ class Prefix(PrimaryModel):
blank=True
)
# Cached depth & child counts
_depth = models.PositiveSmallIntegerField(
default=0,
editable=False
)
_children = models.PositiveBigIntegerField(
default=0,
editable=False
)
objects = PrefixQuerySet.as_manager()
csv_headers = [
@ -311,6 +321,13 @@ class Prefix(PrimaryModel):
ordering = (F('vrf').asc(nulls_first=True), 'prefix', 'pk') # (vrf, prefix) may be non-unique
verbose_name_plural = 'prefixes'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Cache the original prefix and VRF so we can check if they have changed on post_save
self._prefix = self.prefix
self._vrf = self.vrf
def __str__(self):
return str(self.prefix)
@ -379,6 +396,14 @@ class Prefix(PrimaryModel):
return self.prefix.version
return None
@property
def depth(self):
return self._depth
@property
def children(self):
return self._children
def _set_prefix_length(self, value):
"""
Expose the IPNetwork object's prefixlen attribute on the parent model so that it can be manipulated directly,
@ -391,6 +416,26 @@ class Prefix(PrimaryModel):
def get_status_class(self):
return PrefixStatusChoices.CSS_CLASSES.get(self.status)
def get_parents(self, include_self=False):
"""
Return all containing Prefixes in the hierarchy.
"""
lookup = 'net_contains_or_equals' if include_self else 'net_contains'
return Prefix.objects.filter(**{
'vrf': self.vrf,
f'prefix__{lookup}': self.prefix
})
def get_children(self, include_self=False):
"""
Return all covered Prefixes in the hierarchy.
"""
lookup = 'net_contained_or_equal' if include_self else 'net_contained'
return Prefix.objects.filter(**{
'vrf': self.vrf,
f'prefix__{lookup}': self.prefix
})
def get_duplicates(self):
return Prefix.objects.filter(vrf=self.vrf, prefix=str(self.prefix)).exclude(pk=self.pk)

View File

@ -1,27 +1,32 @@
from django.contrib.contenttypes.models import ContentType
from django.db.models import Q
from django.db.models.expressions import RawSQL
from utilities.querysets import RestrictedQuerySet
class PrefixQuerySet(RestrictedQuerySet):
def annotate_tree(self):
def annotate_hierarchy(self):
"""
Annotate the number of parent and child prefixes for each Prefix. Raw SQL is needed for these subqueries
because we need to cast NULL VRF values to integers for comparison. (NULL != NULL).
Annotate the depth and number of child prefixes for each Prefix. Cast null VRF values to zero for
comparison. (NULL != NULL).
"""
return self.extra(
select={
'parents': 'SELECT COUNT(U0."prefix") AS "c" '
'FROM "ipam_prefix" U0 '
'WHERE (U0."prefix" >> "ipam_prefix"."prefix" '
'AND COALESCE(U0."vrf_id", 0) = COALESCE("ipam_prefix"."vrf_id", 0))',
'children': 'SELECT COUNT(U1."prefix") AS "c" '
'FROM "ipam_prefix" U1 '
'WHERE (U1."prefix" << "ipam_prefix"."prefix" '
'AND COALESCE(U1."vrf_id", 0) = COALESCE("ipam_prefix"."vrf_id", 0))',
}
return self.annotate(
hierarchy_depth=RawSQL(
'SELECT COUNT(DISTINCT U0."prefix") AS "c" '
'FROM "ipam_prefix" U0 '
'WHERE (U0."prefix" >> "ipam_prefix"."prefix" '
'AND COALESCE(U0."vrf_id", 0) = COALESCE("ipam_prefix"."vrf_id", 0))',
()
),
hierarchy_children=RawSQL(
'SELECT COUNT(U1."prefix") AS "c" '
'FROM "ipam_prefix" U1 '
'WHERE (U1."prefix" << "ipam_prefix"."prefix" '
'AND COALESCE(U1."vrf_id", 0) = COALESCE("ipam_prefix"."vrf_id", 0))',
()
)
)

View File

@ -1,9 +1,52 @@
from django.db.models.signals import pre_delete
from django.db.models.signals import post_delete, post_save, pre_delete
from django.dispatch import receiver
from dcim.models import Device
from virtualization.models import VirtualMachine
from .models import IPAddress
from .models import IPAddress, Prefix
def update_parents_children(prefix):
"""
Update depth on prefix & containing prefixes
"""
parents = prefix.get_parents(include_self=True).annotate_hierarchy()
for parent in parents:
parent._children = parent.hierarchy_children
Prefix.objects.bulk_update(parents, ['_children'], batch_size=100)
def update_children_depth(prefix):
"""
Update children count on prefix & contained prefixes
"""
children = prefix.get_children(include_self=True).annotate_hierarchy()
for child in children:
child._depth = child.hierarchy_depth
Prefix.objects.bulk_update(children, ['_depth'], batch_size=100)
@receiver(post_save, sender=Prefix)
def handle_prefix_saved(instance, created, **kwargs):
# Prefix has changed (or new instance has been created)
if created or instance.vrf != instance._vrf or instance.prefix != instance._prefix:
update_parents_children(instance)
update_children_depth(instance)
# If this is not a new prefix, clean up parent/children of previous prefix
if not created:
old_prefix = Prefix(vrf=instance._vrf, prefix=instance._prefix)
update_parents_children(old_prefix)
update_children_depth(old_prefix)
@receiver(post_delete, sender=Prefix)
def handle_prefix_deleted(instance, **kwargs):
update_parents_children(instance)
update_children_depth(instance)
@receiver(pre_delete, sender=IPAddress)

View File

@ -15,7 +15,7 @@ AVAILABLE_LABEL = mark_safe('<span class="badge bg-success">Available</span>')
PREFIX_LINK = """
{% load helpers %}
{% for i in record.parents|as_range %}
{% for i in record.depth|as_range %}
<i class="mdi mdi-circle-small"></i>
{% endfor %}
<a href="{% if record.pk %}{% url 'ipam:prefix' pk=record.pk %}{% else %}{% url 'ipam:prefix_add' %}?prefix={{ record }}{% if object.vrf %}&vrf={{ object.vrf.pk }}{% endif %}{% if object.site %}&site={{ object.site.pk }}{% endif %}{% if object.tenant %}&tenant_group={{ object.tenant.group.pk }}&tenant={{ object.tenant.pk }}{% endif %}{% endif %}">{{ record.prefix }}</a>
@ -277,6 +277,19 @@ class PrefixTable(BaseTable):
template_code=PREFIX_LINK,
attrs={'td': {'class': 'text-nowrap'}}
)
depth = tables.Column(
accessor=Accessor('_depth'),
verbose_name='Depth'
)
children = LinkedCountColumn(
accessor=Accessor('_children'),
viewname='ipam:prefix_list',
url_params={
'vrf_id': 'vrf_id',
'within': 'prefix',
},
verbose_name='Children'
)
status = ChoiceFieldColumn(
default=AVAILABLE_LABEL
)
@ -305,8 +318,8 @@ class PrefixTable(BaseTable):
class Meta(BaseTable.Meta):
model = Prefix
fields = (
'pk', 'prefix', 'status', 'children', 'vrf', 'tenant', 'site', 'vlan', 'role', 'is_pool', 'mark_utilized',
'description',
'pk', 'prefix', 'status', 'depth', 'children', 'vrf', 'tenant', 'site', 'vlan', 'role', 'is_pool',
'mark_utilized', 'description',
)
default_columns = ('pk', 'prefix', 'status', 'vrf', 'tenant', 'site', 'vlan', 'role', 'description')
row_attrs = {

View File

@ -186,7 +186,7 @@ class RoleTest(APIViewTestCases.APIViewTestCase):
class PrefixTest(APIViewTestCases.APIViewTestCase):
model = Prefix
brief_fields = ['display', 'family', 'id', 'prefix', 'url']
brief_fields = ['_depth', 'display', 'family', 'id', 'prefix', 'url']
create_data = [
{
'prefix': '192.168.4.0/24',

View File

@ -400,7 +400,8 @@ class PrefixTestCase(TestCase, ChangeLoggedFilterSetTests):
Prefix(prefix='10.0.0.0/16'),
Prefix(prefix='2001:db8::/32'),
)
Prefix.objects.bulk_create(prefixes)
for prefix in prefixes:
prefix.save()
def test_family(self):
params = {'family': '6'}
@ -437,6 +438,18 @@ class PrefixTestCase(TestCase, ChangeLoggedFilterSetTests):
params = {'contains': '2001:db8:0:1::/64'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_depth(self):
params = {'depth': '0'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 8)
params = {'depth__gt': '0'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_children(self):
params = {'children': '0'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 8)
params = {'children__gt': '0'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_mask_length(self):
params = {'mask_length': '24'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)

View File

@ -1,4 +1,4 @@
import netaddr
from netaddr import IPNetwork, IPSet
from django.core.exceptions import ValidationError
from django.test import TestCase, override_settings
@ -10,27 +10,27 @@ class TestAggregate(TestCase):
def test_get_utilization(self):
rir = RIR.objects.create(name='RIR 1', slug='rir-1')
aggregate = Aggregate(prefix=netaddr.IPNetwork('10.0.0.0/8'), rir=rir)
aggregate = Aggregate(prefix=IPNetwork('10.0.0.0/8'), rir=rir)
aggregate.save()
# 25% utilization
Prefix.objects.bulk_create((
Prefix(prefix=netaddr.IPNetwork('10.0.0.0/12')),
Prefix(prefix=netaddr.IPNetwork('10.16.0.0/12')),
Prefix(prefix=netaddr.IPNetwork('10.32.0.0/12')),
Prefix(prefix=netaddr.IPNetwork('10.48.0.0/12')),
Prefix(prefix=IPNetwork('10.0.0.0/12')),
Prefix(prefix=IPNetwork('10.16.0.0/12')),
Prefix(prefix=IPNetwork('10.32.0.0/12')),
Prefix(prefix=IPNetwork('10.48.0.0/12')),
))
self.assertEqual(aggregate.get_utilization(), 25)
# 50% utilization
Prefix.objects.bulk_create((
Prefix(prefix=netaddr.IPNetwork('10.64.0.0/10')),
Prefix(prefix=IPNetwork('10.64.0.0/10')),
))
self.assertEqual(aggregate.get_utilization(), 50)
# 100% utilization
Prefix.objects.bulk_create((
Prefix(prefix=netaddr.IPNetwork('10.128.0.0/9')),
Prefix(prefix=IPNetwork('10.128.0.0/9')),
))
self.assertEqual(aggregate.get_utilization(), 100)
@ -39,9 +39,9 @@ class TestPrefix(TestCase):
def test_get_duplicates(self):
prefixes = Prefix.objects.bulk_create((
Prefix(prefix=netaddr.IPNetwork('192.0.2.0/24')),
Prefix(prefix=netaddr.IPNetwork('192.0.2.0/24')),
Prefix(prefix=netaddr.IPNetwork('192.0.2.0/24')),
Prefix(prefix=IPNetwork('192.0.2.0/24')),
Prefix(prefix=IPNetwork('192.0.2.0/24')),
Prefix(prefix=IPNetwork('192.0.2.0/24')),
))
duplicate_prefix_pks = [p.pk for p in prefixes[0].get_duplicates()]
@ -54,11 +54,11 @@ class TestPrefix(TestCase):
VRF(name='VRF 3'),
))
prefixes = Prefix.objects.bulk_create((
Prefix(prefix=netaddr.IPNetwork('10.0.0.0/16'), status=PrefixStatusChoices.STATUS_CONTAINER),
Prefix(prefix=netaddr.IPNetwork('10.0.0.0/24'), vrf=None),
Prefix(prefix=netaddr.IPNetwork('10.0.1.0/24'), vrf=vrfs[0]),
Prefix(prefix=netaddr.IPNetwork('10.0.2.0/24'), vrf=vrfs[1]),
Prefix(prefix=netaddr.IPNetwork('10.0.3.0/24'), vrf=vrfs[2]),
Prefix(prefix=IPNetwork('10.0.0.0/16'), status=PrefixStatusChoices.STATUS_CONTAINER),
Prefix(prefix=IPNetwork('10.0.0.0/24'), vrf=None),
Prefix(prefix=IPNetwork('10.0.1.0/24'), vrf=vrfs[0]),
Prefix(prefix=IPNetwork('10.0.2.0/24'), vrf=vrfs[1]),
Prefix(prefix=IPNetwork('10.0.3.0/24'), vrf=vrfs[2]),
))
child_prefix_pks = {p.pk for p in prefixes[0].get_child_prefixes()}
@ -79,13 +79,13 @@ class TestPrefix(TestCase):
VRF(name='VRF 3'),
))
parent_prefix = Prefix.objects.create(
prefix=netaddr.IPNetwork('10.0.0.0/16'), status=PrefixStatusChoices.STATUS_CONTAINER
prefix=IPNetwork('10.0.0.0/16'), status=PrefixStatusChoices.STATUS_CONTAINER
)
ips = IPAddress.objects.bulk_create((
IPAddress(address=netaddr.IPNetwork('10.0.0.1/24'), vrf=None),
IPAddress(address=netaddr.IPNetwork('10.0.1.1/24'), vrf=vrfs[0]),
IPAddress(address=netaddr.IPNetwork('10.0.2.1/24'), vrf=vrfs[1]),
IPAddress(address=netaddr.IPNetwork('10.0.3.1/24'), vrf=vrfs[2]),
IPAddress(address=IPNetwork('10.0.0.1/24'), vrf=None),
IPAddress(address=IPNetwork('10.0.1.1/24'), vrf=vrfs[0]),
IPAddress(address=IPNetwork('10.0.2.1/24'), vrf=vrfs[1]),
IPAddress(address=IPNetwork('10.0.3.1/24'), vrf=vrfs[2]),
))
child_ip_pks = {p.pk for p in parent_prefix.get_child_ips()}
@ -102,16 +102,16 @@ class TestPrefix(TestCase):
def test_get_available_prefixes(self):
prefixes = Prefix.objects.bulk_create((
Prefix(prefix=netaddr.IPNetwork('10.0.0.0/16')), # Parent prefix
Prefix(prefix=netaddr.IPNetwork('10.0.0.0/20')),
Prefix(prefix=netaddr.IPNetwork('10.0.32.0/20')),
Prefix(prefix=netaddr.IPNetwork('10.0.128.0/18')),
Prefix(prefix=IPNetwork('10.0.0.0/16')), # Parent prefix
Prefix(prefix=IPNetwork('10.0.0.0/20')),
Prefix(prefix=IPNetwork('10.0.32.0/20')),
Prefix(prefix=IPNetwork('10.0.128.0/18')),
))
missing_prefixes = netaddr.IPSet([
netaddr.IPNetwork('10.0.16.0/20'),
netaddr.IPNetwork('10.0.48.0/20'),
netaddr.IPNetwork('10.0.64.0/18'),
netaddr.IPNetwork('10.0.192.0/18'),
missing_prefixes = IPSet([
IPNetwork('10.0.16.0/20'),
IPNetwork('10.0.48.0/20'),
IPNetwork('10.0.64.0/18'),
IPNetwork('10.0.192.0/18'),
])
available_prefixes = prefixes[0].get_available_prefixes()
@ -119,17 +119,17 @@ class TestPrefix(TestCase):
def test_get_available_ips(self):
parent_prefix = Prefix.objects.create(prefix=netaddr.IPNetwork('10.0.0.0/28'))
parent_prefix = Prefix.objects.create(prefix=IPNetwork('10.0.0.0/28'))
IPAddress.objects.bulk_create((
IPAddress(address=netaddr.IPNetwork('10.0.0.1/26')),
IPAddress(address=netaddr.IPNetwork('10.0.0.3/26')),
IPAddress(address=netaddr.IPNetwork('10.0.0.5/26')),
IPAddress(address=netaddr.IPNetwork('10.0.0.7/26')),
IPAddress(address=netaddr.IPNetwork('10.0.0.9/26')),
IPAddress(address=netaddr.IPNetwork('10.0.0.11/26')),
IPAddress(address=netaddr.IPNetwork('10.0.0.13/26')),
IPAddress(address=IPNetwork('10.0.0.1/26')),
IPAddress(address=IPNetwork('10.0.0.3/26')),
IPAddress(address=IPNetwork('10.0.0.5/26')),
IPAddress(address=IPNetwork('10.0.0.7/26')),
IPAddress(address=IPNetwork('10.0.0.9/26')),
IPAddress(address=IPNetwork('10.0.0.11/26')),
IPAddress(address=IPNetwork('10.0.0.13/26')),
))
missing_ips = netaddr.IPSet([
missing_ips = IPSet([
'10.0.0.2/32',
'10.0.0.4/32',
'10.0.0.6/32',
@ -145,39 +145,39 @@ class TestPrefix(TestCase):
def test_get_first_available_prefix(self):
prefixes = Prefix.objects.bulk_create((
Prefix(prefix=netaddr.IPNetwork('10.0.0.0/16')), # Parent prefix
Prefix(prefix=netaddr.IPNetwork('10.0.0.0/24')),
Prefix(prefix=netaddr.IPNetwork('10.0.1.0/24')),
Prefix(prefix=netaddr.IPNetwork('10.0.2.0/24')),
Prefix(prefix=IPNetwork('10.0.0.0/16')), # Parent prefix
Prefix(prefix=IPNetwork('10.0.0.0/24')),
Prefix(prefix=IPNetwork('10.0.1.0/24')),
Prefix(prefix=IPNetwork('10.0.2.0/24')),
))
self.assertEqual(prefixes[0].get_first_available_prefix(), netaddr.IPNetwork('10.0.3.0/24'))
self.assertEqual(prefixes[0].get_first_available_prefix(), IPNetwork('10.0.3.0/24'))
Prefix.objects.create(prefix=netaddr.IPNetwork('10.0.3.0/24'))
self.assertEqual(prefixes[0].get_first_available_prefix(), netaddr.IPNetwork('10.0.4.0/22'))
Prefix.objects.create(prefix=IPNetwork('10.0.3.0/24'))
self.assertEqual(prefixes[0].get_first_available_prefix(), IPNetwork('10.0.4.0/22'))
def test_get_first_available_ip(self):
parent_prefix = Prefix.objects.create(prefix=netaddr.IPNetwork('10.0.0.0/24'))
parent_prefix = Prefix.objects.create(prefix=IPNetwork('10.0.0.0/24'))
IPAddress.objects.bulk_create((
IPAddress(address=netaddr.IPNetwork('10.0.0.1/24')),
IPAddress(address=netaddr.IPNetwork('10.0.0.2/24')),
IPAddress(address=netaddr.IPNetwork('10.0.0.3/24')),
IPAddress(address=IPNetwork('10.0.0.1/24')),
IPAddress(address=IPNetwork('10.0.0.2/24')),
IPAddress(address=IPNetwork('10.0.0.3/24')),
))
self.assertEqual(parent_prefix.get_first_available_ip(), '10.0.0.4/24')
IPAddress.objects.create(address=netaddr.IPNetwork('10.0.0.4/24'))
IPAddress.objects.create(address=IPNetwork('10.0.0.4/24'))
self.assertEqual(parent_prefix.get_first_available_ip(), '10.0.0.5/24')
def test_get_utilization(self):
# Container Prefix
prefix = Prefix.objects.create(
prefix=netaddr.IPNetwork('10.0.0.0/24'),
prefix=IPNetwork('10.0.0.0/24'),
status=PrefixStatusChoices.STATUS_CONTAINER
)
Prefix.objects.bulk_create((
Prefix(prefix=netaddr.IPNetwork('10.0.0.0/26')),
Prefix(prefix=netaddr.IPNetwork('10.0.0.128/26')),
Prefix(prefix=IPNetwork('10.0.0.0/26')),
Prefix(prefix=IPNetwork('10.0.0.128/26')),
))
self.assertEqual(prefix.get_utilization(), 50)
@ -186,7 +186,7 @@ class TestPrefix(TestCase):
prefix.save()
IPAddress.objects.bulk_create(
# Create 32 IPAddresses within the Prefix
[IPAddress(address=netaddr.IPNetwork('10.0.0.{}/24'.format(i))) for i in range(1, 33)]
[IPAddress(address=IPNetwork('10.0.0.{}/24'.format(i))) for i in range(1, 33)]
)
self.assertEqual(prefix.get_utilization(), 12) # ~= 12%
@ -196,36 +196,234 @@ class TestPrefix(TestCase):
@override_settings(ENFORCE_GLOBAL_UNIQUE=False)
def test_duplicate_global(self):
Prefix.objects.create(prefix=netaddr.IPNetwork('192.0.2.0/24'))
duplicate_prefix = Prefix(prefix=netaddr.IPNetwork('192.0.2.0/24'))
Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24'))
duplicate_prefix = Prefix(prefix=IPNetwork('192.0.2.0/24'))
self.assertIsNone(duplicate_prefix.clean())
@override_settings(ENFORCE_GLOBAL_UNIQUE=True)
def test_duplicate_global_unique(self):
Prefix.objects.create(prefix=netaddr.IPNetwork('192.0.2.0/24'))
duplicate_prefix = Prefix(prefix=netaddr.IPNetwork('192.0.2.0/24'))
Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24'))
duplicate_prefix = Prefix(prefix=IPNetwork('192.0.2.0/24'))
self.assertRaises(ValidationError, duplicate_prefix.clean)
def test_duplicate_vrf(self):
vrf = VRF.objects.create(name='Test', rd='1:1', enforce_unique=False)
Prefix.objects.create(vrf=vrf, prefix=netaddr.IPNetwork('192.0.2.0/24'))
duplicate_prefix = Prefix(vrf=vrf, prefix=netaddr.IPNetwork('192.0.2.0/24'))
Prefix.objects.create(vrf=vrf, prefix=IPNetwork('192.0.2.0/24'))
duplicate_prefix = Prefix(vrf=vrf, prefix=IPNetwork('192.0.2.0/24'))
self.assertIsNone(duplicate_prefix.clean())
def test_duplicate_vrf_unique(self):
vrf = VRF.objects.create(name='Test', rd='1:1', enforce_unique=True)
Prefix.objects.create(vrf=vrf, prefix=netaddr.IPNetwork('192.0.2.0/24'))
duplicate_prefix = Prefix(vrf=vrf, prefix=netaddr.IPNetwork('192.0.2.0/24'))
Prefix.objects.create(vrf=vrf, prefix=IPNetwork('192.0.2.0/24'))
duplicate_prefix = Prefix(vrf=vrf, prefix=IPNetwork('192.0.2.0/24'))
self.assertRaises(ValidationError, duplicate_prefix.clean)
class TestPrefixHierarchy(TestCase):
"""
Test the automatic updating of depth and child count in response to changes made within
the prefix hierarchy.
"""
@classmethod
def setUpTestData(cls):
prefixes = (
# IPv4
Prefix(prefix='10.0.0.0/8', _depth=0, _children=2),
Prefix(prefix='10.0.0.0/16', _depth=1, _children=1),
Prefix(prefix='10.0.0.0/24', _depth=2, _children=0),
# IPv6
Prefix(prefix='2001:db8::/32', _depth=0, _children=2),
Prefix(prefix='2001:db8::/40', _depth=1, _children=1),
Prefix(prefix='2001:db8::/48', _depth=2, _children=0),
)
Prefix.objects.bulk_create(prefixes)
def test_create_prefix4(self):
# Create 10.0.0.0/12
Prefix(prefix='10.0.0.0/12').save()
prefixes = Prefix.objects.filter(prefix__family=4)
self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 3)
self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/12'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 2)
self.assertEqual(prefixes[2].prefix, IPNetwork('10.0.0.0/16'))
self.assertEqual(prefixes[2]._depth, 2)
self.assertEqual(prefixes[2]._children, 1)
self.assertEqual(prefixes[3].prefix, IPNetwork('10.0.0.0/24'))
self.assertEqual(prefixes[3]._depth, 3)
self.assertEqual(prefixes[3]._children, 0)
def test_create_prefix6(self):
# Create 2001:db8::/36
Prefix(prefix='2001:db8::/36').save()
prefixes = Prefix.objects.filter(prefix__family=6)
self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 3)
self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/36'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 2)
self.assertEqual(prefixes[2].prefix, IPNetwork('2001:db8::/40'))
self.assertEqual(prefixes[2]._depth, 2)
self.assertEqual(prefixes[2]._children, 1)
self.assertEqual(prefixes[3].prefix, IPNetwork('2001:db8::/48'))
self.assertEqual(prefixes[3]._depth, 3)
self.assertEqual(prefixes[3]._children, 0)
def test_update_prefix4(self):
# Change 10.0.0.0/24 to 10.0.0.0/12
p = Prefix.objects.get(prefix='10.0.0.0/24')
p.prefix = '10.0.0.0/12'
p.save()
prefixes = Prefix.objects.filter(prefix__family=4)
self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 2)
self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/12'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 1)
self.assertEqual(prefixes[2].prefix, IPNetwork('10.0.0.0/16'))
self.assertEqual(prefixes[2]._depth, 2)
self.assertEqual(prefixes[2]._children, 0)
def test_update_prefix6(self):
# Change 2001:db8::/48 to 2001:db8::/36
p = Prefix.objects.get(prefix='2001:db8::/48')
p.prefix = '2001:db8::/36'
p.save()
prefixes = Prefix.objects.filter(prefix__family=6)
self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 2)
self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/36'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 1)
self.assertEqual(prefixes[2].prefix, IPNetwork('2001:db8::/40'))
self.assertEqual(prefixes[2]._depth, 2)
self.assertEqual(prefixes[2]._children, 0)
def test_update_prefix_vrf4(self):
vrf = VRF(name='VRF A')
vrf.save()
# Move 10.0.0.0/16 to a VRF
p = Prefix.objects.get(prefix='10.0.0.0/16')
p.vrf = vrf
p.save()
prefixes = Prefix.objects.filter(vrf__isnull=True, prefix__family=4)
self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 1)
self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/24'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 0)
prefixes = Prefix.objects.filter(vrf=vrf)
self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/16'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 0)
def test_update_prefix_vrf6(self):
vrf = VRF(name='VRF A')
vrf.save()
# Move 2001:db8::/40 to a VRF
p = Prefix.objects.get(prefix='2001:db8::/40')
p.vrf = vrf
p.save()
prefixes = Prefix.objects.filter(vrf__isnull=True, prefix__family=6)
self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 1)
self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/48'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 0)
prefixes = Prefix.objects.filter(vrf=vrf)
self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/40'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 0)
def test_delete_prefix4(self):
# Delete 10.0.0.0/16
Prefix.objects.filter(prefix='10.0.0.0/16').delete()
prefixes = Prefix.objects.filter(prefix__family=4)
self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 1)
self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/24'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 0)
def test_delete_prefix6(self):
# Delete 2001:db8::/40
Prefix.objects.filter(prefix='2001:db8::/40').delete()
prefixes = Prefix.objects.filter(prefix__family=6)
self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 1)
self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/48'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 0)
def test_duplicate_prefix4(self):
# Duplicate 10.0.0.0/16
Prefix(prefix='10.0.0.0/16').save()
prefixes = Prefix.objects.filter(prefix__family=4)
self.assertEqual(prefixes[0].prefix, IPNetwork('10.0.0.0/8'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 3)
self.assertEqual(prefixes[1].prefix, IPNetwork('10.0.0.0/16'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 1)
self.assertEqual(prefixes[2].prefix, IPNetwork('10.0.0.0/16'))
self.assertEqual(prefixes[2]._depth, 1)
self.assertEqual(prefixes[2]._children, 1)
self.assertEqual(prefixes[3].prefix, IPNetwork('10.0.0.0/24'))
self.assertEqual(prefixes[3]._depth, 2)
self.assertEqual(prefixes[3]._children, 0)
def test_duplicate_prefix6(self):
# Duplicate 2001:db8::/40
Prefix(prefix='2001:db8::/40').save()
prefixes = Prefix.objects.filter(prefix__family=6)
self.assertEqual(prefixes[0].prefix, IPNetwork('2001:db8::/32'))
self.assertEqual(prefixes[0]._depth, 0)
self.assertEqual(prefixes[0]._children, 3)
self.assertEqual(prefixes[1].prefix, IPNetwork('2001:db8::/40'))
self.assertEqual(prefixes[1]._depth, 1)
self.assertEqual(prefixes[1]._children, 1)
self.assertEqual(prefixes[2].prefix, IPNetwork('2001:db8::/40'))
self.assertEqual(prefixes[2]._depth, 1)
self.assertEqual(prefixes[2]._children, 1)
self.assertEqual(prefixes[3].prefix, IPNetwork('2001:db8::/48'))
self.assertEqual(prefixes[3]._depth, 2)
self.assertEqual(prefixes[3]._children, 0)
class TestIPAddress(TestCase):
def test_get_duplicates(self):
ips = IPAddress.objects.bulk_create((
IPAddress(address=netaddr.IPNetwork('192.0.2.1/24')),
IPAddress(address=netaddr.IPNetwork('192.0.2.1/24')),
IPAddress(address=netaddr.IPNetwork('192.0.2.1/24')),
IPAddress(address=IPNetwork('192.0.2.1/24')),
IPAddress(address=IPNetwork('192.0.2.1/24')),
IPAddress(address=IPNetwork('192.0.2.1/24')),
))
duplicate_ip_pks = [p.pk for p in ips[0].get_duplicates()]
@ -237,44 +435,44 @@ class TestIPAddress(TestCase):
@override_settings(ENFORCE_GLOBAL_UNIQUE=False)
def test_duplicate_global(self):
IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(address=netaddr.IPNetwork('192.0.2.1/24'))
IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(address=IPNetwork('192.0.2.1/24'))
self.assertIsNone(duplicate_ip.clean())
@override_settings(ENFORCE_GLOBAL_UNIQUE=True)
def test_duplicate_global_unique(self):
IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(address=netaddr.IPNetwork('192.0.2.1/24'))
IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(address=IPNetwork('192.0.2.1/24'))
self.assertRaises(ValidationError, duplicate_ip.clean)
def test_duplicate_vrf(self):
vrf = VRF.objects.create(name='Test', rd='1:1', enforce_unique=False)
IPAddress.objects.create(vrf=vrf, address=netaddr.IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(vrf=vrf, address=netaddr.IPNetwork('192.0.2.1/24'))
IPAddress.objects.create(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
self.assertIsNone(duplicate_ip.clean())
def test_duplicate_vrf_unique(self):
vrf = VRF.objects.create(name='Test', rd='1:1', enforce_unique=True)
IPAddress.objects.create(vrf=vrf, address=netaddr.IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(vrf=vrf, address=netaddr.IPNetwork('192.0.2.1/24'))
IPAddress.objects.create(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(vrf=vrf, address=IPNetwork('192.0.2.1/24'))
self.assertRaises(ValidationError, duplicate_ip.clean)
@override_settings(ENFORCE_GLOBAL_UNIQUE=True)
def test_duplicate_nonunique_nonrole_role(self):
IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(address=netaddr.IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'))
duplicate_ip = IPAddress(address=IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
self.assertRaises(ValidationError, duplicate_ip.clean)
@override_settings(ENFORCE_GLOBAL_UNIQUE=True)
def test_duplicate_nonunique_role_nonrole(self):
IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
duplicate_ip = IPAddress(address=netaddr.IPNetwork('192.0.2.1/24'))
IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
duplicate_ip = IPAddress(address=IPNetwork('192.0.2.1/24'))
self.assertRaises(ValidationError, duplicate_ip.clean)
@override_settings(ENFORCE_GLOBAL_UNIQUE=True)
def test_duplicate_nonunique_role(self):
IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
IPAddress.objects.create(address=netaddr.IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'), role=IPAddressRoleChoices.ROLE_VIP)
class TestVLANGroup(TestCase):

View File

@ -91,3 +91,63 @@ def add_available_vlans(vlan_group, vlans):
vlans.sort(key=lambda v: v.vid if type(v) == VLAN else v['vid'])
return vlans
def rebuild_prefixes(vrf):
"""
Rebuild the prefix hierarchy for all prefixes in the specified VRF (or global table).
"""
def contains(parent, child):
return child in parent and child != parent
def push_to_stack(prefix):
# Increment child count on parent nodes
for n in stack:
n['children'] += 1
stack.append({
'pk': [prefix['pk']],
'prefix': prefix['prefix'],
'children': 0,
})
stack = []
update_queue = []
prefixes = Prefix.objects.filter(vrf=vrf).values('pk', 'prefix')
# Iterate through all Prefixes in the VRF, growing and shrinking the stack as we go
for i, p in enumerate(prefixes):
# Grow the stack if this is a child of the most recent prefix
if not stack or contains(stack[-1]['prefix'], p['prefix']):
push_to_stack(p)
# Handle duplicate prefixes
elif stack[-1]['prefix'] == p['prefix']:
stack[-1]['pk'].append(p['pk'])
# If this is a sibling or parent of the most recent prefix, pop nodes from the
# stack until we reach a parent prefix (or the root)
else:
while stack and not contains(stack[-1]['prefix'], p['prefix']):
node = stack.pop()
for pk in node['pk']:
update_queue.append(
Prefix(pk=pk, _depth=len(stack), _children=node['children'])
)
push_to_stack(p)
# Flush the update queue once it reaches 100 Prefixes
if len(update_queue) >= 100:
Prefix.objects.bulk_update(update_queue, ['_depth', '_children'])
update_queue = []
# Clear out any prefixes remaining in the stack
while stack:
node = stack.pop()
for pk in node['pk']:
update_queue.append(
Prefix(pk=pk, _depth=len(stack), _children=node['children'])
)
# Final flush of any remaining Prefixes
Prefix.objects.bulk_update(update_queue, ['_depth', '_children'])

View File

@ -238,7 +238,7 @@ class AggregateView(generic.ObjectView):
'site', 'role'
).order_by(
'prefix'
).annotate_tree()
)
# Add available prefixes to the table if requested
if request.GET.get('show_available', 'true') == 'true':
@ -352,7 +352,7 @@ class RoleBulkDeleteView(generic.BulkDeleteView):
#
class PrefixListView(generic.ObjectListView):
queryset = Prefix.objects.annotate_tree()
queryset = Prefix.objects.all()
filterset = filtersets.PrefixFilterSet
filterset_form = forms.PrefixFilterForm
table = tables.PrefixDetailTable
@ -377,7 +377,7 @@ class PrefixView(generic.ObjectView):
prefix__net_contains=str(instance.prefix)
).prefetch_related(
'site', 'role'
).annotate_tree()
)
parent_prefix_table = tables.PrefixTable(list(parent_prefixes), orderable=False)
parent_prefix_table.exclude = ('vrf',)
@ -407,7 +407,7 @@ class PrefixPrefixesView(generic.ObjectView):
# Child prefixes table
child_prefixes = instance.get_child_prefixes().restrict(request.user, 'view').prefetch_related(
'site', 'vlan', 'role',
).annotate_tree()
)
# Add available prefixes to the table if requested
if child_prefixes and request.GET.get('show_available', 'true') == 'true':
@ -551,6 +551,7 @@ class IPAddressView(generic.ObjectView):
vrf=instance.vrf, address__net_contained_or_equal=str(instance.address)
)
related_ips_table = tables.IPAddressTable(related_ips, orderable=False)
paginate_table(related_ips_table, request)
return {
'parent_prefixes_table': parent_prefixes_table,

View File

@ -30,6 +30,10 @@
<input class="form-control" value="{{ termination_a.device.site }}" disabled />
<label>Site</label>
</div>
<div class="form-floating mb-3">
<input class="form-control" value="{{ termination_a.device.location|default:"None" }}" disabled />
<label>Location</label>
</div>
<div class="form-floating mb-3">
<input class="form-control" value="{{ termination_a.device.rack|default:"None" }}" disabled />
<label>Rack</label>
@ -95,8 +99,8 @@
{% if 'termination_b_site' in form.fields %}
{% render_field form.termination_b_site %}
{% endif %}
{% if 'termination_b_rackgroup' in form.fields %}
{% render_field form.termination_b_rackgroup %}
{% if 'termination_b_location' in form.fields %}
{% render_field form.termination_b_location %}
{% endif %}
{% if 'termination_b_rack' in form.fields %}
{% render_field form.termination_b_rack %}

View File

@ -2,6 +2,23 @@
{% load helpers %}
{% block extra_controls %}
<div class="dropdown m-1">
<button class="btn btn-sm btn-outline-secondary dropdown-toggle" type="button" id="max_depth" data-bs-toggle="dropdown" aria-haspopup="true" aria-expanded="true">
Max Depth{% if "depth__lte" in request.GET %}: {{ request.GET.depth__lte }}{% endif %}
</button>
<ul class="dropdown-menu" aria-labelledby="max_depth">
{% if request.GET.depth__lte %}
<li>
<a class="dropdown-item" href="{% url 'ipam:prefix_list' %}{% querystring request depth__lte=None page=1 %}">Clear</a>
</li>
{% endif %}
{% for i in 16|as_range %}
<li><a class="dropdown-item" href="{% url 'ipam:prefix_list' %}{% querystring request depth__lte=i page=1 %}">
{{ i }} {% if request.GET.depth__lte == i %}<i class="mdi mdi-check-bold"></i>{% endif %}
</a></li>
{% endfor %}
</ul>
</div>
<div class="dropdown m-1">
<button class="btn btn-sm btn-outline-secondary dropdown-toggle" type="button" id="max_length" data-bs-toggle="dropdown" aria-haspopup="true" aria-expanded="true">
Max Length{% if "mask_length__lte" in request.GET %}: {{ request.GET.mask_length__lte }}{% endif %}

View File

@ -7,7 +7,7 @@ from django.core.exceptions import FieldError, ValidationError
from utilities.forms.fields import ContentTypeMultipleChoiceField
from .constants import *
from .models import AdminGroup, AdminUser, ObjectPermission, Token, UserConfig
from .models import ObjectPermission, Token, UserConfig
#
@ -39,11 +39,11 @@ class ObjectPermissionInline(admin.TabularInline):
class GroupObjectPermissionInline(ObjectPermissionInline):
model = AdminGroup.object_permissions.through
model = Group.object_permissions.through
class UserObjectPermissionInline(ObjectPermissionInline):
model = AdminUser.object_permissions.through
model = User.object_permissions.through
class UserConfigInline(admin.TabularInline):
@ -62,7 +62,7 @@ admin.site.unregister(Group)
admin.site.unregister(User)
@admin.register(AdminGroup)
@admin.register(Group)
class GroupAdmin(admin.ModelAdmin):
fields = ('name',)
list_display = ('name', 'user_count')
@ -75,7 +75,7 @@ class GroupAdmin(admin.ModelAdmin):
return obj.user_set.count()
@admin.register(AdminUser)
@admin.register(User)
class UserAdmin(UserAdmin_):
list_display = [
'username', 'email', 'first_name', 'last_name', 'is_superuser', 'is_staff', 'is_active'

View File

@ -17,8 +17,6 @@ from .constants import *
__all__ = (
'AdminGroup',
'AdminUser',
'ObjectPermission',
'Token',
'UserConfig',
@ -163,7 +161,6 @@ class UserConfig(models.Model):
@receiver(post_save, sender=User)
@receiver(post_save, sender=AdminUser)
def create_userconfig(instance, created, **kwargs):
"""
Automatically create a new UserConfig when a new User is created.

View File

@ -6,10 +6,10 @@ from io import StringIO
import django_filters
from django import forms
from django.conf import settings
from django.forms.fields import JSONField as _JSONField, InvalidJSONInput
from django.core.exceptions import MultipleObjectsReturned, ObjectDoesNotExist
from django.db.models import Count
from django.forms import BoundField
from django.forms.fields import JSONField as _JSONField, InvalidJSONInput
from django.urls import reverse
from utilities.choices import unpack_grouped_choices

View File

@ -1,4 +1,5 @@
import django_tables2 as tables
from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
@ -293,7 +294,10 @@ class LinkedCountColumn(tables.Column):
if value:
url = reverse(self.viewname, kwargs=self.view_kwargs)
if self.url_params:
url += '?' + '&'.join([f'{k}={getattr(record, v)}' for k, v in self.url_params.items()])
url += '?' + '&'.join([
f'{k}={getattr(record, v) or settings.FILTERS_NULL_CHOICE_VALUE}'
for k, v in self.url_params.items()
])
return mark_safe(f'<a href="{url}">{value}</a>')
return value

View File

@ -105,7 +105,7 @@ def serialize_object(obj, extra=None):
# Include any tags. Check for tags cached on the instance; fall back to using the manager.
if is_taggable(obj):
tags = getattr(obj, '_tags', obj.tags.all())
tags = getattr(obj, '_tags', None) or obj.tags.all()
data['tags'] = [tag.name for tag in tags]
# Append any extra data

View File

@ -15,7 +15,7 @@ else
fi
# Create a new virtual environment
COMMAND="/usr/bin/python3 -m venv ${VIRTUALENV}"
COMMAND="python3 -m venv ${VIRTUALENV}"
echo "Creating a new virtual environment at ${VIRTUALENV}..."
eval $COMMAND || {
echo "--------------------------------------------------------------------"