1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00
Files
netbox-community-netbox/netbox/extras/tests/test_event_rules.py
2023-12-01 08:25:01 -05:00

379 lines
15 KiB
Python

import json
import uuid
from unittest.mock import patch
import django_rq
from dcim.choices import SiteStatusChoices
from dcim.models import Site
from django.contrib.contenttypes.models import ContentType
from django.http import HttpResponse
from django.urls import reverse
from extras.choices import EventRuleActionChoices, ObjectChangeActionChoices
from extras.events import enqueue_object, flush_events, serialize_for_event
from extras.models import EventRule, Tag, Webhook
from extras.webhooks import generate_signature, send_webhook
from requests import Session
from rest_framework import status
from utilities.testing import APITestCase
class EventRuleTest(APITestCase):
def setUp(self):
super().setUp()
# Ensure the queue has been cleared for each test
self.queue = django_rq.get_queue('default')
self.queue.empty()
@classmethod
def setUpTestData(cls):
site_ct = ContentType.objects.get_for_model(Site)
DUMMY_URL = 'http://localhost:9000/'
DUMMY_SECRET = 'LOOKATMEIMASECRETSTRING'
webhooks = Webhook.objects.bulk_create((
Webhook(name='Webhook 1', payload_url=DUMMY_URL, secret=DUMMY_SECRET, additional_headers='X-Foo: Bar'),
Webhook(name='Webhook 2', payload_url=DUMMY_URL, secret=DUMMY_SECRET),
Webhook(name='Webhook 3', payload_url=DUMMY_URL, secret=DUMMY_SECRET),
))
ct = ContentType.objects.get(app_label='extras', model='webhook')
event_rules = EventRule.objects.bulk_create((
EventRule(
name='Webhook Event 1',
type_create=True,
action_type=EventRuleActionChoices.WEBHOOK,
action_object_type=ct,
action_object_id=webhooks[0].id
),
EventRule(
name='Webhook Event 2',
type_update=True,
action_type=EventRuleActionChoices.WEBHOOK,
action_object_type=ct,
action_object_id=webhooks[0].id
),
EventRule(
name='Webhook Event 3',
type_delete=True,
action_type=EventRuleActionChoices.WEBHOOK,
action_object_type=ct,
action_object_id=webhooks[0].id
),
))
for event_rule in event_rules:
event_rule.content_types.set([site_ct])
Tag.objects.bulk_create((
Tag(name='Foo', slug='foo'),
Tag(name='Bar', slug='bar'),
Tag(name='Baz', slug='baz'),
))
def test_eventrule_conditions(self):
"""
Test evaluation of EventRule conditions.
"""
event_rule = EventRule(
name='Event Rule 1',
type_create=True,
type_update=True,
conditions={
'and': [
{
'attr': 'status.value',
'value': 'active',
}
]
}
)
# Create a Site to evaluate
site = Site.objects.create(name='Site 1', slug='site-1', status=SiteStatusChoices.STATUS_STAGING)
data = serialize_for_event(site)
# Evaluate the conditions (status='staging')
self.assertFalse(event_rule.eval_conditions(data))
# Change the site's status
site.status = SiteStatusChoices.STATUS_ACTIVE
data = serialize_for_event(site)
# Evaluate the conditions (status='active')
self.assertTrue(event_rule.eval_conditions(data))
def test_single_create_process_eventrule(self):
"""
Check that creating an object with an applicable EventRule queues a background task for the rule's action.
"""
# Create an object via the REST API
data = {
'name': 'Site 1',
'slug': 'site-1',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
}
url = reverse('dcim-api:site-list')
self.add_permissions('dcim.add_site')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(Site.objects.count(), 1)
self.assertEqual(Site.objects.first().tags.count(), 2)
# Verify that a background task was queued for the new object
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_create=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_CREATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], response.data['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site 1')
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Bar', 'Foo'])
def test_bulk_create_process_eventrule(self):
"""
Check that bulk creating multiple objects with an applicable EventRule queues a background task for each
new object.
"""
# Create multiple objects via the REST API
data = [
{
'name': 'Site 1',
'slug': 'site-1',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
},
{
'name': 'Site 2',
'slug': 'site-2',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
},
{
'name': 'Site 3',
'slug': 'site-3',
'tags': [
{'name': 'Foo'},
{'name': 'Bar'},
]
},
]
url = reverse('dcim-api:site-list')
self.add_permissions('dcim.add_site')
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(Site.objects.count(), 3)
self.assertEqual(Site.objects.first().tags.count(), 2)
# Verify that a background task was queued for each new object
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_create=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_CREATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], response.data[i]['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags']))
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], response.data[i]['name'])
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Bar', 'Foo'])
def test_single_update_process_eventrule(self):
"""
Check that updating an object with an applicable EventRule queues a background task for the rule's action.
"""
site = Site.objects.create(name='Site 1', slug='site-1')
site.tags.set(Tag.objects.filter(name__in=['Foo', 'Bar']))
# Update an object via the REST API
data = {
'name': 'Site X',
'comments': 'Updated the site',
'tags': [
{'name': 'Baz'}
]
}
url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
self.add_permissions('dcim.change_site')
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
# Verify that a background task was queued for the updated object
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_update=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site X')
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Baz'])
def test_bulk_update_process_eventrule(self):
"""
Check that bulk updating multiple objects with an applicable EventRule queues a background task for each
updated object.
"""
sites = (
Site(name='Site 1', slug='site-1'),
Site(name='Site 2', slug='site-2'),
Site(name='Site 3', slug='site-3'),
)
Site.objects.bulk_create(sites)
for site in sites:
site.tags.set(Tag.objects.filter(name__in=['Foo', 'Bar']))
# Update three objects via the REST API
data = [
{
'id': sites[0].pk,
'name': 'Site X',
'tags': [
{'name': 'Baz'}
]
},
{
'id': sites[1].pk,
'name': 'Site Y',
'tags': [
{'name': 'Baz'}
]
},
{
'id': sites[2].pk,
'name': 'Site Z',
'tags': [
{'name': 'Baz'}
]
},
]
url = reverse('dcim-api:site-list')
self.add_permissions('dcim.change_site')
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
# Verify that a background task was queued for each updated object
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_update=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], data[i]['id'])
self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags']))
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
self.assertEqual(job.kwargs['snapshots']['postchange']['name'], response.data[i]['name'])
self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Baz'])
def test_single_delete_process_eventrule(self):
"""
Check that deleting an object with an applicable EventRule queues a background task for the rule's action.
"""
site = Site.objects.create(name='Site 1', slug='site-1')
site.tags.set(Tag.objects.filter(name__in=['Foo', 'Bar']))
# Delete an object via the REST API
url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
self.add_permissions('dcim.delete_site')
response = self.client.delete(url, **self.header)
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
# Verify that a task was queued for the deleted object
self.assertEqual(self.queue.count, 1)
job = self.queue.jobs[0]
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_delete=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_DELETE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], site.pk)
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
def test_bulk_delete_process_eventrule(self):
"""
Check that bulk deleting multiple objects with an applicable EventRule queues a background task for each
deleted object.
"""
sites = (
Site(name='Site 1', slug='site-1'),
Site(name='Site 2', slug='site-2'),
Site(name='Site 3', slug='site-3'),
)
Site.objects.bulk_create(sites)
for site in sites:
site.tags.set(Tag.objects.filter(name__in=['Foo', 'Bar']))
# Delete three objects via the REST API
data = [
{'id': site.pk} for site in sites
]
url = reverse('dcim-api:site-list')
self.add_permissions('dcim.delete_site')
response = self.client.delete(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
# Verify that a background task was queued for each deleted object
self.assertEqual(self.queue.count, 3)
for i, job in enumerate(self.queue.jobs):
self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(type_delete=True))
self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_DELETE)
self.assertEqual(job.kwargs['model_name'], 'site')
self.assertEqual(job.kwargs['data']['id'], sites[i].pk)
self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
def test_send_webhook(self):
request_id = uuid.uuid4()
def dummy_send(_, request, **kwargs):
"""
A dummy implementation of Session.send() to be used for testing.
Always returns a 200 HTTP response.
"""
event = EventRule.objects.get(type_create=True)
webhook = event.action_object
signature = generate_signature(request.body, webhook.secret)
# Validate the outgoing request headers
self.assertEqual(request.headers['Content-Type'], webhook.http_content_type)
self.assertEqual(request.headers['X-Hook-Signature'], signature)
self.assertEqual(request.headers['X-Foo'], 'Bar')
# Validate the outgoing request body
body = json.loads(request.body)
self.assertEqual(body['event'], 'created')
self.assertEqual(body['timestamp'], job.kwargs['timestamp'])
self.assertEqual(body['model'], 'site')
self.assertEqual(body['username'], 'testuser')
self.assertEqual(body['request_id'], str(request_id))
self.assertEqual(body['data']['name'], 'Site 1')
return HttpResponse()
# Enqueue a webhook for processing
webhooks_queue = []
site = Site.objects.create(name='Site 1', slug='site-1')
enqueue_object(
webhooks_queue,
instance=site,
user=self.user,
request_id=request_id,
action=ObjectChangeActionChoices.ACTION_CREATE
)
flush_events(webhooks_queue)
# Retrieve the job from queue
job = self.queue.jobs[0]
# Patch the Session object with our dummy_send() method, then process the webhook for sending
with patch.object(Session, 'send', dummy_send) as mock_send:
send_webhook(**job.kwargs)