1
0
mirror of https://github.com/peeringdb/peeringdb.git synced 2024-05-11 05:55:09 +00:00
Files
peeringdb-peeringdb/peeringdb_server/middleware.py
Stefan Pratter be9deaf2f8 Support 202309 (#1458)
* fixes #1260 - playwright tests
fixes #1394 - v2 search failing to find some names
fixes #1374 - Search to include new objects: Campus & Carrier
fixes #1164 - better rdap error reporting
fixes #1368 - Facility data export into Google Earth KMZ
fixes #1328 - Support web updates from a source of truth
fixes #1257 - Help text covers non-compliant email addresses
fixes #1313 - Improve email confirmation control - add 3 month option & maybe set new default value
fixes #1380 - Reset 'Social Media' to '[]' if field has no value

* linting

* remove target=_blank

* bump ES version to 8.10

* Cache and ES updates (#1459)

* elasticsearch major version pin and relock

* set decimal fields to python value on client save for load_data

* force use of redis password

* add default_meta to render

* add generated, clean up var names

* run pre-commit

* update ES for https and password

* rm cruft

* isort

---------

Co-authored-by: 20C <code@20c.com>
Co-authored-by: Matt Griswold <grizz@20c.com>
2023-10-24 12:17:03 -05:00

588 lines
20 KiB
Python

"""
Custom django middleware.
"""
import base64
import json
from django.conf import settings
from django.contrib.auth import authenticate, get_user_model
from django.contrib.sessions.middleware import SessionMiddleware
from django.core.cache import caches
from django.http import HttpResponse, JsonResponse
from django.middleware.common import CommonMiddleware
from django.urls import reverse
from django.utils.deprecation import MiddlewareMixin
from peeringdb_server.context import current_request
from peeringdb_server.models import OrganizationAPIKey, UserAPIKey
from peeringdb_server.permissions import get_key_from_request
ERR_MULTI_AUTH = "Cannot authenticate through Authorization header while logged in. Please log out and try again."
def get_auth_identity(request):
"""
Returns a string that uniquely identifies the authentication
method used for the request.
This is used to cache negative authentication responses
"""
auth_value = request.META.get("HTTP_AUTHORIZATION", "")
session_id = request.COOKIES.get("sessionid", "")[:10]
if session_id:
return f"session__{session_id}"
auth_type = None
if auth_value:
method, auth_value = auth_value.split(" ", 1)
if method.lower() == "basic":
# base 64 auth value is decoded and split to get the username
auth_value = base64.b64decode(auth_value).decode("utf-8").split(":", 1)[0]
auth_type = "user"
elif method.lower() == "api-key":
# api key auth value is truncated to prefix
auth_value = auth_value.split(".", 1)[0]
auth_type = "key"
return f"{auth_type}__{auth_value}"
return "anonymous__guest"
class PDBSessionMiddleware(SessionMiddleware):
"""
As PeeringDB gets a lot of repeated anonymous requests that do not
store and re-use session cookies this lead to substantial amount of junk
django session objects.
It was decided in #1205 that new django sessions are only to be established
On the login and registration processes.
"""
def process_response(self, request, response):
try:
request.session.is_empty()
except AttributeError:
return response
session_key = request.session.session_key
if session_key and not request.session.is_empty():
# request specifies session and session is not empty, proceed normally
return super().process_response(request, response)
elif not request.COOKIES.get(settings.SESSION_COOKIE_NAME):
# request specifies no session, check if the request.path falls into the
# set of valid paths for new session creation
NEW_SESSION_VALID_PATHS = [
reverse("login"),
reverse("register"),
reverse("username-retrieve"),
reverse("username-retrieve-initiate"),
reverse("reset-password"),
]
if request.resolver_match and "key" in request.resolver_match.kwargs:
NEW_SESSION_VALID_PATHS.append(
reverse(
"account_confirm_email",
kwargs={"key": request.resolver_match.kwargs.get("key")},
),
)
if request.path in NEW_SESSION_VALID_PATHS:
# path is valid for a new session, proceed normally
return super().process_response(request, response)
else:
# path is NOT valid for a new session, abort session
# creation
return response
# proceed normally
return super().process_response(request, response)
class CurrentRequestContext:
"""
Middleware that sets the current request context.
This allows access to the current request from anywhere.
"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
with current_request(request):
return self.get_response(request)
class HttpResponseUnauthorized(HttpResponse):
status_code = 401
class PDBCommonMiddleware(CommonMiddleware):
def has_subdomain(self, request):
# Check if the request has a subdomain and does not start with www
host = request.get_host()
if host.startswith("www.") or (len(host.split(".")) > 2):
return True
return False
def process_request(self, request):
must_prepend = settings.PDB_PREPEND_WWW and not self.has_subdomain(request)
redirect_url = (
(f"{request.scheme}://www.{request.get_host()}") if must_prepend else ""
)
# Check if a slash should be appended
if self.should_redirect_with_slash(request):
path = self.get_full_path_with_slash(request)
else:
path = request.get_full_path()
# Return a redirect if necessary
if redirect_url or path != request.get_full_path():
redirect_url += path
return self.response_redirect_class(redirect_url)
class PDBPermissionMiddleware(MiddlewareMixin):
"""
Middleware that checks if the current user has the correct permissions
to access the requested resource.
"""
def get_username_and_password(self, http_auth):
"""
Get the username and password from the HTTP auth header.
"""
# Check if the HTTP auth header is valid.
if http_auth.startswith("Basic "):
# Get the HTTP auth header without the "Basic " prefix.
http_auth = http_auth[6:]
else:
# Return an empty tuple.
return tuple()
# Decode the HTTP auth header.
http_auth = base64.b64decode(http_auth).decode("utf-8")
# If username or password is empty return an empty tuple.
# Split the username and password from the HTTP auth header.
userpw = http_auth.split(":", 1)
return userpw
def response_unauthorized(self, request, status=None, message=None):
"""
Return a Unauthorized response.
"""
return JsonResponse({"meta": {"error": message}}, status=status)
def process_request(self, request):
http_auth = request.META.get("HTTP_AUTHORIZATION", None)
req_key = get_key_from_request(request)
api_key = None
# session auth already exists, set x-auth-id value and return
if request.user.is_authenticated:
request.auth_id = f"u{request.user.id}"
# request attempting to provide separate authentication while
# already authenticated through session cookie, fail with
# bad request
if req_key or http_auth:
return self.response_unauthorized(
request,
message=ERR_MULTI_AUTH,
status=400,
)
return
# Check if HTTP auth is valid and if the request is made with basic auth.
if http_auth and http_auth.startswith("Basic "):
# Get the username and password from the HTTP auth header.
username, password = self.get_username_and_password(http_auth)
user = None
try:
user_object = get_user_model().objects.get(username=username)
if not user_object.is_active:
# If user is inactive, cache the inactive auth
# and return unauthorized
identifier = get_auth_identity(request)
caches["negative"].set(
f"inactive__{identifier}",
True,
timeout=settings.NEGATIVE_CACHE_EXPIRY_INACTIVE_AUTH,
)
return self.response_unauthorized(
request, message="Inactive account", status=401
)
else:
# Check if the username and password are valid.
user = authenticate(username=username, password=password)
except get_user_model().DoesNotExist:
user_object = None
# return username input in x-auth-id header
if user:
request.auth_id = f"u{user.id}"
# if user is not authenticated return 401 Unauthorized
else:
# truncate the username if needed.
if len(username) > 255:
request.auth_id = username[:255]
else:
request.auth_id = username
return self.response_unauthorized(
request, message="Invalid username or password", status=401
)
# Check API keys
if req_key:
try:
api_key = OrganizationAPIKey.objects.get_from_key(req_key)
except OrganizationAPIKey.DoesNotExist:
pass
try:
api_key = UserAPIKey.objects.get_from_key(req_key)
except UserAPIKey.DoesNotExist:
pass
if not api_key:
# If api key is not valid return 401 Unauthorized
if len(req_key) > 16:
req_key = req_key[:16]
request.auth_id = f"apikey_{req_key}"
return self.response_unauthorized(
request, message="Invalid API key", status=401
)
elif api_key.revoked or api_key.status != "active":
# If api key is revoked or inactive, cache as inactive
# and return 401 Unauthorized
identifier = get_auth_identity(request)
caches["negative"].set(
f"inactive__{identifier}",
True,
timeout=settings.NEGATIVE_CACHE_EXPIRY_INACTIVE_AUTH,
)
return self.response_unauthorized(
request, message="Inactive API key", status=401
)
# If API key is provided, check if the user has an active session
else:
if isinstance(api_key, OrganizationAPIKey):
prefix = f"o{api_key.org_id}"
else:
prefix = f"u{api_key.user_id}"
request.auth_id = f"{prefix}_apikey_{api_key.prefix}"
if request.session.get("_auth_user_id") and request.user.id:
if int(request.user.id) == int(
request.session.get("_auth_user_id")
):
return self.response_unauthorized(
request,
message=ERR_MULTI_AUTH,
status=400,
)
def process_response(self, request, response):
if hasattr(request, "auth_id"):
# Sanitizes the auth_id
request.auth_id = request.auth_id.replace(" ", "_")
# If auth_id ends with a 401 make sure is it limited to 16 bytes
if response.status_code == 401 and len(request.auth_id) > 16:
if not request.auth_id.startswith("apikey_"):
request.auth_id = request.auth_id[:16]
response["X-Auth-ID"] = request.auth_id
return response
class RedisNegativeCacheMiddleware(MiddlewareMixin):
"""
Middleware that uses Django's cache framework with Redis backend to cache error responses.
"""
def process_response(self, request, response):
"""
Process the response before it's sent to the client.
"""
if not settings.NEGATIVE_CACHE_ENABLED:
return response
# Check if the response is an error response
if response.status_code in [401, 403, 429]:
# Generate the cache key
cache_key = self.generate_cache_key(request)
# Cache the response content and status code with specific expiry time
cache_data = {
"content": response.content.decode(),
"status": response.status_code,
"content-type": response.get("content-type"),
}
expiry_setting = f"NEGATIVE_CACHE_EXPIRY_{response.status_code}"
caches["negative"].set(
cache_key, cache_data, timeout=getattr(settings, expiry_setting)
)
# If the response is 401 or 403, increment the count for this IP address
# so we can throttle it if it exceeds the limit
if response.status_code in [401, 403]:
throttle_key = self.generate_ratelimit_key(request)
throttle_count = caches["negative"].get(throttle_key, 0)
throttle_count += 1
# cache for 1 minute
caches["negative"].set(throttle_key, throttle_count, timeout=60)
return response
def process_request(self, request):
"""
Process the request before it's passed to the view.
"""
if not settings.NEGATIVE_CACHE_ENABLED:
return
# Check if inactive auth cache
identifier = get_auth_identity(request)
# Check if the IP address has been throttled for too many 401 or 403 responses
throttle_key = self.generate_ratelimit_key(request)
throttle_count = caches["negative"].get(throttle_key, 0)
# If the count exceeds the limit, return a throttled error response
if throttle_count > settings.NEGATIVE_CACHE_REPEATED_RATE_LIMIT:
response = JsonResponse(
{
"meta": {
"error": "Too many requests that resulted in 401 or 403 responses"
}
},
status=429,
)
response["X-Throttled-Response"] = "True"
return response
if identifier:
inactive_cache = caches["negative"].get(f"inactive__{identifier}")
if inactive_cache:
if identifier.startswith("key__"):
response = JsonResponse(
{"meta": {"error": "Inactive API key"}}, status=401
)
else:
response = JsonResponse(
{"meta": {"error": "Inactive account"}}, status=401
)
response["X-Cached-Response"] = "True"
return response
# Generate the cache key
cache_key = self.generate_cache_key(request)
# Check if the response is cached
cached_response = caches["negative"].get(cache_key)
if (
cached_response
and cached_response.get("content-type") == "application/json"
):
# Deserialize the cached response content and return it with the cached status code
response = JsonResponse(
json.loads(cached_response["content"]),
safe=False,
status=cached_response["status"],
)
elif cached_response:
# Return the cached response as is
response = HttpResponse(
cached_response["content"],
content_type=cached_response["content-type"],
status=cached_response["status"],
)
else:
# No cached response found, return
return
# Add a custom header to indicate that this is a cached response
response["X-Cached-Response"] = "True"
return response
def get_ident(self, request):
"""
Get the IP address of the client, taking both X-Forwarded-For and REMOTE_ADDR into account.
"""
xff = request.META.get("HTTP_X_FORWARDED_FOR")
remote_addr = request.META.get("REMOTE_ADDR")
return "".join(xff.split()) if xff else remote_addr
def generate_ratelimit_key(self, request):
return f"negcache__throttle__{self.get_ident(request)}"
def generate_cache_key(self, request):
"""
Generate the cache key using the IP address, HTTP_AUTHORIZATION value or session ID, request path, and URL parameters.
"""
ip_address = self.get_ident(request)
request_path = request.path
url_parameters = request.GET.urlencode()
identifier = get_auth_identity(request)
method = request.method
# Generate the key and sanitize it by replacing spaces and colons with underscores
cache_key = f"negcache_{ip_address}__{identifier}__{method}__{request_path}__{url_parameters}"
sanitized_cache_key = cache_key.replace(" ", "_").replace(":", "_")
return sanitized_cache_key
class CacheControlMiddleware(MiddlewareMixin):
"""
Sets the Cache-Control s-maxage header on responses
"""
# views using CACHE_CONTROL_DYNAMIC_PAGE as value for s-maxage
# views that will receive frequent update and require a shorter
# TTL
dynamic_views = [
"sponsors",
"home",
# entity views (net, ix, fac, org, carrier)
"net-view",
"net-view-asn",
"ix-view",
"org-view",
"fac-view",
"carrier-view",
# data views that fill select elements when editing
# fac, ix, net, org or carrier
"data-facilities",
"data-asns",
]
# views using CACHE_CONTROL_STATIC_PAGE as a value for s-maxage
# views that generally dont change outside of deploys and can
# support a longer TTL
static_views = [
"about",
"aup",
# data views that fill select elements when editing
# fac, ix, net, org or carrier
"data-countries",
"data-enum",
"data-locales",
]
# views that support caching while authenticated
# currently this NEEDS TO EXCLUDE anything that references
# the logged in user and or data only available to the logged
# in user
authenticated_views = [
# data views that fill select elements when editing
# fac, ix, net, org or carrier
"data-countries",
"data-enum",
"data-locales",
"data-facilities",
"data-asns",
]
# REST api views are handled automatically and will use
# the `CACHE_CONTROL_API_CACHE` setting for api-cache responses
# and the `CACHE_CONTROL_API` setting for normal responses
def process_response(self, request, response):
# only on GET requests
if request.method != "GET":
return response
# generally, requests that dont have a resolver match
# are ignored as we are being specific which views
# get the Cache-Control header at this point, rather
# than applying some broad caching policy
match = request.resolver_match
if not match or not match.url_name:
return response
if (
request.user.is_authenticated
and match.url_name not in self.authenticated_views
):
# request is authenticated, dont set cache-control
# headers for authenticated responses.
return response
if match.namespace == "api":
# REST API
if getattr(response, "context_data", None):
# API CACHE
if (
response.context_data.get("apicache") is True
and settings.CACHE_CONTROL_API_CACHE
):
response[
"Cache-Control"
] = f"s-maxage={settings.CACHE_CONTROL_API_CACHE}"
elif settings.CACHE_CONTROL_API:
# NO API CACHE
response["Cache-Control"] = f"s-maxage={settings.CACHE_CONTROL_API}"
elif match.url_name in self.dynamic_views:
# DYNAMIC CONTENT VIEW
if settings.CACHE_CONTROL_DYNAMIC_PAGE:
response[
"Cache-Control"
] = f"s-maxage={settings.CACHE_CONTROL_DYNAMIC_PAGE}"
elif match.url_name in self.static_views:
# STATIC CONTENT VIEW
if settings.CACHE_CONTROL_STATIC_PAGE:
response[
"Cache-Control"
] = f"s-maxage={settings.CACHE_CONTROL_STATIC_PAGE}"
return response