1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00

Closes #3952: Add test for webhooks_worker; introduce generate_signature()

This commit is contained in:
Jeremy Stretch
2020-01-23 15:04:40 -05:00
parent 7b517abdb6
commit 2e69037c29
3 changed files with 75 additions and 13 deletions

View File

@ -1,11 +1,19 @@
import json
import uuid
from unittest.mock import patch
import django_rq
from django.contrib.contenttypes.models import ContentType
from django.http import HttpResponse
from django.urls import reverse
from requests import Session
from rest_framework import status
from dcim.models import Site
from extras.choices import ObjectChangeActionChoices
from extras.models import Webhook
from extras.webhooks import enqueue_webhooks, generate_signature
from extras.webhooks_worker import process_webhook
from utilities.testing import APITestCase
@ -22,11 +30,13 @@ class WebhookTest(APITestCase):
def setUpTestData(cls):
site_ct = ContentType.objects.get_for_model(Site)
PAYLOAD_URL = "http://localhost/"
DUMMY_URL = "http://localhost/"
DUMMY_SECRET = "LOOKATMEIMASECRETSTRING"
webhooks = Webhook.objects.bulk_create((
Webhook(name='Site Create Webhook', type_create=True, payload_url=PAYLOAD_URL),
Webhook(name='Site Update Webhook', type_update=True, payload_url=PAYLOAD_URL),
Webhook(name='Site Delete Webhook', type_delete=True, payload_url=PAYLOAD_URL),
Webhook(name='Site Create Webhook', type_create=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET, additional_headers={'X-Foo': 'Bar'}),
Webhook(name='Site Update Webhook', type_update=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET),
Webhook(name='Site Delete Webhook', type_delete=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET),
))
for webhook in webhooks:
webhook.obj_type.set([site_ct])
@ -87,3 +97,47 @@ class WebhookTest(APITestCase):
self.assertEqual(job.args[1]['id'], site.pk)
self.assertEqual(job.args[2], 'site')
self.assertEqual(job.args[3], ObjectChangeActionChoices.ACTION_DELETE)
def test_webhooks_worker(self):
request_id = uuid.uuid4()
def dummy_send(_, request):
"""
A dummy implementation of Session.send() to be used for testing.
Always returns a 200 HTTP response.
"""
webhook = Webhook.objects.get(type_create=True)
signature = generate_signature(request.body, webhook.secret)
# Validate the outgoing request headers
self.assertEqual(request.headers['Content-Type'], webhook.http_content_type)
self.assertEqual(request.headers['X-Hook-Signature'], signature)
self.assertEqual(request.headers['X-Foo'], 'Bar')
# Validate the outgoing request body
body = json.loads(request.body)
self.assertEqual(body['event'], 'created')
self.assertEqual(body['timestamp'], job.args[4])
self.assertEqual(body['model'], 'site')
self.assertEqual(body['username'], 'testuser')
self.assertEqual(body['request_id'], str(request_id))
self.assertEqual(body['data']['name'], 'Site 1')
return HttpResponse()
# Enqueue a webhook for processing
site = Site.objects.create(name='Site 1', slug='site-1')
enqueue_webhooks(
instance=site,
user=self.user,
request_id=request_id,
action=ObjectChangeActionChoices.ACTION_CREATE
)
# Retrieve the job from queue
job = self.queue.jobs[0]
# Patch the Session object with our dummy_send() method, then process the webhook for sending
with patch.object(Session, 'send', dummy_send) as mock_send:
process_webhook(*job.args)

View File

@ -1,4 +1,6 @@
import datetime
import hashlib
import hmac
from django.contrib.contenttypes.models import ContentType
@ -8,6 +10,18 @@ from .choices import *
from .constants import *
def generate_signature(request_body, secret):
"""
Return a cryptographic signature that can be used to verify the authenticity of webhook data.
"""
hmac_prep = hmac.new(
key=secret.encode('utf8'),
msg=request_body.encode('utf8'),
digestmod=hashlib.sha512
)
return hmac_prep.hexdigest()
def enqueue_webhooks(instance, user, request_id, action):
"""
Find Webhook(s) assigned to this instance + action and enqueue them

View File

@ -1,5 +1,3 @@
import hashlib
import hmac
import json
import requests
@ -7,6 +5,7 @@ from django_rq import job
from rest_framework.utils.encoders import JSONEncoder
from .choices import ObjectChangeActionChoices, WebhookContentTypeChoices
from .webhooks import generate_signature
@job('default')
@ -43,12 +42,7 @@ def process_webhook(webhook, data, model_name, event, timestamp, username, reque
if webhook.secret != '':
# Sign the request with a hash of the secret key and its content.
hmac_prep = hmac.new(
key=webhook.secret.encode('utf8'),
msg=prepared_request.body.encode('utf8'),
digestmod=hashlib.sha512
)
prepared_request.headers['X-Hook-Signature'] = hmac_prep.hexdigest()
prepared_request.headers['X-Hook-Signature'] = generate_signature(prepared_request.body, webhook.secret)
with requests.Session() as session:
session.verify = webhook.ssl_verification
@ -56,7 +50,7 @@ def process_webhook(webhook, data, model_name, event, timestamp, username, reque
session.verify = webhook.ca_file_path
response = session.send(prepared_request)
if response.status_code >= 200 and response.status_code <= 299:
if 200 <= response.status_code <= 299:
return 'Status {} returned, webhook successfully processed.'.format(response.status_code)
else:
raise requests.exceptions.RequestException(