mirror of
https://github.com/netbox-community/netbox.git
synced 2024-05-10 07:54:54 +00:00
Merge pull request #6730 from MaxRink/remote_groups
Remote groups via HTTP Headers
This commit is contained in:
@ -490,6 +490,14 @@ NetBox can be configured to support remote user authentication by inferring user
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
## REMOTE_AUTH_GROUP_SYNC_ENABLED
|
||||||
|
|
||||||
|
Default: `False`
|
||||||
|
|
||||||
|
NetBox can be configured to sync remote user groups by inferring user authentication from an HTTP header set by the HTTP reverse proxy (e.g. nginx or Apache). Set this to `True` to enable this functionality. (Local authentication will still take effect as a fallback.) (Requires `REMOTE_AUTH_ENABLED`.)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
## REMOTE_AUTH_HEADER
|
## REMOTE_AUTH_HEADER
|
||||||
|
|
||||||
Default: `'HTTP_REMOTE_USER'`
|
Default: `'HTTP_REMOTE_USER'`
|
||||||
@ -498,6 +506,54 @@ When remote user authentication is in use, this is the name of the HTTP header w
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
## REMOTE_AUTH_GROUP_HEADER
|
||||||
|
|
||||||
|
Default: `'HTTP_REMOTE_USER_GROUP'`
|
||||||
|
|
||||||
|
When remote user authentication is in use, this is the name of the HTTP header which informs NetBox of the currently authenticated user. For example, to use the request header `X-Remote-User-Groups` it needs to be set to `HTTP_X_REMOTE_USER_GROUPS`. (Requires `REMOTE_AUTH_ENABLED` and `REMOTE_AUTH_GROUP_SYNC_ENABLED` )
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## REMOTE_AUTH_SUPERUSER_GROUPS
|
||||||
|
|
||||||
|
Default: `[]` (Empty list)
|
||||||
|
|
||||||
|
The list of groups that promote an remote User to Superuser on Login. If group isn't present on next Login, the Role gets revoked. (Requires `REMOTE_AUTH_ENABLED` and `REMOTE_AUTH_GROUP_SYNC_ENABLED` )
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## REMOTE_AUTH_SUPERUSERS
|
||||||
|
|
||||||
|
Default: `[]` (Empty list)
|
||||||
|
|
||||||
|
The list of users that get promoted to Superuser on Login. If user isn't present in list on next Login, the Role gets revoked. (Requires `REMOTE_AUTH_ENABLED` and `REMOTE_AUTH_GROUP_SYNC_ENABLED` )
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## REMOTE_AUTH_STAFF_GROUPS
|
||||||
|
|
||||||
|
Default: `[]` (Empty list)
|
||||||
|
|
||||||
|
The list of groups that promote an remote User to Staff on Login. If group isn't present on next Login, the Role gets revoked. (Requires `REMOTE_AUTH_ENABLED` and `REMOTE_AUTH_GROUP_SYNC_ENABLED` )
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## REMOTE_AUTH_STAFF_USERS
|
||||||
|
|
||||||
|
Default: `[]` (Empty list)
|
||||||
|
|
||||||
|
The list of users that get promoted to Staff on Login. If user isn't present in list on next Login, the Role gets revoked. (Requires `REMOTE_AUTH_ENABLED` and `REMOTE_AUTH_GROUP_SYNC_ENABLED` )
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## REMOTE_AUTH_GROUP_SEPARATOR
|
||||||
|
|
||||||
|
Default: `|` (Pipe)
|
||||||
|
|
||||||
|
The Seperator upon which `REMOTE_AUTH_GROUP_HEADER` gets split into individual Groups. This needs to be coordinated with your authentication Proxy. (Requires `REMOTE_AUTH_ENABLED` and `REMOTE_AUTH_GROUP_SYNC_ENABLED` )
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
## RELEASE_CHECK_URL
|
## RELEASE_CHECK_URL
|
||||||
|
|
||||||
Default: None (disabled)
|
Default: None (disabled)
|
||||||
|
@ -2,14 +2,17 @@ import logging
|
|||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
from django.contrib.auth import get_user_model
|
||||||
from django.contrib.auth.backends import ModelBackend, RemoteUserBackend as _RemoteUserBackend
|
from django.contrib.auth.backends import ModelBackend, RemoteUserBackend as _RemoteUserBackend
|
||||||
from django.contrib.auth.models import Group
|
from django.contrib.auth.models import Group, AnonymousUser
|
||||||
from django.core.exceptions import ImproperlyConfigured
|
from django.core.exceptions import ImproperlyConfigured
|
||||||
from django.db.models import Q
|
from django.db.models import Q
|
||||||
|
|
||||||
from users.models import ObjectPermission
|
from users.models import ObjectPermission
|
||||||
from utilities.permissions import permission_is_exempt, resolve_permission, resolve_permission_ct
|
from utilities.permissions import permission_is_exempt, resolve_permission, resolve_permission_ct
|
||||||
|
|
||||||
|
UserModel = get_user_model()
|
||||||
|
|
||||||
|
|
||||||
class ObjectPermissionMixin():
|
class ObjectPermissionMixin():
|
||||||
|
|
||||||
@ -101,38 +104,145 @@ class RemoteUserBackend(_RemoteUserBackend):
|
|||||||
def create_unknown_user(self):
|
def create_unknown_user(self):
|
||||||
return settings.REMOTE_AUTH_AUTO_CREATE_USER
|
return settings.REMOTE_AUTH_AUTO_CREATE_USER
|
||||||
|
|
||||||
def configure_user(self, request, user):
|
def configure_groups(self, user, remote_groups):
|
||||||
logger = logging.getLogger('netbox.authentication.RemoteUserBackend')
|
logger = logging.getLogger('netbox.authentication.RemoteUserBackend')
|
||||||
|
|
||||||
# Assign default groups to the user
|
# Assign default groups to the user
|
||||||
group_list = []
|
group_list = []
|
||||||
for name in settings.REMOTE_AUTH_DEFAULT_GROUPS:
|
for name in remote_groups:
|
||||||
try:
|
try:
|
||||||
group_list.append(Group.objects.get(name=name))
|
group_list.append(Group.objects.get(name=name))
|
||||||
except Group.DoesNotExist:
|
except Group.DoesNotExist:
|
||||||
logging.error(f"Could not assign group {name} to remotely-authenticated user {user}: Group not found")
|
|
||||||
if group_list:
|
|
||||||
user.groups.add(*group_list)
|
|
||||||
logger.debug(f"Assigned groups to remotely-authenticated user {user}: {group_list}")
|
|
||||||
|
|
||||||
# Assign default object permissions to the user
|
|
||||||
permissions_list = []
|
|
||||||
for permission_name, constraints in settings.REMOTE_AUTH_DEFAULT_PERMISSIONS.items():
|
|
||||||
try:
|
|
||||||
object_type, action = resolve_permission_ct(permission_name)
|
|
||||||
# TODO: Merge multiple actions into a single ObjectPermission per content type
|
|
||||||
obj_perm = ObjectPermission(actions=[action], constraints=constraints)
|
|
||||||
obj_perm.save()
|
|
||||||
obj_perm.users.add(user)
|
|
||||||
obj_perm.object_types.add(object_type)
|
|
||||||
permissions_list.append(permission_name)
|
|
||||||
except ValueError:
|
|
||||||
logging.error(
|
logging.error(
|
||||||
f"Invalid permission name: '{permission_name}'. Permissions must be in the form "
|
f"Could not assign group {name} to remotely-authenticated user {user}: Group not found")
|
||||||
"<app>.<action>_<model>. (Example: dcim.add_site)"
|
if group_list:
|
||||||
)
|
user.groups.set(group_list)
|
||||||
if permissions_list:
|
logger.debug(
|
||||||
logger.debug(f"Assigned permissions to remotely-authenticated user {user}: {permissions_list}")
|
f"Assigned groups to remotely-authenticated user {user}: {group_list}")
|
||||||
|
else:
|
||||||
|
user.groups.clear()
|
||||||
|
logger.debug(f"Stripping user {user} from Groups")
|
||||||
|
user.is_superuser = self._is_superuser(user)
|
||||||
|
logger.debug(f"User {user} is Superuser: {user.is_superuser}")
|
||||||
|
logger.debug(
|
||||||
|
f"User {user} should be Superuser: {self._is_superuser(user)}")
|
||||||
|
|
||||||
|
user.is_staff = self._is_staff(user)
|
||||||
|
logger.debug(f"User {user} is Staff: {user.is_staff}")
|
||||||
|
logger.debug(f"User {user} should be Staff: {self._is_staff(user)}")
|
||||||
|
user.save()
|
||||||
|
return user
|
||||||
|
|
||||||
|
def authenticate(self, request, remote_user, remote_groups=None):
|
||||||
|
"""
|
||||||
|
The username passed as ``remote_user`` is considered trusted. Return
|
||||||
|
the ``User`` object with the given username. Create a new ``User``
|
||||||
|
object if ``create_unknown_user`` is ``True``.
|
||||||
|
Return None if ``create_unknown_user`` is ``False`` and a ``User``
|
||||||
|
object with the given username is not found in the database.
|
||||||
|
"""
|
||||||
|
logger = logging.getLogger('netbox.authentication.RemoteUserBackend')
|
||||||
|
logger.debug(
|
||||||
|
f"trying to authenticate {remote_user} with groups {remote_groups}")
|
||||||
|
if not remote_user:
|
||||||
|
return
|
||||||
|
user = None
|
||||||
|
username = self.clean_username(remote_user)
|
||||||
|
|
||||||
|
# Note that this could be accomplished in one try-except clause, but
|
||||||
|
# instead we use get_or_create when creating unknown users since it has
|
||||||
|
# built-in safeguards for multiple threads.
|
||||||
|
if self.create_unknown_user:
|
||||||
|
user, created = UserModel._default_manager.get_or_create(**{
|
||||||
|
UserModel.USERNAME_FIELD: username
|
||||||
|
})
|
||||||
|
if created:
|
||||||
|
user = self.configure_user(request, user)
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
user = UserModel._default_manager.get_by_natural_key(username)
|
||||||
|
except UserModel.DoesNotExist:
|
||||||
|
pass
|
||||||
|
if self.user_can_authenticate(user):
|
||||||
|
if settings.REMOTE_AUTH_GROUP_SYNC_ENABLED:
|
||||||
|
if user is not None and not isinstance(user, AnonymousUser):
|
||||||
|
return self.configure_groups(user, remote_groups)
|
||||||
|
else:
|
||||||
|
return user
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _is_superuser(self, user):
|
||||||
|
logger = logging.getLogger('netbox.authentication.RemoteUserBackend')
|
||||||
|
superuser_groups = settings.REMOTE_AUTH_SUPERUSER_GROUPS
|
||||||
|
logger.debug(f"Superuser Groups: {superuser_groups}")
|
||||||
|
superusers = settings.REMOTE_AUTH_SUPERUSERS
|
||||||
|
logger.debug(f"Superuser Users: {superusers}")
|
||||||
|
user_groups = set()
|
||||||
|
for g in user.groups.all():
|
||||||
|
user_groups.add(g.name)
|
||||||
|
logger.debug(f"User {user.username} is in Groups:{user_groups}")
|
||||||
|
|
||||||
|
result = user.username in superusers or (
|
||||||
|
set(user_groups) & set(superuser_groups))
|
||||||
|
logger.debug(f"User {user.username} in Superuser Users :{result}")
|
||||||
|
return bool(result)
|
||||||
|
|
||||||
|
def _is_staff(self, user):
|
||||||
|
logger = logging.getLogger('netbox.authentication.RemoteUserBackend')
|
||||||
|
staff_groups = settings.REMOTE_AUTH_STAFF_GROUPS
|
||||||
|
logger.debug(f"Superuser Groups: {staff_groups}")
|
||||||
|
staff_users = settings.REMOTE_AUTH_STAFF_USERS
|
||||||
|
logger.debug(f"Staff Users :{staff_users}")
|
||||||
|
user_groups = set()
|
||||||
|
for g in user.groups.all():
|
||||||
|
user_groups.add(g.name)
|
||||||
|
logger.debug(f"User {user.username} is in Groups:{user_groups}")
|
||||||
|
result = user.username in staff_users or (
|
||||||
|
set(user_groups) & set(staff_groups))
|
||||||
|
logger.debug(f"User {user.username} in Staff Users :{result}")
|
||||||
|
return bool(result)
|
||||||
|
|
||||||
|
def configure_user(self, request, user):
|
||||||
|
logger = logging.getLogger('netbox.authentication.RemoteUserBackend')
|
||||||
|
if not settings.REMOTE_AUTH_GROUP_SYNC_ENABLED:
|
||||||
|
# Assign default groups to the user
|
||||||
|
group_list = []
|
||||||
|
for name in settings.REMOTE_AUTH_DEFAULT_GROUPS:
|
||||||
|
try:
|
||||||
|
group_list.append(Group.objects.get(name=name))
|
||||||
|
except Group.DoesNotExist:
|
||||||
|
logging.error(
|
||||||
|
f"Could not assign group {name} to remotely-authenticated user {user}: Group not found")
|
||||||
|
if group_list:
|
||||||
|
user.groups.add(*group_list)
|
||||||
|
logger.debug(
|
||||||
|
f"Assigned groups to remotely-authenticated user {user}: {group_list}")
|
||||||
|
|
||||||
|
# Assign default object permissions to the user
|
||||||
|
permissions_list = []
|
||||||
|
for permission_name, constraints in settings.REMOTE_AUTH_DEFAULT_PERMISSIONS.items():
|
||||||
|
try:
|
||||||
|
object_type, action = resolve_permission_ct(
|
||||||
|
permission_name)
|
||||||
|
# TODO: Merge multiple actions into a single ObjectPermission per content type
|
||||||
|
obj_perm = ObjectPermission(
|
||||||
|
actions=[action], constraints=constraints)
|
||||||
|
obj_perm.save()
|
||||||
|
obj_perm.users.add(user)
|
||||||
|
obj_perm.object_types.add(object_type)
|
||||||
|
permissions_list.append(permission_name)
|
||||||
|
except ValueError:
|
||||||
|
logging.error(
|
||||||
|
f"Invalid permission name: '{permission_name}'. Permissions must be in the form "
|
||||||
|
"<app>.<action>_<model>. (Example: dcim.add_site)"
|
||||||
|
)
|
||||||
|
if permissions_list:
|
||||||
|
logger.debug(
|
||||||
|
f"Assigned permissions to remotely-authenticated user {user}: {permissions_list}")
|
||||||
|
else:
|
||||||
|
logger.debug(
|
||||||
|
f"Skipped initial assignment of permissions and groups to remotely-authenticated user {user} as Group sync is enabled")
|
||||||
|
|
||||||
return user
|
return user
|
||||||
|
|
||||||
|
@ -1,8 +1,11 @@
|
|||||||
import uuid
|
import uuid
|
||||||
from urllib import parse
|
from urllib import parse
|
||||||
|
import logging
|
||||||
|
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.contrib.auth.middleware import RemoteUserMiddleware as RemoteUserMiddleware_
|
from django.contrib.auth.middleware import RemoteUserMiddleware as RemoteUserMiddleware_
|
||||||
|
from django.contrib import auth
|
||||||
|
from django.core.exceptions import ImproperlyConfigured
|
||||||
from django.db import ProgrammingError
|
from django.db import ProgrammingError
|
||||||
from django.http import Http404, HttpResponseRedirect
|
from django.http import Http404, HttpResponseRedirect
|
||||||
from django.urls import reverse
|
from django.urls import reverse
|
||||||
@ -16,6 +19,7 @@ class LoginRequiredMiddleware(object):
|
|||||||
"""
|
"""
|
||||||
If LOGIN_REQUIRED is True, redirect all non-authenticated users to the login page.
|
If LOGIN_REQUIRED is True, redirect all non-authenticated users to the login page.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, get_response):
|
def __init__(self, get_response):
|
||||||
self.get_response = get_response
|
self.get_response = get_response
|
||||||
|
|
||||||
@ -49,12 +53,65 @@ class RemoteUserMiddleware(RemoteUserMiddleware_):
|
|||||||
return settings.REMOTE_AUTH_HEADER
|
return settings.REMOTE_AUTH_HEADER
|
||||||
|
|
||||||
def process_request(self, request):
|
def process_request(self, request):
|
||||||
|
logger = logging.getLogger(
|
||||||
|
'netbox.authentication.RemoteUserMiddleware')
|
||||||
# Bypass middleware if remote authentication is not enabled
|
# Bypass middleware if remote authentication is not enabled
|
||||||
if not settings.REMOTE_AUTH_ENABLED:
|
if not settings.REMOTE_AUTH_ENABLED:
|
||||||
return
|
return
|
||||||
|
# AuthenticationMiddleware is required so that request.user exists.
|
||||||
|
if not hasattr(request, 'user'):
|
||||||
|
raise ImproperlyConfigured(
|
||||||
|
"The Django remote user auth middleware requires the"
|
||||||
|
" authentication middleware to be installed. Edit your"
|
||||||
|
" MIDDLEWARE setting to insert"
|
||||||
|
" 'django.contrib.auth.middleware.AuthenticationMiddleware'"
|
||||||
|
" before the RemoteUserMiddleware class.")
|
||||||
|
try:
|
||||||
|
username = request.META[self.header]
|
||||||
|
except KeyError:
|
||||||
|
# If specified header doesn't exist then remove any existing
|
||||||
|
# authenticated remote-user, or return (leaving request.user set to
|
||||||
|
# AnonymousUser by the AuthenticationMiddleware).
|
||||||
|
if self.force_logout_if_no_header and request.user.is_authenticated:
|
||||||
|
self._remove_invalid_user(request)
|
||||||
|
return
|
||||||
|
# If the user is already authenticated and that user is the user we are
|
||||||
|
# getting passed in the headers, then the correct user is already
|
||||||
|
# persisted in the session and we don't need to continue.
|
||||||
|
if request.user.is_authenticated:
|
||||||
|
if request.user.get_username() == self.clean_username(username, request):
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
# An authenticated user is associated with the request, but
|
||||||
|
# it does not match the authorized user in the header.
|
||||||
|
self._remove_invalid_user(request)
|
||||||
|
|
||||||
return super().process_request(request)
|
# We are seeing this user for the first time in this session, attempt
|
||||||
|
# to authenticate the user.
|
||||||
|
if settings.REMOTE_AUTH_GROUP_SYNC_ENABLED:
|
||||||
|
logger.debug("Trying to sync Groups")
|
||||||
|
user = auth.authenticate(
|
||||||
|
request, remote_user=username, remote_groups=self._get_groups(request))
|
||||||
|
else:
|
||||||
|
user = auth.authenticate(request, remote_user=username)
|
||||||
|
if user:
|
||||||
|
# User is valid. Set request.user and persist user in the session
|
||||||
|
# by logging the user in.
|
||||||
|
request.user = user
|
||||||
|
auth.login(request, user)
|
||||||
|
|
||||||
|
def _get_groups(self, request):
|
||||||
|
logger = logging.getLogger(
|
||||||
|
'netbox.authentication.RemoteUserMiddleware')
|
||||||
|
|
||||||
|
groups_string = request.META.get(
|
||||||
|
settings.REMOTE_AUTH_GROUP_HEADER, None)
|
||||||
|
if groups_string:
|
||||||
|
groups = groups_string.split(settings.REMOTE_AUTH_GROUP_SEPARATOR)
|
||||||
|
else:
|
||||||
|
groups = []
|
||||||
|
logger.debug(f"Groups are {groups}")
|
||||||
|
return groups
|
||||||
|
|
||||||
|
|
||||||
class ObjectChangeMiddleware(object):
|
class ObjectChangeMiddleware(object):
|
||||||
@ -71,6 +128,7 @@ class ObjectChangeMiddleware(object):
|
|||||||
have been created. Conversely, deletions are acted upon immediately, so that the serialized representation of the
|
have been created. Conversely, deletions are acted upon immediately, so that the serialized representation of the
|
||||||
object is recorded before it (and any related objects) are actually deleted from the database.
|
object is recorded before it (and any related objects) are actually deleted from the database.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, get_response):
|
def __init__(self, get_response):
|
||||||
self.get_response = get_response
|
self.get_response = get_response
|
||||||
|
|
||||||
@ -90,6 +148,7 @@ class APIVersionMiddleware(object):
|
|||||||
"""
|
"""
|
||||||
If the request is for an API endpoint, include the API version as a response header.
|
If the request is for an API endpoint, include the API version as a response header.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, get_response):
|
def __init__(self, get_response):
|
||||||
self.get_response = get_response
|
self.get_response = get_response
|
||||||
|
|
||||||
@ -105,6 +164,7 @@ class ExceptionHandlingMiddleware(object):
|
|||||||
Intercept certain exceptions which are likely indicative of installation issues and provide helpful instructions
|
Intercept certain exceptions which are likely indicative of installation issues and provide helpful instructions
|
||||||
to the user.
|
to the user.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, get_response):
|
def __init__(self, get_response):
|
||||||
self.get_response = get_response
|
self.get_response = get_response
|
||||||
|
|
||||||
|
@ -120,6 +120,13 @@ REMOTE_AUTH_DEFAULT_GROUPS = getattr(configuration, 'REMOTE_AUTH_DEFAULT_GROUPS'
|
|||||||
REMOTE_AUTH_DEFAULT_PERMISSIONS = getattr(configuration, 'REMOTE_AUTH_DEFAULT_PERMISSIONS', {})
|
REMOTE_AUTH_DEFAULT_PERMISSIONS = getattr(configuration, 'REMOTE_AUTH_DEFAULT_PERMISSIONS', {})
|
||||||
REMOTE_AUTH_ENABLED = getattr(configuration, 'REMOTE_AUTH_ENABLED', False)
|
REMOTE_AUTH_ENABLED = getattr(configuration, 'REMOTE_AUTH_ENABLED', False)
|
||||||
REMOTE_AUTH_HEADER = getattr(configuration, 'REMOTE_AUTH_HEADER', 'HTTP_REMOTE_USER')
|
REMOTE_AUTH_HEADER = getattr(configuration, 'REMOTE_AUTH_HEADER', 'HTTP_REMOTE_USER')
|
||||||
|
REMOTE_AUTH_GROUP_HEADER = getattr(configuration, 'REMOTE_AUTH_GROUP_HEADER', 'HTTP_REMOTE_USER_GROUP')
|
||||||
|
REMOTE_AUTH_GROUP_SYNC_ENABLED = getattr(configuration, 'REMOTE_AUTH_GROUP_SYNC_ENABLED', False)
|
||||||
|
REMOTE_AUTH_SUPERUSER_GROUPS = getattr(configuration, 'REMOTE_AUTH_SUPERUSER_GROUPS', [])
|
||||||
|
REMOTE_AUTH_SUPERUSERS = getattr(configuration, 'REMOTE_AUTH_SUPERUSERS', [])
|
||||||
|
REMOTE_AUTH_STAFF_GROUPS = getattr(configuration, 'REMOTE_AUTH_STAFF_GROUPS', [])
|
||||||
|
REMOTE_AUTH_STAFF_USERS = getattr(configuration, 'REMOTE_AUTH_STAFF_USERS', [])
|
||||||
|
REMOTE_AUTH_GROUP_SEPARATOR = getattr(configuration, 'REMOTE_AUTH_GROUP_SEPARATOR', '|')
|
||||||
RELEASE_CHECK_URL = getattr(configuration, 'RELEASE_CHECK_URL', None)
|
RELEASE_CHECK_URL = getattr(configuration, 'RELEASE_CHECK_URL', None)
|
||||||
REPORTS_ROOT = getattr(configuration, 'REPORTS_ROOT', os.path.join(BASE_DIR, 'reports')).rstrip('/')
|
REPORTS_ROOT = getattr(configuration, 'REPORTS_ROOT', os.path.join(BASE_DIR, 'reports')).rstrip('/')
|
||||||
RQ_DEFAULT_TIMEOUT = getattr(configuration, 'RQ_DEFAULT_TIMEOUT', 300)
|
RQ_DEFAULT_TIMEOUT = getattr(configuration, 'RQ_DEFAULT_TIMEOUT', 300)
|
||||||
|
@ -58,7 +58,8 @@ class ExternalAuthenticationTestCase(TestCase):
|
|||||||
|
|
||||||
response = self.client.get(reverse('home'), follow=True, **headers)
|
response = self.client.get(reverse('home'), follow=True, **headers)
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
self.assertEqual(int(self.client.session.get('_auth_user_id')), self.user.pk, msg='Authentication failed')
|
self.assertEqual(int(self.client.session.get(
|
||||||
|
'_auth_user_id')), self.user.pk, msg='Authentication failed')
|
||||||
|
|
||||||
@override_settings(
|
@override_settings(
|
||||||
REMOTE_AUTH_ENABLED=True,
|
REMOTE_AUTH_ENABLED=True,
|
||||||
@ -78,7 +79,8 @@ class ExternalAuthenticationTestCase(TestCase):
|
|||||||
|
|
||||||
response = self.client.get(reverse('home'), follow=True, **headers)
|
response = self.client.get(reverse('home'), follow=True, **headers)
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
self.assertEqual(int(self.client.session.get('_auth_user_id')), self.user.pk, msg='Authentication failed')
|
self.assertEqual(int(self.client.session.get(
|
||||||
|
'_auth_user_id')), self.user.pk, msg='Authentication failed')
|
||||||
|
|
||||||
@override_settings(
|
@override_settings(
|
||||||
REMOTE_AUTH_ENABLED=True,
|
REMOTE_AUTH_ENABLED=True,
|
||||||
@ -102,7 +104,8 @@ class ExternalAuthenticationTestCase(TestCase):
|
|||||||
|
|
||||||
# Local user should have been automatically created
|
# Local user should have been automatically created
|
||||||
new_user = User.objects.get(username='remoteuser2')
|
new_user = User.objects.get(username='remoteuser2')
|
||||||
self.assertEqual(int(self.client.session.get('_auth_user_id')), new_user.pk, msg='Authentication failed')
|
self.assertEqual(int(self.client.session.get(
|
||||||
|
'_auth_user_id')), new_user.pk, msg='Authentication failed')
|
||||||
|
|
||||||
@override_settings(
|
@override_settings(
|
||||||
REMOTE_AUTH_ENABLED=True,
|
REMOTE_AUTH_ENABLED=True,
|
||||||
@ -121,7 +124,8 @@ class ExternalAuthenticationTestCase(TestCase):
|
|||||||
self.assertTrue(settings.REMOTE_AUTH_ENABLED)
|
self.assertTrue(settings.REMOTE_AUTH_ENABLED)
|
||||||
self.assertTrue(settings.REMOTE_AUTH_AUTO_CREATE_USER)
|
self.assertTrue(settings.REMOTE_AUTH_AUTO_CREATE_USER)
|
||||||
self.assertEqual(settings.REMOTE_AUTH_HEADER, 'HTTP_REMOTE_USER')
|
self.assertEqual(settings.REMOTE_AUTH_HEADER, 'HTTP_REMOTE_USER')
|
||||||
self.assertEqual(settings.REMOTE_AUTH_DEFAULT_GROUPS, ['Group 1', 'Group 2'])
|
self.assertEqual(settings.REMOTE_AUTH_DEFAULT_GROUPS,
|
||||||
|
['Group 1', 'Group 2'])
|
||||||
|
|
||||||
# Create required groups
|
# Create required groups
|
||||||
groups = (
|
groups = (
|
||||||
@ -135,7 +139,8 @@ class ExternalAuthenticationTestCase(TestCase):
|
|||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
|
|
||||||
new_user = User.objects.get(username='remoteuser2')
|
new_user = User.objects.get(username='remoteuser2')
|
||||||
self.assertEqual(int(self.client.session.get('_auth_user_id')), new_user.pk, msg='Authentication failed')
|
self.assertEqual(int(self.client.session.get(
|
||||||
|
'_auth_user_id')), new_user.pk, msg='Authentication failed')
|
||||||
self.assertListEqual(
|
self.assertListEqual(
|
||||||
[groups[0], groups[1]],
|
[groups[0], groups[1]],
|
||||||
list(new_user.groups.all())
|
list(new_user.groups.all())
|
||||||
@ -144,7 +149,8 @@ class ExternalAuthenticationTestCase(TestCase):
|
|||||||
@override_settings(
|
@override_settings(
|
||||||
REMOTE_AUTH_ENABLED=True,
|
REMOTE_AUTH_ENABLED=True,
|
||||||
REMOTE_AUTH_AUTO_CREATE_USER=True,
|
REMOTE_AUTH_AUTO_CREATE_USER=True,
|
||||||
REMOTE_AUTH_DEFAULT_PERMISSIONS={'dcim.add_site': None, 'dcim.change_site': None},
|
REMOTE_AUTH_DEFAULT_PERMISSIONS={
|
||||||
|
'dcim.add_site': None, 'dcim.change_site': None},
|
||||||
LOGIN_REQUIRED=True
|
LOGIN_REQUIRED=True
|
||||||
)
|
)
|
||||||
def test_remote_auth_default_permissions(self):
|
def test_remote_auth_default_permissions(self):
|
||||||
@ -158,14 +164,102 @@ class ExternalAuthenticationTestCase(TestCase):
|
|||||||
self.assertTrue(settings.REMOTE_AUTH_ENABLED)
|
self.assertTrue(settings.REMOTE_AUTH_ENABLED)
|
||||||
self.assertTrue(settings.REMOTE_AUTH_AUTO_CREATE_USER)
|
self.assertTrue(settings.REMOTE_AUTH_AUTO_CREATE_USER)
|
||||||
self.assertEqual(settings.REMOTE_AUTH_HEADER, 'HTTP_REMOTE_USER')
|
self.assertEqual(settings.REMOTE_AUTH_HEADER, 'HTTP_REMOTE_USER')
|
||||||
self.assertEqual(settings.REMOTE_AUTH_DEFAULT_PERMISSIONS, {'dcim.add_site': None, 'dcim.change_site': None})
|
self.assertEqual(settings.REMOTE_AUTH_DEFAULT_PERMISSIONS, {
|
||||||
|
'dcim.add_site': None, 'dcim.change_site': None})
|
||||||
|
|
||||||
response = self.client.get(reverse('home'), follow=True, **headers)
|
response = self.client.get(reverse('home'), follow=True, **headers)
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
|
|
||||||
new_user = User.objects.get(username='remoteuser2')
|
new_user = User.objects.get(username='remoteuser2')
|
||||||
self.assertEqual(int(self.client.session.get('_auth_user_id')), new_user.pk, msg='Authentication failed')
|
self.assertEqual(int(self.client.session.get(
|
||||||
self.assertTrue(new_user.has_perms(['dcim.add_site', 'dcim.change_site']))
|
'_auth_user_id')), new_user.pk, msg='Authentication failed')
|
||||||
|
self.assertTrue(new_user.has_perms(
|
||||||
|
['dcim.add_site', 'dcim.change_site']))
|
||||||
|
|
||||||
|
@override_settings(
|
||||||
|
REMOTE_AUTH_ENABLED=True,
|
||||||
|
REMOTE_AUTH_AUTO_CREATE_USER=True,
|
||||||
|
REMOTE_AUTH_GROUP_SYNC_ENABLED=True,
|
||||||
|
LOGIN_REQUIRED=True
|
||||||
|
)
|
||||||
|
def test_remote_auth_remote_groups_default(self):
|
||||||
|
"""
|
||||||
|
Test enabling remote authentication with group sync enabled with the default configuration.
|
||||||
|
"""
|
||||||
|
headers = {
|
||||||
|
'HTTP_REMOTE_USER': 'remoteuser2',
|
||||||
|
'HTTP_REMOTE_USER_GROUP': 'Group 1|Group 2',
|
||||||
|
}
|
||||||
|
|
||||||
|
self.assertTrue(settings.REMOTE_AUTH_ENABLED)
|
||||||
|
self.assertTrue(settings.REMOTE_AUTH_AUTO_CREATE_USER)
|
||||||
|
self.assertTrue(settings.REMOTE_AUTH_GROUP_SYNC_ENABLED)
|
||||||
|
self.assertEqual(settings.REMOTE_AUTH_HEADER, 'HTTP_REMOTE_USER')
|
||||||
|
self.assertEqual(settings.REMOTE_AUTH_GROUP_HEADER,
|
||||||
|
'HTTP_REMOTE_USER_GROUP')
|
||||||
|
self.assertEqual(settings.REMOTE_AUTH_GROUP_SEPARATOR, '|')
|
||||||
|
|
||||||
|
# Create required groups
|
||||||
|
groups = (
|
||||||
|
Group(name='Group 1'),
|
||||||
|
Group(name='Group 2'),
|
||||||
|
Group(name='Group 3'),
|
||||||
|
)
|
||||||
|
Group.objects.bulk_create(groups)
|
||||||
|
|
||||||
|
response = self.client.get(reverse('home'), follow=True, **headers)
|
||||||
|
self.assertEqual(response.status_code, 200)
|
||||||
|
|
||||||
|
new_user = User.objects.get(username='remoteuser2')
|
||||||
|
self.assertEqual(int(self.client.session.get(
|
||||||
|
'_auth_user_id')), new_user.pk, msg='Authentication failed')
|
||||||
|
self.assertListEqual(
|
||||||
|
[groups[0], groups[1]],
|
||||||
|
list(new_user.groups.all())
|
||||||
|
)
|
||||||
|
|
||||||
|
@override_settings(
|
||||||
|
REMOTE_AUTH_ENABLED=True,
|
||||||
|
REMOTE_AUTH_AUTO_CREATE_USER=True,
|
||||||
|
REMOTE_AUTH_GROUP_SYNC_ENABLED=True,
|
||||||
|
REMOTE_AUTH_HEADER='HTTP_FOO',
|
||||||
|
REMOTE_AUTH_GROUP_HEADER='HTTP_BAR',
|
||||||
|
LOGIN_REQUIRED=True
|
||||||
|
)
|
||||||
|
def test_remote_auth_remote_groups_custom_header(self):
|
||||||
|
"""
|
||||||
|
Test enabling remote authentication with group sync enabled with the default configuration.
|
||||||
|
"""
|
||||||
|
headers = {
|
||||||
|
'HTTP_FOO': 'remoteuser2',
|
||||||
|
'HTTP_BAR': 'Group 1|Group 2',
|
||||||
|
}
|
||||||
|
|
||||||
|
self.assertTrue(settings.REMOTE_AUTH_ENABLED)
|
||||||
|
self.assertTrue(settings.REMOTE_AUTH_AUTO_CREATE_USER)
|
||||||
|
self.assertTrue(settings.REMOTE_AUTH_GROUP_SYNC_ENABLED)
|
||||||
|
self.assertEqual(settings.REMOTE_AUTH_HEADER, 'HTTP_FOO')
|
||||||
|
self.assertEqual(settings.REMOTE_AUTH_GROUP_HEADER, 'HTTP_BAR')
|
||||||
|
self.assertEqual(settings.REMOTE_AUTH_GROUP_SEPARATOR, '|')
|
||||||
|
|
||||||
|
# Create required groups
|
||||||
|
groups = (
|
||||||
|
Group(name='Group 1'),
|
||||||
|
Group(name='Group 2'),
|
||||||
|
Group(name='Group 3'),
|
||||||
|
)
|
||||||
|
Group.objects.bulk_create(groups)
|
||||||
|
|
||||||
|
response = self.client.get(reverse('home'), follow=True, **headers)
|
||||||
|
self.assertEqual(response.status_code, 200)
|
||||||
|
|
||||||
|
new_user = User.objects.get(username='remoteuser2')
|
||||||
|
self.assertEqual(int(self.client.session.get(
|
||||||
|
'_auth_user_id')), new_user.pk, msg='Authentication failed')
|
||||||
|
self.assertListEqual(
|
||||||
|
[groups[0], groups[1]],
|
||||||
|
list(new_user.groups.all())
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class ObjectPermissionAPIViewTestCase(TestCase):
|
class ObjectPermissionAPIViewTestCase(TestCase):
|
||||||
@ -206,7 +300,8 @@ class ObjectPermissionAPIViewTestCase(TestCase):
|
|||||||
def test_get_object(self):
|
def test_get_object(self):
|
||||||
|
|
||||||
# Attempt to retrieve object without permission
|
# Attempt to retrieve object without permission
|
||||||
url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk})
|
url = reverse('ipam-api:prefix-detail',
|
||||||
|
kwargs={'pk': self.prefixes[0].pk})
|
||||||
response = self.client.get(url, **self.header)
|
response = self.client.get(url, **self.header)
|
||||||
self.assertEqual(response.status_code, 403)
|
self.assertEqual(response.status_code, 403)
|
||||||
|
|
||||||
@ -221,12 +316,14 @@ class ObjectPermissionAPIViewTestCase(TestCase):
|
|||||||
obj_perm.object_types.add(ContentType.objects.get_for_model(Prefix))
|
obj_perm.object_types.add(ContentType.objects.get_for_model(Prefix))
|
||||||
|
|
||||||
# Retrieve permitted object
|
# Retrieve permitted object
|
||||||
url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk})
|
url = reverse('ipam-api:prefix-detail',
|
||||||
|
kwargs={'pk': self.prefixes[0].pk})
|
||||||
response = self.client.get(url, **self.header)
|
response = self.client.get(url, **self.header)
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
|
|
||||||
# Attempt to retrieve non-permitted object
|
# Attempt to retrieve non-permitted object
|
||||||
url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[3].pk})
|
url = reverse('ipam-api:prefix-detail',
|
||||||
|
kwargs={'pk': self.prefixes[3].pk})
|
||||||
response = self.client.get(url, **self.header)
|
response = self.client.get(url, **self.header)
|
||||||
self.assertEqual(response.status_code, 404)
|
self.assertEqual(response.status_code, 404)
|
||||||
|
|
||||||
@ -292,7 +389,8 @@ class ObjectPermissionAPIViewTestCase(TestCase):
|
|||||||
|
|
||||||
# Attempt to edit an object without permission
|
# Attempt to edit an object without permission
|
||||||
data = {'site': self.sites[0].pk}
|
data = {'site': self.sites[0].pk}
|
||||||
url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk})
|
url = reverse('ipam-api:prefix-detail',
|
||||||
|
kwargs={'pk': self.prefixes[0].pk})
|
||||||
response = self.client.patch(url, data, format='json', **self.header)
|
response = self.client.patch(url, data, format='json', **self.header)
|
||||||
self.assertEqual(response.status_code, 403)
|
self.assertEqual(response.status_code, 403)
|
||||||
|
|
||||||
@ -308,19 +406,22 @@ class ObjectPermissionAPIViewTestCase(TestCase):
|
|||||||
|
|
||||||
# Attempt to edit a non-permitted object
|
# Attempt to edit a non-permitted object
|
||||||
data = {'site': self.sites[0].pk}
|
data = {'site': self.sites[0].pk}
|
||||||
url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[3].pk})
|
url = reverse('ipam-api:prefix-detail',
|
||||||
|
kwargs={'pk': self.prefixes[3].pk})
|
||||||
response = self.client.patch(url, data, format='json', **self.header)
|
response = self.client.patch(url, data, format='json', **self.header)
|
||||||
self.assertEqual(response.status_code, 404)
|
self.assertEqual(response.status_code, 404)
|
||||||
|
|
||||||
# Edit a permitted object
|
# Edit a permitted object
|
||||||
data['status'] = 'reserved'
|
data['status'] = 'reserved'
|
||||||
url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk})
|
url = reverse('ipam-api:prefix-detail',
|
||||||
|
kwargs={'pk': self.prefixes[0].pk})
|
||||||
response = self.client.patch(url, data, format='json', **self.header)
|
response = self.client.patch(url, data, format='json', **self.header)
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
|
|
||||||
# Attempt to modify a permitted object to a non-permitted object
|
# Attempt to modify a permitted object to a non-permitted object
|
||||||
data['site'] = self.sites[1].pk
|
data['site'] = self.sites[1].pk
|
||||||
url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk})
|
url = reverse('ipam-api:prefix-detail',
|
||||||
|
kwargs={'pk': self.prefixes[0].pk})
|
||||||
response = self.client.patch(url, data, format='json', **self.header)
|
response = self.client.patch(url, data, format='json', **self.header)
|
||||||
self.assertEqual(response.status_code, 403)
|
self.assertEqual(response.status_code, 403)
|
||||||
|
|
||||||
@ -328,7 +429,8 @@ class ObjectPermissionAPIViewTestCase(TestCase):
|
|||||||
def test_delete_object(self):
|
def test_delete_object(self):
|
||||||
|
|
||||||
# Attempt to delete an object without permission
|
# Attempt to delete an object without permission
|
||||||
url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk})
|
url = reverse('ipam-api:prefix-detail',
|
||||||
|
kwargs={'pk': self.prefixes[0].pk})
|
||||||
response = self.client.delete(url, format='json', **self.header)
|
response = self.client.delete(url, format='json', **self.header)
|
||||||
self.assertEqual(response.status_code, 403)
|
self.assertEqual(response.status_code, 403)
|
||||||
|
|
||||||
@ -343,11 +445,13 @@ class ObjectPermissionAPIViewTestCase(TestCase):
|
|||||||
obj_perm.object_types.add(ContentType.objects.get_for_model(Prefix))
|
obj_perm.object_types.add(ContentType.objects.get_for_model(Prefix))
|
||||||
|
|
||||||
# Attempt to delete a non-permitted object
|
# Attempt to delete a non-permitted object
|
||||||
url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[3].pk})
|
url = reverse('ipam-api:prefix-detail',
|
||||||
|
kwargs={'pk': self.prefixes[3].pk})
|
||||||
response = self.client.delete(url, format='json', **self.header)
|
response = self.client.delete(url, format='json', **self.header)
|
||||||
self.assertEqual(response.status_code, 404)
|
self.assertEqual(response.status_code, 404)
|
||||||
|
|
||||||
# Delete a permitted object
|
# Delete a permitted object
|
||||||
url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk})
|
url = reverse('ipam-api:prefix-detail',
|
||||||
|
kwargs={'pk': self.prefixes[0].pk})
|
||||||
response = self.client.delete(url, format='json', **self.header)
|
response = self.client.delete(url, format='json', **self.header)
|
||||||
self.assertEqual(response.status_code, 204)
|
self.assertEqual(response.status_code, 204)
|
||||||
|
Reference in New Issue
Block a user