1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00
Files
netbox-community-netbox/netbox/utilities/views.py

483 lines
19 KiB
Python
Raw Normal View History

from collections import OrderedDict
2016-05-18 16:35:35 -04:00
from django_tables2 import RequestConfig
2016-03-01 11:23:03 -05:00
from django.contrib import messages
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ImproperlyConfigured
2016-05-18 16:35:35 -04:00
from django.core.urlresolvers import reverse
2016-03-01 11:23:03 -05:00
from django.db import transaction, IntegrityError
from django.db.models import ProtectedError
from django.forms import ModelMultipleChoiceField, MultipleHiddenInput, TypedChoiceField
from django.http import HttpResponse, HttpResponseRedirect
2016-03-01 11:23:03 -05:00
from django.shortcuts import get_object_or_404, redirect, render
from django.template import TemplateSyntaxError
from django.utils.http import is_safe_url
2016-03-01 11:23:03 -05:00
from django.views.generic import View
from extras.forms import CustomFieldForm
from extras.models import CustomField, CustomFieldValue, ExportTemplate, UserAction
2016-03-01 11:23:03 -05:00
from .error_handlers import handle_protectederror
from .forms import ConfirmationForm
2016-03-01 11:23:03 -05:00
from .paginator import EnhancedPaginator
class annotate_custom_fields:
def __init__(self, queryset, custom_fields):
self.queryset = queryset
self.custom_fields = custom_fields
def __iter__(self):
for obj in self.queryset:
values_dict = {cfv.field_id: cfv.value for cfv in obj.custom_field_values.all()}
obj.custom_fields = OrderedDict([(field, values_dict.get(field.pk)) for field in self.custom_fields])
yield obj
2016-03-01 11:23:03 -05:00
class ObjectListView(View):
queryset = None
filter = None
filter_form = None
table = None
edit_permissions = []
2016-03-01 11:23:03 -05:00
template_name = None
redirect_on_single_result = True
2016-03-01 11:23:03 -05:00
def get(self, request, *args, **kwargs):
model = self.queryset.model
object_ct = ContentType.objects.get_for_model(model)
2016-03-01 11:23:03 -05:00
if self.filter:
self.queryset = self.filter(request.GET, self.queryset).qs
# If this type of object has one or more custom fields, prefetch any relevant custom field values
custom_fields = CustomField.objects.filter(obj_type=ContentType.objects.get_for_model(model))\
.prefetch_related('choices')
if custom_fields:
self.queryset = self.queryset.prefetch_related('custom_field_values')
# Check for export template rendering
2016-03-01 11:23:03 -05:00
if request.GET.get('export'):
et = get_object_or_404(ExportTemplate, content_type=object_ct, name=request.GET.get('export'))
queryset = annotate_custom_fields(self.queryset, custom_fields) if custom_fields else self.queryset
try:
response = et.to_response(context_dict={'queryset': queryset},
filename='netbox_{}'.format(model._meta.verbose_name_plural))
return response
except TemplateSyntaxError:
2016-05-18 16:35:35 -04:00
messages.error(request, "There was an error rendering the selected export template ({})."
.format(et.name))
# Fall back to built-in CSV export
elif 'export' in request.GET and hasattr(model, 'to_csv'):
output = '\n'.join([obj.to_csv() for obj in self.queryset])
response = HttpResponse(
output,
content_type='text/csv'
)
response['Content-Disposition'] = 'attachment; filename="netbox_{}.csv"'\
.format(self.queryset.model._meta.verbose_name_plural)
return response
2016-03-01 11:23:03 -05:00
# Attempt to redirect automatically if the search query returns a single result
if self.redirect_on_single_result and self.queryset.count() == 1 and request.GET:
try:
return HttpResponseRedirect(self.queryset[0].get_absolute_url())
except AttributeError:
pass
2016-03-07 12:56:37 -05:00
# Provide a hook to tweak the queryset based on the request immediately prior to rendering the object list
self.queryset = self.alter_queryset(request)
# Construct the table based on the user's permissions
table = self.table(self.queryset)
table.model = model
if 'pk' in table.base_columns and any([request.user.has_perm(perm) for perm in self.edit_permissions]):
table.base_columns['pk'].visible = True
RequestConfig(request, paginate={'klass': EnhancedPaginator}).configure(table)
2016-03-01 11:23:03 -05:00
context = {
2016-03-01 11:23:03 -05:00
'table': table,
'filter_form': self.filter_form(request.GET, label_suffix='') if self.filter_form else None,
'export_templates': ExportTemplate.objects.filter(content_type=object_ct),
}
context.update(self.extra_context())
return render(request, self.template_name, context)
2016-03-01 11:23:03 -05:00
2016-03-07 12:56:37 -05:00
def alter_queryset(self, request):
# .all() is necessary to avoid caching queries
return self.queryset.all()
2016-03-07 12:56:37 -05:00
def extra_context(self):
return {}
2016-03-01 11:23:03 -05:00
class ObjectEditView(View):
model = None
form_class = None
fields_initial = []
template_name = 'utilities/obj_edit.html'
success_url = None
cancel_url = None
def get_object(self, kwargs):
# Look up object by slug if one has been provided. Otherwise, use PK.
if 'slug' in kwargs:
return get_object_or_404(self.model, slug=kwargs['slug'])
else:
return get_object_or_404(self.model, pk=kwargs['pk'])
def get(self, request, *args, **kwargs):
if kwargs:
obj = self.get_object(kwargs)
form = self.form_class(instance=obj)
else:
obj = None
form = self.form_class(initial={k: request.GET.get(k) for k in self.fields_initial})
return render(request, self.template_name, {
'obj': obj,
'obj_type': self.model._meta.verbose_name,
'form': form,
'cancel_url': obj.get_absolute_url() if hasattr(obj, 'get_absolute_url') else reverse(self.cancel_url),
})
def post(self, request, *args, **kwargs):
# Validate object if editing an existing object
obj = self.get_object(kwargs) if kwargs else None
form = self.form_class(request.POST, instance=obj)
if form.is_valid():
obj = form.save(commit=False)
obj_created = not obj.pk
obj.save()
2016-08-15 15:24:23 -04:00
if isinstance(form, CustomFieldForm):
form.save_custom_fields()
2016-07-18 14:48:51 -04:00
msg = u'Created ' if obj_created else u'Modified '
2016-05-17 15:04:16 -04:00
msg += self.model._meta.verbose_name
if hasattr(obj, 'get_absolute_url'):
2016-07-18 14:48:51 -04:00
msg = u'{} <a href="{}">{}</a>'.format(msg, obj.get_absolute_url(), obj)
2016-05-17 15:04:16 -04:00
else:
2016-07-18 14:48:51 -04:00
msg = u'{} {}'.format(msg, obj)
2016-05-17 15:04:16 -04:00
messages.success(request, msg)
if obj_created:
UserAction.objects.log_create(request.user, obj, msg)
else:
UserAction.objects.log_edit(request.user, obj, msg)
if '_addanother' in request.POST:
return redirect(request.path)
elif self.success_url:
return redirect(self.success_url)
else:
return redirect(obj.get_absolute_url())
return render(request, self.template_name, {
'obj': obj,
'obj_type': self.model._meta.verbose_name,
'form': form,
'cancel_url': obj.get_absolute_url() if hasattr(obj, 'get_absolute_url') else reverse(self.cancel_url),
})
class ObjectDeleteView(View):
model = None
template_name = 'utilities/obj_delete.html'
redirect_url = None
def get_object(self, kwargs):
# Look up object by slug if one has been provided. Otherwise, use PK.
if 'slug' in kwargs:
return get_object_or_404(self.model, slug=kwargs['slug'])
else:
return get_object_or_404(self.model, pk=kwargs['pk'])
def get(self, request, *args, **kwargs):
obj = self.get_object(kwargs)
form = ConfirmationForm()
return render(request, self.template_name, {
'obj': obj,
'form': form,
'obj_type': self.model._meta.verbose_name,
'cancel_url': obj.get_absolute_url(),
})
def post(self, request, *args, **kwargs):
obj = self.get_object(kwargs)
form = ConfirmationForm(request.POST)
if form.is_valid():
try:
obj.delete()
2016-07-18 14:48:51 -04:00
msg = u'Deleted {} {}'.format(self.model._meta.verbose_name, obj)
messages.success(request, msg)
UserAction.objects.log_delete(request.user, obj, msg)
return redirect(self.redirect_url)
except ProtectedError, e:
handle_protectederror(obj, request, e)
return redirect(obj.get_absolute_url())
return render(request, self.template_name, {
'obj': obj,
'form': form,
'obj_type': self.model._meta.verbose_name,
'cancel_url': obj.get_absolute_url(),
})
2016-03-01 11:23:03 -05:00
class BulkImportView(View):
form = None
table = None
template_name = None
obj_list_url = None
def get(self, request, *args, **kwargs):
return render(request, self.template_name, {
'form': self.form(),
'obj_list_url': self.obj_list_url,
})
def post(self, request, *args, **kwargs):
form = self.form(request.POST)
if form.is_valid():
new_objs = []
try:
with transaction.atomic():
for obj in form.cleaned_data['csv']:
self.save_obj(obj)
new_objs.append(obj)
obj_table = self.table(new_objs)
if new_objs:
2016-07-18 14:48:51 -04:00
msg = u'Imported {} {}'.format(len(new_objs), new_objs[0]._meta.verbose_name_plural)
messages.success(request, msg)
UserAction.objects.log_import(request.user, ContentType.objects.get_for_model(new_objs[0]), msg)
2016-03-01 11:23:03 -05:00
return render(request, "import_success.html", {
'table': obj_table,
})
except IntegrityError as e:
form.add_error('csv', "Record {}: {}".format(len(new_objs) + 1, e.__cause__))
return render(request, self.template_name, {
'form': form,
'obj_list_url': self.obj_list_url,
})
def save_obj(self, obj):
obj.save()
class BulkEditView(View):
cls = None
2016-10-14 16:38:46 -04:00
parent_cls = None
2016-03-01 11:23:03 -05:00
form = None
template_name = None
default_redirect_url = None
2016-03-01 11:23:03 -05:00
2016-10-14 16:38:46 -04:00
def get(self):
return redirect(self.default_redirect_url)
2016-03-01 11:23:03 -05:00
2016-10-14 16:38:46 -04:00
def post(self, request, **kwargs):
# Attempt to derive parent object if a parent class has been given
if self.parent_cls:
parent_obj = get_object_or_404(self.parent_cls, **kwargs)
else:
parent_obj = None
2016-03-01 11:23:03 -05:00
2016-10-14 16:38:46 -04:00
# Determine URL to redirect users upon modification of objects
posted_redirect_url = request.POST.get('redirect_url')
if posted_redirect_url and is_safe_url(url=posted_redirect_url, host=request.get_host()):
redirect_url = posted_redirect_url
2016-10-14 16:38:46 -04:00
elif parent_obj:
redirect_url = parent_obj.get_absolute_url()
elif self.default_redirect_url:
redirect_url = reverse(self.default_redirect_url)
2016-10-14 16:38:46 -04:00
else:
raise ImproperlyConfigured('No redirect URL has been provided.')
2016-10-14 16:38:46 -04:00
# Are we editing *all* objects in the queryset or just a selected subset?
if request.POST.get('_all'):
pk_list = [int(pk) for pk in request.POST.get('pk_all').split(',') if pk]
else:
pk_list = [int(pk) for pk in request.POST.getlist('pk')]
2016-03-01 11:23:03 -05:00
if '_apply' in request.POST:
form = self.form(self.cls, request.POST)
2016-03-01 11:23:03 -05:00
if form.is_valid():
custom_fields = form.custom_fields if hasattr(form, 'custom_fields') else []
standard_fields = [field for field in form.fields if field not in custom_fields and field != 'pk']
# Update standard fields. If a field is listed in _nullify, delete its value.
nullified_fields = request.POST.getlist('_nullify')
fields_to_update = {}
for field in standard_fields:
if field in form.nullable_fields and field in nullified_fields:
fields_to_update[field] = ''
elif form.cleaned_data[field]:
fields_to_update[field] = form.cleaned_data[field]
updated_count = self.cls.objects.filter(pk__in=pk_list).update(**fields_to_update)
# Update custom fields for objects
if custom_fields:
objs_updated = self.update_custom_fields(pk_list, form, custom_fields, nullified_fields)
if objs_updated and not updated_count:
updated_count = objs_updated
if updated_count:
2016-07-18 14:48:51 -04:00
msg = u'Updated {} {}'.format(updated_count, self.cls._meta.verbose_name_plural)
messages.success(self.request, msg)
UserAction.objects.log_bulk_edit(request.user, ContentType.objects.get_for_model(self.cls), msg)
return redirect(redirect_url)
2016-03-01 11:23:03 -05:00
else:
form = self.form(self.cls, initial={'pk': pk_list})
2016-03-01 11:23:03 -05:00
selected_objects = self.cls.objects.filter(pk__in=pk_list)
2016-03-01 11:23:03 -05:00
if not selected_objects:
messages.warning(request, "No {} were selected.".format(self.cls._meta.verbose_name_plural))
return redirect(redirect_url)
2016-03-01 11:23:03 -05:00
return render(request, self.template_name, {
'form': form,
'selected_objects': selected_objects,
'cancel_url': redirect_url,
2016-03-01 11:23:03 -05:00
})
def update_custom_fields(self, pk_list, form, fields, nullified_fields):
obj_type = ContentType.objects.get_for_model(self.cls)
objs_updated = False
for name in fields:
field = form.fields[name].model
# Setting the field to null
if name in form.nullable_fields and name in nullified_fields:
# Delete all CustomFieldValues for instances of this field belonging to the selected objects.
CustomFieldValue.objects.filter(field=field, obj_type=obj_type, obj_id__in=pk_list).delete()
objs_updated = True
# Updating the value of the field
elif form.cleaned_data[name] not in [None, u'']:
# Check for zero value (bulk editing)
if isinstance(form.fields[name], TypedChoiceField) and form.cleaned_data[name] == 0:
serialized_value = field.serialize_value(None)
else:
serialized_value = field.serialize_value(form.cleaned_data[name])
# Gather any pre-existing CustomFieldValues for the objects being edited.
existing_cfvs = CustomFieldValue.objects.filter(field=field, obj_type=obj_type, obj_id__in=pk_list)
# Determine which objects have an existing CFV to update and which need a new CFV created.
update_list = [cfv['obj_id'] for cfv in existing_cfvs.values()]
create_list = list(set(pk_list) - set(update_list))
# Creating/updating CFVs
if serialized_value:
existing_cfvs.update(serialized_value=serialized_value)
CustomFieldValue.objects.bulk_create([
CustomFieldValue(field=field, obj_type=obj_type, obj_id=pk, serialized_value=serialized_value)
for pk in create_list
])
# Deleting CFVs
else:
existing_cfvs.delete()
2016-03-01 11:23:03 -05:00
objs_updated = True
return len(pk_list) if objs_updated else 0
2016-03-01 11:23:03 -05:00
class BulkDeleteView(View):
cls = None
parent_cls = None
2016-03-01 11:23:03 -05:00
form = None
template_name = 'utilities/confirm_bulk_delete.html'
default_redirect_url = None
2016-03-01 11:23:03 -05:00
2016-10-14 16:38:46 -04:00
def post(self, request, **kwargs):
# Attempt to derive parent object if a parent class has been given
if self.parent_cls:
parent_obj = get_object_or_404(self.parent_cls, **kwargs)
else:
parent_obj = None
# Determine URL to redirect users upon deletion of objects
posted_redirect_url = request.POST.get('redirect_url')
if posted_redirect_url and is_safe_url(url=posted_redirect_url, host=request.get_host()):
redirect_url = posted_redirect_url
elif parent_obj:
redirect_url = parent_obj.get_absolute_url()
elif self.default_redirect_url:
redirect_url = reverse(self.default_redirect_url)
else:
raise ImproperlyConfigured('No redirect URL has been provided.')
# Are we deleting *all* objects in the queryset or just a selected subset?
if request.POST.get('_all'):
2016-10-14 16:38:46 -04:00
pk_list = [int(pk) for pk in request.POST.get('pk_all').split(',') if pk]
else:
2016-10-14 16:38:46 -04:00
pk_list = [int(pk) for pk in request.POST.getlist('pk')]
form_cls = self.get_form()
2016-03-01 11:23:03 -05:00
if '_confirm' in request.POST:
form = form_cls(request.POST)
2016-03-01 11:23:03 -05:00
if form.is_valid():
# Delete objects
queryset = self.cls.objects.filter(pk__in=pk_list)
2016-03-01 11:23:03 -05:00
try:
deleted_count = queryset.delete()[1][self.cls._meta.label]
2016-03-01 11:23:03 -05:00
except ProtectedError, e:
handle_protectederror(list(queryset), request, e)
return redirect(redirect_url)
2016-03-01 11:23:03 -05:00
2016-07-18 14:48:51 -04:00
msg = u'Deleted {} {}'.format(deleted_count, self.cls._meta.verbose_name_plural)
messages.success(request, msg)
UserAction.objects.log_bulk_delete(request.user, ContentType.objects.get_for_model(self.cls), msg)
return redirect(redirect_url)
2016-03-01 11:23:03 -05:00
else:
form = form_cls(initial={'pk': pk_list})
2016-03-01 11:23:03 -05:00
selected_objects = self.cls.objects.filter(pk__in=pk_list)
2016-03-01 11:23:03 -05:00
if not selected_objects:
messages.warning(request, "No {} were selected for deletion.".format(self.cls._meta.verbose_name_plural))
return redirect(redirect_url)
2016-03-01 11:23:03 -05:00
return render(request, self.template_name, {
'form': form,
'parent_obj': parent_obj,
'obj_type_plural': self.cls._meta.verbose_name_plural,
2016-03-01 11:23:03 -05:00
'selected_objects': selected_objects,
'cancel_url': redirect_url,
2016-03-01 11:23:03 -05:00
})
def get_form(self):
"""Provide a standard bulk delete form if none has been specified for the view"""
class BulkDeleteForm(ConfirmationForm):
pk = ModelMultipleChoiceField(queryset=self.cls.objects.all(), widget=MultipleHiddenInput)
if self.form:
return self.form
return BulkDeleteForm