import json
import urllib.parse
from django.conf import settings
from django.contrib import admin
from django.contrib.auth.models import User
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.core.cache import cache
from django.core.validators import ValidationError
from django.db import models
from django.http import HttpResponse, QueryDict
from django.urls import reverse
from django.utils import timezone
from django.utils.formats import date_format
from django.utils.translation import gettext as _
from rest_framework.utils.encoders import JSONEncoder
from extras.choices import *
from extras.conditions import ConditionSet
from extras.constants import *
from extras.utils import FeatureQuery, image_upload
from netbox.config import get_config
from netbox.models import ChangeLoggedModel
from netbox.models.features import (
CloningMixin, CustomFieldsMixin, CustomLinksMixin, ExportTemplatesMixin, SyncedDataMixin, TagsMixin,
)
from utilities.querysets import RestrictedQuerySet
from utilities.utils import clean_html, render_jinja2
__all__ = (
'ConfigRevision',
'CustomLink',
'ExportTemplate',
'ImageAttachment',
'JournalEntry',
'SavedFilter',
'Webhook',
)
class Webhook(ExportTemplatesMixin, ChangeLoggedModel):
"""
A Webhook defines a request that will be sent to a remote application when an object is created, updated, and/or
delete in NetBox. The request will contain a representation of the object, which the remote application can act on.
Each Webhook can be limited to firing only on certain actions or certain object types.
"""
content_types = models.ManyToManyField(
to=ContentType,
related_name='webhooks',
verbose_name='Object types',
limit_choices_to=FeatureQuery('webhooks'),
help_text=_("The object(s) to which this Webhook applies.")
)
name = models.CharField(
max_length=150,
unique=True
)
type_create = models.BooleanField(
default=False,
help_text=_("Triggers when a matching object is created.")
)
type_update = models.BooleanField(
default=False,
help_text=_("Triggers when a matching object is updated.")
)
type_delete = models.BooleanField(
default=False,
help_text=_("Triggers when a matching object is deleted.")
)
type_job_start = models.BooleanField(
default=False,
help_text=_("Triggers when a job for a matching object is started.")
)
type_job_end = models.BooleanField(
default=False,
help_text=_("Triggers when a job for a matching object terminates.")
)
payload_url = models.CharField(
max_length=500,
verbose_name='URL',
help_text=_('This URL will be called using the HTTP method defined when the webhook is called. '
'Jinja2 template processing is supported with the same context as the request body.')
)
enabled = models.BooleanField(
default=True
)
http_method = models.CharField(
max_length=30,
choices=WebhookHttpMethodChoices,
default=WebhookHttpMethodChoices.METHOD_POST,
verbose_name='HTTP method'
)
http_content_type = models.CharField(
max_length=100,
default=HTTP_CONTENT_TYPE_JSON,
verbose_name='HTTP content type',
help_text=_('The complete list of official content types is available '
'here.')
)
additional_headers = models.TextField(
blank=True,
help_text=_("User-supplied HTTP headers to be sent with the request in addition to the HTTP content type. "
"Headers should be defined in the format Name: Value
. Jinja2 template processing is "
"supported with the same context as the request body (below).")
)
body_template = models.TextField(
blank=True,
help_text=_('Jinja2 template for a custom request body. If blank, a JSON object representing the change will be '
'included. Available context data includes: event
, model
, '
'timestamp
, username
, request_id
, and data
.')
)
secret = models.CharField(
max_length=255,
blank=True,
help_text=_("When provided, the request will include a 'X-Hook-Signature' "
"header containing a HMAC hex digest of the payload body using "
"the secret as the key. The secret is not transmitted in "
"the request.")
)
conditions = models.JSONField(
blank=True,
null=True,
help_text=_("A set of conditions which determine whether the webhook will be generated.")
)
ssl_verification = models.BooleanField(
default=True,
verbose_name='SSL verification',
help_text=_("Enable SSL certificate verification. Disable with caution!")
)
ca_file_path = models.CharField(
max_length=4096,
null=True,
blank=True,
verbose_name='CA File Path',
help_text=_('The specific CA certificate file to use for SSL verification. '
'Leave blank to use the system defaults.')
)
class Meta:
ordering = ('name',)
constraints = (
models.UniqueConstraint(
fields=('payload_url', 'type_create', 'type_update', 'type_delete'),
name='%(app_label)s_%(class)s_unique_payload_url_types'
),
)
def __str__(self):
return self.name
def get_absolute_url(self):
return reverse('extras:webhook', args=[self.pk])
@property
def docs_url(self):
return f'{settings.STATIC_URL}docs/models/extras/webhook/'
def clean(self):
super().clean()
# At least one action type must be selected
if not any([
self.type_create, self.type_update, self.type_delete, self.type_job_start, self.type_job_end
]):
raise ValidationError(
"At least one event type must be selected: create, update, delete, job_start, and/or job_end."
)
if self.conditions:
try:
ConditionSet(self.conditions)
except ValueError as e:
raise ValidationError({'conditions': e})
# CA file path requires SSL verification enabled
if not self.ssl_verification and self.ca_file_path:
raise ValidationError({
'ca_file_path': 'Do not specify a CA certificate file if SSL verification is disabled.'
})
def render_headers(self, context):
"""
Render additional_headers and return a dict of Header: Value pairs.
"""
if not self.additional_headers:
return {}
ret = {}
data = render_jinja2(self.additional_headers, context)
for line in data.splitlines():
header, value = line.split(':', 1)
ret[header.strip()] = value.strip()
return ret
def render_body(self, context):
"""
Render the body template, if defined. Otherwise, jump the context as a JSON object.
"""
if self.body_template:
return render_jinja2(self.body_template, context)
else:
return json.dumps(context, cls=JSONEncoder)
def render_payload_url(self, context):
"""
Render the payload URL.
"""
return render_jinja2(self.payload_url, context)
class CustomLink(CloningMixin, ExportTemplatesMixin, ChangeLoggedModel):
"""
A custom link to an external representation of a NetBox object. The link text and URL fields accept Jinja2 template
code to be rendered with an object as context.
"""
content_types = models.ManyToManyField(
to=ContentType,
related_name='custom_links',
help_text=_('The object type(s) to which this link applies.')
)
name = models.CharField(
max_length=100,
unique=True
)
enabled = models.BooleanField(
default=True
)
link_text = models.TextField(
help_text=_("Jinja2 template code for link text")
)
link_url = models.TextField(
verbose_name='Link URL',
help_text=_("Jinja2 template code for link URL")
)
weight = models.PositiveSmallIntegerField(
default=100
)
group_name = models.CharField(
max_length=50,
blank=True,
help_text=_("Links with the same group will appear as a dropdown menu")
)
button_class = models.CharField(
max_length=30,
choices=CustomLinkButtonClassChoices,
default=CustomLinkButtonClassChoices.DEFAULT,
help_text=_("The class of the first link in a group will be used for the dropdown button")
)
new_window = models.BooleanField(
default=False,
help_text=_("Force link to open in a new window")
)
clone_fields = (
'content_types', 'enabled', 'weight', 'group_name', 'button_class', 'new_window',
)
class Meta:
ordering = ['group_name', 'weight', 'name']
def __str__(self):
return self.name
def get_absolute_url(self):
return reverse('extras:customlink', args=[self.pk])
@property
def docs_url(self):
return f'{settings.STATIC_URL}docs/models/extras/customlink/'
def render(self, context):
"""
Render the CustomLink given the provided context, and return the text, link, and link_target.
:param context: The context passed to Jinja2
"""
text = render_jinja2(self.link_text, context).strip()
if not text:
return {}
link = render_jinja2(self.link_url, context).strip()
link_target = ' target="_blank"' if self.new_window else ''
# Sanitize link text
allowed_schemes = get_config().ALLOWED_URL_SCHEMES
text = clean_html(text, allowed_schemes)
# Sanitize link
link = urllib.parse.quote_plus(link, safe='/:?&=%+[]@#')
# Verify link scheme is allowed
result = urllib.parse.urlparse(link)
if result.scheme and result.scheme not in allowed_schemes:
link = ""
return {
'text': text,
'link': link,
'link_target': link_target,
}
class ExportTemplate(SyncedDataMixin, CloningMixin, ExportTemplatesMixin, ChangeLoggedModel):
content_types = models.ManyToManyField(
to=ContentType,
related_name='export_templates',
help_text=_('The object type(s) to which this template applies.')
)
name = models.CharField(
max_length=100
)
description = models.CharField(
max_length=200,
blank=True
)
template_code = models.TextField(
help_text=_('Jinja2 template code. The list of objects being exported is passed as a context variable named '
'queryset
.')
)
mime_type = models.CharField(
max_length=50,
blank=True,
verbose_name='MIME type',
help_text=_('Defaults to text/plain; charset=utf-8
')
)
file_extension = models.CharField(
max_length=15,
blank=True,
help_text=_('Extension to append to the rendered filename')
)
as_attachment = models.BooleanField(
default=True,
help_text=_("Download file as attachment")
)
clone_fields = (
'content_types', 'template_code', 'mime_type', 'file_extension', 'as_attachment',
)
class Meta:
ordering = ('name',)
def __str__(self):
return self.name
def get_absolute_url(self):
return reverse('extras:exporttemplate', args=[self.pk])
@property
def docs_url(self):
return f'{settings.STATIC_URL}docs/models/extras/exporttemplate/'
def clean(self):
super().clean()
if self.name.lower() == 'table':
raise ValidationError({
'name': f'"{self.name}" is a reserved name. Please choose a different name.'
})
def sync_data(self):
"""
Synchronize template content from the designated DataFile (if any).
"""
self.template_code = self.data_file.data_as_string
def render(self, queryset):
"""
Render the contents of the template.
"""
context = {
'queryset': queryset
}
output = render_jinja2(self.template_code, context)
# Replace CRLF-style line terminators
output = output.replace('\r\n', '\n')
return output
def render_to_response(self, queryset):
"""
Render the template to an HTTP response, delivered as a named file attachment
"""
output = self.render(queryset)
mime_type = 'text/plain; charset=utf-8' if not self.mime_type else self.mime_type
# Build the response
response = HttpResponse(output, content_type=mime_type)
if self.as_attachment:
basename = queryset.model._meta.verbose_name_plural.replace(' ', '_')
extension = f'.{self.file_extension}' if self.file_extension else ''
filename = f'netbox_{basename}{extension}'
response['Content-Disposition'] = f'attachment; filename="{filename}"'
return response
class SavedFilter(CloningMixin, ExportTemplatesMixin, ChangeLoggedModel):
"""
A set of predefined keyword parameters that can be reused to filter for specific objects.
"""
content_types = models.ManyToManyField(
to=ContentType,
related_name='saved_filters',
help_text=_('The object type(s) to which this filter applies.')
)
name = models.CharField(
max_length=100,
unique=True
)
slug = models.SlugField(
max_length=100,
unique=True
)
description = models.CharField(
max_length=200,
blank=True
)
user = models.ForeignKey(
to=User,
on_delete=models.SET_NULL,
blank=True,
null=True
)
weight = models.PositiveSmallIntegerField(
default=100
)
enabled = models.BooleanField(
default=True
)
shared = models.BooleanField(
default=True
)
parameters = models.JSONField()
clone_fields = (
'content_types', 'weight', 'enabled', 'parameters',
)
class Meta:
ordering = ('weight', 'name')
def __str__(self):
return self.name
def get_absolute_url(self):
return reverse('extras:savedfilter', args=[self.pk])
@property
def docs_url(self):
return f'{settings.STATIC_URL}docs/models/extras/savedfilter/'
def clean(self):
super().clean()
# Verify that `parameters` is a JSON object
if type(self.parameters) is not dict:
raise ValidationError(
{'parameters': 'Filter parameters must be stored as a dictionary of keyword arguments.'}
)
@property
def url_params(self):
qd = QueryDict(mutable=True)
qd.update(self.parameters)
return qd.urlencode()
class ImageAttachment(ChangeLoggedModel):
"""
An uploaded image which is associated with an object.
"""
content_type = models.ForeignKey(
to=ContentType,
on_delete=models.CASCADE
)
object_id = models.PositiveBigIntegerField()
parent = GenericForeignKey(
ct_field='content_type',
fk_field='object_id'
)
image = models.ImageField(
upload_to=image_upload,
height_field='image_height',
width_field='image_width'
)
image_height = models.PositiveSmallIntegerField()
image_width = models.PositiveSmallIntegerField()
name = models.CharField(
max_length=50,
blank=True
)
objects = RestrictedQuerySet.as_manager()
clone_fields = ('content_type', 'object_id')
class Meta:
ordering = ('name', 'pk') # name may be non-unique
def __str__(self):
if self.name:
return self.name
filename = self.image.name.rsplit('/', 1)[-1]
return filename.split('_', 2)[2]
def delete(self, *args, **kwargs):
_name = self.image.name
super().delete(*args, **kwargs)
# Delete file from disk
self.image.delete(save=False)
# Deleting the file erases its name. We restore the image's filename here in case we still need to reference it
# before the request finishes. (For example, to display a message indicating the ImageAttachment was deleted.)
self.image.name = _name
@property
def size(self):
"""
Wrapper around `image.size` to suppress an OSError in case the file is inaccessible. Also opportunistically
catch other exceptions that we know other storage back-ends to throw.
"""
expected_exceptions = [OSError]
try:
from botocore.exceptions import ClientError
expected_exceptions.append(ClientError)
except ImportError:
pass
try:
return self.image.size
except tuple(expected_exceptions):
return None
def to_objectchange(self, action):
objectchange = super().to_objectchange(action)
objectchange.related_object = self.parent
return objectchange
class JournalEntry(CustomFieldsMixin, CustomLinksMixin, TagsMixin, ExportTemplatesMixin, ChangeLoggedModel):
"""
A historical remark concerning an object; collectively, these form an object's journal. The journal is used to
preserve historical context around an object, and complements NetBox's built-in change logging. For example, you
might record a new journal entry when a device undergoes maintenance, or when a prefix is expanded.
"""
assigned_object_type = models.ForeignKey(
to=ContentType,
on_delete=models.CASCADE
)
assigned_object_id = models.PositiveBigIntegerField()
assigned_object = GenericForeignKey(
ct_field='assigned_object_type',
fk_field='assigned_object_id'
)
created_by = models.ForeignKey(
to=User,
on_delete=models.SET_NULL,
blank=True,
null=True
)
kind = models.CharField(
max_length=30,
choices=JournalEntryKindChoices,
default=JournalEntryKindChoices.KIND_INFO
)
comments = models.TextField()
class Meta:
ordering = ('-created',)
verbose_name_plural = 'journal entries'
def __str__(self):
created = timezone.localtime(self.created)
return f"{date_format(created, format='SHORT_DATETIME_FORMAT')} ({self.get_kind_display()})"
def get_absolute_url(self):
return reverse('extras:journalentry', args=[self.pk])
def clean(self):
super().clean()
# Prevent the creation of journal entries on unsupported models
permitted_types = ContentType.objects.filter(FeatureQuery('journaling').get_query())
if self.assigned_object_type not in permitted_types:
raise ValidationError(f"Journaling is not supported for this object type ({self.assigned_object_type}).")
def get_kind_color(self):
return JournalEntryKindChoices.colors.get(self.kind)
class ConfigRevision(models.Model):
"""
An atomic revision of NetBox's configuration.
"""
created = models.DateTimeField(
auto_now_add=True
)
comment = models.CharField(
max_length=200,
blank=True
)
data = models.JSONField(
blank=True,
null=True,
verbose_name='Configuration data'
)
def __str__(self):
return f'Config revision #{self.pk} ({self.created})'
def __getattr__(self, item):
if item in self.data:
return self.data[item]
return super().__getattribute__(item)
def activate(self):
"""
Cache the configuration data.
"""
cache.set('config', self.data, None)
cache.set('config_version', self.pk, None)
@admin.display(boolean=True)
def is_active(self):
return cache.get('config_version') == self.pk