mirror of
https://github.com/netbox-community/netbox.git
synced 2024-05-10 07:54:54 +00:00
Restore webhooks worker test
This commit is contained in:
@ -12,7 +12,7 @@ from rest_framework import status
|
||||
from dcim.models import Site
|
||||
from extras.choices import ObjectChangeActionChoices
|
||||
from extras.models import Tag, Webhook
|
||||
from extras.webhooks import enqueue_object, generate_signature
|
||||
from extras.webhooks import enqueue_object, flush_webhooks, generate_signature
|
||||
from extras.webhooks_worker import process_webhook
|
||||
from utilities.testing import APITestCase
|
||||
|
||||
@ -251,48 +251,49 @@ class WebhookTest(APITestCase):
|
||||
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
|
||||
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
|
||||
|
||||
# TODO: Replace webhook worker test
|
||||
# def test_webhooks_worker(self):
|
||||
#
|
||||
# request_id = uuid.uuid4()
|
||||
#
|
||||
# def dummy_send(_, request, **kwargs):
|
||||
# """
|
||||
# A dummy implementation of Session.send() to be used for testing.
|
||||
# Always returns a 200 HTTP response.
|
||||
# """
|
||||
# webhook = Webhook.objects.get(type_create=True)
|
||||
# signature = generate_signature(request.body, webhook.secret)
|
||||
#
|
||||
# # Validate the outgoing request headers
|
||||
# self.assertEqual(request.headers['Content-Type'], webhook.http_content_type)
|
||||
# self.assertEqual(request.headers['X-Hook-Signature'], signature)
|
||||
# self.assertEqual(request.headers['X-Foo'], 'Bar')
|
||||
#
|
||||
# # Validate the outgoing request body
|
||||
# body = json.loads(request.body)
|
||||
# self.assertEqual(body['event'], 'created')
|
||||
# self.assertEqual(body['timestamp'], job.kwargs['timestamp'])
|
||||
# self.assertEqual(body['model'], 'site')
|
||||
# self.assertEqual(body['username'], 'testuser')
|
||||
# self.assertEqual(body['request_id'], str(request_id))
|
||||
# self.assertEqual(body['data']['name'], 'Site 1')
|
||||
#
|
||||
# return HttpResponse()
|
||||
#
|
||||
# # Enqueue a webhook for processing
|
||||
# site = Site.objects.create(name='Site 1', slug='site-1')
|
||||
# enqueue_webhooks(
|
||||
# queue=[],
|
||||
# instance=site,
|
||||
# user=self.user,
|
||||
# request_id=request_id,
|
||||
# action=ObjectChangeActionChoices.ACTION_CREATE
|
||||
# )
|
||||
#
|
||||
# # Retrieve the job from queue
|
||||
# job = self.queue.jobs[0]
|
||||
#
|
||||
# # Patch the Session object with our dummy_send() method, then process the webhook for sending
|
||||
# with patch.object(Session, 'send', dummy_send) as mock_send:
|
||||
# process_webhook(**job.kwargs)
|
||||
def test_webhooks_worker(self):
|
||||
|
||||
request_id = uuid.uuid4()
|
||||
|
||||
def dummy_send(_, request, **kwargs):
|
||||
"""
|
||||
A dummy implementation of Session.send() to be used for testing.
|
||||
Always returns a 200 HTTP response.
|
||||
"""
|
||||
webhook = Webhook.objects.get(type_create=True)
|
||||
signature = generate_signature(request.body, webhook.secret)
|
||||
|
||||
# Validate the outgoing request headers
|
||||
self.assertEqual(request.headers['Content-Type'], webhook.http_content_type)
|
||||
self.assertEqual(request.headers['X-Hook-Signature'], signature)
|
||||
self.assertEqual(request.headers['X-Foo'], 'Bar')
|
||||
|
||||
# Validate the outgoing request body
|
||||
body = json.loads(request.body)
|
||||
self.assertEqual(body['event'], 'created')
|
||||
self.assertEqual(body['timestamp'], job.kwargs['timestamp'])
|
||||
self.assertEqual(body['model'], 'site')
|
||||
self.assertEqual(body['username'], 'testuser')
|
||||
self.assertEqual(body['request_id'], str(request_id))
|
||||
self.assertEqual(body['data']['name'], 'Site 1')
|
||||
|
||||
return HttpResponse()
|
||||
|
||||
# Enqueue a webhook for processing
|
||||
webhooks_queue = []
|
||||
site = Site.objects.create(name='Site 1', slug='site-1')
|
||||
enqueue_object(
|
||||
webhooks_queue,
|
||||
instance=site,
|
||||
user=self.user,
|
||||
request_id=request_id,
|
||||
action=ObjectChangeActionChoices.ACTION_CREATE
|
||||
)
|
||||
flush_webhooks(webhooks_queue)
|
||||
|
||||
# Retrieve the job from queue
|
||||
job = self.queue.jobs[0]
|
||||
|
||||
# Patch the Session object with our dummy_send() method, then process the webhook for sending
|
||||
with patch.object(Session, 'send', dummy_send) as mock_send:
|
||||
process_webhook(**job.kwargs)
|
||||
|
Reference in New Issue
Block a user