1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00

Merge branch 'develop' into 568-csv-import-cf

This commit is contained in:
hSaria
2020-01-23 20:27:07 +00:00
committed by GitHub
20 changed files with 706 additions and 101 deletions

View File

@@ -14,10 +14,10 @@ from django.db import transaction
from mptt.forms import TreeNodeChoiceField, TreeNodeMultipleChoiceField
from mptt.models import MPTTModel
from ipam.formfields import IPFormField
from utilities.exceptions import AbortTransaction
from utilities.validators import MaxPrefixLengthValidator, MinPrefixLengthValidator
from ipam.formfields import IPAddressFormField, IPNetworkFormField
from ipam.validators import MaxPrefixLengthValidator, MinPrefixLengthValidator, prefix_validator
from .constants import LOG_DEFAULT, LOG_FAILURE, LOG_INFO, LOG_SUCCESS, LOG_WARNING
from utilities.exceptions import AbortTransaction
from .forms import ScriptForm
from .signals import purge_changelog
@@ -27,6 +27,8 @@ __all__ = [
'ChoiceVar',
'FileVar',
'IntegerVar',
'IPAddressVar',
'IPAddressWithMaskVar',
'IPNetworkVar',
'MultiObjectVar',
'ObjectVar',
@@ -48,15 +50,19 @@ class ScriptVariable:
def __init__(self, label='', description='', default=None, required=True):
# Default field attributes
self.field_attrs = {
'help_text': description,
'required': required
}
# Initialize field attributes
if not hasattr(self, 'field_attrs'):
self.field_attrs = {}
if description:
self.field_attrs['help_text'] = description
if label:
self.field_attrs['label'] = label
if default:
self.field_attrs['initial'] = default
if required:
self.field_attrs['required'] = True
if 'validators' not in self.field_attrs:
self.field_attrs['validators'] = []
def as_field(self):
"""
@@ -196,17 +202,32 @@ class FileVar(ScriptVariable):
form_field = forms.FileField
class IPAddressVar(ScriptVariable):
"""
An IPv4 or IPv6 address without a mask.
"""
form_field = IPAddressFormField
class IPAddressWithMaskVar(ScriptVariable):
"""
An IPv4 or IPv6 address with a mask.
"""
form_field = IPNetworkFormField
class IPNetworkVar(ScriptVariable):
"""
An IPv4 or IPv6 prefix.
"""
form_field = IPFormField
form_field = IPNetworkFormField
field_attrs = {
'validators': [prefix_validator]
}
def __init__(self, min_prefix_length=None, max_prefix_length=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.field_attrs['validators'] = list()
# Optional minimum/maximum prefix lengths
if min_prefix_length is not None:
self.field_attrs['validators'].append(

View File

@@ -1,6 +1,6 @@
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import TestCase
from netaddr import IPNetwork
from netaddr import IPAddress, IPNetwork
from dcim.models import DeviceRole
from extras.scripts import *
@@ -186,6 +186,54 @@ class ScriptVariablesTest(TestCase):
self.assertTrue(form.is_valid())
self.assertEqual(form.cleaned_data['var1'], testfile)
def test_ipaddressvar(self):
class TestScript(Script):
var1 = IPAddressVar()
# Validate IP network enforcement
data = {'var1': '1.2.3'}
form = TestScript().as_form(data, None)
self.assertFalse(form.is_valid())
self.assertIn('var1', form.errors)
# Validate IP mask exclusion
data = {'var1': '192.0.2.0/24'}
form = TestScript().as_form(data, None)
self.assertFalse(form.is_valid())
self.assertIn('var1', form.errors)
# Validate valid data
data = {'var1': '192.0.2.1'}
form = TestScript().as_form(data, None)
self.assertTrue(form.is_valid())
self.assertEqual(form.cleaned_data['var1'], IPAddress(data['var1']))
def test_ipaddresswithmaskvar(self):
class TestScript(Script):
var1 = IPAddressWithMaskVar()
# Validate IP network enforcement
data = {'var1': '1.2.3'}
form = TestScript().as_form(data, None)
self.assertFalse(form.is_valid())
self.assertIn('var1', form.errors)
# Validate IP mask requirement
data = {'var1': '192.0.2.0'}
form = TestScript().as_form(data, None)
self.assertFalse(form.is_valid())
self.assertIn('var1', form.errors)
# Validate valid data
data = {'var1': '192.0.2.0/24'}
form = TestScript().as_form(data, None)
self.assertTrue(form.is_valid())
self.assertEqual(form.cleaned_data['var1'], IPNetwork(data['var1']))
def test_ipnetworkvar(self):
class TestScript(Script):
@@ -198,6 +246,12 @@ class ScriptVariablesTest(TestCase):
self.assertFalse(form.is_valid())
self.assertIn('var1', form.errors)
# Validate host IP check
data = {'var1': '192.0.2.1/24'}
form = TestScript().as_form(data, None)
self.assertFalse(form.is_valid())
self.assertIn('var1', form.errors)
# Validate valid data
data = {'var1': '192.0.2.0/24'}
form = TestScript().as_form(data, None)

View File

@@ -1,11 +1,19 @@
import json
import uuid
from unittest.mock import patch
import django_rq
from django.contrib.contenttypes.models import ContentType
from django.http import HttpResponse
from django.urls import reverse
from requests import Session
from rest_framework import status
from dcim.models import Site
from extras.choices import ObjectChangeActionChoices
from extras.models import Webhook
from extras.webhooks import enqueue_webhooks, generate_signature
from extras.webhooks_worker import process_webhook
from utilities.testing import APITestCase
@@ -22,11 +30,13 @@ class WebhookTest(APITestCase):
def setUpTestData(cls):
site_ct = ContentType.objects.get_for_model(Site)
PAYLOAD_URL = "http://localhost/"
DUMMY_URL = "http://localhost/"
DUMMY_SECRET = "LOOKATMEIMASECRETSTRING"
webhooks = Webhook.objects.bulk_create((
Webhook(name='Site Create Webhook', type_create=True, payload_url=PAYLOAD_URL),
Webhook(name='Site Update Webhook', type_update=True, payload_url=PAYLOAD_URL),
Webhook(name='Site Delete Webhook', type_delete=True, payload_url=PAYLOAD_URL),
Webhook(name='Site Create Webhook', type_create=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET, additional_headers={'X-Foo': 'Bar'}),
Webhook(name='Site Update Webhook', type_update=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET),
Webhook(name='Site Delete Webhook', type_delete=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET),
))
for webhook in webhooks:
webhook.obj_type.set([site_ct])
@@ -87,3 +97,47 @@ class WebhookTest(APITestCase):
self.assertEqual(job.args[1]['id'], site.pk)
self.assertEqual(job.args[2], 'site')
self.assertEqual(job.args[3], ObjectChangeActionChoices.ACTION_DELETE)
def test_webhooks_worker(self):
request_id = uuid.uuid4()
def dummy_send(_, request):
"""
A dummy implementation of Session.send() to be used for testing.
Always returns a 200 HTTP response.
"""
webhook = Webhook.objects.get(type_create=True)
signature = generate_signature(request.body, webhook.secret)
# Validate the outgoing request headers
self.assertEqual(request.headers['Content-Type'], webhook.http_content_type)
self.assertEqual(request.headers['X-Hook-Signature'], signature)
self.assertEqual(request.headers['X-Foo'], 'Bar')
# Validate the outgoing request body
body = json.loads(request.body)
self.assertEqual(body['event'], 'created')
self.assertEqual(body['timestamp'], job.args[4])
self.assertEqual(body['model'], 'site')
self.assertEqual(body['username'], 'testuser')
self.assertEqual(body['request_id'], str(request_id))
self.assertEqual(body['data']['name'], 'Site 1')
return HttpResponse()
# Enqueue a webhook for processing
site = Site.objects.create(name='Site 1', slug='site-1')
enqueue_webhooks(
instance=site,
user=self.user,
request_id=request_id,
action=ObjectChangeActionChoices.ACTION_CREATE
)
# Retrieve the job from queue
job = self.queue.jobs[0]
# Patch the Session object with our dummy_send() method, then process the webhook for sending
with patch.object(Session, 'send', dummy_send) as mock_send:
process_webhook(*job.args)

View File

@@ -1,4 +1,6 @@
import datetime
import hashlib
import hmac
from django.contrib.contenttypes.models import ContentType
@@ -8,6 +10,18 @@ from .choices import *
from .constants import *
def generate_signature(request_body, secret):
"""
Return a cryptographic signature that can be used to verify the authenticity of webhook data.
"""
hmac_prep = hmac.new(
key=secret.encode('utf8'),
msg=request_body.encode('utf8'),
digestmod=hashlib.sha512
)
return hmac_prep.hexdigest()
def enqueue_webhooks(instance, user, request_id, action):
"""
Find Webhook(s) assigned to this instance + action and enqueue them

View File

@@ -1,5 +1,3 @@
import hashlib
import hmac
import json
import requests
@@ -7,6 +5,7 @@ from django_rq import job
from rest_framework.utils.encoders import JSONEncoder
from .choices import ObjectChangeActionChoices, WebhookContentTypeChoices
from .webhooks import generate_signature
@job('default')
@@ -23,7 +22,7 @@ def process_webhook(webhook, data, model_name, event, timestamp, username, reque
'data': data
}
headers = {
'Content-Type': webhook.get_http_content_type_display(),
'Content-Type': webhook.http_content_type,
}
if webhook.additional_headers:
headers.update(webhook.additional_headers)
@@ -43,12 +42,7 @@ def process_webhook(webhook, data, model_name, event, timestamp, username, reque
if webhook.secret != '':
# Sign the request with a hash of the secret key and its content.
hmac_prep = hmac.new(
key=webhook.secret.encode('utf8'),
msg=prepared_request.body.encode('utf8'),
digestmod=hashlib.sha512
)
prepared_request.headers['X-Hook-Signature'] = hmac_prep.hexdigest()
prepared_request.headers['X-Hook-Signature'] = generate_signature(prepared_request.body, webhook.secret)
with requests.Session() as session:
session.verify = webhook.ssl_verification
@@ -56,7 +50,7 @@ def process_webhook(webhook, data, model_name, event, timestamp, username, reque
session.verify = webhook.ca_file_path
response = session.send(prepared_request)
if response.status_code >= 200 and response.status_code <= 299:
if 200 <= response.status_code <= 299:
return 'Status {} returned, webhook successfully processed.'.format(response.status_code)
else:
raise requests.exceptions.RequestException(