1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00

Closes #15464: Move permission assignments to user & group models (#15554)

* Move user & group M2M assignments for ObjectPermission

* Restore users & groups fields on ObjectPermission serializer
This commit is contained in:
Jeremy Stretch
2024-03-29 14:57:16 -04:00
committed by GitHub
parent 8767577ecd
commit c8d9d9358e
8 changed files with 255 additions and 87 deletions

View File

@ -85,8 +85,9 @@ class ValidatedModelSerializer(BaseModelSerializer):
attrs.pop('custom_fields', None)
# Skip ManyToManyFields
opts = self.Meta.model._meta
m2m_values = {}
for field in self.Meta.model._meta.local_many_to_many:
for field in [*opts.local_many_to_many, *opts.related_objects]:
if field.name in attrs:
m2m_values[field.name] = attrs.pop(field.name)

View File

@ -1,11 +1,10 @@
from django.contrib.auth import get_user_model
from rest_framework import serializers
from core.models import ObjectType
from netbox.api.fields import ContentTypeField, SerializedPKRelatedField
from netbox.api.serializers import ValidatedModelSerializer
from users.models import Group, ObjectPermission
from .users import GroupSerializer, UserSerializer
from users.api.nested_serializers import NestedGroupSerializer, NestedUserSerializer
from users.models import Group, ObjectPermission, User
__all__ = (
'ObjectPermissionSerializer',
@ -20,14 +19,14 @@ class ObjectPermissionSerializer(ValidatedModelSerializer):
)
groups = SerializedPKRelatedField(
queryset=Group.objects.all(),
serializer=GroupSerializer,
serializer=NestedGroupSerializer,
nested=True,
required=False,
many=True
)
users = SerializedPKRelatedField(
queryset=get_user_model().objects.all(),
serializer=UserSerializer,
queryset=User.objects.all(),
serializer=NestedUserSerializer,
nested=True,
required=False,
many=True
@ -36,9 +35,9 @@ class ObjectPermissionSerializer(ValidatedModelSerializer):
class Meta:
model = ObjectPermission
fields = (
'id', 'url', 'display', 'name', 'description', 'enabled', 'object_types', 'groups', 'users', 'actions',
'constraints',
'id', 'url', 'display', 'name', 'description', 'enabled', 'object_types', 'actions', 'constraints',
'groups', 'users',
)
brief_fields = (
'id', 'url', 'display', 'name', 'description', 'enabled', 'object_types', 'groups', 'users', 'actions',
'id', 'url', 'display', 'name', 'description', 'enabled', 'object_types', 'actions',
)

View File

@ -5,7 +5,8 @@ from rest_framework import serializers
from netbox.api.fields import SerializedPKRelatedField
from netbox.api.serializers import ValidatedModelSerializer
from users.models import Group
from users.models import Group, ObjectPermission
from .permissions import ObjectPermissionSerializer
__all__ = (
'GroupSerializer',
@ -16,10 +17,18 @@ __all__ = (
class GroupSerializer(ValidatedModelSerializer):
url = serializers.HyperlinkedIdentityField(view_name='users-api:group-detail')
user_count = serializers.IntegerField(read_only=True)
permissions = SerializedPKRelatedField(
source='object_permissions',
queryset=ObjectPermission.objects.all(),
serializer=ObjectPermissionSerializer,
nested=True,
required=False,
many=True
)
class Meta:
model = Group
fields = ('id', 'url', 'display', 'name', 'user_count')
fields = ('id', 'url', 'display', 'name', 'permissions', 'user_count')
brief_fields = ('id', 'url', 'display', 'name')
@ -32,12 +41,20 @@ class UserSerializer(ValidatedModelSerializer):
required=False,
many=True
)
permissions = SerializedPKRelatedField(
source='object_permissions',
queryset=ObjectPermission.objects.all(),
serializer=ObjectPermissionSerializer,
nested=True,
required=False,
many=True
)
class Meta:
model = get_user_model()
fields = (
'id', 'url', 'display', 'username', 'password', 'first_name', 'last_name', 'email', 'is_staff', 'is_active',
'date_joined', 'last_login', 'groups',
'date_joined', 'last_login', 'groups', 'permissions',
)
brief_fields = ('id', 'url', 'display', 'username')
extra_kwargs = {

View File

@ -203,9 +203,6 @@ class UserForm(forms.ModelForm):
super().__init__(*args, **kwargs)
if self.instance.pk:
# Populate assigned permissions
self.fields['object_permissions'].initial = self.instance.object_permissions.values_list('id', flat=True)
# Password fields are optional for existing Users
self.fields['password'].required = False
self.fields['confirm_password'].required = False
@ -213,9 +210,6 @@ class UserForm(forms.ModelForm):
def save(self, *args, **kwargs):
instance = super().save(*args, **kwargs)
# Update assigned permissions
instance.object_permissions.set(self.cleaned_data['object_permissions'])
# On edit, check if we have to save the password
if self.cleaned_data.get('password'):
instance.set_password(self.cleaned_data.get('password'))
@ -260,14 +254,12 @@ class GroupForm(forms.ModelForm):
# Populate assigned users and permissions
if self.instance.pk:
self.fields['users'].initial = self.instance.users.values_list('id', flat=True)
self.fields['object_permissions'].initial = self.instance.object_permissions.values_list('id', flat=True)
def save(self, *args, **kwargs):
instance = super().save(*args, **kwargs)
# Update assigned users and permissions
# Update assigned users
instance.users.set(self.cleaned_data['users'])
instance.object_permissions.set(self.cleaned_data['object_permissions'])
return instance
@ -335,9 +327,10 @@ class ObjectPermissionForm(forms.ModelForm):
# Make the actions field optional since the form uses it only for non-CRUD actions
self.fields['actions'].required = False
# Order group and user fields
self.fields['groups'].queryset = self.fields['groups'].queryset.order_by('name')
self.fields['users'].queryset = self.fields['users'].queryset.order_by('username')
# Populate assigned users and groups
if self.instance.pk:
self.fields['groups'].initial = self.instance.groups.values_list('id', flat=True)
self.fields['users'].initial = self.instance.users.values_list('id', flat=True)
# Check the appropriate checkboxes when editing an existing ObjectPermission
if self.instance.pk:
@ -381,3 +374,12 @@ class ObjectPermissionForm(forms.ModelForm):
raise forms.ValidationError({
'constraints': _('Invalid filter for {model}: {error}').format(model=model, error=e)
})
def save(self, *args, **kwargs):
instance = super().save(*args, **kwargs)
# Update assigned users and groups
instance.users.set(self.cleaned_data['users'])
instance.groups.set(self.cleaned_data['groups'])
return instance

View File

@ -0,0 +1,122 @@
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('users', '0007_objectpermission_update_object_types'),
]
operations = [
# Flip M2M assignments for ObjectPermission to Groups
migrations.SeparateDatabaseAndState(
state_operations=[
migrations.RemoveField(
model_name='objectpermission',
name='groups',
),
migrations.AddField(
model_name='group',
name='object_permissions',
field=models.ManyToManyField(blank=True, related_name='groups', to='users.objectpermission'),
),
],
database_operations=[
# Rename table
migrations.RunSQL(
"ALTER TABLE users_objectpermission_groups"
" RENAME TO users_group_object_permissions"
),
migrations.RunSQL(
"ALTER TABLE users_objectpermission_groups_id_seq"
" RENAME TO users_group_object_permissions_id_seq"
),
# Rename constraints
migrations.RunSQL(
"ALTER TABLE users_group_object_permissions RENAME CONSTRAINT "
"users_objectpermissi_group_id_fb7ba6e0_fk_users_gro TO "
"users_group_object_p_group_id_90dd183a_fk_users_gro"
),
migrations.RunSQL(
"ALTER TABLE users_group_object_permissions RENAME CONSTRAINT "
"users_objectpermissi_objectpermission_id_2f7cc117_fk_users_obj TO "
"users_group_object_p_objectpermission_id_dd489dc4_fk_users_obj"
),
# Rename indexes
migrations.RunSQL(
"ALTER INDEX users_objectpermission_groups_pkey "
" RENAME TO users_group_object_permissions_pkey"
),
migrations.RunSQL(
"ALTER INDEX users_objectpermission_g_objectpermission_id_grou_3b62a39c_uniq "
" RENAME TO users_group_object_permi_group_id_objectpermissio_db1f8cbe_uniq"
),
migrations.RunSQL(
"ALTER INDEX users_objectpermission_groups_group_id_fb7ba6e0"
" RENAME TO users_group_object_permissions_group_id_90dd183a"
),
migrations.RunSQL(
"ALTER INDEX users_objectpermission_groups_objectpermission_id_2f7cc117"
" RENAME TO users_group_object_permissions_objectpermission_id_dd489dc4"
),
]
),
# Flip M2M assignments for ObjectPermission to Users
migrations.SeparateDatabaseAndState(
state_operations=[
migrations.RemoveField(
model_name='objectpermission',
name='users',
),
migrations.AddField(
model_name='user',
name='object_permissions',
field=models.ManyToManyField(blank=True, related_name='users', to='users.objectpermission'),
),
],
database_operations=[
# Rename table
migrations.RunSQL(
"ALTER TABLE users_objectpermission_users"
" RENAME TO users_user_object_permissions"
),
migrations.RunSQL(
"ALTER TABLE users_objectpermission_users_id_seq"
" RENAME TO users_user_object_permissions_id_seq"
),
# Rename constraints
migrations.RunSQL(
"ALTER TABLE users_user_object_permissions RENAME CONSTRAINT "
"users_objectpermissi_objectpermission_id_78a9c2e6_fk_users_obj TO "
"users_user_object_pe_objectpermission_id_29b431b4_fk_users_obj"
),
migrations.RunSQL(
"ALTER TABLE users_user_object_permissions RENAME CONSTRAINT "
"users_objectpermission_users_user_id_16c0905d_fk_auth_user_id TO "
"users_user_object_permissions_user_id_9d647aac_fk_users_user_id"
),
# Rename indexes
migrations.RunSQL(
"ALTER INDEX users_objectpermission_users_pkey "
" RENAME TO users_user_object_permissions_pkey"
),
migrations.RunSQL(
"ALTER INDEX users_objectpermission_u_objectpermission_id_user_3a7db108_uniq "
" RENAME TO users_user_object_permis_user_id_objectpermission_0a98550e_uniq"
),
migrations.RunSQL(
"ALTER INDEX users_objectpermission_users_user_id_16c0905d"
" RENAME TO users_user_object_permissions_user_id_9d647aac"
),
migrations.RunSQL(
"ALTER INDEX users_objectpermission_users_objectpermission_id_78a9c2e6"
" RENAME TO users_user_object_permissions_objectpermission_id_29b431b4"
),
]
),
]

View File

@ -53,6 +53,11 @@ class Group(models.Model):
max_length=200,
blank=True
)
object_permissions = models.ManyToManyField(
to='users.ObjectPermission',
blank=True,
related_name='groups'
)
# Replicate legacy Django permissions support from stock Group model
# to ensure authentication backend compatibility
@ -92,6 +97,11 @@ class User(AbstractUser):
related_name='users',
related_query_name='user'
)
object_permissions = models.ManyToManyField(
to='users.ObjectPermission',
blank=True,
related_name='users'
)
objects = UserManager()
@ -387,16 +397,6 @@ class ObjectPermission(models.Model):
limit_choices_to=OBJECTPERMISSION_OBJECT_TYPES,
related_name='object_permissions'
)
groups = models.ManyToManyField(
to='users.Group',
blank=True,
related_name='object_permissions'
)
users = models.ManyToManyField(
to=get_user_model(),
blank=True,
related_name='object_permissions'
)
actions = ArrayField(
base_field=models.CharField(max_length=30),
help_text=_("The list of actions granted by this permission")

View File

@ -16,29 +16,13 @@ class AppTest(APITestCase):
url = reverse('users-api:api-root')
response = self.client.get(f'{url}?format=api', **self.header)
self.assertEqual(response.status_code, 200)
class UserTest(APIViewTestCases.APIViewTestCase):
model = User
view_namespace = 'users'
brief_fields = ['display', 'id', 'url', 'username']
validation_excluded_fields = ['password']
create_data = [
{
'username': 'User_4',
'password': 'password4',
},
{
'username': 'User_5',
'password': 'password5',
},
{
'username': 'User_6',
'password': 'password6',
},
]
bulk_update_data = {
'email': 'test@example.com',
}
@ -46,13 +30,41 @@ class UserTest(APIViewTestCases.APIViewTestCase):
@classmethod
def setUpTestData(cls):
permissions = (
ObjectPermission(name='Permission 1', actions=['view']),
ObjectPermission(name='Permission 2', actions=['view']),
ObjectPermission(name='Permission 3', actions=['view']),
)
ObjectPermission.objects.bulk_create(permissions)
permissions[0].object_types.add(ObjectType.objects.get_by_natural_key('dcim', 'site'))
permissions[1].object_types.add(ObjectType.objects.get_by_natural_key('dcim', 'location'))
permissions[2].object_types.add(ObjectType.objects.get_by_natural_key('dcim', 'rack'))
users = (
User(username='User_1', password='password1'),
User(username='User_2', password='password2'),
User(username='User_3', password='password3'),
User(username='User1', password='password1'),
User(username='User2', password='password2'),
User(username='User3', password='password3'),
)
User.objects.bulk_create(users)
cls.create_data = [
{
'username': 'User4',
'password': 'password4',
'permissions': [permissions[0].pk],
},
{
'username': 'User5',
'password': 'password5',
'permissions': [permissions[1].pk],
},
{
'username': 'User6',
'password': 'password6',
'permissions': [permissions[2].pk],
},
]
def test_that_password_is_changed(self):
"""
Test that password is changed
@ -67,7 +79,7 @@ class UserTest(APIViewTestCases.APIViewTestCase):
obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
user_credentials = {
'username': 'user1',
'username': 'newuser',
'password': 'abc123',
}
user = User.objects.create_user(**user_credentials)
@ -76,41 +88,56 @@ class UserTest(APIViewTestCases.APIViewTestCase):
'password': 'newpassword'
}
url = reverse('users-api:user-detail', kwargs={'pk': user.id})
response = self.client.patch(url, data, format='json', **self.header)
self.assertEqual(response.status_code, 200)
updated_user = User.objects.get(id=user.id)
self.assertTrue(updated_user.check_password(data['password']))
user.refresh_from_db()
self.assertTrue(user.check_password(data['password']))
class GroupTest(APIViewTestCases.APIViewTestCase):
model = Group
view_namespace = 'users'
brief_fields = ['display', 'id', 'name', 'url']
create_data = [
{
'name': 'Group 4',
},
{
'name': 'Group 5',
},
{
'name': 'Group 6',
},
]
@classmethod
def setUpTestData(cls):
users = (
permissions = (
ObjectPermission(name='Permission 1', actions=['view']),
ObjectPermission(name='Permission 2', actions=['view']),
ObjectPermission(name='Permission 3', actions=['view']),
)
ObjectPermission.objects.bulk_create(permissions)
permissions[0].object_types.add(ObjectType.objects.get_by_natural_key('dcim', 'site'))
permissions[1].object_types.add(ObjectType.objects.get_by_natural_key('dcim', 'location'))
permissions[2].object_types.add(ObjectType.objects.get_by_natural_key('dcim', 'rack'))
groups = (
Group(name='Group 1'),
Group(name='Group 2'),
Group(name='Group 3'),
)
Group.objects.bulk_create(users)
Group.objects.bulk_create(groups)
cls.create_data = [
{
'name': 'Group 4',
'permissions': [permissions[0].pk],
},
{
'name': 'Group 5',
'permissions': [permissions[1].pk],
},
{
'name': 'Group 6',
'permissions': [permissions[2].pk],
},
]
def model_to_dict(self, instance, *args, **kwargs):
# Overwrite permissions attr to work around the serializer field having a different name
data = super().model_to_dict(instance, *args, **kwargs)
data['permissions'] = list(instance.object_permissions.values_list('id', flat=True))
return data
def test_bulk_update_objects(self):
"""
@ -142,9 +169,9 @@ class TokenTest(
@classmethod
def setUpTestData(cls):
users = (
create_test_user('User 1'),
create_test_user('User 2'),
create_test_user('User 3'),
create_test_user('User1'),
create_test_user('User2'),
create_test_user('User3'),
)
tokens = (
@ -240,9 +267,7 @@ class ObjectPermissionTest(
APIViewTestCases.DeleteObjectViewTestCase
):
model = ObjectPermission
brief_fields = [
'actions', 'description', 'display', 'enabled', 'groups', 'id', 'name', 'object_types', 'url', 'users',
]
brief_fields = ['actions', 'description', 'display', 'enabled', 'id', 'name', 'object_types', 'url']
@classmethod
def setUpTestData(cls):
@ -255,9 +280,9 @@ class ObjectPermissionTest(
Group.objects.bulk_create(groups)
users = (
User(username='User 1', is_active=True),
User(username='User 2', is_active=True),
User(username='User 3', is_active=True),
User(username='User1', is_active=True),
User(username='User2', is_active=True),
User(username='User3', is_active=True),
)
User.objects.bulk_create(users)

View File

@ -4,7 +4,7 @@ from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
from django.contrib.postgres.fields import ArrayField
from django.core.exceptions import FieldDoesNotExist
from django.db.models import ManyToManyField, JSONField
from django.db.models import ManyToManyField, ManyToManyRel, JSONField
from django.forms.models import model_to_dict
from django.test import Client, TestCase as _TestCase
from netaddr import IPNetwork
@ -111,8 +111,10 @@ class ModelTestCase(TestCase):
continue
# Handle ManyToManyFields
if value and type(field) in (ManyToManyField, TaggableManager):
if value and type(field) in (ManyToManyField, ManyToManyRel, TaggableManager):
# Resolve reverse M2M relationships
if isinstance(field, ManyToManyRel):
value = getattr(instance, field.related_name).all()
if field.related_model in (ContentType, ObjectType) and api:
model_dict[key] = sorted([object_type_identifier(ot) for ot in value])
else: