mirror of
https://github.com/netbox-community/netbox.git
synced 2024-05-10 07:54:54 +00:00
Fixes #1136: Enforce model validation during bulk update
This commit is contained in:
@ -9,10 +9,10 @@ from django.contrib.contenttypes.models import ContentType
|
|||||||
from django.core.exceptions import ValidationError
|
from django.core.exceptions import ValidationError
|
||||||
from django.db import transaction, IntegrityError
|
from django.db import transaction, IntegrityError
|
||||||
from django.db.models import ProtectedError
|
from django.db.models import ProtectedError
|
||||||
from django.forms import CharField, Form, ModelMultipleChoiceField, MultipleHiddenInput, Textarea, TypedChoiceField
|
from django.forms import CharField, Form, ModelMultipleChoiceField, MultipleHiddenInput, Textarea
|
||||||
from django.http import HttpResponse
|
from django.http import HttpResponse
|
||||||
from django.shortcuts import get_object_or_404, redirect, render
|
from django.shortcuts import get_object_or_404, redirect, render
|
||||||
from django.template import TemplateSyntaxError
|
from django.template.exceptions import TemplateSyntaxError
|
||||||
from django.urls import reverse
|
from django.urls import reverse
|
||||||
from django.utils.html import escape
|
from django.utils.html import escape
|
||||||
from django.utils.http import is_safe_url
|
from django.utils.http import is_safe_url
|
||||||
@ -514,32 +514,56 @@ class BulkEditView(View):
|
|||||||
|
|
||||||
custom_fields = form.custom_fields if hasattr(form, 'custom_fields') else []
|
custom_fields = form.custom_fields if hasattr(form, 'custom_fields') else []
|
||||||
standard_fields = [field for field in form.fields if field not in custom_fields and field != 'pk']
|
standard_fields = [field for field in form.fields if field not in custom_fields and field != 'pk']
|
||||||
|
nullified_fields = request.POST.getlist('_nullify')
|
||||||
|
|
||||||
|
try:
|
||||||
|
|
||||||
|
with transaction.atomic():
|
||||||
|
|
||||||
|
updated_count = 0
|
||||||
|
for obj in self.cls.objects.filter(pk__in=pk_list):
|
||||||
|
|
||||||
# Update standard fields. If a field is listed in _nullify, delete its value.
|
# Update standard fields. If a field is listed in _nullify, delete its value.
|
||||||
nullified_fields = request.POST.getlist('_nullify')
|
for name in standard_fields:
|
||||||
fields_to_update = {}
|
if name in form.nullable_fields and name in nullified_fields:
|
||||||
for field in standard_fields:
|
setattr(obj, name, '' if isinstance(form.fields[name], CharField) else None)
|
||||||
if field in form.nullable_fields and field in nullified_fields:
|
elif form.cleaned_data[name] not in (None, ''):
|
||||||
if isinstance(form.fields[field], CharField):
|
setattr(obj, name, form.cleaned_data[name])
|
||||||
fields_to_update[field] = ''
|
obj.full_clean()
|
||||||
else:
|
obj.save()
|
||||||
fields_to_update[field] = None
|
|
||||||
elif form.cleaned_data[field] not in (None, ''):
|
|
||||||
fields_to_update[field] = form.cleaned_data[field]
|
|
||||||
updated_count = self.cls.objects.filter(pk__in=pk_list).update(**fields_to_update)
|
|
||||||
|
|
||||||
# Update custom fields for objects
|
# Update custom fields
|
||||||
if custom_fields:
|
obj_type = ContentType.objects.get_for_model(self.cls)
|
||||||
objs_updated = self.update_custom_fields(pk_list, form, custom_fields, nullified_fields)
|
for name in custom_fields:
|
||||||
if objs_updated and not updated_count:
|
field = form.fields[name].model
|
||||||
updated_count = objs_updated
|
if name in form.nullable_fields and name in nullified_fields:
|
||||||
|
CustomFieldValue.objects.filter(
|
||||||
|
field=field, obj_type=obj_type, obj_id=obj.pk
|
||||||
|
).delete()
|
||||||
|
elif form.cleaned_data[name] not in [None, '']:
|
||||||
|
try:
|
||||||
|
cfv = CustomFieldValue.objects.get(
|
||||||
|
field=field, obj_type=obj_type, obj_id=obj.pk
|
||||||
|
)
|
||||||
|
except CustomFieldValue.DoesNotExist:
|
||||||
|
cfv = CustomFieldValue(
|
||||||
|
field=field, obj_type=obj_type, obj_id=obj.pk
|
||||||
|
)
|
||||||
|
cfv.value = form.cleaned_data[name]
|
||||||
|
cfv.save()
|
||||||
|
|
||||||
|
updated_count += 1
|
||||||
|
|
||||||
if updated_count:
|
if updated_count:
|
||||||
msg = 'Updated {} {}'.format(updated_count, self.cls._meta.verbose_name_plural)
|
msg = 'Updated {} {}'.format(updated_count, self.cls._meta.verbose_name_plural)
|
||||||
messages.success(self.request, msg)
|
messages.success(self.request, msg)
|
||||||
UserAction.objects.log_bulk_edit(request.user, ContentType.objects.get_for_model(self.cls), msg)
|
UserAction.objects.log_bulk_edit(request.user, ContentType.objects.get_for_model(self.cls), msg)
|
||||||
|
|
||||||
return redirect(return_url)
|
return redirect(return_url)
|
||||||
|
|
||||||
|
except ValidationError as e:
|
||||||
|
messages.error(self.request, "{} failed validation: {}".format(obj, e))
|
||||||
|
|
||||||
else:
|
else:
|
||||||
initial_data = request.POST.copy()
|
initial_data = request.POST.copy()
|
||||||
initial_data['pk'] = pk_list
|
initial_data['pk'] = pk_list
|
||||||
@ -559,53 +583,6 @@ class BulkEditView(View):
|
|||||||
'return_url': return_url,
|
'return_url': return_url,
|
||||||
})
|
})
|
||||||
|
|
||||||
def update_custom_fields(self, pk_list, form, fields, nullified_fields):
|
|
||||||
obj_type = ContentType.objects.get_for_model(self.cls)
|
|
||||||
objs_updated = False
|
|
||||||
|
|
||||||
for name in fields:
|
|
||||||
|
|
||||||
field = form.fields[name].model
|
|
||||||
|
|
||||||
# Setting the field to null
|
|
||||||
if name in form.nullable_fields and name in nullified_fields:
|
|
||||||
|
|
||||||
# Delete all CustomFieldValues for instances of this field belonging to the selected objects.
|
|
||||||
CustomFieldValue.objects.filter(field=field, obj_type=obj_type, obj_id__in=pk_list).delete()
|
|
||||||
objs_updated = True
|
|
||||||
|
|
||||||
# Updating the value of the field
|
|
||||||
elif form.cleaned_data[name] not in [None, '']:
|
|
||||||
|
|
||||||
# Check for zero value (bulk editing)
|
|
||||||
if isinstance(form.fields[name], TypedChoiceField) and form.cleaned_data[name] == 0:
|
|
||||||
serialized_value = field.serialize_value(None)
|
|
||||||
else:
|
|
||||||
serialized_value = field.serialize_value(form.cleaned_data[name])
|
|
||||||
|
|
||||||
# Gather any pre-existing CustomFieldValues for the objects being edited.
|
|
||||||
existing_cfvs = CustomFieldValue.objects.filter(field=field, obj_type=obj_type, obj_id__in=pk_list)
|
|
||||||
|
|
||||||
# Determine which objects have an existing CFV to update and which need a new CFV created.
|
|
||||||
update_list = [cfv['obj_id'] for cfv in existing_cfvs.values()]
|
|
||||||
create_list = list(set(pk_list) - set(update_list))
|
|
||||||
|
|
||||||
# Creating/updating CFVs
|
|
||||||
if serialized_value:
|
|
||||||
existing_cfvs.update(serialized_value=serialized_value)
|
|
||||||
CustomFieldValue.objects.bulk_create([
|
|
||||||
CustomFieldValue(field=field, obj_type=obj_type, obj_id=pk, serialized_value=serialized_value)
|
|
||||||
for pk in create_list
|
|
||||||
])
|
|
||||||
|
|
||||||
# Deleting CFVs
|
|
||||||
else:
|
|
||||||
existing_cfvs.delete()
|
|
||||||
|
|
||||||
objs_updated = True
|
|
||||||
|
|
||||||
return len(pk_list) if objs_updated else 0
|
|
||||||
|
|
||||||
|
|
||||||
class BulkDeleteView(View):
|
class BulkDeleteView(View):
|
||||||
"""
|
"""
|
||||||
|
Reference in New Issue
Block a user