1
0
mirror of https://github.com/peeringdb/peeringdb.git synced 2024-05-11 05:55:09 +00:00
Files
peeringdb-peeringdb/peeringdb_server/rest_throttles.py
Matt Griswold 77282c4b66 Prep 2.29 (#1024)
* django3, py39, lgtm, linting (#715)

* IX-F Importer: ticket status change when posting re-occuring conflict to existing resolved ticket (#920)

* fix recaptcha requirement for user creation in django-admin (#715)

* IX-F Importer: fix command output buffering #967

* Drop dot1q_support field #903

* fix test (#967)

* Add name, city, country to ixfac (GET operation) #166

* additional tests fir #166

* Allow IXP to trigger ix-f importer for their exchange #779

* add docker compose for dev

* add selinux labels for mountpoints

* fixes #1013: The process to permanently remove old soft-deleted network contacts pdb_delete_pocs raises a false ProtectedAction

* fix api test

* relock poetry

* remove django_namespace_perms from installed apps

* fix user permissios ui

* remove remaining references to django namespace perms

* linting

* copy tox.ini

* comment flake8 check until we figure out why it ignores configs from tox.ini

* black format

* poetry lock

Co-authored-by: Stefan Pratter <stefan@20c.com>
2021-08-18 08:21:22 -05:00

109 lines
3.1 KiB
Python

from django.conf import settings
from rest_framework import throttling
from rest_framework.exceptions import PermissionDenied
from peeringdb_server.permissions import get_org_key_from_request, get_user_from_request
class IXFImportThrottle(throttling.UserRateThrottle):
scope = "ixf_import_request"
def get_cache_key(self, request, view):
key = super().get_cache_key(request, view)
ix = view.get_object()
return f"{key}.{ix.id}"
class FilterThrottle(throttling.SimpleRateThrottle):
"""
Base class for API throttling targeted at specific query filters
Scope name will be 'filter_{self.filter_name}'
"""
filter_name = None
def __init__(self):
pass
def allow_request(self, request, view):
# If the parameter specified in cls.filter_name
# is set in request parameters, set the scope
# accordingly
if self.filter_name in request.query_params:
self.scope = f"filter_{self.filter_name}"
else:
return True
# user either comes from request.user, or user api key
#
# it will be None if an organization key is set or request
# is anonymous
self.user = user = get_user_from_request(request)
self.org_key = org_key = get_org_key_from_request(request)
# Neither user nor organzation key could be identified
# Get user directly from request, which will likely return
# an anonymous user instance
if not org_key and not user:
self.user = user = request.user
# require authenticated user to use this filter ?
require_auth = (
getattr(
settings, f"API_{self.filter_name.upper()}_FILTER_REQUIRE_AUTH", False
)
and user
)
# require verified user to use this filter ?
require_verified = (
getattr(
settings,
f"API_{self.filter_name.upper()}_FILTER_REQUIRE_VERIFIED",
False,
)
and user
)
if (require_auth or require_verified) and not user.is_authenticated:
raise PermissionDenied(
f"Please authenticate to use the `{self.filter_name}` filter"
)
if require_verified and not user.is_verified_user:
raise PermissionDenied(
f"Please verify your account to use the `{self.filter_name}` filter"
)
self.rate = self.get_rate()
self.num_requests, self.duration = self.parse_rate(self.rate)
return super().allow_request(request, view)
def get_cache_key(self, request, view):
if self.org_key:
ident = f"org-key:{self.org_key.prefix}"
elif self.user and self.user.is_authenticated:
ident = self.user.pk
else:
ident = self.get_ident(request)
return self.cache_format % {"scope": self.scope, "ident": ident}
class FilterDistanceThrottle(FilterThrottle):
"""
Rate limiting for ?distance= queries
"""
filter_name = "distance"