1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00

Catch and log evaluation of RestrictedQuerySet without calling restrict()

This commit is contained in:
Jeremy Stretch
2020-06-16 10:25:37 -04:00
parent e917535380
commit ce5fd7955f

View File

@ -1,3 +1,5 @@
import logging
from django.db.models import Q, QuerySet
from utilities.permissions import permission_is_exempt
@ -16,6 +18,34 @@ class DummyQuerySet:
class RestrictedQuerySet(QuerySet):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Initialize the is_restricted flag to False. This indicates that the QuerySet has not yet been restricted.
self.is_restricted = False
def _check_restriction(self):
# Raise a warning if the QuerySet is evaluated without first calling restrict().
if not getattr(self, 'is_restricted', False):
logger = logging.getLogger('netbox.RestrictedQuerySet')
logger.warning(f'Evaluation of RestrictedQuerySet prior to calling restrict(): {self.model}')
def _clone(self):
# Persist the is_restricted flag when cloning the QuerySet.
c = super()._clone()
c.is_restricted = self.is_restricted
return c
def _fetch_all(self):
self._check_restriction()
return super()._fetch_all()
def count(self):
self._check_restriction()
return super().count()
def restrict(self, user, action):
"""
Filter the QuerySet to return only objects on which the specified user has been granted the specified
@ -31,16 +61,21 @@ class RestrictedQuerySet(QuerySet):
# Bypass restriction for superusers and exempt views
if user.is_superuser or permission_is_exempt(permission_required):
return self
qs = self
# User is anonymous or has not been granted the requisite permission
if not user.is_authenticated or permission_required not in user.get_all_permissions():
return self.none()
elif not user.is_authenticated or permission_required not in user.get_all_permissions():
qs = self.none()
# Filter the queryset to include only objects with allowed attributes
attrs = Q()
for perm_attrs in user._object_perm_cache[permission_required]:
if perm_attrs:
attrs |= Q(**perm_attrs)
else:
attrs = Q()
for perm_attrs in user._object_perm_cache[permission_required]:
if perm_attrs:
attrs |= Q(**perm_attrs)
qs = self.filter(attrs)
return self.filter(attrs)
# Mark the QuerySet as having been restricted
qs.is_restricted = True
return qs