1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00

Implements #2006 - run reports and scripts in the background

This commit is contained in:
John Anderson
2020-06-29 03:50:05 -04:00
parent eb8c0539c5
commit 3777fbccc3
29 changed files with 953 additions and 192 deletions

View File

@ -2,7 +2,7 @@ from django import forms
from django.contrib import admin
from utilities.forms import LaxURLField
from .models import CustomField, CustomFieldChoice, CustomLink, Graph, ExportTemplate, ReportResult, Webhook
from .models import CustomField, CustomFieldChoice, CustomLink, Graph, ExportTemplate, JobResult, Webhook
from .reports import get_report
@ -228,27 +228,18 @@ class ExportTemplateAdmin(admin.ModelAdmin):
# Reports
#
@admin.register(ReportResult)
class ReportResultAdmin(admin.ModelAdmin):
@admin.register(JobResult)
class JobResultAdmin(admin.ModelAdmin):
list_display = [
'report', 'active', 'created', 'user', 'passing',
'obj_type', 'name', 'created', 'completed', 'user', 'status',
]
fields = [
'report', 'user', 'passing', 'data',
'obj_type', 'name', 'created', 'completed', 'user', 'status', 'data', 'job_id'
]
list_filter = [
'failed',
'status',
]
readonly_fields = fields
def has_add_permission(self, request):
return False
def active(self, obj):
module, report_name = obj.report.split('.')
return True if get_report(module, report_name) else False
active.boolean = True
def passing(self, obj):
return not obj.failed
passing.boolean = True

View File

@ -1,13 +1,14 @@
from rest_framework import serializers
from extras import models
from utilities.api import WritableNestedSerializer
from extras import choices, models
from users.api.nested_serializers import NestedUserSerializer
from utilities.api import ChoiceField, WritableNestedSerializer
__all__ = [
'NestedConfigContextSerializer',
'NestedExportTemplateSerializer',
'NestedGraphSerializer',
'NestedReportResultSerializer',
'NestedJobResultSerializer',
'NestedTagSerializer',
]
@ -44,13 +45,13 @@ class NestedTagSerializer(WritableNestedSerializer):
fields = ['id', 'url', 'name', 'slug', 'color']
class NestedReportResultSerializer(serializers.ModelSerializer):
url = serializers.HyperlinkedIdentityField(
view_name='extras-api:report-detail',
lookup_field='report',
lookup_url_kwarg='pk'
class NestedJobResultSerializer(serializers.ModelSerializer):
url = serializers.HyperlinkedIdentityField(view_name='extras-api:jobresult-detail')
status = ChoiceField(choices=choices.JobResultStatusChoices)
user = NestedUserSerializer(
read_only=True
)
class Meta:
model = models.ReportResult
fields = ['url', 'created', 'user', 'failed']
model = models.JobResult
fields = ['url', 'created', 'completed', 'user', 'status']

View File

@ -11,7 +11,7 @@ from dcim.models import Device, DeviceRole, Platform, Rack, Region, Site
from extras.choices import *
from extras.constants import *
from extras.models import (
ConfigContext, ExportTemplate, Graph, ImageAttachment, ObjectChange, ReportResult, Tag,
ConfigContext, ExportTemplate, Graph, ImageAttachment, ObjectChange, JobResult, Tag,
)
from extras.utils import FeatureQuery
from tenancy.api.nested_serializers import NestedTenantSerializer, NestedTenantGroupSerializer
@ -232,27 +232,46 @@ class ConfigContextSerializer(ValidatedModelSerializer):
]
#
# Job Results
#
class JobResultSerializer(serializers.ModelSerializer):
url = serializers.HyperlinkedIdentityField(view_name='extras-api:jobresult-detail')
user = NestedUserSerializer(
read_only=True
)
status = ChoiceField(choices=JobResultStatusChoices, read_only=True)
obj_type = ContentTypeField(
read_only=True
)
class Meta:
model = JobResult
fields = [
'id', 'url', 'created', 'completed', 'name', 'obj_type', 'status', 'user', 'data', 'job_id',
]
#
# Reports
#
class ReportResultSerializer(serializers.ModelSerializer):
class Meta:
model = ReportResult
fields = ['created', 'user', 'failed', 'data']
class ReportSerializer(serializers.Serializer):
url = serializers.HyperlinkedIdentityField(
view_name='extras-api:report-detail',
lookup_field='full_name',
lookup_url_kwarg='pk'
)
module = serializers.CharField(max_length=255)
name = serializers.CharField(max_length=255)
description = serializers.CharField(max_length=255, required=False)
test_methods = serializers.ListField(child=serializers.CharField(max_length=255))
result = NestedReportResultSerializer()
result = NestedJobResultSerializer()
class ReportDetailSerializer(ReportSerializer):
result = ReportResultSerializer()
result = JobResultSerializer()
#
@ -260,10 +279,16 @@ class ReportDetailSerializer(ReportSerializer):
#
class ScriptSerializer(serializers.Serializer):
url = serializers.HyperlinkedIdentityField(
view_name='extras-api:script-detail',
lookup_field='full_name',
lookup_url_kwarg='pk'
)
id = serializers.SerializerMethodField(read_only=True)
name = serializers.SerializerMethodField(read_only=True)
description = serializers.SerializerMethodField(read_only=True)
vars = serializers.SerializerMethodField(read_only=True)
result = NestedJobResultSerializer()
def get_id(self, instance):
return '{}.{}'.format(instance.__module__, instance.__name__)
@ -280,6 +305,10 @@ class ScriptSerializer(serializers.Serializer):
}
class ScriptDetailSerializer(ScriptSerializer):
result = JobResultSerializer()
class ScriptInputSerializer(serializers.Serializer):
data = serializers.JSONField()
commit = serializers.BooleanField()

View File

@ -41,5 +41,8 @@ router.register('scripts', views.ScriptViewSet, basename='script')
# Change logging
router.register('object-changes', views.ObjectChangeViewSet)
# Job Results
router.register('job-results', views.JobResultViewSet)
app_name = 'extras-api'
urlpatterns = router.urls

View File

@ -10,8 +10,9 @@ from rest_framework.response import Response
from rest_framework.viewsets import ReadOnlyModelViewSet, ViewSet
from extras import filters
from extras.choices import JobResultStatusChoices
from extras.models import (
ConfigContext, CustomFieldChoice, ExportTemplate, Graph, ImageAttachment, ObjectChange, ReportResult, Tag,
ConfigContext, CustomFieldChoice, ExportTemplate, Graph, ImageAttachment, ObjectChange, JobResult, Tag,
)
from extras.reports import get_report, get_reports
from extras.scripts import get_script, get_scripts, run_script
@ -165,13 +166,21 @@ class ReportViewSet(ViewSet):
Compile all reports and their related results (if any). Result data is deferred in the list view.
"""
report_list = []
report_content_type = ContentType.objects.get(app_label='extras', model='report')
results = {
r.name: r
for r in JobResult.objects.filter(
obj_type=report_content_type,
status__in=JobResultStatusChoices.TERMINAL_STATE_CHOICES
).defer('data')
}
# Iterate through all available Reports.
for module_name, reports in get_reports():
for report in reports:
# Attach the relevant ReportResult (if any) to each Report.
report.result = ReportResult.objects.filter(report=report.full_name).defer('data').first()
# Attach the relevant JobResult (if any) to each Report.
report.result = results.get(report.full_name, None)
report_list.append(report)
serializer = serializers.ReportSerializer(report_list, many=True, context={
@ -185,27 +194,41 @@ class ReportViewSet(ViewSet):
Retrieve a single Report identified as "<module>.<report>".
"""
# Retrieve the Report and ReportResult, if any.
# Retrieve the Report and JobResult, if any.
report = self._retrieve_report(pk)
report.result = ReportResult.objects.filter(report=report.full_name).first()
report_content_type = ContentType.objects.get(app_label='extras', model='report')
report.result = JobResult.objects.filter(
obj_type=report_content_type,
name=report.full_name,
status__in=JobResultStatusChoices.TERMINAL_STATE_CHOICES
).first()
serializer = serializers.ReportDetailSerializer(report)
serializer = serializers.ReportDetailSerializer(report, context={
'request': request
})
return Response(serializer.data)
@action(detail=True, methods=['post'])
def run(self, request, pk):
"""
Run a Report and create a new ReportResult, overwriting any previous result for the Report.
Run a Report identified as "<module>.<script>" and return the pending JobResult as the result
"""
# Check that the user has permission to run reports.
if not request.user.has_perm('extras.add_reportresult'):
raise PermissionDenied("This user does not have permission to run reports.")
# Retrieve and run the Report. This will create a new ReportResult.
# Retrieve and run the Report. This will create a new JobResult.
report = self._retrieve_report(pk)
report.run()
report_content_type = ContentType.objects.get(app_label='extras', model='report')
job_result = JobResult.enqueue_job(
run_report,
report.full_name,
report_content_type,
request.user
)
report.result = job_result
serializer = serializers.ReportDetailSerializer(report)
@ -231,23 +254,42 @@ class ScriptViewSet(ViewSet):
def list(self, request):
script_content_type = ContentType.objects.get(app_label='extras', model='script')
results = {
r.name: r
for r in JobResult.objects.filter(
obj_type=script_content_type,
status__in=JobResultStatusChoices.TERMINAL_STATE_CHOICES
).defer('data').order_by('created')
}
flat_list = []
for script_list in get_scripts().values():
flat_list.extend(script_list.values())
# Attach JobResult objects to each script (if any)
for script in flat_list:
script.result = results.get(script.full_name, None)
serializer = serializers.ScriptSerializer(flat_list, many=True, context={'request': request})
return Response(serializer.data)
def retrieve(self, request, pk):
script = self._get_script(pk)
serializer = serializers.ScriptSerializer(script, context={'request': request})
script_content_type = ContentType.objects.get(app_label='extras', model='script')
script.result = JobResult.objects.filter(
obj_type=script_content_type,
name=script.full_name,
status__in=JobResultStatusChoices.TERMINAL_STATE_CHOICES
).first()
serializer = serializers.ScriptDetailSerializer(script, context={'request': request})
return Response(serializer.data)
def post(self, request, pk):
"""
Run a Script identified as "<module>.<script>".
Run a Script identified as "<module>.<script>" and return the pending JobResult as the result
"""
script = self._get_script(pk)()
input_serializer = serializers.ScriptInputSerializer(data=request.data)
@ -255,10 +297,21 @@ class ScriptViewSet(ViewSet):
if input_serializer.is_valid():
data = input_serializer.data['data']
commit = input_serializer.data['commit']
script.output, execution_time = run_script(script, data, request, commit)
output_serializer = serializers.ScriptOutputSerializer(script)
return Response(output_serializer.data)
script_content_type = ContentType.objects.get(app_label='extras', model='script')
job_result = JobResult.enqueue_job(
run_script,
script.full_name,
script_content_type,
request.user,
data=form.cleaned_data,
request=copy_safe_request(request),
commit=commit
)
script.result = job_result
serializer = serializers.ScriptDetailSerializer(script)
return Response(serializer.data)
return Response(input_serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@ -274,3 +327,16 @@ class ObjectChangeViewSet(ReadOnlyModelViewSet):
queryset = ObjectChange.objects.prefetch_related('user')
serializer_class = serializers.ObjectChangeSerializer
filterset_class = filters.ObjectChangeFilterSet
#
# Job Results
#
class JobResultViewSet(ReadOnlyModelViewSet):
"""
Retrieve a list of job results
"""
queryset = JobResult.objects.prefetch_related('user')
serializer_class = serializers.JobResultSerializer
filterset_class = filters.JobResultFilterSet

View File

@ -120,6 +120,35 @@ class TemplateLanguageChoices(ChoiceSet):
}
#
# Job results
#
class JobResultStatusChoices(ChoiceSet):
STATUS_PENDING = 'pending'
STATUS_RUNNING = 'running'
STATUS_COMPLETED = 'completed'
STATUS_FAILED = 'failed'
CHOICES = (
(STATUS_PENDING, 'Pending'),
(STATUS_RUNNING, 'Running'),
(STATUS_COMPLETED, 'Completed'),
(STATUS_FAILED, 'Failed'),
)
TERMINAL_STATE_CHOICES = (
STATUS_COMPLETED,
STATUS_FAILED,
)
NON_TERMINAL_STATE_CHOICES = (
STATUS_PENDING,
STATUS_RUNNING,
)
#
# Webhooks
#

View File

@ -19,7 +19,8 @@ HTTP_CONTENT_TYPE_JSON = 'application/json'
EXTRAS_FEATURES = [
'custom_fields',
'custom_links',
'graphs',
'export_templates',
'graphs',
'job_results',
'webhooks'
]

View File

@ -7,7 +7,7 @@ from tenancy.models import Tenant, TenantGroup
from utilities.filters import BaseFilterSet
from virtualization.models import Cluster, ClusterGroup
from .choices import *
from .models import ConfigContext, CustomField, Graph, ExportTemplate, ObjectChange, Tag
from .models import ConfigContext, CustomField, Graph, ExportTemplate, ObjectChange, JobResult, Tag
__all__ = (
@ -287,3 +287,33 @@ class CreatedUpdatedFilterSet(django_filters.FilterSet):
field_name='last_updated',
lookup_expr='lte'
)
#
# Job Results
#
class JobResultFilterSet(BaseFilterSet):
q = django_filters.CharFilter(
method='search',
label='Search',
)
created = django_filters.DateTimeFilter()
completed = django_filters.DateTimeFilter()
status = django_filters.MultipleChoiceFilter(
choices=JobResultStatusChoices,
null_value=None
)
class Meta:
model = JobResult
fields = [
'id', 'created', 'completed', 'status', 'user', 'obj_type', 'name'
]
def search(self, queryset, name, value):
if not value.strip():
return queryset
return queryset.filter(
Q(user__username__icontains=value)
)

View File

@ -0,0 +1,22 @@
# Generated by Django 3.0.7 on 2020-06-22 20:11
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('extras', '0042_customfield_manager'),
]
operations = [
migrations.CreateModel(
name='Report',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False)),
],
options={
'managed': False,
},
),
]

View File

@ -0,0 +1,76 @@
# Generated by Django 3.0.7 on 2020-06-23 02:28
import uuid
from django.conf import settings
import django.contrib.postgres.fields.jsonb
from django.db import migrations, models
import django.db.models.deletion
import extras.utils
def convert_job_results(apps, schema_editor):
"""
Convert ReportResult objects to JobResult objects
"""
from extras.choices import JobResultStatusChoices
ReportResult = apps.get_model('extras', 'ReportResult')
JobResult = apps.get_model('extras', 'JobResult')
ContentType = apps.get_model('contenttypes', 'ContentType')
report_content_type = ContentType.objects.get(app_label='extras', model='report')
job_results = []
for report_result in ReportResult.objects.all():
if report_result.failed:
status = JobResultStatusChoices.STATUS_FAILED
else:
status = JobResultStatusChoices.STATUS_COMPLETED
job_results.append(
JobResult(
name=report_result.report,
obj_type=report_content_type,
created=report_result.created,
completed=report_result.created,
user=report_result.user,
status=status,
data=report_result.data,
job_id=uuid.uuid4()
)
)
JobResult.objects.bulk_create(job_results)
class Migration(migrations.Migration):
dependencies = [
('contenttypes', '0002_remove_content_type_name'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('extras', '0043_reports'),
]
operations = [
migrations.CreateModel(
name='JobResult',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False)),
('name', models.CharField(max_length=255)),
('created', models.DateTimeField(auto_now_add=True)),
('completed', models.DateTimeField(blank=True, null=True)),
('status', models.CharField(default='pending', max_length=30)),
('data', django.contrib.postgres.fields.jsonb.JSONField(blank=True, null=True)),
('job_id', models.UUIDField(unique=True)),
('obj_type', models.ForeignKey(limit_choices_to=extras.utils.FeatureQuery('job_results'), on_delete=django.db.models.deletion.CASCADE, related_name='job_results', to='contenttypes.ContentType')),
('user', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='+', to=settings.AUTH_USER_MODEL)),
],
options={
'ordering': ['obj_type', 'name', '-created'],
},
),
migrations.RunPython(
code=convert_job_results
),
migrations.DeleteModel(
name='ReportResult'
)
]

View File

@ -1,7 +1,7 @@
from .customfields import CustomField, CustomFieldChoice, CustomFieldModel, CustomFieldValue
from .models import (
ConfigContext, ConfigContextModel, CustomLink, ExportTemplate, Graph, ImageAttachment, ObjectChange, ReportResult,
Script, Webhook,
ConfigContext, ConfigContextModel, CustomLink, ExportTemplate, Graph, ImageAttachment, JobResult, ObjectChange,
Report, Script, Webhook,
)
from .tags import Tag, TaggedItem
@ -16,8 +16,9 @@ __all__ = (
'ExportTemplate',
'Graph',
'ImageAttachment',
'JobResult',
'ObjectChange',
'ReportResult',
'Report',
'Script',
'Tag',
'TaggedItem',

View File

@ -1,6 +1,8 @@
import json
import uuid
from collections import OrderedDict
import django_rq
from django.contrib.auth.models import User
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
@ -17,7 +19,7 @@ from utilities.utils import deepmerge, render_jinja2
from extras.choices import *
from extras.constants import *
from extras.querysets import ConfigContextQuerySet
from extras.utils import FeatureQuery, image_upload
from extras.utils import extras_features, FeatureQuery, image_upload
#
@ -562,29 +564,92 @@ class ConfigContextModel(models.Model):
# Custom scripts
#
@extras_features('job_results')
class Script(models.Model):
"""
Dummy model used to generate permissions for custom scripts. Does not exist in the database.
"""
class Meta:
managed = False
@classmethod
def get_absolute_url_from_job_result(cls, job_result):
"""
Given a JobResult that links to this content type, return URL to an instance which corresponds to that job
result, i.e. for historical records
"""
if job_result.obj_type.model_class() != cls:
return None
module, script_name = job_result.name.split('.')
return reverse(
'extras:script_history_detail',
kwargs={
'module': module,
'script_name': script_name,
'job_id': job_result.job_id
}
)
#
# Report results
# Reports
#
class ReportResult(models.Model):
@extras_features('job_results')
class Report(models.Model):
"""
Dummy model used to generate permissions for reports. Does not exist in the database.
"""
class Meta:
managed = False
@classmethod
def get_absolute_url_from_job_result(cls, job_result):
"""
Given a JobResult that links to this content type, return URL to an instance which corresponds to that job
result, i.e. for historical records
"""
if job_result.obj_type.model_class() != cls:
return None
return reverse(
'extras:report_history_detail',
kwargs={
'name': job_result.name,
'job_id': job_result.job_id
}
)
#
# Job results
#
class JobResult(models.Model):
"""
This model stores the results from running a user-defined report.
"""
report = models.CharField(
max_length=255,
unique=True
name = models.CharField(
max_length=255
)
obj_type = models.ForeignKey(
to=ContentType,
related_name='job_results',
verbose_name='Object types',
limit_choices_to=FeatureQuery('job_results'),
help_text="The object type to which this job result applies.",
on_delete=models.CASCADE,
)
created = models.DateTimeField(
auto_now_add=True
)
completed = models.DateTimeField(
null=True,
blank=True
)
user = models.ForeignKey(
to=User,
on_delete=models.SET_NULL,
@ -592,19 +657,65 @@ class ReportResult(models.Model):
blank=True,
null=True
)
failed = models.BooleanField()
data = JSONField()
status = models.CharField(
max_length=30,
choices=JobResultStatusChoices,
default=JobResultStatusChoices.STATUS_PENDING
)
data = JSONField(
null=True,
blank=True
)
job_id = models.UUIDField(
unique=True
)
class Meta:
ordering = ['report']
ordering = ['obj_type', 'name', '-created']
def __str__(self):
return "{} {} at {}".format(
self.report,
"passed" if not self.failed else "failed",
self.created
return str(self.job_id)
def get_absolute_url(self):
"""
Job results are accessed only under the context of the content type they link to
"""
return self.obj_type.model_class().get_absolute_url_from_job_result(self)
@property
def duration(self):
if not self.completed:
return None
duration = self.completed - self.created
minutes, seconds = divmod(duration.total_seconds(), 60)
return f"{int(minutes)} minutes, {seconds:.2f} seconds"
@classmethod
def enqueue_job(cls, func, name, obj_type, user, *args, **kwargs):
"""
Create a JobResult instance and enqueue a job using the given callable
func: The callable object to be enqueued for execution
name: Name for the JobResult instance
obj_type: ContentType to link to the JobResult instance obj_type
user: User object to link to the JobResult instance
args: additional args passed to the callable
kwargs: additional kargs passed to the callable
"""
job_result = cls.objects.create(
name=name,
obj_type=obj_type,
user=user,
job_id=uuid.uuid4()
)
func.delay(*args, job_id=str(job_result.job_id), job_result=job_result, **kwargs)
return job_result
#
# Change logging

View File

@ -5,10 +5,13 @@ import pkgutil
from collections import OrderedDict
from django.conf import settings
from django.db.models import Q
from django.utils import timezone
from django_rq import job
from .choices import JobResultStatusChoices
from .constants import *
from .models import ReportResult
from .models import JobResult
def is_report(obj):
@ -60,6 +63,25 @@ def get_reports():
return module_list
@job('default')
def run_report(job_result, *args, **kwargs):
"""
Helper function to call the run method on a report. This is needed to get around the inability to pickle an instance
method for queueing into the background processor.
"""
module_name, report_name = job_result.name.split('.', 1)
report = get_report(module_name, report_name)
report.run(job_result)
# Delete any previous terminal state results
JobResult.objects.filter(
obj_type=job_result.obj_type,
status=JobResultStatusChoices.TERMINAL_STATE_CHOICES
).exclude(
pk=job_result.pk
).delete()
class Report(object):
"""
NetBox users can extend this object to write custom reports to be used for validating data within NetBox. Each
@ -177,27 +199,29 @@ class Report(object):
self.logger.info(f"Failure | {obj}: {message}")
self.failed = True
def run(self):
def run(self, job_result):
"""
Run the report and return its results. Each test method will be executed in order.
Run the report and save its results. Each test method will be executed in order.
"""
self.logger.info(f"Running report")
job_result.status = JobResultStatusChoices.STATUS_RUNNING
job_result.save()
for method_name in self.test_methods:
self.active_test = method_name
test_method = getattr(self, method_name)
test_method()
# Delete any previous ReportResult and create a new one to record the result.
ReportResult.objects.filter(report=self.full_name).delete()
result = ReportResult(report=self.full_name, failed=self.failed, data=self._results)
result.save()
self.result = result
if self.failed:
self.logger.warning("Report failed")
job_result.status = JobResultStatusChoices.STATUS_FAILED
else:
self.logger.info("Report completed successfully")
job_result.status = JobResultStatusChoices.STATUS_COMPLETED
job_result.data = self._results
job_result.completed = timezone.now()
job_result.save()
# Perform any post-run tasks
self.post_run()

View File

@ -12,9 +12,15 @@ from django import forms
from django.conf import settings
from django.core.validators import RegexValidator
from django.db import transaction
from django.utils import timezone
from django.utils.decorators import classproperty
from django_rq import job
from mptt.forms import TreeNodeChoiceField, TreeNodeMultipleChoiceField
from mptt.models import MPTTModel
from extras.api.serializers import ScriptOutputSerializer
from extras.choices import JobResultStatusChoices
from extras.models import JobResult
from ipam.formfields import IPAddressFormField, IPNetworkFormField
from ipam.validators import MaxPrefixLengthValidator, MinPrefixLengthValidator, prefix_validator
from .constants import LOG_DEFAULT, LOG_FAILURE, LOG_INFO, LOG_SUCCESS, LOG_WARNING
@ -269,6 +275,10 @@ class BaseScript:
def __str__(self):
return getattr(self.Meta, 'name', self.__class__.__name__)
@classproperty
def full_name(self):
return '.'.join([self.__module__, self.__name__])
@classmethod
def module(cls):
return cls.__module__
@ -375,7 +385,8 @@ def is_variable(obj):
return isinstance(obj, ScriptVariable)
def run_script(script, data, request, commit=True):
@job('default')
def run_script(data, request, commit=True, *args, **kwargs):
"""
A wrapper for calling Script.run(). This performs error handling and provides a hook for committing changes. It
exists outside of the Script class to ensure it cannot be overridden by a script author.
@ -384,8 +395,15 @@ def run_script(script, data, request, commit=True):
start_time = None
end_time = None
script_name = script.__class__.__name__
logger = logging.getLogger(f"netbox.scripts.{script.module()}.{script_name}")
job_result = kwargs.pop('job_result')
module, script_name = job_result.name.split('.', 1)
script = get_script(module, script_name)()
job_result.status = JobResultStatusChoices.STATUS_RUNNING
job_result.save()
logger = logging.getLogger(f"netbox.scripts.{module}.{script_name}")
logger.info(f"Running script (commit={commit})")
# Add files to form data
@ -405,9 +423,8 @@ def run_script(script, data, request, commit=True):
try:
with transaction.atomic():
start_time = time.time()
output = script.run(**kwargs)
end_time = time.time()
script.output = script.run(**kwargs)
job_result.status = JobResultStatusChoices.STATUS_COMPLETED
if not commit:
raise AbortTransaction()
except AbortTransaction:
@ -419,6 +436,7 @@ def run_script(script, data, request, commit=True):
)
logger.error(f"Exception raised during script execution: {e}")
commit = False
job_result.status = JobResultStatusChoices.STATUS_FAILED
finally:
if not commit:
# Delete all pending changelog entries
@ -427,14 +445,19 @@ def run_script(script, data, request, commit=True):
"Database changes have been reverted automatically."
)
# Calculate execution time
if end_time is not None:
execution_time = end_time - start_time
logger.info(f"Script completed in {execution_time:.4f} seconds")
else:
execution_time = None
job_result.data = ScriptOutputSerializer(script).data
job_result.completed = timezone.now()
job_result.save()
return output, execution_time
logger.info(f"Script completed in {job_result.duration}")
# Delete any previous terminal state results
JobResult.objects.filter(
obj_type=job_result.obj_type,
status=JobResultStatusChoices.TERMINAL_STATE_CHOICES
).exclude(
pk=job_result.pk
).delete()
def get_scripts(use_names=False):

View File

@ -2,7 +2,7 @@ import django_tables2 as tables
from django_tables2.utils import Accessor
from utilities.tables import BaseTable, BooleanColumn, ColorColumn, ToggleColumn
from .models import ConfigContext, ObjectChange, Tag, TaggedItem
from .models import ConfigContext, ObjectChange, JobResult, Tag, TaggedItem
TAG_ACTIONS = """
<a href="{% url 'extras:tag_changelog' slug=record.slug %}" class="btn btn-default btn-xs" title="Change log">
@ -61,6 +61,28 @@ OBJECTCHANGE_REQUEST_ID = """
<a href="{% url 'extras:objectchange_list' %}?request_id={{ value }}">{{ value }}</a>
"""
JOB_RESULT_CREATED = """
<a href="{{ record.get_absolute_url }}">{{ value|date:"SHORT_DATETIME_FORMAT" }}</a>
"""
JOB_RESULT_COMPLETED = """
<span>{% if value %}{{ value|date:"SHORT_DATETIME_FORMAT" }}{% else %}—{% endif %}</span>
"""
JOB_RESULT_STATUS = """
{% if record.status == 'failed' %}
<label class="label label-danger">Failed</label>
{% elif record.status == 'pending' %}
<label class="label label-default">Pending</label>
{% elif record.status == 'running' %}
<label class="label label-warning">Running</label>
{% elif record.status == 'completed' %}
<label class="label label-success">Passed</label>
{% else %}
<label class="label label-default">N/A</label>
{% endif %}
"""
class TagTable(BaseTable):
pk = ToggleColumn()
@ -133,3 +155,21 @@ class ObjectChangeTable(BaseTable):
class Meta(BaseTable.Meta):
model = ObjectChange
fields = ('time', 'user_name', 'action', 'changed_object_type', 'object_repr', 'request_id')
class JobResultHistoryTable(BaseTable):
created = tables.TemplateColumn(
template_code=JOB_RESULT_CREATED,
verbose_name='Run'
)
completed = tables.TemplateColumn(
template_code=JOB_RESULT_COMPLETED
)
status = tables.TemplateColumn(
template_code=JOB_RESULT_STATUS
)
class Meta(BaseTable.Meta):
model = JobResult
fields = ('created', 'completed', 'status')

View File

@ -11,24 +11,25 @@ def log_level(level):
"""
Display a label indicating a syslog severity (e.g. info, warning, etc.).
"""
# TODO: we should convert this to a choices class
levels = {
LOG_DEFAULT: {
'default': {
'name': 'Default',
'class': 'default'
},
LOG_SUCCESS: {
'success': {
'name': 'Success',
'class': 'success',
},
LOG_INFO: {
'info': {
'name': 'Info',
'class': 'info'
},
LOG_WARNING: {
'warning': {
'name': 'Warning',
'class': 'warning'
},
LOG_FAILURE: {
'failure': {
'name': 'Failure',
'class': 'danger'
}

View File

@ -43,5 +43,6 @@ urlpatterns = [
# Scripts
path('scripts/', views.ScriptListView.as_view(), name='script_list'),
path('scripts/<str:module>/<str:name>/', views.ScriptView.as_view(), name='script'),
path('scripts/<str:module>/<str:name>/result/<int:job_result_pk>/', views.ScriptResultView.as_view(), name='script_result'),
]

View File

@ -1,3 +1,5 @@
import time
from django import template
from django.conf import settings
from django.contrib import messages
@ -11,14 +13,15 @@ from django_tables2 import RequestConfig
from utilities.forms import ConfirmationForm
from utilities.paginator import EnhancedPaginator
from utilities.utils import shallow_compare_dict
from utilities.utils import copy_safe_request, shallow_compare_dict
from utilities.views import (
BulkDeleteView, BulkEditView, BulkImportView, ObjectView, ObjectDeleteView, ObjectEditView, ObjectListView,
ObjectPermissionRequiredMixin,
ContentTypePermissionRequiredMixin,
)
from . import filters, forms, tables
from .models import ConfigContext, ImageAttachment, ObjectChange, ReportResult, Tag, TaggedItem
from .reports import get_report, get_reports
from .choices import JobResultStatusChoices
from .models import ConfigContext, ImageAttachment, ObjectChange, Report, JobResult, Script, Tag, TaggedItem
from .reports import get_report, get_reports, run_report
from .scripts import get_scripts, run_script
@ -332,9 +335,9 @@ class ImageAttachmentDeleteView(ObjectDeleteView):
# Reports
#
class ReportListView(ObjectPermissionRequiredMixin, View):
class ReportListView(ContentTypePermissionRequiredMixin, View):
"""
Retrieve all of the available reports from disk and the recorded ReportResult (if any) for each.
Retrieve all of the available reports from disk and the recorded JobResult (if any) for each.
"""
def get_required_permission(self):
return 'extras.view_reportresult'
@ -342,7 +345,14 @@ class ReportListView(ObjectPermissionRequiredMixin, View):
def get(self, request):
reports = get_reports()
results = {r.report: r for r in ReportResult.objects.all()}
report_content_type = ContentType.objects.get(app_label='extras', model='report')
results = {
r.name: r
for r in JobResult.objects.filter(
obj_type=report_content_type,
status__in=JobResultStatusChoices.TERMINAL_STATE_CHOICES
).defer('data')
}
ret = []
for module, report_list in reports:
@ -357,23 +367,46 @@ class ReportListView(ObjectPermissionRequiredMixin, View):
})
class ReportView(ObjectPermissionRequiredMixin, View):
class GetReportMixin:
def get_report(self, name):
if '.' not in name:
raise Http404
# Retrieve the Report by "<module>.<report>"
module_name, report_name = name.split('.', 1)
report = get_report(module_name, report_name)
if report is None:
raise Http404
return report
class ReportView(GetReportMixin, ContentTypePermissionRequiredMixin, View):
"""
Display a single Report and its associated ReportResult (if any).
Display a single Report and its associated JobResult (if any).
"""
def get_required_permission(self):
return 'extras.view_reportresult'
def get(self, request, name):
# Retrieve the Report by "<module>.<report>"
module_name, report_name = name.split('.')
report = get_report(module_name, report_name)
if report is None:
raise Http404
report = self.get_report(name)
# Attach the ReportResult (if any)
report.result = ReportResult.objects.filter(report=report.full_name).first()
report_content_type = ContentType.objects.get(app_label='extras', model='report')
latest_run_result = JobResult.objects.filter(
obj_type=report_content_type,
name=report.full_name,
status__in=JobResultStatusChoices.TERMINAL_STATE_CHOICES
).first()
pending_run_result = JobResult.objects.filter(
obj_type=report_content_type,
name=report.full_name,
status__in=JobResultStatusChoices.NON_TERMINAL_STATE_CHOICES
).order_by(
'created'
).first()
report.result = latest_run_result
report.pending_result = pending_run_result
return render(request, 'extras/report.html', {
'report': report,
@ -381,7 +414,7 @@ class ReportView(ObjectPermissionRequiredMixin, View):
})
class ReportRunView(ObjectPermissionRequiredMixin, View):
class ReportRunView(GetReportMixin, ContentTypePermissionRequiredMixin, View):
"""
Run a Report and record a new ReportResult.
"""
@ -390,20 +423,19 @@ class ReportRunView(ObjectPermissionRequiredMixin, View):
def post(self, request, name):
# Retrieve the Report by "<module>.<report>"
module_name, report_name = name.split('.')
report = get_report(module_name, report_name)
if report is None:
raise Http404
report = self.get_report(name)
form = ConfirmationForm(request.POST)
if form.is_valid():
# Run the Report. A new ReportResult is created.
report.run()
result = 'failed' if report.failed else 'passed'
msg = "Ran report {} ({})".format(report.full_name, result)
messages.success(request, mark_safe(msg))
# Run the Report. A new JobResult is created.
report_content_type = ContentType.objects.get(app_label='extras', model='report')
job_result = JobResult.enqueue_job(
run_report,
report.full_name,
report_content_type,
request.user
)
return redirect('extras:report', name=report.full_name)
@ -412,7 +444,16 @@ class ReportRunView(ObjectPermissionRequiredMixin, View):
# Scripts
#
class ScriptListView(ObjectPermissionRequiredMixin, View):
class GetScriptMixin:
def _get_script(self, module, name):
scripts = get_scripts()
try:
return scripts[module][name]()
except KeyError:
raise Http404
class ScriptListView(ContentTypePermissionRequiredMixin, View):
def get_required_permission(self):
return 'extras.view_script'
@ -424,22 +465,23 @@ class ScriptListView(ObjectPermissionRequiredMixin, View):
})
class ScriptView(ObjectPermissionRequiredMixin, View):
class ScriptView(ContentTypePermissionRequiredMixin, GetScriptMixin, View):
def get_required_permission(self):
return 'extras.view_script'
def _get_script(self, module, name):
scripts = get_scripts()
try:
return scripts[module][name]()
except KeyError:
raise Http404
def get(self, request, module, name):
script = self._get_script(module, name)
form = script.as_form(initial=request.GET)
# Look for a pending JobResult (use the latest one by creation timestamp)
script_content_type = ContentType.objects.get(app_label='extras', model='script')
script.result = JobResult.objects.filter(
obj_type=script_content_type,
name=script.full_name,
status__in=JobResultStatusChoices.NON_TERMINAL_STATE_CHOICES
).first()
return render(request, 'extras/script.html', {
'module': module,
'script': script,
@ -459,12 +501,40 @@ class ScriptView(ObjectPermissionRequiredMixin, View):
if form.is_valid():
commit = form.cleaned_data.pop('_commit')
output, execution_time = run_script(script, form.cleaned_data, request, commit)
#output, execution_time = run_script(script, form.cleaned_data, request, commit)
return render(request, 'extras/script.html', {
script_content_type = ContentType.objects.get(app_label='extras', model='script')
job_result = JobResult.enqueue_job(
run_script,
script.full_name,
script_content_type,
request.user,
data=form.cleaned_data,
request=copy_safe_request(request),
commit=commit
)
return redirect('extras:script_result', module=module, name=name, job_result_pk=job_result.pk)
class ScriptResultView(ContentTypePermissionRequiredMixin, GetScriptMixin, View):
def get_required_permission(self):
return 'extras.view_script'
def get(self, request, module, name, job_result_pk):
script = self._get_script(module, name)
form = script.as_form(initial=request.GET)
# Look for a pending JobResult (use the latest one by creation timestamp)
script_content_type = ContentType.objects.get(app_label='extras', model='script')
result = get_object_or_404(JobResult.objects.all(), pk=job_result_pk)
if result.obj_type != script_content_type:
raise Http404
return render(request, 'extras/script_result.html', {
'module': module,
'script': script,
'form': form,
'output': output,
'execution_time': execution_time,
'result': result,
'class_name': name
})

View File

@ -24,7 +24,7 @@ from dcim.tables import (
CableTable, DeviceTable, DeviceTypeTable, PowerFeedTable, RackTable, RackGroupTable, SiteTable,
VirtualChassisTable,
)
from extras.models import ObjectChange, ReportResult
from extras.models import ObjectChange, JobResult
from ipam.filters import AggregateFilterSet, IPAddressFilterSet, PrefixFilterSet, VLANFilterSet, VRFFilterSet
from ipam.models import Aggregate, IPAddress, Prefix, VLAN, VRF
from ipam.tables import AggregateTable, IPAddressTable, PrefixTable, VLANTable, VRFTable
@ -241,7 +241,7 @@ class HomeView(View):
return render(request, self.template_name, {
'search_form': SearchForm(),
'stats': stats,
'report_results': ReportResult.objects.order_by('-created')[:10],
'report_results': [],#ReportResult.objects.order_by('-created')[:10],
'changelog': changelog[:15],
'new_release': new_release,
})

View File

@ -0,0 +1,52 @@
var url = netbox_api_path + "extras/job-results/";
var timeout = 1000;
function updatePendingStatusLabel(status){
var labelClass;
if (status.value === 'failed'){
labelClass = 'danger';
} else if (status.value === 'pending'){
labelClass = 'default';
} else if (status.value === 'running'){
labelClass = 'warning';
} else if (status.value === 'completed'){
labelClass = 'success';
} else {
labelClass = 'default';
}
var elem = $('#pending-result-label > label');
elem.attr('class', 'label label-' + labelClass);
elem.text(status.label);
}
function refreshWindow(){
window.location.reload();
}
$(document).ready(function(){
if (pending_result_id !== null){
(function checkPendingResult(){
$.ajax({
url: url + pending_result_id + '/',
method: 'GET',
dataType: 'json',
beforeSend: function(xhr, settings) {
xhr.setRequestHeader("X-CSRFToken", "{{ csrf_token }}");
},
context: this,
success: function(data) {
updatePendingStatusLabel(data.status);
if (data.status.value === 'completed' || data.status.value === 'failed'){
jobTerminatedAction()
} else {
setTimeout(checkPendingResult, timeout);
if (timeout < 10000) {
// back off each iteration, until we reach a 10s interval
timeout += 1000
}
}
}
});
})();
}
})

View File

@ -0,0 +1,11 @@
{% if result.status == 'failed' %}
<label class="label label-danger">Failed</label>
{% elif result.status == 'pending' %}
<label class="label label-default">Pending</label>
{% elif result.status == 'running' %}
<label class="label label-warning">Running</label>
{% elif result.status == 'completed' %}
<label class="label label-success">Passed</label>
{% else %}
<label class="label label-default">N/A</label>
{% endif %}

View File

@ -1,7 +0,0 @@
{% if result.failed %}
<label class="label label-danger">Failed</label>
{% elif result %}
<label class="label label-success">Passed</label>
{% else %}
<label class="label label-default">N/A</label>
{% endif %}

View File

@ -1,5 +1,6 @@
{% extends 'base.html' %}
{% load helpers %}
{% load static %}
{% block title %}{{ report.name }}{% endblock %}
@ -22,14 +23,21 @@
</form>
</div>
{% endif %}
<h1>{{ report.name }}{% include 'extras/inc/report_label.html' with result=report.result %}</h1>
<h1>{{ report.name }}{% include 'extras/inc/job_label.html' with result=report.result %}</h1>
<div class="row">
<div class="col-md-12">
{% if report.description %}
<p class="lead">{{ report.description }}</p>
{% endif %}
{% if report.result %}
<p>Last run: <strong>{{ report.result.created }}</strong></p>
<p>Last run: <strong>{{ report.result.created }}</strong> {% if report.result.completed %} Duration: <strong>{{ report.result.duration }}</strong>{% endif %}</p>
{% endif %}
{% if report.pending_result %}
<p>
Pending run: <strong>{{ report.pending_result.created }}</strong>
<span id="pending-result-label">{% include 'extras/inc/job_label.html' with result=report.pending_result %}</span>
<img id="pending-result-loader" src="{% static 'img/ajax-loader.gif' %}" />
</p>
{% endif %}
{% if report.result %}
<div class="panel panel-default">
@ -100,3 +108,25 @@
</div>
</div>
{% endblock %}
{% block javascript %}
<script type="text/javascript">
{% if report.pending_result %}
var pending_result_id = {{ report.pending_result.pk }};
{% else %}
var pending_result_id = null;
{% endif %}
function jobTerminatedAction(){
$('#pending-result-loader').hide();
var refreshButton = document.createElement('button');
refreshButton.className = 'btn btn-xs btn-primary';
refreshButton.onclick = refreshWindow;
refreshButton.innerHTML = '<i class="fa fa-refresh"></i> Refresh';
$('#pending-result-loader').parents('p').append(refreshButton)
}
</script>
<script src="{% static 'js/job_result.js' %}?v{{ settings.VERSION }}"
onerror="window.location='{% url 'media_failure' %}?filename=js/job_result.js'"></script>
{% endblock %}

View File

@ -24,7 +24,7 @@
<a href="{% url 'extras:report' name=report.full_name %}" name="report.{{ report.name }}"><strong>{{ report.name }}</strong></a>
</td>
<td>
{% include 'extras/inc/report_label.html' with result=report.result %}
{% include 'extras/inc/job_label.html' with result=report.result %}
</td>
<td>{{ report.description|default:"" }}</td>
{% if report.result %}
@ -69,7 +69,7 @@
<a href="#report.{{ report.name }}" class="list-group-item">
<i class="fa fa-list-alt"></i> {{ report.name }}
<div class="pull-right">
{% include 'extras/inc/report_label.html' with result=report.result %}
{% include 'extras/inc/job_label.html' with result=report.result %}
</div>
</a>
{% endfor %}

View File

@ -21,51 +21,12 @@
<li role="presentation" class="active">
<a href="#run" role="tab" data-toggle="tab" class="active">Run</a>
</li>
<li role="presentation"{% if not output %} class="disabled"{% endif %}>
<a href="#output" role="tab" data-toggle="tab">Output</a>
</li>
<li role="presentation">
<a href="#source" role="tab" data-toggle="tab">Source</a>
</li>
</ul>
<div class="tab-content">
<div role="tabpanel" class="tab-pane active" id="run">
{% if execution_time or script.log %}
<div class="row">
<div class="col-md-12">
<div class="panel panel-default">
<div class="panel-heading">
<strong>Script Log</strong>
</div>
<table class="table table-hover panel-body">
<tr>
<th>Line</th>
<th>Level</th>
<th>Message</th>
</tr>
{% for level, message in script.log %}
<tr>
<td>{{ forloop.counter }}</td>
<td>{% log_level level %}</td>
<td class="rendered-markdown">{{ message|render_markdown }}</td>
</tr>
{% empty %}
<tr>
<td colspan="3" class="text-center text-muted">
No log output
</td>
</tr>
{% endfor %}
</table>
{% if execution_time %}
<div class="panel-footer text-right text-muted">
<small>Exec time: {{ execution_time|floatformat:3 }}s</small>
</div>
{% endif %}
</div>
</div>
</div>
{% endif %}
<div class="row">
<div class="col-md-6 col-md-offset-3">
{% if not perms.extras.run_script %}
@ -100,9 +61,6 @@
</div>
</div>
</div>
<div role="tabpanel" class="tab-pane" id="output">
<pre>{{ output }}</pre>
</div>
<div role="tabpanel" class="tab-pane" id="source">
<p><code>{{ script.filename }}</code></p>
<pre>{{ script.source }}</pre>

View File

@ -0,0 +1,107 @@
{% extends 'base.html' %}
{% load helpers %}
{% load form_helpers %}
{% load log_levels %}
{% load static %}
{% block title %}{{ script }}{% endblock %}
{% block content %}
<div class="row noprint">
<div class="col-md-12">
<ol class="breadcrumb">
<li><a href="{% url 'extras:script_list' %}">Scripts</a></li>
<li><a href="{% url 'extras:script_list' %}#module.{{ module }}">{{ module|bettertitle }}</a></li>
<li><a href="{% url 'extras:script' module=script.module name=class_name %}">{{ script }}</a></li>
<li>{{ result.created }}</li>
</ol>
</div>
</div>
<h1>{{ script }}</h1>
<p>{{ script.Meta.description }}</p>
<ul class="nav nav-tabs" role="tablist">
<li role="presentation" class="active">
<a href="#log" role="tab" data-toggle="tab" class="active">Log</a>
</li>
<li role="presentation">
<a href="#output" role="tab" data-toggle="tab">Output</a>
</li>
<li role="presentation">
<a href="#source" role="tab" data-toggle="tab">Source</a>
</li>
</ul>
<div class="tab-content">
<p>
Run: <strong>{{ result.created }}</strong>
{% if result.completed %}
Duration: <strong>{{ result.duration }}</strong>
{% else %}
<span id="pending-result-label">{% include 'extras/inc/job_label.html' with result=result %}</span>
<img id="pending-result-loader" src="{% static 'img/ajax-loader.gif' %}" />
{% endif %}
</p>
<div role="tabpanel" class="tab-pane active" id="log">
{% if result.completed %}
<div class="row">
<div class="col-md-12">
<div class="panel panel-default">
<div class="panel-heading">
<strong>Script Log</strong>
</div>
<table class="table table-hover panel-body">
<tr>
<th>Line</th>
<th>Level</th>
<th>Message</th>
</tr>
{% for log in result.data.log %}
<tr>
<td>{{ forloop.counter }}</td>
<td>{% log_level log.status %}</td>
<td class="rendered-markdown">{{ log.message|render_markdown }}</td>
</tr>
{% empty %}
<tr>
<td colspan="3" class="text-center text-muted">
No log output
</td>
</tr>
{% endfor %}
</table>
{% if execution_time %}
<div class="panel-footer text-right text-muted">
<small>Exec time: {{ execution_time|floatformat:3 }}s</small>
</div>
{% endif %}
</div>
</div>
</div>
{% endif %}
</div>
<div role="tabpanel" class="tab-pane" id="output">
<pre>{{ result.data.output }}</pre>
</div>
<div role="tabpanel" class="tab-pane" id="source">
<p><code>{{ script.filename }}</code></p>
<pre>{{ script.source }}</pre>
</div>
</div>
{% endblock %}
{% block javascript %}
<script type="text/javascript">
{% if not result.completed %}
var pending_result_id = {{ result.pk }};
{% else %}
var pending_result_id = null;
{% endif %}
function jobTerminatedAction(){
refreshWindow()
}
</script>
<script src="{% static 'js/job_result.js' %}?v{{ settings.VERSION }}"
onerror="window.location='{% url 'media_failure' %}?filename=js/job_result.js'"></script>
{% endblock %}

View File

@ -42,3 +42,25 @@ ADVISORY_LOCK_KEYS = {
'available-prefixes': 100100,
'available-ips': 100200,
}
#
# HTTP Request META safe copy
#
HTTP_REQUEST_META_SAFE_COPY = [
'CONTENT_LENGTH',
'CONTENT_TYPE',
'HTTP_ACCEPT',
'HTTP_ACCEPT_ENCODING',
'HTTP_ACCEPT_LANGUAGE',
'HTTP_HOST',
'HTTP_REFERER',
'HTTP_USER_AGENT',
'QUERY_STRING',
'REMOTE_ADDR',
'REMOTE_HOST',
'REMOTE_USER',
'REQUEST_METHOD',
'SERVER_NAME',
'SERVER_PORT',
]

View File

@ -5,11 +5,12 @@ from collections import OrderedDict
from django.core.serializers import serialize
from django.db.models import Count, OuterRef, Subquery
from django.http import QueryDict
from django.http.request import HttpRequest
from jinja2 import Environment
from dcim.choices import CableLengthUnitChoices
from extras.utils import is_taggable
from utilities.constants import HTTP_REQUEST_META_SAFE_COPY
def csv_format(data):
"""
@ -257,3 +258,37 @@ def flatten_dict(d, prefix='', separator='.'):
else:
ret[key] = v
return ret
#
# Fake request object
#
class NetBoxFakeRequest:
"""
A fake request object which is explicitly defined at the module level so it is able to be pickled. It simply
takes what is passed to it as kwargs on init and sets them as instance variables.
"""
def __init__(self, *args, **kwargs):
for k, v in kwargs.items():
setattr(self, k, v)
def copy_safe_request(request):
"""
Copy selected attributes from a request object into a new fake request object. This is needed in places where
thread safe pickling of the useful request data is needed.
"""
meta = {
k: request.META[k]
for k in HTTP_REQUEST_META_SAFE_COPY
if k in request.META and isinstance(request.META[k], str)
}
return NetBoxFakeRequest(**{
'META': meta,
'POST': request.POST,
'GET': request.GET,
'FILES': request.FILES,
'user': request.user,
'path': request.path
})

View File

@ -39,6 +39,40 @@ from .paginator import EnhancedPaginator, get_paginate_count
# Mixins
#
class ContentTypePermissionRequiredMixin(AccessMixin):
"""
Similar to Django's built-in PermissionRequiredMixin, but extended to check model-level permission assignments.
This is related to ObjectPermissionRequiredMixin, except that is does not enforce object-level permissions,
and fits within NetBox's custom permission enforcement system.
additional_permissions: An optional iterable of statically declared permissions to evaluate in addition to those
derived from the object type
"""
additional_permissions = list()
def get_required_permission(self):
"""
Return the specific permission necessary to perform the requested action on an object.
"""
raise NotImplementedError(f"{self.__class__.__name__} must implement get_required_permission()")
def has_permission(self):
user = self.request.user
permission_required = self.get_required_permission()
# Check that the user has been granted the required permission(s).
if user.has_perms((permission_required, *self.additional_permissions)):
return True
return False
def dispatch(self, request, *args, **kwargs):
if not self.has_permission():
return self.handle_no_permission()
return super().dispatch(request, *args, **kwargs)
class ObjectPermissionRequiredMixin(AccessMixin):
"""
Similar to Django's built-in PermissionRequiredMixin, but extended to check for both model-level and object-level