diff --git a/netbox/netbox/settings.py b/netbox/netbox/settings.py index 266f1afd7..f4ee6fff2 100644 --- a/netbox/netbox/settings.py +++ b/netbox/netbox/settings.py @@ -339,7 +339,7 @@ TEMPLATES = [ # Set up authentication backends AUTHENTICATION_BACKENDS = [ 'utilities.auth_backends.ObjectPermissionBackend', - REMOTE_AUTH_BACKEND, + # REMOTE_AUTH_BACKEND, ] # Internationalization diff --git a/netbox/users/migrations/0007_objectpermission.py b/netbox/users/migrations/0007_objectpermission.py index d805c3379..1fadcc9a5 100644 --- a/netbox/users/migrations/0007_objectpermission.py +++ b/netbox/users/migrations/0007_objectpermission.py @@ -1,4 +1,4 @@ -# Generated by Django 3.0.6 on 2020-05-08 20:18 +# Generated by Django 3.0.6 on 2020-05-27 14:17 from django.conf import settings import django.contrib.postgres.fields.jsonb @@ -9,9 +9,9 @@ import django.db.models.deletion class Migration(migrations.Migration): dependencies = [ - migrations.swappable_dependency(settings.AUTH_USER_MODEL), - ('auth', '0011_update_proxy_permissions'), ('contenttypes', '0002_remove_content_type_name'), + ('auth', '0011_update_proxy_permissions'), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), ('users', '0006_create_userconfigs'), ] @@ -20,7 +20,7 @@ class Migration(migrations.Migration): name='ObjectPermission', fields=[ ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False)), - ('attrs', django.contrib.postgres.fields.jsonb.JSONField()), + ('attrs', django.contrib.postgres.fields.jsonb.JSONField(blank=True, null=True)), ('can_view', models.BooleanField(default=False)), ('can_add', models.BooleanField(default=False)), ('can_change', models.BooleanField(default=False)), diff --git a/netbox/users/models.py b/netbox/users/models.py index 70e7254e6..b9ab6cbb5 100644 --- a/netbox/users/models.py +++ b/netbox/users/models.py @@ -240,6 +240,8 @@ class ObjectPermission(models.Model): on_delete=models.CASCADE ) attrs = JSONField( + blank=True, + null=True, verbose_name='Attributes' ) can_view = models.BooleanField( @@ -264,10 +266,11 @@ class ObjectPermission(models.Model): # Validate the specified model attributes by attempting to execute a query. We don't care whether the query # returns anything; we just want to make sure the specified attributes are valid. - model = self.model.model_class() - try: - model.objects.filter(**self.attrs).exists() - except FieldError as e: - raise ValidationError({ - 'attrs': f'Invalid attributes for {model}: {e}' - }) + if self.attrs: + model = self.model.model_class() + try: + model.objects.filter(**self.attrs).exists() + except FieldError as e: + raise ValidationError({ + 'attrs': f'Invalid attributes for {model}: {e}' + }) diff --git a/netbox/utilities/auth_backends.py b/netbox/utilities/auth_backends.py index e540a04e0..8cf8b621c 100644 --- a/netbox/utilities/auth_backends.py +++ b/netbox/utilities/auth_backends.py @@ -12,43 +12,53 @@ class ObjectPermissionBackend(ModelBackend): def get_object_permissions(self, user_obj): """ - Return all model-level permissions granted to the user by an ObjectPermission. + Return all permissions granted to the user by an ObjectPermission. """ if not hasattr(user_obj, '_object_perm_cache'): - # Cache all assigned ObjectPermissions on the User instance - perms = set() - for obj_perm in ObjectPermission.objects.filter( + # Retrieve all assigned ObjectPermissions + object_permissions = ObjectPermission.objects.filter( Q(users=user_obj) | Q(groups__user=user_obj) - ).prefetch_related('model'): + ).prefetch_related('model') + + # Create a dictionary mapping permissions to their attributes + perms = dict() + for obj_perm in object_permissions: for action in ['view', 'add', 'change', 'delete']: if getattr(obj_perm, f"can_{action}"): - perms.add(f"{obj_perm.model.app_label}.{action}_{obj_perm.model.model}") + perm_name = f"{obj_perm.model.app_label}.{action}_{obj_perm.model.model}" + if perm_name in perms: + perms[perm_name].append(obj_perm.attrs) + else: + perms[perm_name] = [obj_perm.attrs] + + # Cache resolved permissions on the User instance setattr(user_obj, '_object_perm_cache', perms) return user_obj._object_perm_cache - def get_all_permissions(self, user_obj, obj=None): - - # Handle inactive/anonymous users - if not user_obj.is_active or user_obj.is_anonymous: - return set() - - # Cache model-level permissions on the User instance - if not hasattr(user_obj, '_perm_cache'): - user_obj._perm_cache = { - *self.get_user_permissions(user_obj, obj=obj), - *self.get_group_permissions(user_obj, obj=obj), - *self.get_object_permissions(user_obj) - } - - return user_obj._perm_cache + # def get_all_permissions(self, user_obj, obj=None): + # + # # Handle inactive/anonymous users + # if not user_obj.is_active or user_obj.is_anonymous: + # return set() + # + # # Cache object permissions on the User instance + # if not hasattr(user_obj, '_perm_cache'): + # user_obj._perm_cache = self.get_object_permissions(user_obj) + # + # return user_obj._perm_cache def has_perm(self, user_obj, perm, obj=None): + # print(f'has_perm({perm})') app_label, codename = perm.split('.') action, model_name = codename.split('_') + # Superusers implicitly have all permissions + if user_obj.is_active and user_obj.is_superuser: + return True + # If this is a view permission, check whether the model has been exempted from enforcement if action == 'view': if ( @@ -60,29 +70,29 @@ class ObjectPermissionBackend(ModelBackend): ): return True - # If no object is specified, evaluate model-level permissions. The presence of a permission in this set tells - # us that the user has permission for *some* objects, but not necessarily a specific object. + # Handle inactive/anonymous users + if not user_obj.is_active or user_obj.is_anonymous: + return False + + # If no applicable ObjectPermissions have been created for this user/permission, deny permission + if perm not in self.get_object_permissions(user_obj): + return False + + # If no object has been specified, grant permission. (The presence of a permission in this set tells + # us that the user has permission for *some* objects, but not necessarily a specific object.) if obj is None: - return perm in self.get_all_permissions(user_obj) + return True # Sanity check: Ensure that the requested permission applies to the specified object model = obj._meta.model if model._meta.label_lower != '.'.join((app_label, model_name)): raise ValueError(f"Invalid permission {perm} for model {model}") - # If the user has been granted model-level permission for the object, return True - model_perms = { - *self.get_user_permissions(user_obj), - *self.get_group_permissions(user_obj), - } - if perm in model_perms: - return True - - # Gather all ObjectPermissions pertinent to the requested permission. If none are found, the User has no - # applicable permissions. - attrs = ObjectPermission.objects.get_attr_constraints(user_obj, perm) - if not attrs: - return False + # Compile a query filter that matches all instances of the specified model + obj_perm_attrs = self.get_object_permissions(user_obj)[perm] + attrs = Q() + for perm_attrs in obj_perm_attrs: + attrs |= Q(**perm_attrs.attrs) # Permission to perform the requested action on the object depends on whether the specified object matches # the specified attributes. Note that this check is made against the *database* record representing the object, diff --git a/netbox/utilities/testing/testcases.py b/netbox/utilities/testing/testcases.py index 6f878986b..3d0ad1ef3 100644 --- a/netbox/utilities/testing/testcases.py +++ b/netbox/utilities/testing/testcases.py @@ -159,15 +159,15 @@ class ViewTestCases: with disable_warnings('django.request'): self.assertHttpStatus(self.client.get(instance.get_absolute_url()), 403) - @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) - def test_get_object_with_model_permission(self): - instance = self.model.objects.first() - - # Add model-level permission - self.add_permissions(get_permission_for_model(self.model, 'view')) - - # Try GET with model-level permission - self.assertHttpStatus(self.client.get(instance.get_absolute_url()), 200) + # @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) + # def test_get_object_with_model_permission(self): + # instance = self.model.objects.first() + # + # # Add model-level permission + # self.add_permissions(get_permission_for_model(self.model, 'view')) + # + # # Try GET with model-level permission + # self.assertHttpStatus(self.client.get(instance.get_absolute_url()), 200) @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) def test_get_object_with_object_permission(self): @@ -217,24 +217,24 @@ class ViewTestCases: with disable_warnings('django.request'): self.assertHttpStatus(response, 403) - @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) - def test_create_object_with_model_permission(self): - initial_count = self.model.objects.count() - - # Assign model-level permission - self.add_permissions(get_permission_for_model(self.model, 'add')) - - # Try GET with model-level permission - self.assertHttpStatus(self.client.get(self._get_url('add')), 200) - - # Try POST with model-level permission - request = { - 'path': self._get_url('add'), - 'data': post_data(self.form_data), - } - self.assertHttpStatus(self.client.post(**request), 302) - self.assertEqual(initial_count + 1, self.model.objects.count()) - self.assertInstanceEqual(self.model.objects.order_by('pk').last(), self.form_data) + # @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) + # def test_create_object_with_model_permission(self): + # initial_count = self.model.objects.count() + # + # # Assign model-level permission + # self.add_permissions(get_permission_for_model(self.model, 'add')) + # + # # Try GET with model-level permission + # self.assertHttpStatus(self.client.get(self._get_url('add')), 200) + # + # # Try POST with model-level permission + # request = { + # 'path': self._get_url('add'), + # 'data': post_data(self.form_data), + # } + # self.assertHttpStatus(self.client.post(**request), 302) + # self.assertEqual(initial_count + 1, self.model.objects.count()) + # self.assertInstanceEqual(self.model.objects.order_by('pk').last(), self.form_data) @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) def test_create_object_with_object_permission(self): @@ -296,23 +296,23 @@ class ViewTestCases: with disable_warnings('django.request'): self.assertHttpStatus(self.client.post(**request), 403) - @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) - def test_edit_object_with_model_permission(self): - instance = self.model.objects.first() - - # Assign model-level permission - self.add_permissions(get_permission_for_model(self.model, 'change')) - - # Try GET with model-level permission - self.assertHttpStatus(self.client.get(self._get_url('edit', instance)), 200) - - # Try POST with model-level permission - request = { - 'path': self._get_url('edit', instance), - 'data': post_data(self.form_data), - } - self.assertHttpStatus(self.client.post(**request), 302) - self.assertInstanceEqual(self.model.objects.get(pk=instance.pk), self.form_data) + # @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) + # def test_edit_object_with_model_permission(self): + # instance = self.model.objects.first() + # + # # Assign model-level permission + # self.add_permissions(get_permission_for_model(self.model, 'change')) + # + # # Try GET with model-level permission + # self.assertHttpStatus(self.client.get(self._get_url('edit', instance)), 200) + # + # # Try POST with model-level permission + # request = { + # 'path': self._get_url('edit', instance), + # 'data': post_data(self.form_data), + # } + # self.assertHttpStatus(self.client.post(**request), 302) + # self.assertInstanceEqual(self.model.objects.get(pk=instance.pk), self.form_data) @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) def test_edit_object_with_object_permission(self): @@ -368,24 +368,24 @@ class ViewTestCases: with disable_warnings('django.request'): self.assertHttpStatus(self.client.post(**request), 403) - @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) - def test_delete_object_with_model_permission(self): - instance = self.model.objects.first() - - # Assign model-level permission - self.add_permissions(get_permission_for_model(self.model, 'delete')) - - # Try GET with model-level permission - self.assertHttpStatus(self.client.get(self._get_url('delete', instance)), 200) - - # Try POST with model-level permission - request = { - 'path': self._get_url('delete', instance), - 'data': post_data({'confirm': True}), - } - self.assertHttpStatus(self.client.post(**request), 302) - with self.assertRaises(ObjectDoesNotExist): - self.model.objects.get(pk=instance.pk) + # @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) + # def test_delete_object_with_model_permission(self): + # instance = self.model.objects.first() + # + # # Assign model-level permission + # self.add_permissions(get_permission_for_model(self.model, 'delete')) + # + # # Try GET with model-level permission + # self.assertHttpStatus(self.client.get(self._get_url('delete', instance)), 200) + # + # # Try POST with model-level permission + # request = { + # 'path': self._get_url('delete', instance), + # 'data': post_data({'confirm': True}), + # } + # self.assertHttpStatus(self.client.post(**request), 302) + # with self.assertRaises(ObjectDoesNotExist): + # self.model.objects.get(pk=instance.pk) @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) def test_delete_object_with_object_permission(self): @@ -434,20 +434,20 @@ class ViewTestCases: with disable_warnings('django.request'): self.assertHttpStatus(self.client.get(self._get_url('list')), 403) - @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) - def test_list_objects_with_model_permission(self): - - # Add model-level permission - self.add_permissions(get_permission_for_model(self.model, 'view')) - - # Try GET with model-level permission - self.assertHttpStatus(self.client.get(self._get_url('list')), 200) - - # Built-in CSV export - if hasattr(self.model, 'csv_headers'): - response = self.client.get('{}?export'.format(self._get_url('list'))) - self.assertHttpStatus(response, 200) - self.assertEqual(response.get('Content-Type'), 'text/csv') + # @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) + # def test_list_objects_with_model_permission(self): + # + # # Add model-level permission + # self.add_permissions(get_permission_for_model(self.model, 'view')) + # + # # Try GET with model-level permission + # self.assertHttpStatus(self.client.get(self._get_url('list')), 200) + # + # # Built-in CSV export + # if hasattr(self.model, 'csv_headers'): + # response = self.client.get('{}?export'.format(self._get_url('list'))) + # self.assertHttpStatus(response, 200) + # self.assertEqual(response.get('Content-Type'), 'text/csv') @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) def test_list_objects_with_object_permission(self): @@ -528,22 +528,22 @@ class ViewTestCases: with disable_warnings('django.request'): self.assertHttpStatus(response, 403) - @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) - def test_bulk_import_objects_with_model_permission(self): - initial_count = self.model.objects.count() - data = { - 'csv': self._get_csv_data(), - } - - # Assign model-level permission - self.add_permissions(get_permission_for_model(self.model, 'add')) - - # Try GET with model-level permission - self.assertHttpStatus(self.client.get(self._get_url('import')), 200) - - # Test POST with permission - self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) - self.assertEqual(self.model.objects.count(), initial_count + len(self.csv_data) - 1) + # @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) + # def test_bulk_import_objects_with_model_permission(self): + # initial_count = self.model.objects.count() + # data = { + # 'csv': self._get_csv_data(), + # } + # + # # Assign model-level permission + # self.add_permissions(get_permission_for_model(self.model, 'add')) + # + # # Try GET with model-level permission + # self.assertHttpStatus(self.client.get(self._get_url('import')), 200) + # + # # Test POST with permission + # self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) + # self.assertEqual(self.model.objects.count(), initial_count + len(self.csv_data) - 1) @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) def test_bulk_import_objects_with_object_permission(self): @@ -589,24 +589,24 @@ class ViewTestCases: with disable_warnings('django.request'): self.assertHttpStatus(self.client.post(self._get_url('bulk_edit'), data), 403) - @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) - def test_bulk_edit_objects_with_model_permission(self): - pk_list = self.model.objects.values_list('pk', flat=True)[:3] - data = { - 'pk': pk_list, - '_apply': True, # Form button - } - - # Append the form data to the request - data.update(post_data(self.bulk_edit_data)) - - # Assign model-level permission - self.add_permissions(get_permission_for_model(self.model, 'change')) - - # Try POST with model-level permission - self.assertHttpStatus(self.client.post(self._get_url('bulk_edit'), data), 302) - for i, instance in enumerate(self.model.objects.filter(pk__in=pk_list)): - self.assertInstanceEqual(instance, self.bulk_edit_data) + # @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) + # def test_bulk_edit_objects_with_model_permission(self): + # pk_list = self.model.objects.values_list('pk', flat=True)[:3] + # data = { + # 'pk': pk_list, + # '_apply': True, # Form button + # } + # + # # Append the form data to the request + # data.update(post_data(self.bulk_edit_data)) + # + # # Assign model-level permission + # self.add_permissions(get_permission_for_model(self.model, 'change')) + # + # # Try POST with model-level permission + # self.assertHttpStatus(self.client.post(self._get_url('bulk_edit'), data), 302) + # for i, instance in enumerate(self.model.objects.filter(pk__in=pk_list)): + # self.assertInstanceEqual(instance, self.bulk_edit_data) @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) def test_bulk_edit_objects_with_object_permission(self): @@ -656,21 +656,21 @@ class ViewTestCases: with disable_warnings('django.request'): self.assertHttpStatus(self.client.post(self._get_url('bulk_delete'), data), 403) - @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) - def test_bulk_delete_objects_with_model_permission(self): - pk_list = self.model.objects.values_list('pk', flat=True) - data = { - 'pk': pk_list, - 'confirm': True, - '_confirm': True, # Form button - } - - # Assign model-level permission - self.add_permissions(get_permission_for_model(self.model, 'delete')) - - # Try POST with model-level permission - self.assertHttpStatus(self.client.post(self._get_url('bulk_delete'), data), 302) - self.assertEqual(self.model.objects.count(), 0) + # @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) + # def test_bulk_delete_objects_with_model_permission(self): + # pk_list = self.model.objects.values_list('pk', flat=True) + # data = { + # 'pk': pk_list, + # 'confirm': True, + # '_confirm': True, # Form button + # } + # + # # Assign model-level permission + # self.add_permissions(get_permission_for_model(self.model, 'delete')) + # + # # Try POST with model-level permission + # self.assertHttpStatus(self.client.post(self._get_url('bulk_delete'), data), 302) + # self.assertEqual(self.model.objects.count(), 0) @override_settings(EXEMPT_VIEW_PERMISSIONS=[]) def test_bulk_delete_objects_with_object_permission(self): diff --git a/netbox/utilities/views.py b/netbox/utilities/views.py index cbedecd4d..6e93c2369 100644 --- a/netbox/utilities/views.py +++ b/netbox/utilities/views.py @@ -8,7 +8,7 @@ from django.contrib.contenttypes.models import ContentType from django.contrib.auth.mixins import AccessMixin from django.core.exceptions import FieldDoesNotExist, ImproperlyConfigured, ObjectDoesNotExist, ValidationError from django.db import transaction, IntegrityError -from django.db.models import ManyToManyField, ProtectedError +from django.db.models import ManyToManyField, ProtectedError, Q from django.forms import Form, ModelMultipleChoiceField, MultipleHiddenInput, Textarea from django.http import HttpResponse, HttpResponseServerError from django.shortcuts import get_object_or_404, redirect, render @@ -65,22 +65,16 @@ class ObjectPermissionRequiredMixin(AccessMixin): if not user.has_perms((permission_required, *self.additional_permissions)): return False - # Superusers implicitly have all permissions - if user.is_superuser: - return True - - # Determine whether the permission is model-level or object-level. Model-level permissions grant the - # specified action to *all* objects, so no further action is needed. - if permission_required in {*user._user_perm_cache, *user._group_perm_cache}: - return True - - # If the permission is granted only at the object level, filter the view's queryset to return only objects - # on which the user is permitted to perform the specified action. - attrs = ObjectPermission.objects.get_attr_constraints(user, permission_required) - if attrs: - # Update the view's QuerySet to filter only the permitted objects + # Update the view's QuerySet to filter only the permitted objects + if user.is_authenticated: + obj_perm_attrs = user._object_perm_cache[permission_required] + attrs = Q() + for perm_attrs in obj_perm_attrs: + if perm_attrs: + attrs |= Q(**perm_attrs) self.queryset = self.queryset.filter(attrs) - return True + + return True def dispatch(self, request, *args, **kwargs):