mirror of
				https://github.com/netbox-community/netbox.git
				synced 2024-05-10 07:54:54 +00:00 
			
		
		
		
	Update post-change snapshot for M2M changes
This commit is contained in:
		@@ -12,7 +12,7 @@ from prometheus_client import Counter
 | 
			
		||||
 | 
			
		||||
from .choices import ObjectChangeActionChoices
 | 
			
		||||
from .models import CustomField, ObjectChange
 | 
			
		||||
from .webhooks import enqueue_object, serialize_for_webhook
 | 
			
		||||
from .webhooks import enqueue_object, get_snapshots, serialize_for_webhook
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
#
 | 
			
		||||
@@ -23,6 +23,13 @@ def _handle_changed_object(request, webhook_queue, sender, instance, **kwargs):
 | 
			
		||||
    """
 | 
			
		||||
    Fires when an object is created or updated.
 | 
			
		||||
    """
 | 
			
		||||
    def is_same_object(instance, webhook_data):
 | 
			
		||||
        return (
 | 
			
		||||
            ContentType.objects.get_for_model(instance) == webhook_data['content_type'] and
 | 
			
		||||
            instance.pk == webhook_data['object_id'] and
 | 
			
		||||
            request.id == webhook_data['request_id']
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    if not hasattr(instance, 'to_objectchange'):
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
@@ -57,11 +64,10 @@ def _handle_changed_object(request, webhook_queue, sender, instance, **kwargs):
 | 
			
		||||
            objectchange.save()
 | 
			
		||||
 | 
			
		||||
    # If this is an M2M change, update the previously queued webhook (from post_save)
 | 
			
		||||
    if m2m_changed and webhook_queue:
 | 
			
		||||
        # TODO: Need more validation here
 | 
			
		||||
        # TODO: Need to account for snapshot changes
 | 
			
		||||
    if m2m_changed and webhook_queue and is_same_object(instance, webhook_queue[-1]):
 | 
			
		||||
        instance.refresh_from_db()  # Ensure that we're working with fresh M2M assignments
 | 
			
		||||
        webhook_queue[-1]['data'] = serialize_for_webhook(instance)
 | 
			
		||||
        webhook_queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange']
 | 
			
		||||
    else:
 | 
			
		||||
        enqueue_object(webhook_queue, instance, request.user, request.id, action)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -67,10 +67,12 @@ class WebhookTest(APITestCase):
 | 
			
		||||
        self.assertEqual(self.queue.count, 1)
 | 
			
		||||
        job = self.queue.jobs[0]
 | 
			
		||||
        self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_create=True))
 | 
			
		||||
        self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_CREATE)
 | 
			
		||||
        self.assertEqual(job.kwargs['model_name'], 'site')
 | 
			
		||||
        self.assertEqual(job.kwargs['data']['id'], response.data['id'])
 | 
			
		||||
        self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
 | 
			
		||||
        self.assertEqual(job.kwargs['model_name'], 'site')
 | 
			
		||||
        self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_CREATE)
 | 
			
		||||
        self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site 1')
 | 
			
		||||
        self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Bar', 'Foo'])
 | 
			
		||||
 | 
			
		||||
    def test_enqueue_webhook_update(self):
 | 
			
		||||
        site = Site.objects.create(name='Site 1', slug='site-1')
 | 
			
		||||
@@ -78,6 +80,7 @@ class WebhookTest(APITestCase):
 | 
			
		||||
 | 
			
		||||
        # Update an object via the REST API
 | 
			
		||||
        data = {
 | 
			
		||||
            'name': 'Site X',
 | 
			
		||||
            'comments': 'Updated the site',
 | 
			
		||||
            'tags': [
 | 
			
		||||
                {'name': 'Baz'}
 | 
			
		||||
@@ -92,10 +95,14 @@ class WebhookTest(APITestCase):
 | 
			
		||||
        self.assertEqual(self.queue.count, 1)
 | 
			
		||||
        job = self.queue.jobs[0]
 | 
			
		||||
        self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_update=True))
 | 
			
		||||
        self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_UPDATE)
 | 
			
		||||
        self.assertEqual(job.kwargs['model_name'], 'site')
 | 
			
		||||
        self.assertEqual(job.kwargs['data']['id'], site.pk)
 | 
			
		||||
        self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
 | 
			
		||||
        self.assertEqual(job.kwargs['model_name'], 'site')
 | 
			
		||||
        self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_UPDATE)
 | 
			
		||||
        self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
 | 
			
		||||
        self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
 | 
			
		||||
        self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site X')
 | 
			
		||||
        self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Baz'])
 | 
			
		||||
 | 
			
		||||
    def test_enqueue_webhook_delete(self):
 | 
			
		||||
        site = Site.objects.create(name='Site 1', slug='site-1')
 | 
			
		||||
@@ -111,9 +118,11 @@ class WebhookTest(APITestCase):
 | 
			
		||||
        self.assertEqual(self.queue.count, 1)
 | 
			
		||||
        job = self.queue.jobs[0]
 | 
			
		||||
        self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_delete=True))
 | 
			
		||||
        self.assertEqual(job.kwargs['data']['id'], site.pk)
 | 
			
		||||
        self.assertEqual(job.kwargs['model_name'], 'site')
 | 
			
		||||
        self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_DELETE)
 | 
			
		||||
        self.assertEqual(job.kwargs['model_name'], 'site')
 | 
			
		||||
        self.assertEqual(job.kwargs['data']['id'], site.pk)
 | 
			
		||||
        self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
 | 
			
		||||
        self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
 | 
			
		||||
 | 
			
		||||
    # TODO: Replace webhook worker test
 | 
			
		||||
    # def test_webhooks_worker(self):
 | 
			
		||||
 
 | 
			
		||||
@@ -25,6 +25,13 @@ def serialize_for_webhook(instance):
 | 
			
		||||
    return serializer.data
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_snapshots(instance, action):
 | 
			
		||||
    return {
 | 
			
		||||
        'prechange': getattr(instance, '_prechange_snapshot', None),
 | 
			
		||||
        'postchange': serialize_object(instance) if action != ObjectChangeActionChoices.ACTION_DELETE else None,
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def generate_signature(request_body, secret):
 | 
			
		||||
    """
 | 
			
		||||
    Return a cryptographic signature that can be used to verify the authenticity of webhook data.
 | 
			
		||||
@@ -48,18 +55,12 @@ def enqueue_object(queue, instance, user, request_id, action):
 | 
			
		||||
    if model_name not in registry['model_features']['webhooks'].get(app_label, []):
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
    # Gather pre- and post-change snapshots
 | 
			
		||||
    snapshots = {
 | 
			
		||||
        'prechange': getattr(instance, '_prechange_snapshot', None),
 | 
			
		||||
        'postchange': serialize_object(instance) if action != ObjectChangeActionChoices.ACTION_DELETE else None,
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    queue.append({
 | 
			
		||||
        'content_type': ContentType.objects.get_for_model(instance),
 | 
			
		||||
        'object_id': instance.pk,
 | 
			
		||||
        'event': action,
 | 
			
		||||
        'data': serialize_for_webhook(instance),
 | 
			
		||||
        'snapshots': snapshots,
 | 
			
		||||
        'snapshots': get_snapshots(instance, action),
 | 
			
		||||
        'username': user.username,
 | 
			
		||||
        'request_id': request_id
 | 
			
		||||
    })
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user