1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00

Merge branch 'develop' into feature

This commit is contained in:
jeremystretch
2022-11-16 11:42:32 -05:00
28 changed files with 307 additions and 206 deletions

View File

@@ -1,10 +1,6 @@
from contextlib import contextmanager
from django.db.models.signals import m2m_changed, pre_delete, post_save
from extras.signals import clear_webhooks, clear_webhook_queue, handle_changed_object, handle_deleted_object
from netbox import thread_locals
from netbox.request_context import set_request
from netbox.context import current_request, webhooks_queue
from .webhooks import flush_webhooks
@@ -16,27 +12,14 @@ def change_logging(request):
:param request: WSGIRequest object with a unique `id` set
"""
set_request(request)
thread_locals.webhook_queue = []
# Connect our receivers to the post_save and post_delete signals.
post_save.connect(handle_changed_object, dispatch_uid='handle_changed_object')
m2m_changed.connect(handle_changed_object, dispatch_uid='handle_changed_object')
pre_delete.connect(handle_deleted_object, dispatch_uid='handle_deleted_object')
clear_webhooks.connect(clear_webhook_queue, dispatch_uid='clear_webhook_queue')
current_request.set(request)
webhooks_queue.set([])
yield
# Disconnect change logging signals. This is necessary to avoid recording any errant
# changes during test cleanup.
post_save.disconnect(handle_changed_object, dispatch_uid='handle_changed_object')
m2m_changed.disconnect(handle_changed_object, dispatch_uid='handle_changed_object')
pre_delete.disconnect(handle_deleted_object, dispatch_uid='handle_deleted_object')
clear_webhooks.disconnect(clear_webhook_queue, dispatch_uid='clear_webhook_queue')
# Flush queued webhooks to RQ
flush_webhooks(thread_locals.webhook_queue)
del thread_locals.webhook_queue
flush_webhooks(webhooks_queue.get())
# Clear the request from thread-local storage
set_request(None)
# Clear context vars
current_request.set(None)
webhooks_queue.set([])

View File

@@ -7,14 +7,14 @@ from django.dispatch import receiver, Signal
from django_prometheus.models import model_deletes, model_inserts, model_updates
from extras.validators import CustomValidator
from netbox import thread_locals
from netbox.config import get_config
from netbox.request_context import get_request
from netbox.context import current_request, webhooks_queue
from netbox.signals import post_clean
from .choices import ObjectChangeActionChoices
from .models import ConfigRevision, CustomField, ObjectChange
from .webhooks import enqueue_object, get_snapshots, serialize_for_webhook
#
# Change logging/webhooks
#
@@ -23,22 +23,32 @@ from .webhooks import enqueue_object, get_snapshots, serialize_for_webhook
clear_webhooks = Signal()
def is_same_object(instance, webhook_data, request_id):
"""
Compare the given instance to the most recent queued webhook object, returning True
if they match. This check is used to avoid creating duplicate webhook entries.
"""
return (
ContentType.objects.get_for_model(instance) == webhook_data['content_type'] and
instance.pk == webhook_data['object_id'] and
request_id == webhook_data['request_id']
)
@receiver((post_save, m2m_changed))
def handle_changed_object(sender, instance, **kwargs):
"""
Fires when an object is created or updated.
"""
m2m_changed = False
if not hasattr(instance, 'to_objectchange'):
return
request = get_request()
m2m_changed = False
def is_same_object(instance, webhook_data):
return (
ContentType.objects.get_for_model(instance) == webhook_data['content_type'] and
instance.pk == webhook_data['object_id'] and
request.id == webhook_data['request_id']
)
# Get the current request, or bail if not set
request = current_request.get()
if request is None:
return
# Determine the type of change being made
if kwargs.get('created'):
@@ -69,13 +79,14 @@ def handle_changed_object(sender, instance, **kwargs):
objectchange.save()
# If this is an M2M change, update the previously queued webhook (from post_save)
webhook_queue = thread_locals.webhook_queue
if m2m_changed and webhook_queue and is_same_object(instance, webhook_queue[-1]):
queue = webhooks_queue.get()
if m2m_changed and queue and is_same_object(instance, queue[-1], request.id):
instance.refresh_from_db() # Ensure that we're working with fresh M2M assignments
webhook_queue[-1]['data'] = serialize_for_webhook(instance)
webhook_queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange']
queue[-1]['data'] = serialize_for_webhook(instance)
queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange']
else:
enqueue_object(webhook_queue, instance, request.user, request.id, action)
enqueue_object(queue, instance, request.user, request.id, action)
webhooks_queue.set(queue)
# Increment metric counters
if action == ObjectChangeActionChoices.ACTION_CREATE:
@@ -84,6 +95,7 @@ def handle_changed_object(sender, instance, **kwargs):
model_updates.labels(instance._meta.model_name).inc()
@receiver(pre_delete)
def handle_deleted_object(sender, instance, **kwargs):
"""
Fires when an object is deleted.
@@ -91,7 +103,10 @@ def handle_deleted_object(sender, instance, **kwargs):
if not hasattr(instance, 'to_objectchange'):
return
request = get_request()
# Get the current request, or bail if not set
request = current_request.get()
if request is None:
return
# Record an ObjectChange if applicable
if hasattr(instance, 'to_objectchange'):
@@ -101,22 +116,22 @@ def handle_deleted_object(sender, instance, **kwargs):
objectchange.save()
# Enqueue webhooks
webhook_queue = thread_locals.webhook_queue
enqueue_object(webhook_queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE)
queue = webhooks_queue.get()
enqueue_object(queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE)
webhooks_queue.set(queue)
# Increment metric counters
model_deletes.labels(instance._meta.model_name).inc()
@receiver(clear_webhooks)
def clear_webhook_queue(sender, **kwargs):
"""
Delete any queued webhooks (e.g. because of an aborted bulk transaction)
"""
logger = logging.getLogger('webhooks')
webhook_queue = thread_locals.webhook_queue
logger.info(f"Clearing {len(webhook_queue)} queued webhooks ({sender})")
webhook_queue.clear()
logger.info(f"Clearing {len(webhooks_queue.get())} queued webhooks ({sender})")
webhooks_queue.set([])
#