1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00

Disable built-in model permissions

This commit is contained in:
Jeremy Stretch
2020-05-27 10:48:56 -04:00
parent 03da9348e5
commit 5dddf6846b
6 changed files with 197 additions and 190 deletions

View File

@ -339,7 +339,7 @@ TEMPLATES = [
# Set up authentication backends # Set up authentication backends
AUTHENTICATION_BACKENDS = [ AUTHENTICATION_BACKENDS = [
'utilities.auth_backends.ObjectPermissionBackend', 'utilities.auth_backends.ObjectPermissionBackend',
REMOTE_AUTH_BACKEND, # REMOTE_AUTH_BACKEND,
] ]
# Internationalization # Internationalization

View File

@ -1,4 +1,4 @@
# Generated by Django 3.0.6 on 2020-05-08 20:18 # Generated by Django 3.0.6 on 2020-05-27 14:17
from django.conf import settings from django.conf import settings
import django.contrib.postgres.fields.jsonb import django.contrib.postgres.fields.jsonb
@ -9,9 +9,9 @@ import django.db.models.deletion
class Migration(migrations.Migration): class Migration(migrations.Migration):
dependencies = [ dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('auth', '0011_update_proxy_permissions'),
('contenttypes', '0002_remove_content_type_name'), ('contenttypes', '0002_remove_content_type_name'),
('auth', '0011_update_proxy_permissions'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('users', '0006_create_userconfigs'), ('users', '0006_create_userconfigs'),
] ]
@ -20,7 +20,7 @@ class Migration(migrations.Migration):
name='ObjectPermission', name='ObjectPermission',
fields=[ fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False)), ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False)),
('attrs', django.contrib.postgres.fields.jsonb.JSONField()), ('attrs', django.contrib.postgres.fields.jsonb.JSONField(blank=True, null=True)),
('can_view', models.BooleanField(default=False)), ('can_view', models.BooleanField(default=False)),
('can_add', models.BooleanField(default=False)), ('can_add', models.BooleanField(default=False)),
('can_change', models.BooleanField(default=False)), ('can_change', models.BooleanField(default=False)),

View File

@ -240,6 +240,8 @@ class ObjectPermission(models.Model):
on_delete=models.CASCADE on_delete=models.CASCADE
) )
attrs = JSONField( attrs = JSONField(
blank=True,
null=True,
verbose_name='Attributes' verbose_name='Attributes'
) )
can_view = models.BooleanField( can_view = models.BooleanField(
@ -264,10 +266,11 @@ class ObjectPermission(models.Model):
# Validate the specified model attributes by attempting to execute a query. We don't care whether the query # Validate the specified model attributes by attempting to execute a query. We don't care whether the query
# returns anything; we just want to make sure the specified attributes are valid. # returns anything; we just want to make sure the specified attributes are valid.
model = self.model.model_class() if self.attrs:
try: model = self.model.model_class()
model.objects.filter(**self.attrs).exists() try:
except FieldError as e: model.objects.filter(**self.attrs).exists()
raise ValidationError({ except FieldError as e:
'attrs': f'Invalid attributes for {model}: {e}' raise ValidationError({
}) 'attrs': f'Invalid attributes for {model}: {e}'
})

View File

@ -12,43 +12,53 @@ class ObjectPermissionBackend(ModelBackend):
def get_object_permissions(self, user_obj): def get_object_permissions(self, user_obj):
""" """
Return all model-level permissions granted to the user by an ObjectPermission. Return all permissions granted to the user by an ObjectPermission.
""" """
if not hasattr(user_obj, '_object_perm_cache'): if not hasattr(user_obj, '_object_perm_cache'):
# Cache all assigned ObjectPermissions on the User instance # Retrieve all assigned ObjectPermissions
perms = set() object_permissions = ObjectPermission.objects.filter(
for obj_perm in ObjectPermission.objects.filter(
Q(users=user_obj) | Q(users=user_obj) |
Q(groups__user=user_obj) Q(groups__user=user_obj)
).prefetch_related('model'): ).prefetch_related('model')
# Create a dictionary mapping permissions to their attributes
perms = dict()
for obj_perm in object_permissions:
for action in ['view', 'add', 'change', 'delete']: for action in ['view', 'add', 'change', 'delete']:
if getattr(obj_perm, f"can_{action}"): if getattr(obj_perm, f"can_{action}"):
perms.add(f"{obj_perm.model.app_label}.{action}_{obj_perm.model.model}") perm_name = f"{obj_perm.model.app_label}.{action}_{obj_perm.model.model}"
if perm_name in perms:
perms[perm_name].append(obj_perm.attrs)
else:
perms[perm_name] = [obj_perm.attrs]
# Cache resolved permissions on the User instance
setattr(user_obj, '_object_perm_cache', perms) setattr(user_obj, '_object_perm_cache', perms)
return user_obj._object_perm_cache return user_obj._object_perm_cache
def get_all_permissions(self, user_obj, obj=None): # def get_all_permissions(self, user_obj, obj=None):
#
# Handle inactive/anonymous users # # Handle inactive/anonymous users
if not user_obj.is_active or user_obj.is_anonymous: # if not user_obj.is_active or user_obj.is_anonymous:
return set() # return set()
#
# Cache model-level permissions on the User instance # # Cache object permissions on the User instance
if not hasattr(user_obj, '_perm_cache'): # if not hasattr(user_obj, '_perm_cache'):
user_obj._perm_cache = { # user_obj._perm_cache = self.get_object_permissions(user_obj)
*self.get_user_permissions(user_obj, obj=obj), #
*self.get_group_permissions(user_obj, obj=obj), # return user_obj._perm_cache
*self.get_object_permissions(user_obj)
}
return user_obj._perm_cache
def has_perm(self, user_obj, perm, obj=None): def has_perm(self, user_obj, perm, obj=None):
# print(f'has_perm({perm})')
app_label, codename = perm.split('.') app_label, codename = perm.split('.')
action, model_name = codename.split('_') action, model_name = codename.split('_')
# Superusers implicitly have all permissions
if user_obj.is_active and user_obj.is_superuser:
return True
# If this is a view permission, check whether the model has been exempted from enforcement # If this is a view permission, check whether the model has been exempted from enforcement
if action == 'view': if action == 'view':
if ( if (
@ -60,29 +70,29 @@ class ObjectPermissionBackend(ModelBackend):
): ):
return True return True
# If no object is specified, evaluate model-level permissions. The presence of a permission in this set tells # Handle inactive/anonymous users
# us that the user has permission for *some* objects, but not necessarily a specific object. if not user_obj.is_active or user_obj.is_anonymous:
return False
# If no applicable ObjectPermissions have been created for this user/permission, deny permission
if perm not in self.get_object_permissions(user_obj):
return False
# If no object has been specified, grant permission. (The presence of a permission in this set tells
# us that the user has permission for *some* objects, but not necessarily a specific object.)
if obj is None: if obj is None:
return perm in self.get_all_permissions(user_obj) return True
# Sanity check: Ensure that the requested permission applies to the specified object # Sanity check: Ensure that the requested permission applies to the specified object
model = obj._meta.model model = obj._meta.model
if model._meta.label_lower != '.'.join((app_label, model_name)): if model._meta.label_lower != '.'.join((app_label, model_name)):
raise ValueError(f"Invalid permission {perm} for model {model}") raise ValueError(f"Invalid permission {perm} for model {model}")
# If the user has been granted model-level permission for the object, return True # Compile a query filter that matches all instances of the specified model
model_perms = { obj_perm_attrs = self.get_object_permissions(user_obj)[perm]
*self.get_user_permissions(user_obj), attrs = Q()
*self.get_group_permissions(user_obj), for perm_attrs in obj_perm_attrs:
} attrs |= Q(**perm_attrs.attrs)
if perm in model_perms:
return True
# Gather all ObjectPermissions pertinent to the requested permission. If none are found, the User has no
# applicable permissions.
attrs = ObjectPermission.objects.get_attr_constraints(user_obj, perm)
if not attrs:
return False
# Permission to perform the requested action on the object depends on whether the specified object matches # Permission to perform the requested action on the object depends on whether the specified object matches
# the specified attributes. Note that this check is made against the *database* record representing the object, # the specified attributes. Note that this check is made against the *database* record representing the object,

View File

@ -159,15 +159,15 @@ class ViewTestCases:
with disable_warnings('django.request'): with disable_warnings('django.request'):
self.assertHttpStatus(self.client.get(instance.get_absolute_url()), 403) self.assertHttpStatus(self.client.get(instance.get_absolute_url()), 403)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[]) # @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_get_object_with_model_permission(self): # def test_get_object_with_model_permission(self):
instance = self.model.objects.first() # instance = self.model.objects.first()
#
# Add model-level permission # # Add model-level permission
self.add_permissions(get_permission_for_model(self.model, 'view')) # self.add_permissions(get_permission_for_model(self.model, 'view'))
#
# Try GET with model-level permission # # Try GET with model-level permission
self.assertHttpStatus(self.client.get(instance.get_absolute_url()), 200) # self.assertHttpStatus(self.client.get(instance.get_absolute_url()), 200)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[]) @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_get_object_with_object_permission(self): def test_get_object_with_object_permission(self):
@ -217,24 +217,24 @@ class ViewTestCases:
with disable_warnings('django.request'): with disable_warnings('django.request'):
self.assertHttpStatus(response, 403) self.assertHttpStatus(response, 403)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[]) # @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_create_object_with_model_permission(self): # def test_create_object_with_model_permission(self):
initial_count = self.model.objects.count() # initial_count = self.model.objects.count()
#
# Assign model-level permission # # Assign model-level permission
self.add_permissions(get_permission_for_model(self.model, 'add')) # self.add_permissions(get_permission_for_model(self.model, 'add'))
#
# Try GET with model-level permission # # Try GET with model-level permission
self.assertHttpStatus(self.client.get(self._get_url('add')), 200) # self.assertHttpStatus(self.client.get(self._get_url('add')), 200)
#
# Try POST with model-level permission # # Try POST with model-level permission
request = { # request = {
'path': self._get_url('add'), # 'path': self._get_url('add'),
'data': post_data(self.form_data), # 'data': post_data(self.form_data),
} # }
self.assertHttpStatus(self.client.post(**request), 302) # self.assertHttpStatus(self.client.post(**request), 302)
self.assertEqual(initial_count + 1, self.model.objects.count()) # self.assertEqual(initial_count + 1, self.model.objects.count())
self.assertInstanceEqual(self.model.objects.order_by('pk').last(), self.form_data) # self.assertInstanceEqual(self.model.objects.order_by('pk').last(), self.form_data)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[]) @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_create_object_with_object_permission(self): def test_create_object_with_object_permission(self):
@ -296,23 +296,23 @@ class ViewTestCases:
with disable_warnings('django.request'): with disable_warnings('django.request'):
self.assertHttpStatus(self.client.post(**request), 403) self.assertHttpStatus(self.client.post(**request), 403)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[]) # @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_edit_object_with_model_permission(self): # def test_edit_object_with_model_permission(self):
instance = self.model.objects.first() # instance = self.model.objects.first()
#
# Assign model-level permission # # Assign model-level permission
self.add_permissions(get_permission_for_model(self.model, 'change')) # self.add_permissions(get_permission_for_model(self.model, 'change'))
#
# Try GET with model-level permission # # Try GET with model-level permission
self.assertHttpStatus(self.client.get(self._get_url('edit', instance)), 200) # self.assertHttpStatus(self.client.get(self._get_url('edit', instance)), 200)
#
# Try POST with model-level permission # # Try POST with model-level permission
request = { # request = {
'path': self._get_url('edit', instance), # 'path': self._get_url('edit', instance),
'data': post_data(self.form_data), # 'data': post_data(self.form_data),
} # }
self.assertHttpStatus(self.client.post(**request), 302) # self.assertHttpStatus(self.client.post(**request), 302)
self.assertInstanceEqual(self.model.objects.get(pk=instance.pk), self.form_data) # self.assertInstanceEqual(self.model.objects.get(pk=instance.pk), self.form_data)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[]) @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_edit_object_with_object_permission(self): def test_edit_object_with_object_permission(self):
@ -368,24 +368,24 @@ class ViewTestCases:
with disable_warnings('django.request'): with disable_warnings('django.request'):
self.assertHttpStatus(self.client.post(**request), 403) self.assertHttpStatus(self.client.post(**request), 403)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[]) # @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_delete_object_with_model_permission(self): # def test_delete_object_with_model_permission(self):
instance = self.model.objects.first() # instance = self.model.objects.first()
#
# Assign model-level permission # # Assign model-level permission
self.add_permissions(get_permission_for_model(self.model, 'delete')) # self.add_permissions(get_permission_for_model(self.model, 'delete'))
#
# Try GET with model-level permission # # Try GET with model-level permission
self.assertHttpStatus(self.client.get(self._get_url('delete', instance)), 200) # self.assertHttpStatus(self.client.get(self._get_url('delete', instance)), 200)
#
# Try POST with model-level permission # # Try POST with model-level permission
request = { # request = {
'path': self._get_url('delete', instance), # 'path': self._get_url('delete', instance),
'data': post_data({'confirm': True}), # 'data': post_data({'confirm': True}),
} # }
self.assertHttpStatus(self.client.post(**request), 302) # self.assertHttpStatus(self.client.post(**request), 302)
with self.assertRaises(ObjectDoesNotExist): # with self.assertRaises(ObjectDoesNotExist):
self.model.objects.get(pk=instance.pk) # self.model.objects.get(pk=instance.pk)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[]) @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_delete_object_with_object_permission(self): def test_delete_object_with_object_permission(self):
@ -434,20 +434,20 @@ class ViewTestCases:
with disable_warnings('django.request'): with disable_warnings('django.request'):
self.assertHttpStatus(self.client.get(self._get_url('list')), 403) self.assertHttpStatus(self.client.get(self._get_url('list')), 403)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[]) # @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_list_objects_with_model_permission(self): # def test_list_objects_with_model_permission(self):
#
# Add model-level permission # # Add model-level permission
self.add_permissions(get_permission_for_model(self.model, 'view')) # self.add_permissions(get_permission_for_model(self.model, 'view'))
#
# Try GET with model-level permission # # Try GET with model-level permission
self.assertHttpStatus(self.client.get(self._get_url('list')), 200) # self.assertHttpStatus(self.client.get(self._get_url('list')), 200)
#
# Built-in CSV export # # Built-in CSV export
if hasattr(self.model, 'csv_headers'): # if hasattr(self.model, 'csv_headers'):
response = self.client.get('{}?export'.format(self._get_url('list'))) # response = self.client.get('{}?export'.format(self._get_url('list')))
self.assertHttpStatus(response, 200) # self.assertHttpStatus(response, 200)
self.assertEqual(response.get('Content-Type'), 'text/csv') # self.assertEqual(response.get('Content-Type'), 'text/csv')
@override_settings(EXEMPT_VIEW_PERMISSIONS=[]) @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_list_objects_with_object_permission(self): def test_list_objects_with_object_permission(self):
@ -528,22 +528,22 @@ class ViewTestCases:
with disable_warnings('django.request'): with disable_warnings('django.request'):
self.assertHttpStatus(response, 403) self.assertHttpStatus(response, 403)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[]) # @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_bulk_import_objects_with_model_permission(self): # def test_bulk_import_objects_with_model_permission(self):
initial_count = self.model.objects.count() # initial_count = self.model.objects.count()
data = { # data = {
'csv': self._get_csv_data(), # 'csv': self._get_csv_data(),
} # }
#
# Assign model-level permission # # Assign model-level permission
self.add_permissions(get_permission_for_model(self.model, 'add')) # self.add_permissions(get_permission_for_model(self.model, 'add'))
#
# Try GET with model-level permission # # Try GET with model-level permission
self.assertHttpStatus(self.client.get(self._get_url('import')), 200) # self.assertHttpStatus(self.client.get(self._get_url('import')), 200)
#
# Test POST with permission # # Test POST with permission
self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200) # self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200)
self.assertEqual(self.model.objects.count(), initial_count + len(self.csv_data) - 1) # self.assertEqual(self.model.objects.count(), initial_count + len(self.csv_data) - 1)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[]) @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_bulk_import_objects_with_object_permission(self): def test_bulk_import_objects_with_object_permission(self):
@ -589,24 +589,24 @@ class ViewTestCases:
with disable_warnings('django.request'): with disable_warnings('django.request'):
self.assertHttpStatus(self.client.post(self._get_url('bulk_edit'), data), 403) self.assertHttpStatus(self.client.post(self._get_url('bulk_edit'), data), 403)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[]) # @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_bulk_edit_objects_with_model_permission(self): # def test_bulk_edit_objects_with_model_permission(self):
pk_list = self.model.objects.values_list('pk', flat=True)[:3] # pk_list = self.model.objects.values_list('pk', flat=True)[:3]
data = { # data = {
'pk': pk_list, # 'pk': pk_list,
'_apply': True, # Form button # '_apply': True, # Form button
} # }
#
# Append the form data to the request # # Append the form data to the request
data.update(post_data(self.bulk_edit_data)) # data.update(post_data(self.bulk_edit_data))
#
# Assign model-level permission # # Assign model-level permission
self.add_permissions(get_permission_for_model(self.model, 'change')) # self.add_permissions(get_permission_for_model(self.model, 'change'))
#
# Try POST with model-level permission # # Try POST with model-level permission
self.assertHttpStatus(self.client.post(self._get_url('bulk_edit'), data), 302) # self.assertHttpStatus(self.client.post(self._get_url('bulk_edit'), data), 302)
for i, instance in enumerate(self.model.objects.filter(pk__in=pk_list)): # for i, instance in enumerate(self.model.objects.filter(pk__in=pk_list)):
self.assertInstanceEqual(instance, self.bulk_edit_data) # self.assertInstanceEqual(instance, self.bulk_edit_data)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[]) @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_bulk_edit_objects_with_object_permission(self): def test_bulk_edit_objects_with_object_permission(self):
@ -656,21 +656,21 @@ class ViewTestCases:
with disable_warnings('django.request'): with disable_warnings('django.request'):
self.assertHttpStatus(self.client.post(self._get_url('bulk_delete'), data), 403) self.assertHttpStatus(self.client.post(self._get_url('bulk_delete'), data), 403)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[]) # @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_bulk_delete_objects_with_model_permission(self): # def test_bulk_delete_objects_with_model_permission(self):
pk_list = self.model.objects.values_list('pk', flat=True) # pk_list = self.model.objects.values_list('pk', flat=True)
data = { # data = {
'pk': pk_list, # 'pk': pk_list,
'confirm': True, # 'confirm': True,
'_confirm': True, # Form button # '_confirm': True, # Form button
} # }
#
# Assign model-level permission # # Assign model-level permission
self.add_permissions(get_permission_for_model(self.model, 'delete')) # self.add_permissions(get_permission_for_model(self.model, 'delete'))
#
# Try POST with model-level permission # # Try POST with model-level permission
self.assertHttpStatus(self.client.post(self._get_url('bulk_delete'), data), 302) # self.assertHttpStatus(self.client.post(self._get_url('bulk_delete'), data), 302)
self.assertEqual(self.model.objects.count(), 0) # self.assertEqual(self.model.objects.count(), 0)
@override_settings(EXEMPT_VIEW_PERMISSIONS=[]) @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
def test_bulk_delete_objects_with_object_permission(self): def test_bulk_delete_objects_with_object_permission(self):

View File

@ -8,7 +8,7 @@ from django.contrib.contenttypes.models import ContentType
from django.contrib.auth.mixins import AccessMixin from django.contrib.auth.mixins import AccessMixin
from django.core.exceptions import FieldDoesNotExist, ImproperlyConfigured, ObjectDoesNotExist, ValidationError from django.core.exceptions import FieldDoesNotExist, ImproperlyConfigured, ObjectDoesNotExist, ValidationError
from django.db import transaction, IntegrityError from django.db import transaction, IntegrityError
from django.db.models import ManyToManyField, ProtectedError from django.db.models import ManyToManyField, ProtectedError, Q
from django.forms import Form, ModelMultipleChoiceField, MultipleHiddenInput, Textarea from django.forms import Form, ModelMultipleChoiceField, MultipleHiddenInput, Textarea
from django.http import HttpResponse, HttpResponseServerError from django.http import HttpResponse, HttpResponseServerError
from django.shortcuts import get_object_or_404, redirect, render from django.shortcuts import get_object_or_404, redirect, render
@ -65,22 +65,16 @@ class ObjectPermissionRequiredMixin(AccessMixin):
if not user.has_perms((permission_required, *self.additional_permissions)): if not user.has_perms((permission_required, *self.additional_permissions)):
return False return False
# Superusers implicitly have all permissions # Update the view's QuerySet to filter only the permitted objects
if user.is_superuser: if user.is_authenticated:
return True obj_perm_attrs = user._object_perm_cache[permission_required]
attrs = Q()
# Determine whether the permission is model-level or object-level. Model-level permissions grant the for perm_attrs in obj_perm_attrs:
# specified action to *all* objects, so no further action is needed. if perm_attrs:
if permission_required in {*user._user_perm_cache, *user._group_perm_cache}: attrs |= Q(**perm_attrs)
return True
# If the permission is granted only at the object level, filter the view's queryset to return only objects
# on which the user is permitted to perform the specified action.
attrs = ObjectPermission.objects.get_attr_constraints(user, permission_required)
if attrs:
# Update the view's QuerySet to filter only the permitted objects
self.queryset = self.queryset.filter(attrs) self.queryset = self.queryset.filter(attrs)
return True
return True
def dispatch(self, request, *args, **kwargs): def dispatch(self, request, *args, **kwargs):