diff --git a/netbox/dcim/views.py b/netbox/dcim/views.py
index b3f97995c..03e375d35 100644
--- a/netbox/dcim/views.py
+++ b/netbox/dcim/views.py
@@ -194,12 +194,13 @@ class SiteListView(ObjectPermissionRequiredMixin, ObjectListView):
table = tables.SiteTable
-class SiteView(PermissionRequiredMixin, View):
+class SiteView(ObjectPermissionRequiredMixin, View):
permission_required = 'dcim.view_site'
+ queryset = Site.objects.prefetch_related('region', 'tenant__group')
def get(self, request, slug):
- site = get_object_or_404(Site.objects.prefetch_related('region', 'tenant__group'), slug=slug)
+ site = get_object_or_404(self.queryset, slug=slug)
stats = {
'rack_count': Rack.objects.filter(site=site).count(),
'device_count': Device.objects.filter(site=site).count(),
@@ -219,7 +220,7 @@ class SiteView(PermissionRequiredMixin, View):
})
-class SiteCreateView(PermissionRequiredMixin, ObjectEditView):
+class SiteCreateView(ObjectPermissionRequiredMixin, ObjectEditView):
permission_required = 'dcim.add_site'
queryset = Site.objects.all()
model_form = forms.SiteForm
@@ -231,7 +232,7 @@ class SiteEditView(SiteCreateView):
permission_required = 'dcim.change_site'
-class SiteDeleteView(PermissionRequiredMixin, ObjectDeleteView):
+class SiteDeleteView(ObjectPermissionRequiredMixin, ObjectDeleteView):
permission_required = 'dcim.delete_site'
queryset = Site.objects.all()
default_return_url = 'dcim:site_list'
diff --git a/netbox/netbox/authentication.py b/netbox/netbox/authentication.py
index 58fd4380a..850189a83 100644
--- a/netbox/netbox/authentication.py
+++ b/netbox/netbox/authentication.py
@@ -14,22 +14,14 @@ class ObjectPermissionRequiredMixin(AccessMixin):
if self.request.user.has_perm(self.permission_required):
return True
- # If not, check for an object-level permission
+ # If not, check for object-level permissions
app, codename = self.permission_required.split('.')
action, model_name = codename.split('_')
model = self.queryset.model
- obj_permissions = ObjectPermission.objects.filter(
- Q(users=self.request.user) | Q(groups__user=self.request.user),
- model=ContentType.objects.get_for_model(model),
- **{f'can_{action}': True}
- )
- if obj_permissions:
-
+ attrs = ObjectPermission.objects.get_attr_constraints(self.request.user, model, action)
+ if attrs:
# Update the view's QuerySet to filter only the permitted objects
- # TODO: Do this more efficiently
- for perm in obj_permissions:
- self.queryset = self.queryset.filter(**perm.attrs)
-
+ self.queryset = self.queryset.filter(**attrs)
return True
return False
diff --git a/netbox/users/models.py b/netbox/users/models.py
index f2002ae95..bb2093f05 100644
--- a/netbox/users/models.py
+++ b/netbox/users/models.py
@@ -7,6 +7,7 @@ from django.contrib.postgres.fields import JSONField
from django.core.exceptions import FieldError, ValidationError
from django.core.validators import MinLengthValidator
from django.db import models
+from django.db.models import Q
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.utils import timezone
@@ -194,6 +195,38 @@ class Token(models.Model):
return True
+class ObjectPermissionManager(models.Manager):
+
+ def get_attr_constraints(self, user, model, action):
+ """
+ Compile all ObjectPermission attributes applicable to a specific combination of user, model, and action. Returns
+ a dictionary that can be passed directly to .filter() on a QuerySet.
+ """
+ assert action in ['view', 'add', 'change', 'delete'], f"Invalid action: {action}"
+
+ qs = self.get_queryset().filter(
+ Q(users=user) | Q(groups__user=user),
+ model=ContentType.objects.get_for_model(model),
+ **{f'can_{action}': True}
+ )
+
+ attrs = {}
+ for perm in qs:
+ attrs.update(perm.attrs)
+
+ return attrs
+
+ def validate_queryset(self, queryset, user, action):
+ """
+ Check that the specified user has permission to perform the specified action on all objects in the QuerySet.
+ """
+ assert action in ['view', 'add', 'change', 'delete'], f"Invalid action: {action}"
+
+ model = queryset.model
+ attrs = self.get_attr_constraints(user, model, action)
+ return queryset.count() == model.objects.filter(**attrs).count()
+
+
class ObjectPermission(models.Model):
"""
A mapping of view, add, change, and/or delete permission for users and/or groups to an arbitrary set of objects
@@ -229,6 +262,8 @@ class ObjectPermission(models.Model):
default=False
)
+ objects = ObjectPermissionManager()
+
class Meta:
unique_together = ('model', 'attrs')
diff --git a/netbox/utilities/auth_backends.py b/netbox/utilities/auth_backends.py
index 0d20fe02f..7deb9b0de 100644
--- a/netbox/utilities/auth_backends.py
+++ b/netbox/utilities/auth_backends.py
@@ -56,21 +56,12 @@ class ObjectPermissionBackend(ModelBackend):
if model._meta.model_name != model_name:
raise ValueError(f"Invalid permission {perm} for model {model}")
- # Retrieve user's permissions for this model
- # This can probably be cached
- obj_permissions = ObjectPermission.objects.filter(
- Q(users=user_obj) | Q(groups__user=user_obj),
- model=ContentType.objects.get_for_model(obj),
- **{f'can_{action}': True}
- )
-
- for perm in obj_permissions:
-
- # Attempt to retrieve the model from the database using the
- # attributes defined in the ObjectPermission. If we have a
- # match, assert that the user has permission.
- if model.objects.filter(pk=obj.pk, **perm.attrs).exists():
- return True
+ # Attempt to retrieve the model from the database using the
+ # attributes defined in the ObjectPermission. If we have a
+ # match, assert that the user has permission.
+ attrs = ObjectPermission.objects.get_attr_constraints(user_obj, obj, action)
+ if model.objects.filter(pk=obj.pk, **attrs).exists():
+ return True
class RemoteUserBackend(ViewExemptModelBackend, RemoteUserBackend_):
diff --git a/netbox/utilities/views.py b/netbox/utilities/views.py
index 076f2ad14..d9eace90b 100644
--- a/netbox/utilities/views.py
+++ b/netbox/utilities/views.py
@@ -4,7 +4,7 @@ from copy import deepcopy
from django.contrib import messages
from django.contrib.contenttypes.models import ContentType
-from django.core.exceptions import FieldDoesNotExist, ValidationError
+from django.core.exceptions import FieldDoesNotExist, ObjectDoesNotExist, ValidationError
from django.db import transaction, IntegrityError
from django.db.models import ManyToManyField, ProtectedError
from django.forms import Form, ModelMultipleChoiceField, MultipleHiddenInput, Textarea
@@ -23,6 +23,7 @@ from django_tables2 import RequestConfig
from extras.models import CustomField, CustomFieldValue, ExportTemplate
from extras.querysets import CustomFieldQueryset
+from users.models import ObjectPermission
from utilities.exceptions import AbortTransaction
from utilities.forms import BootstrapMixin, CSVDataField, TableConfigForm
from utilities.utils import csv_format, prepare_cloned_fields
@@ -262,32 +263,43 @@ class ObjectEditView(GetReturnURLMixin, View):
if form.is_valid():
logger.debug("Form validation was successful")
- obj = form.save()
- msg = '{} {}'.format(
- 'Created' if not form.instance.pk else 'Modified',
- self.queryset.model._meta.verbose_name
- )
- logger.info(f"{msg} {obj} (PK: {obj.pk})")
- if hasattr(obj, 'get_absolute_url'):
- msg = '{} {}'.format(msg, obj.get_absolute_url(), escape(obj))
- else:
- msg = '{} {}'.format(msg, escape(obj))
- messages.success(request, mark_safe(msg))
+ try:
+ with transaction.atomic():
+ obj = form.save()
- if '_addanother' in request.POST:
+ # Check that the new object conforms with any assigned object-level permissions
+ self.queryset.get(pk=obj.pk)
- # If the object has clone_fields, pre-populate a new instance of the form
- if hasattr(obj, 'clone_fields'):
- url = '{}?{}'.format(request.path, prepare_cloned_fields(obj))
- return redirect(url)
+ msg = '{} {}'.format(
+ 'Created' if not form.instance.pk else 'Modified',
+ self.queryset.model._meta.verbose_name
+ )
+ logger.info(f"{msg} {obj} (PK: {obj.pk})")
+ if hasattr(obj, 'get_absolute_url'):
+ msg = '{} {}'.format(msg, obj.get_absolute_url(), escape(obj))
+ else:
+ msg = '{} {}'.format(msg, escape(obj))
+ messages.success(request, mark_safe(msg))
- return redirect(request.get_full_path())
+ if '_addanother' in request.POST:
- return_url = form.cleaned_data.get('return_url')
- if return_url is not None and is_safe_url(url=return_url, allowed_hosts=request.get_host()):
- return redirect(return_url)
- else:
- return redirect(self.get_return_url(request, obj))
+ # If the object has clone_fields, pre-populate a new instance of the form
+ if hasattr(obj, 'clone_fields'):
+ url = '{}?{}'.format(request.path, prepare_cloned_fields(obj))
+ return redirect(url)
+
+ return redirect(request.get_full_path())
+
+ return_url = form.cleaned_data.get('return_url')
+ if return_url is not None and is_safe_url(url=return_url, allowed_hosts=request.get_host()):
+ return redirect(return_url)
+ else:
+ return redirect(self.get_return_url(request, obj))
+
+ except ObjectDoesNotExist:
+ logger.debug("Object save failed due to object-level permissions violation")
+ # TODO: Link user to personal permissions view
+ form.add_error(None, "Object save failed due to object-level permissions violation")
else:
logger.debug("Form validation failed")