1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00

507 lines
19 KiB
Python
Raw Normal View History

from collections import OrderedDict
from datetime import date
2017-03-08 16:12:14 -05:00
import graphviz
from django.contrib.auth.models import User
2016-08-12 17:20:01 -04:00
from django.contrib.contenttypes.fields import GenericForeignKey
2016-03-01 11:23:03 -05:00
from django.contrib.contenttypes.models import ContentType
2016-08-12 17:20:01 -04:00
from django.core.validators import ValidationError
2016-03-01 11:23:03 -05:00
from django.db import models
2017-03-08 16:12:14 -05:00
from django.db.models import Q
2016-03-01 11:23:03 -05:00
from django.http import HttpResponse
from django.template import Template, Context
from django.utils.encoding import python_2_unicode_compatible
2016-05-24 09:45:40 -04:00
from django.utils.safestring import mark_safe
2016-03-01 11:23:03 -05:00
2016-04-08 14:57:54 -04:00
2016-08-12 17:20:01 -04:00
CUSTOMFIELD_MODELS = (
'site', 'rack', 'devicetype', 'device', # DCIM
2016-08-12 17:20:01 -04:00
'aggregate', 'prefix', 'ipaddress', 'vlan', 'vrf', # IPAM
'provider', 'circuit', # Circuits
'tenant', # Tenants
)
CF_TYPE_TEXT = 100
CF_TYPE_INTEGER = 200
CF_TYPE_BOOLEAN = 300
CF_TYPE_DATE = 400
CF_TYPE_URL = 500
CF_TYPE_SELECT = 600
2016-08-12 17:20:01 -04:00
CUSTOMFIELD_TYPE_CHOICES = (
(CF_TYPE_TEXT, 'Text'),
(CF_TYPE_INTEGER, 'Integer'),
(CF_TYPE_BOOLEAN, 'Boolean (true/false)'),
(CF_TYPE_DATE, 'Date'),
(CF_TYPE_URL, 'URL'),
2016-08-12 17:20:01 -04:00
(CF_TYPE_SELECT, 'Selection'),
)
2016-03-01 11:23:03 -05:00
GRAPH_TYPE_INTERFACE = 100
GRAPH_TYPE_PROVIDER = 200
GRAPH_TYPE_SITE = 300
GRAPH_TYPE_CHOICES = (
(GRAPH_TYPE_INTERFACE, 'Interface'),
(GRAPH_TYPE_PROVIDER, 'Provider'),
(GRAPH_TYPE_SITE, 'Site'),
)
EXPORTTEMPLATE_MODELS = [
'site', 'rack', 'device', 'consoleport', 'powerport', 'interfaceconnection', # DCIM
'aggregate', 'prefix', 'ipaddress', 'vlan', # IPAM
'provider', 'circuit', # Circuits
'tenant', # Tenants
2016-03-01 11:23:03 -05:00
]
ACTION_CREATE = 1
ACTION_IMPORT = 2
ACTION_EDIT = 3
ACTION_BULK_EDIT = 4
ACTION_DELETE = 5
ACTION_BULK_DELETE = 6
ACTION_BULK_CREATE = 7
ACTION_CHOICES = (
(ACTION_CREATE, 'created'),
(ACTION_BULK_CREATE, 'bulk created'),
(ACTION_IMPORT, 'imported'),
(ACTION_EDIT, 'modified'),
(ACTION_BULK_EDIT, 'bulk edited'),
(ACTION_DELETE, 'deleted'),
(ACTION_BULK_DELETE, 'bulk deleted'),
)
2016-03-01 11:23:03 -05:00
2017-03-08 16:12:14 -05:00
#
# Custom fields
#
2016-08-15 15:24:23 -04:00
class CustomFieldModel(object):
def cf(self):
"""
Name-based CustomFieldValue accessor for use in templates
"""
2016-09-21 11:43:22 -04:00
if not hasattr(self, 'get_custom_fields'):
return dict()
2016-09-21 11:43:22 -04:00
return {field.name: value for field, value in self.get_custom_fields().items()}
def get_custom_fields(self):
"""
Return a dictionary of custom fields for a single object in the form {<field>: value}.
"""
2016-08-15 15:24:23 -04:00
# Find all custom fields applicable to this type of object
content_type = ContentType.objects.get_for_model(self)
fields = CustomField.objects.filter(obj_type=content_type)
# If the object exists, populate its custom fields with values
if hasattr(self, 'pk'):
values = CustomFieldValue.objects.filter(obj_type=content_type, obj_id=self.pk).select_related('field')
values_dict = {cfv.field_id: cfv.value for cfv in values}
return OrderedDict([(field, values_dict.get(field.pk)) for field in fields])
2016-08-15 15:24:23 -04:00
else:
return OrderedDict([(field, None) for field in fields])
2016-08-15 15:24:23 -04:00
@python_2_unicode_compatible
2016-08-12 17:20:01 -04:00
class CustomField(models.Model):
2016-08-15 15:24:23 -04:00
obj_type = models.ManyToManyField(ContentType, related_name='custom_fields', verbose_name='Object(s)',
limit_choices_to={'model__in': CUSTOMFIELD_MODELS},
help_text="The object(s) to which this field applies.")
2016-08-12 17:20:01 -04:00
type = models.PositiveSmallIntegerField(choices=CUSTOMFIELD_TYPE_CHOICES, default=CF_TYPE_TEXT)
name = models.CharField(max_length=50, unique=True)
2016-08-15 15:24:23 -04:00
label = models.CharField(max_length=50, blank=True, help_text="Name of the field as displayed to users (if not "
"provided, the field's name will be used)")
2016-08-12 17:20:01 -04:00
description = models.CharField(max_length=100, blank=True)
2016-08-15 15:24:23 -04:00
required = models.BooleanField(default=False, help_text="Determines whether this field is required when creating "
"new objects or editing an existing object.")
is_filterable = models.BooleanField(default=True, help_text="This field can be used to filter objects.")
2016-08-18 11:44:40 -04:00
default = models.CharField(max_length=100, blank=True, help_text="Default value for the field. Use \"true\" or "
"\"false\" for booleans. N/A for selection "
2016-08-15 15:24:23 -04:00
"fields.")
weight = models.PositiveSmallIntegerField(default=100, help_text="Fields with higher weights appear lower in a "
"form")
2016-08-12 17:20:01 -04:00
class Meta:
2016-08-23 12:25:23 -04:00
ordering = ['weight', 'name']
2016-08-12 17:20:01 -04:00
def __str__(self):
2016-08-18 14:23:28 -04:00
return self.label or self.name.replace('_', ' ').capitalize()
2016-08-12 17:20:01 -04:00
def serialize_value(self, value):
"""
Serialize the given value to a string suitable for storage as a CustomFieldValue
"""
if value is None:
return ''
if self.type == CF_TYPE_BOOLEAN:
return str(int(bool(value)))
if self.type == CF_TYPE_DATE:
return value.strftime('%Y-%m-%d')
if self.type == CF_TYPE_SELECT:
# Could be ModelChoiceField or TypedChoiceField
return str(value.id) if hasattr(value, 'id') else str(value)
return value
def deserialize_value(self, serialized_value):
"""
Convert a string into the object it represents depending on the type of field
"""
if serialized_value is '':
return None
if self.type == CF_TYPE_INTEGER:
return int(serialized_value)
if self.type == CF_TYPE_BOOLEAN:
return bool(int(serialized_value))
if self.type == CF_TYPE_DATE:
# Read date as YYYY-MM-DD
return date(*[int(n) for n in serialized_value.split('-')])
if self.type == CF_TYPE_SELECT:
try:
return self.choices.get(pk=int(serialized_value))
except CustomFieldChoice.DoesNotExist:
return None
return serialized_value
2016-08-12 17:20:01 -04:00
@python_2_unicode_compatible
2016-08-12 17:20:01 -04:00
class CustomFieldValue(models.Model):
field = models.ForeignKey('CustomField', related_name='values', on_delete=models.CASCADE)
2016-08-12 17:20:01 -04:00
obj_type = models.ForeignKey(ContentType, related_name='+', on_delete=models.PROTECT)
obj_id = models.PositiveIntegerField()
obj = GenericForeignKey('obj_type', 'obj_id')
serialized_value = models.CharField(max_length=255)
2016-08-12 17:20:01 -04:00
class Meta:
ordering = ['obj_type', 'obj_id']
2016-08-15 15:24:23 -04:00
unique_together = ['field', 'obj_type', 'obj_id']
2016-08-12 17:20:01 -04:00
def __str__(self):
return u'{} {}'.format(self.obj, self.field)
2016-08-12 17:20:01 -04:00
@property
def value(self):
return self.field.deserialize_value(self.serialized_value)
2016-08-12 17:20:01 -04:00
@value.setter
def value(self, value):
self.serialized_value = self.field.serialize_value(value)
2016-08-12 17:20:01 -04:00
def save(self, *args, **kwargs):
# Delete this object if it no longer has a value to store
if self.pk and self.value is None:
self.delete()
else:
super(CustomFieldValue, self).save(*args, **kwargs)
2016-08-12 17:20:01 -04:00
@python_2_unicode_compatible
2016-08-12 17:20:01 -04:00
class CustomFieldChoice(models.Model):
field = models.ForeignKey('CustomField', related_name='choices', limit_choices_to={'type': CF_TYPE_SELECT},
on_delete=models.CASCADE)
value = models.CharField(max_length=100)
weight = models.PositiveSmallIntegerField(default=100, help_text="Higher weights appear lower in the list")
2016-08-12 17:20:01 -04:00
class Meta:
ordering = ['field', 'weight', 'value']
unique_together = ['field', 'value']
def __str__(self):
2016-08-12 17:20:01 -04:00
return self.value
def clean(self):
if self.field.type != CF_TYPE_SELECT:
raise ValidationError("Custom field choices can only be assigned to selection fields.")
def delete(self, using=None, keep_parents=False):
# When deleting a CustomFieldChoice, delete all CustomFieldValues which point to it
pk = self.pk
super(CustomFieldChoice, self).delete(using, keep_parents)
CustomFieldValue.objects.filter(field__type=CF_TYPE_SELECT, serialized_value=str(pk)).delete()
2016-08-12 17:20:01 -04:00
2017-03-08 16:12:14 -05:00
#
# Graphs
#
@python_2_unicode_compatible
2016-03-01 11:23:03 -05:00
class Graph(models.Model):
type = models.PositiveSmallIntegerField(choices=GRAPH_TYPE_CHOICES)
weight = models.PositiveSmallIntegerField(default=1000)
name = models.CharField(max_length=100, verbose_name='Name')
source = models.CharField(max_length=500, verbose_name='Source URL')
link = models.URLField(verbose_name='Link URL', blank=True)
class Meta:
ordering = ['type', 'weight', 'name']
def __str__(self):
2016-03-01 11:23:03 -05:00
return self.name
def embed_url(self, obj):
template = Template(self.source)
return template.render(Context({'obj': obj}))
def embed_link(self, obj):
if self.link is None:
return ''
template = Template(self.link)
return template.render(Context({'obj': obj}))
2016-03-01 11:23:03 -05:00
2017-03-08 16:12:14 -05:00
#
# Export templates
#
@python_2_unicode_compatible
2016-03-01 11:23:03 -05:00
class ExportTemplate(models.Model):
content_type = models.ForeignKey(
ContentType, limit_choices_to={'model__in': EXPORTTEMPLATE_MODELS}, on_delete=models.CASCADE
)
name = models.CharField(max_length=100)
description = models.CharField(max_length=200, blank=True)
2016-03-01 11:23:03 -05:00
template_code = models.TextField()
mime_type = models.CharField(max_length=15, blank=True)
file_extension = models.CharField(max_length=15, blank=True)
class Meta:
ordering = ['content_type', 'name']
unique_together = [
['content_type', 'name']
]
def __str__(self):
return u'{}: {}'.format(self.content_type, self.name)
2016-03-01 11:23:03 -05:00
def to_response(self, context_dict, filename):
"""
Render the template to an HTTP response, delivered as a named file attachment
"""
template = Template(self.template_code)
mime_type = 'text/plain' if not self.mime_type else self.mime_type
output = template.render(Context(context_dict))
# Replace CRLF-style line terminators
output = output.replace('\r\n', '\n')
response = HttpResponse(output, content_type=mime_type)
2016-03-01 11:23:03 -05:00
if self.file_extension:
filename += '.{}'.format(self.file_extension)
response['Content-Disposition'] = 'attachment; filename="{}"'.format(filename)
return response
2016-04-08 14:57:54 -04:00
2017-03-08 16:12:14 -05:00
#
# Topology maps
#
@python_2_unicode_compatible
2016-04-08 14:57:54 -04:00
class TopologyMap(models.Model):
name = models.CharField(max_length=50, unique=True)
slug = models.SlugField(unique=True)
site = models.ForeignKey('dcim.Site', related_name='topology_maps', blank=True, null=True, on_delete=models.CASCADE)
device_patterns = models.TextField(
help_text="Identify devices to include in the diagram using regular expressions, one per line. Each line will "
"result in a new tier of the drawing. Separate multiple regexes within a line using semicolons. "
"Devices will be rendered in the order they are defined."
)
2016-04-08 14:57:54 -04:00
description = models.CharField(max_length=100, blank=True)
class Meta:
ordering = ['name']
def __str__(self):
2016-04-08 14:57:54 -04:00
return self.name
@property
def device_sets(self):
if not self.device_patterns:
return None
return [line.strip() for line in self.device_patterns.split('\n')]
2017-03-08 16:30:32 -05:00
def render(self, img_format='png'):
2017-03-08 16:12:14 -05:00
from circuits.models import CircuitTermination
2017-03-08 16:12:14 -05:00
from dcim.models import Device, InterfaceConnection
# Construct the graph
graph = graphviz.Graph()
graph.graph_attr['ranksep'] = '1'
for i, device_set in enumerate(self.device_sets):
subgraph = graphviz.Graph(name='sg{}'.format(i))
subgraph.graph_attr['rank'] = 'same'
# Add a pseudonode for each device_set to enforce hierarchical layout
subgraph.node('set{}'.format(i), label='', shape='none', width='0')
if i:
graph.edge('set{}'.format(i - 1), 'set{}'.format(i), style='invis')
# Add each device to the graph
devices = []
for query in device_set.split(';'): # Split regexes on semicolons
2017-04-06 14:12:30 -04:00
devices += Device.objects.filter(name__regex=query).select_related('device_role')
2017-03-08 16:12:14 -05:00
for d in devices:
2017-04-06 14:12:30 -04:00
fillcolor = '#{}'.format(d.device_role.color)
subgraph.node(d.name, style='filled', fillcolor=fillcolor)
2017-03-08 16:12:14 -05:00
# Add an invisible connection to each successive device in a set to enforce horizontal order
for j in range(0, len(devices) - 1):
subgraph.edge(devices[j].name, devices[j + 1].name, style='invis')
graph.subgraph(subgraph)
# Compile list of all devices
device_superset = Q()
for device_set in self.device_sets:
for query in device_set.split(';'): # Split regexes on semicolons
device_superset = device_superset | Q(name__regex=query)
# Add all interface connections to the graph
2017-03-08 16:12:14 -05:00
devices = Device.objects.filter(*(device_superset,))
connections = InterfaceConnection.objects.filter(
interface_a__device__in=devices, interface_b__device__in=devices
)
for c in connections:
graph.edge(c.interface_a.device.name, c.interface_b.device.name)
# Add all circuits to the graph
for termination in CircuitTermination.objects.filter(term_side='A', interface__device__in=devices):
peer_termination = termination.get_peer_termination()
if peer_termination is not None and peer_termination.interface.device in devices:
graph.edge(termination.interface.device.name, peer_termination.interface.device.name, color='blue')
2017-03-08 16:30:32 -05:00
return graph.pipe(format=img_format)
2017-03-08 16:12:14 -05:00
#
# Image attachments
#
def image_upload(instance, filename):
path = 'image-attachments/'
# Rename the file to the provided name, if any. Attempt to preserve the file extension.
extension = filename.rsplit('.')[-1]
if instance.name and extension in ['bmp', 'gif', 'jpeg', 'jpg', 'png']:
filename = '.'.join([instance.name, extension])
elif instance.name:
filename = instance.name
return '{}{}_{}_{}'.format(path, instance.content_type.name, instance.object_id, filename)
@python_2_unicode_compatible
class ImageAttachment(models.Model):
"""
An uploaded image which is associated with an object.
"""
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
object_id = models.PositiveIntegerField()
parent = GenericForeignKey('content_type', 'object_id')
image = models.ImageField(upload_to=image_upload, height_field='image_height', width_field='image_width')
image_height = models.PositiveSmallIntegerField()
image_width = models.PositiveSmallIntegerField()
name = models.CharField(max_length=50, blank=True)
created = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ['name']
def __str__(self):
if self.name:
return self.name
filename = self.image.name.rsplit('/', 1)[-1]
return filename.split('_', 2)[2]
def delete(self, *args, **kwargs):
_name = self.image.name
super(ImageAttachment, self).delete(*args, **kwargs)
# Delete file from disk
self.image.delete(save=False)
# Deleting the file erases its name. We restore the image's filename here in case we still need to reference it
# before the request finishes. (For example, to display a message indicating the ImageAttachment was deleted.)
self.image.name = _name
2017-03-08 16:12:14 -05:00
#
# User actions
#
class UserActionManager(models.Manager):
# Actions affecting a single object
def log_action(self, user, obj, action, message):
self.model.objects.create(
2016-06-16 15:02:01 -04:00
content_type=ContentType.objects.get_for_model(obj),
object_id=obj.pk,
user=user,
action=action,
message=message,
)
def log_create(self, user, obj, message=''):
self.log_action(user, obj, ACTION_CREATE, message)
def log_edit(self, user, obj, message=''):
self.log_action(user, obj, ACTION_EDIT, message)
def log_delete(self, user, obj, message=''):
self.log_action(user, obj, ACTION_DELETE, message)
# Actions affecting multiple objects
def log_bulk_action(self, user, content_type, action, message):
self.model.objects.create(
content_type=content_type,
user=user,
action=action,
message=message,
)
def log_import(self, user, content_type, message=''):
self.log_bulk_action(user, content_type, ACTION_IMPORT, message)
def log_bulk_create(self, user, content_type, message=''):
self.log_bulk_action(user, content_type, ACTION_BULK_CREATE, message)
def log_bulk_edit(self, user, content_type, message=''):
self.log_bulk_action(user, content_type, ACTION_BULK_EDIT, message)
def log_bulk_delete(self, user, content_type, message=''):
self.log_bulk_action(user, content_type, ACTION_BULK_DELETE, message)
@python_2_unicode_compatible
class UserAction(models.Model):
"""
A record of an action (add, edit, or delete) performed on an object by a User.
"""
time = models.DateTimeField(auto_now_add=True, editable=False)
2016-05-24 09:45:40 -04:00
user = models.ForeignKey(User, related_name='actions', on_delete=models.CASCADE)
content_type = models.ForeignKey(ContentType, on_delete=models.CASCADE)
object_id = models.PositiveIntegerField(blank=True, null=True)
action = models.PositiveSmallIntegerField(choices=ACTION_CHOICES)
message = models.TextField(blank=True)
objects = UserActionManager()
class Meta:
ordering = ['-time']
2016-05-24 09:45:40 -04:00
def __str__(self):
2016-05-24 09:45:40 -04:00
if self.message:
return u'{} {}'.format(self.user, self.message)
return u'{} {} {}'.format(self.user, self.get_action_display(), self.content_type)
2016-05-24 09:45:40 -04:00
def icon(self):
if self.action in [ACTION_CREATE, ACTION_BULK_CREATE, ACTION_IMPORT]:
2016-05-24 09:45:40 -04:00
return mark_safe('<i class="glyphicon glyphicon-plus text-success"></i>')
elif self.action in [ACTION_EDIT, ACTION_BULK_EDIT]:
return mark_safe('<i class="glyphicon glyphicon-pencil text-warning"></i>')
elif self.action in [ACTION_DELETE, ACTION_BULK_DELETE]:
return mark_safe('<i class="glyphicon glyphicon-remove text-danger"></i>')
else:
return ''