mirror of
				https://github.com/netbox-community/netbox.git
				synced 2024-05-10 07:54:54 +00:00 
			
		
		
		
	Merge branch 'develop' into 3995-navbar-overflow
This commit is contained in:
		| @@ -1,11 +1,19 @@ | ||||
| import json | ||||
| import uuid | ||||
| from unittest.mock import patch | ||||
|  | ||||
| import django_rq | ||||
| from django.contrib.contenttypes.models import ContentType | ||||
| from django.http import HttpResponse | ||||
| from django.urls import reverse | ||||
| from requests import Session | ||||
| from rest_framework import status | ||||
|  | ||||
| from dcim.models import Site | ||||
| from extras.choices import ObjectChangeActionChoices | ||||
| from extras.models import Webhook | ||||
| from extras.webhooks import enqueue_webhooks, generate_signature | ||||
| from extras.webhooks_worker import process_webhook | ||||
| from utilities.testing import APITestCase | ||||
|  | ||||
|  | ||||
| @@ -22,11 +30,13 @@ class WebhookTest(APITestCase): | ||||
|     def setUpTestData(cls): | ||||
|  | ||||
|         site_ct = ContentType.objects.get_for_model(Site) | ||||
|         PAYLOAD_URL = "http://localhost/" | ||||
|         DUMMY_URL = "http://localhost/" | ||||
|         DUMMY_SECRET = "LOOKATMEIMASECRETSTRING" | ||||
|  | ||||
|         webhooks = Webhook.objects.bulk_create(( | ||||
|             Webhook(name='Site Create Webhook', type_create=True, payload_url=PAYLOAD_URL), | ||||
|             Webhook(name='Site Update Webhook', type_update=True, payload_url=PAYLOAD_URL), | ||||
|             Webhook(name='Site Delete Webhook', type_delete=True, payload_url=PAYLOAD_URL), | ||||
|             Webhook(name='Site Create Webhook', type_create=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET, additional_headers={'X-Foo': 'Bar'}), | ||||
|             Webhook(name='Site Update Webhook', type_update=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET), | ||||
|             Webhook(name='Site Delete Webhook', type_delete=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET), | ||||
|         )) | ||||
|         for webhook in webhooks: | ||||
|             webhook.obj_type.set([site_ct]) | ||||
| @@ -87,3 +97,47 @@ class WebhookTest(APITestCase): | ||||
|         self.assertEqual(job.args[1]['id'], site.pk) | ||||
|         self.assertEqual(job.args[2], 'site') | ||||
|         self.assertEqual(job.args[3], ObjectChangeActionChoices.ACTION_DELETE) | ||||
|  | ||||
|     def test_webhooks_worker(self): | ||||
|  | ||||
|         request_id = uuid.uuid4() | ||||
|  | ||||
|         def dummy_send(_, request): | ||||
|             """ | ||||
|             A dummy implementation of Session.send() to be used for testing. | ||||
|             Always returns a 200 HTTP response. | ||||
|             """ | ||||
|             webhook = Webhook.objects.get(type_create=True) | ||||
|             signature = generate_signature(request.body, webhook.secret) | ||||
|  | ||||
|             # Validate the outgoing request headers | ||||
|             self.assertEqual(request.headers['Content-Type'], webhook.http_content_type) | ||||
|             self.assertEqual(request.headers['X-Hook-Signature'], signature) | ||||
|             self.assertEqual(request.headers['X-Foo'], 'Bar') | ||||
|  | ||||
|             # Validate the outgoing request body | ||||
|             body = json.loads(request.body) | ||||
|             self.assertEqual(body['event'], 'created') | ||||
|             self.assertEqual(body['timestamp'], job.args[4]) | ||||
|             self.assertEqual(body['model'], 'site') | ||||
|             self.assertEqual(body['username'], 'testuser') | ||||
|             self.assertEqual(body['request_id'], str(request_id)) | ||||
|             self.assertEqual(body['data']['name'], 'Site 1') | ||||
|  | ||||
|             return HttpResponse() | ||||
|  | ||||
|         # Enqueue a webhook for processing | ||||
|         site = Site.objects.create(name='Site 1', slug='site-1') | ||||
|         enqueue_webhooks( | ||||
|             instance=site, | ||||
|             user=self.user, | ||||
|             request_id=request_id, | ||||
|             action=ObjectChangeActionChoices.ACTION_CREATE | ||||
|         ) | ||||
|  | ||||
|         # Retrieve the job from queue | ||||
|         job = self.queue.jobs[0] | ||||
|  | ||||
|         # Patch the Session object with our dummy_send() method, then process the webhook for sending | ||||
|         with patch.object(Session, 'send', dummy_send) as mock_send: | ||||
|             process_webhook(*job.args) | ||||
|   | ||||
| @@ -1,4 +1,6 @@ | ||||
| import datetime | ||||
| import hashlib | ||||
| import hmac | ||||
|  | ||||
| from django.contrib.contenttypes.models import ContentType | ||||
|  | ||||
| @@ -8,6 +10,18 @@ from .choices import * | ||||
| from .constants import * | ||||
|  | ||||
|  | ||||
| def generate_signature(request_body, secret): | ||||
|     """ | ||||
|     Return a cryptographic signature that can be used to verify the authenticity of webhook data. | ||||
|     """ | ||||
|     hmac_prep = hmac.new( | ||||
|         key=secret.encode('utf8'), | ||||
|         msg=request_body.encode('utf8'), | ||||
|         digestmod=hashlib.sha512 | ||||
|     ) | ||||
|     return hmac_prep.hexdigest() | ||||
|  | ||||
|  | ||||
| def enqueue_webhooks(instance, user, request_id, action): | ||||
|     """ | ||||
|     Find Webhook(s) assigned to this instance + action and enqueue them | ||||
|   | ||||
| @@ -1,5 +1,3 @@ | ||||
| import hashlib | ||||
| import hmac | ||||
| import json | ||||
|  | ||||
| import requests | ||||
| @@ -7,6 +5,7 @@ from django_rq import job | ||||
| from rest_framework.utils.encoders import JSONEncoder | ||||
|  | ||||
| from .choices import ObjectChangeActionChoices, WebhookContentTypeChoices | ||||
| from .webhooks import generate_signature | ||||
|  | ||||
|  | ||||
| @job('default') | ||||
| @@ -23,7 +22,7 @@ def process_webhook(webhook, data, model_name, event, timestamp, username, reque | ||||
|         'data': data | ||||
|     } | ||||
|     headers = { | ||||
|         'Content-Type': webhook.get_http_content_type_display(), | ||||
|         'Content-Type': webhook.http_content_type, | ||||
|     } | ||||
|     if webhook.additional_headers: | ||||
|         headers.update(webhook.additional_headers) | ||||
| @@ -43,12 +42,7 @@ def process_webhook(webhook, data, model_name, event, timestamp, username, reque | ||||
|  | ||||
|     if webhook.secret != '': | ||||
|         # Sign the request with a hash of the secret key and its content. | ||||
|         hmac_prep = hmac.new( | ||||
|             key=webhook.secret.encode('utf8'), | ||||
|             msg=prepared_request.body.encode('utf8'), | ||||
|             digestmod=hashlib.sha512 | ||||
|         ) | ||||
|         prepared_request.headers['X-Hook-Signature'] = hmac_prep.hexdigest() | ||||
|         prepared_request.headers['X-Hook-Signature'] = generate_signature(prepared_request.body, webhook.secret) | ||||
|  | ||||
|     with requests.Session() as session: | ||||
|         session.verify = webhook.ssl_verification | ||||
| @@ -56,7 +50,7 @@ def process_webhook(webhook, data, model_name, event, timestamp, username, reque | ||||
|             session.verify = webhook.ca_file_path | ||||
|         response = session.send(prepared_request) | ||||
|  | ||||
|     if response.status_code >= 200 and response.status_code <= 299: | ||||
|     if 200 <= response.status_code <= 299: | ||||
|         return 'Status {} returned, webhook successfully processed.'.format(response.status_code) | ||||
|     else: | ||||
|         raise requests.exceptions.RequestException( | ||||
|   | ||||
		Reference in New Issue
	
	Block a user