mirror of
https://github.com/netbox-community/netbox.git
synced 2024-05-10 07:54:54 +00:00
9856 merge feature
This commit is contained in:
@ -1,11 +1,12 @@
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import authenticate
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.models import Group
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from drf_spectacular.utils import extend_schema_field
|
||||
from drf_spectacular.types import OpenApiTypes
|
||||
from rest_framework import serializers
|
||||
from rest_framework.exceptions import PermissionDenied
|
||||
from rest_framework.exceptions import AuthenticationFailed, PermissionDenied
|
||||
|
||||
from netbox.api.fields import ContentTypeField, IPNetworkSerializer, SerializedPKRelatedField
|
||||
from netbox.api.serializers import ValidatedModelSerializer
|
||||
@ -51,6 +52,16 @@ class UserSerializer(ValidatedModelSerializer):
|
||||
|
||||
return user
|
||||
|
||||
def update(self, instance, validated_data):
|
||||
"""
|
||||
Ensure proper updated password hash generation.
|
||||
"""
|
||||
password = validated_data.pop('password', None)
|
||||
if password is not None:
|
||||
instance.set_password(password)
|
||||
|
||||
return super().update(instance, validated_data)
|
||||
|
||||
@extend_schema_field(OpenApiTypes.STR)
|
||||
def get_display(self, obj):
|
||||
if full_name := obj.get_full_name():
|
||||
@ -107,9 +118,42 @@ class TokenSerializer(ValidatedModelSerializer):
|
||||
return super().validate(data)
|
||||
|
||||
|
||||
class TokenProvisionSerializer(serializers.Serializer):
|
||||
username = serializers.CharField()
|
||||
password = serializers.CharField()
|
||||
class TokenProvisionSerializer(TokenSerializer):
|
||||
user = NestedUserSerializer(
|
||||
read_only=True
|
||||
)
|
||||
username = serializers.CharField(
|
||||
write_only=True
|
||||
)
|
||||
password = serializers.CharField(
|
||||
write_only=True
|
||||
)
|
||||
last_used = serializers.DateTimeField(
|
||||
read_only=True
|
||||
)
|
||||
key = serializers.CharField(
|
||||
read_only=True
|
||||
)
|
||||
|
||||
class Meta:
|
||||
model = Token
|
||||
fields = (
|
||||
'id', 'url', 'display', 'user', 'created', 'expires', 'last_used', 'key', 'write_enabled', 'description',
|
||||
'allowed_ips', 'username', 'password',
|
||||
)
|
||||
|
||||
def validate(self, data):
|
||||
# Validate the username and password
|
||||
username = data.pop('username')
|
||||
password = data.pop('password')
|
||||
user = authenticate(request=self.context.get('request'), username=username, password=password)
|
||||
if user is None:
|
||||
raise AuthenticationFailed("Invalid username/password")
|
||||
|
||||
# Inject the user into the validated data
|
||||
data['user'] = user
|
||||
|
||||
return data
|
||||
|
||||
|
||||
class ObjectPermissionSerializer(ValidatedModelSerializer):
|
||||
|
@ -1,3 +1,4 @@
|
||||
import logging
|
||||
from django.contrib.auth import authenticate
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.models import Group
|
||||
@ -60,31 +61,24 @@ class TokenProvisionView(APIView):
|
||||
"""
|
||||
permission_classes = []
|
||||
|
||||
# @extend_schema(methods=["post"], responses={201: serializers.TokenSerializer})
|
||||
@extend_schema(
|
||||
request=serializers.TokenProvisionSerializer,
|
||||
responses={
|
||||
201: serializers.TokenProvisionSerializer,
|
||||
401: OpenApiTypes.OBJECT,
|
||||
}
|
||||
)
|
||||
def post(self, request):
|
||||
serializer = serializers.TokenProvisionSerializer(data=request.data)
|
||||
serializer.is_valid()
|
||||
serializer = serializers.TokenProvisionSerializer(data=request.data, context={'request': request})
|
||||
serializer.is_valid(raise_exception=True)
|
||||
self.perform_create(serializer)
|
||||
return Response(serializer.data, status=HTTP_201_CREATED)
|
||||
|
||||
# Authenticate the user account based on the provided credentials
|
||||
username = serializer.data.get('username')
|
||||
password = serializer.data.get('password')
|
||||
if not username or not password:
|
||||
raise AuthenticationFailed("Username and password must be provided to provision a token.")
|
||||
user = authenticate(request=request, username=username, password=password)
|
||||
if user is None:
|
||||
raise AuthenticationFailed("Invalid username/password")
|
||||
|
||||
# Create a new Token for the User
|
||||
token = Token(user=user)
|
||||
token.save()
|
||||
data = serializers.TokenSerializer(token, context={'request': request}).data
|
||||
# Manually append the token key, which is normally write-only
|
||||
data['key'] = token.key
|
||||
|
||||
return Response(data, status=HTTP_201_CREATED)
|
||||
|
||||
def get_serializer_class(self):
|
||||
return serializers.TokenSerializer
|
||||
def perform_create(self, serializer):
|
||||
model = serializer.Meta.model
|
||||
logger = logging.getLogger(f'netbox.api.views.TokenProvisionView')
|
||||
logger.info(f"Creating new {model._meta.verbose_name}")
|
||||
serializer.save()
|
||||
|
||||
|
||||
#
|
||||
|
@ -2,7 +2,7 @@ from django.db.models import Q
|
||||
|
||||
|
||||
OBJECTPERMISSION_OBJECT_TYPES = Q(
|
||||
~Q(app_label__in=['admin', 'auth', 'contenttypes', 'sessions', 'taggit', 'users']) |
|
||||
~Q(app_label__in=['account', 'admin', 'auth', 'contenttypes', 'sessions', 'taggit', 'users']) |
|
||||
Q(app_label='auth', model__in=['group', 'user']) |
|
||||
Q(app_label='users', model__in=['objectpermission', 'token'])
|
||||
)
|
||||
|
@ -1,14 +1,12 @@
|
||||
from django import forms
|
||||
from extras.forms.mixins import SavedFiltersMixin
|
||||
from utilities.forms import FilterForm
|
||||
from users.models import Token
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.models import Group
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from netbox.forms import NetBoxModelFilterSetForm
|
||||
from users.models import NetBoxGroup, NetBoxUser, ObjectPermission
|
||||
from utilities.forms import BOOLEAN_WITH_BLANK_CHOICES
|
||||
from netbox.forms.mixins import SavedFiltersMixin
|
||||
from users.models import NetBoxGroup, NetBoxUser, ObjectPermission, Token
|
||||
from utilities.forms import BOOLEAN_WITH_BLANK_CHOICES, FilterForm
|
||||
from utilities.forms.fields import DynamicModelMultipleChoiceField
|
||||
from utilities.forms.widgets import DateTimePicker
|
||||
|
||||
|
@ -56,6 +56,7 @@ class UserConfigFormMetaclass(forms.models.ModelFormMetaclass):
|
||||
class UserConfigForm(BootstrapMixin, forms.ModelForm, metaclass=UserConfigFormMetaclass):
|
||||
fieldsets = (
|
||||
(_('User Interface'), (
|
||||
'locale.language',
|
||||
'pagination.per_page',
|
||||
'pagination.placement',
|
||||
'ui.colormode',
|
||||
@ -114,6 +115,9 @@ class UserTokenForm(BootstrapMixin, forms.ModelForm):
|
||||
help_text=_(
|
||||
'Keys must be at least 40 characters in length. <strong>Be sure to record your key</strong> prior to '
|
||||
'submitting this form, as it may no longer be accessible once the token has been created.'
|
||||
),
|
||||
widget=forms.TextInput(
|
||||
attrs={'data-clipboard': 'true'}
|
||||
)
|
||||
)
|
||||
allowed_ips = SimpleArrayField(
|
||||
@ -383,5 +387,5 @@ class ObjectPermissionForm(BootstrapMixin, forms.ModelForm):
|
||||
model.objects.filter(qs_filter_from_constraints(constraints, tokens)).exists()
|
||||
except FieldError as e:
|
||||
raise forms.ValidationError({
|
||||
'constraints': _('Invalid filter for {model}: {e}').format(model=model, e=e)
|
||||
'constraints': _('Invalid filter for {model}: {error}').format(model=model, error=e)
|
||||
})
|
||||
|
@ -66,7 +66,7 @@ class Migration(migrations.Migration):
|
||||
('actions', django.contrib.postgres.fields.ArrayField(base_field=models.CharField(max_length=30), size=None)),
|
||||
('constraints', models.JSONField(blank=True, null=True)),
|
||||
('groups', models.ManyToManyField(blank=True, related_name='object_permissions', to='auth.Group')),
|
||||
('object_types', models.ManyToManyField(limit_choices_to=models.Q(models.Q(models.Q(('app_label__in', ['admin', 'auth', 'contenttypes', 'sessions', 'taggit', 'users']), _negated=True), models.Q(('app_label', 'auth'), ('model__in', ['group', 'user'])), models.Q(('app_label', 'users'), ('model__in', ['objectpermission', 'token'])), _connector='OR')), related_name='object_permissions', to='contenttypes.ContentType')),
|
||||
('object_types', models.ManyToManyField(limit_choices_to=models.Q(models.Q(models.Q(('app_label__in', ['account', 'admin', 'auth', 'contenttypes', 'sessions', 'taggit', 'users']), _negated=True), models.Q(('app_label', 'auth'), ('model__in', ['group', 'user'])), models.Q(('app_label', 'users'), ('model__in', ['objectpermission', 'token'])), _connector='OR')), related_name='object_permissions', to='contenttypes.ContentType')),
|
||||
('users', models.ManyToManyField(blank=True, related_name='object_permissions', to=settings.AUTH_USER_MODEL)),
|
||||
],
|
||||
options={
|
||||
|
@ -3,7 +3,6 @@ import os
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import Group, GroupManager, User, UserManager
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.contrib.postgres.fields import ArrayField
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.core.validators import MinLengthValidator
|
||||
@ -15,6 +14,7 @@ from django.utils import timezone
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from netaddr import IPNetwork
|
||||
|
||||
from core.models import ContentType
|
||||
from ipam.fields import IPNetworkField
|
||||
from netbox.config import get_config
|
||||
from utilities.querysets import RestrictedQuerySet
|
||||
@ -99,6 +99,8 @@ class UserConfig(models.Model):
|
||||
default=dict
|
||||
)
|
||||
|
||||
_netbox_private = True
|
||||
|
||||
class Meta:
|
||||
ordering = ['user']
|
||||
verbose_name = _('user preferences')
|
||||
@ -169,7 +171,7 @@ class UserConfig(models.Model):
|
||||
elif key in d:
|
||||
err_path = '.'.join(path.split('.')[:i + 1])
|
||||
raise TypeError(
|
||||
_("Key '{err_path}' is a leaf node; cannot assign new keys").format(err_path=err_path)
|
||||
_("Key '{path}' is a leaf node; cannot assign new keys").format(path=err_path)
|
||||
)
|
||||
else:
|
||||
d = d.setdefault(key, {})
|
||||
@ -218,6 +220,7 @@ class UserConfig(models.Model):
|
||||
|
||||
|
||||
@receiver(post_save, sender=User)
|
||||
@receiver(post_save, sender=NetBoxUser)
|
||||
def create_userconfig(instance, created, raw=False, **kwargs):
|
||||
"""
|
||||
Automatically create a new UserConfig when a new User is created. Skip this if importing a user from a fixture.
|
||||
@ -351,7 +354,7 @@ class ObjectPermission(models.Model):
|
||||
default=True
|
||||
)
|
||||
object_types = models.ManyToManyField(
|
||||
to=ContentType,
|
||||
to='contenttypes.ContentType',
|
||||
limit_choices_to=OBJECTPERMISSION_OBJECT_TYPES,
|
||||
related_name='object_permissions'
|
||||
)
|
||||
|
@ -52,7 +52,7 @@ class UserTable(NetBoxTable):
|
||||
model = NetBoxUser
|
||||
fields = (
|
||||
'pk', 'id', 'username', 'first_name', 'last_name', 'email', 'groups', 'is_active', 'is_staff',
|
||||
'is_superuser',
|
||||
'is_superuser', 'last_login',
|
||||
)
|
||||
default_columns = ('pk', 'username', 'first_name', 'last_name', 'email', 'is_active')
|
||||
|
||||
|
@ -54,6 +54,38 @@ class UserTest(APIViewTestCases.APIViewTestCase):
|
||||
)
|
||||
User.objects.bulk_create(users)
|
||||
|
||||
def test_that_password_is_changed(self):
|
||||
"""
|
||||
Test that password is changed
|
||||
"""
|
||||
|
||||
obj_perm = ObjectPermission(
|
||||
name='Test permission',
|
||||
actions=['change']
|
||||
)
|
||||
obj_perm.save()
|
||||
obj_perm.users.add(self.user)
|
||||
obj_perm.object_types.add(ContentType.objects.get_for_model(self.model))
|
||||
|
||||
user_credentials = {
|
||||
'username': 'user1',
|
||||
'password': 'abc123',
|
||||
}
|
||||
user = User.objects.create_user(**user_credentials)
|
||||
|
||||
data = {
|
||||
'password': 'newpassword'
|
||||
}
|
||||
url = reverse('users-api:user-detail', kwargs={'pk': user.id})
|
||||
|
||||
response = self.client.patch(url, data, format='json', **self.header)
|
||||
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
updated_user = User.objects.get(id=user.id)
|
||||
|
||||
self.assertTrue(updated_user.check_password(data['password']))
|
||||
|
||||
|
||||
class GroupTest(APIViewTestCases.APIViewTestCase):
|
||||
model = Group
|
||||
@ -141,17 +173,25 @@ class TokenTest(
|
||||
"""
|
||||
Test the provisioning of a new REST API token given a valid username and password.
|
||||
"""
|
||||
data = {
|
||||
user_credentials = {
|
||||
'username': 'user1',
|
||||
'password': 'abc123',
|
||||
}
|
||||
user = User.objects.create_user(**data)
|
||||
user = User.objects.create_user(**user_credentials)
|
||||
|
||||
data = {
|
||||
**user_credentials,
|
||||
'description': 'My API token',
|
||||
'expires': '2099-12-31T23:59:59Z',
|
||||
}
|
||||
url = reverse('users-api:token_provision')
|
||||
|
||||
response = self.client.post(url, data, format='json', **self.header)
|
||||
self.assertEqual(response.status_code, 201)
|
||||
self.assertIn('key', response.data)
|
||||
self.assertEqual(len(response.data['key']), 40)
|
||||
self.assertEqual(response.data['description'], data['description'])
|
||||
self.assertEqual(response.data['expires'], data['expires'])
|
||||
token = Token.objects.get(user=user)
|
||||
self.assertEqual(token.key, response.data['key'])
|
||||
|
||||
|
@ -67,6 +67,10 @@ class UserTestCase(TestCase, BaseFilterSetTests):
|
||||
users[1].groups.set([groups[1]])
|
||||
users[2].groups.set([groups[2]])
|
||||
|
||||
def test_q(self):
|
||||
params = {'q': 'user1'}
|
||||
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||
|
||||
def test_username(self):
|
||||
params = {'username': ['User1', 'User2']}
|
||||
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||
@ -117,6 +121,10 @@ class GroupTestCase(TestCase, BaseFilterSetTests):
|
||||
)
|
||||
Group.objects.bulk_create(groups)
|
||||
|
||||
def test_q(self):
|
||||
params = {'q': 'group 1'}
|
||||
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||
|
||||
def test_name(self):
|
||||
params = {'name': ['Group 1', 'Group 2']}
|
||||
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||
@ -164,6 +172,10 @@ class ObjectPermissionTestCase(TestCase, BaseFilterSetTests):
|
||||
permissions[i].users.set([users[i]])
|
||||
permissions[i].object_types.set([object_types[i]])
|
||||
|
||||
def test_q(self):
|
||||
params = {'q': 'foobar1'}
|
||||
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||
|
||||
def test_name(self):
|
||||
params = {'name': ['Permission 1', 'Permission 2']}
|
||||
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
|
||||
@ -235,6 +247,10 @@ class TokenTestCase(TestCase, BaseFilterSetTests):
|
||||
)
|
||||
Token.objects.bulk_create(tokens)
|
||||
|
||||
def test_q(self):
|
||||
params = {'q': 'foobar1'}
|
||||
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
|
||||
|
||||
def test_user(self):
|
||||
users = User.objects.order_by('id')[:2]
|
||||
params = {'user_id': [users[0].pk, users[1].pk]}
|
||||
|
@ -68,7 +68,7 @@ class UserView(generic.ObjectView):
|
||||
template_name = 'users/user.html'
|
||||
|
||||
def get_extra_context(self, request, instance):
|
||||
changelog = ObjectChange.objects.restrict(request.user, 'view').filter(user=request.user)[:20]
|
||||
changelog = ObjectChange.objects.restrict(request.user, 'view').filter(user=instance)[:20]
|
||||
changelog_table = ObjectChangeTable(changelog)
|
||||
|
||||
return {
|
||||
|
Reference in New Issue
Block a user