1
0
mirror of https://github.com/peeringdb/peeringdb.git synced 2024-05-11 05:55:09 +00:00

Expose authentication methods on outbound federation (#1565)

* Expose authentication methods on outbound federation

* relock

* docs

* linting

* docs

* webauthn instead of u2f

* use swk

* docs

* remove cruft

* remove unused import

* add amr claim for JWT ID token as well

fix tests

add test key

* fix oidc validator tests

* fix merge cruft

---------

Co-authored-by: 20C <code@20c.com>
This commit is contained in:
Stefan Pratter
2024-03-13 03:59:15 +02:00
committed by GitHub
parent 91149f0e29
commit c3b70ce09f
12 changed files with 704 additions and 2 deletions

View File

@ -37,6 +37,7 @@ case "$1" in
export DATABASE_PASSWORD=""
export RELEASE_ENV=run_tests
export PEERINGDB_SYNC_CACHE_URL=""
export OIDC_RSA_PRIVATE_KEY_ACTIVE_PATH=/srv/www.peeringdb.com/tests/data/oidc/oidc.key
unset BASE_URL
unset OAUTH2_PROVIDER_APPLICATION_MODEL
unset SESSION_COOKIE_DOMAIN

60
docs/dev/oauth.md Normal file
View File

@ -0,0 +1,60 @@
## Example backend for python social core
```python
from django.conf import settings
from social_core.backends.oauth import BaseOAuth2
from social_core.exceptions import AuthFailed
class PeeringDBOAuth2(BaseOAuth2):
name = "peeringdb"
AUTHORIZATION_URL = settings.PDB_OAUTH_AUTHORIZE_URL
ACCESS_TOKEN_URL = settings.PDB_OAUTH_ACCESS_TOKEN_URL
PROFILE_URL = settings.PDB_OAUTH_PROFILE_URL
ACCESS_TOKEN_METHOD = "POST"
DEFAULT_SCOPE = ["email", "profile", "networks", "amr"]
EXTRA_DATA = ["networks", "amr"]
def get_user_details(self, response):
"""Return user details."""
if response.get("verified_user") is not True:
raise AuthFailed(
self,
"PeeringDB user is not verified. Please affiliate yourself with an organization in PeeringDB and try again.",
)
return {
"username": response.get("given_name"),
"email": response.get("email") or "",
"first_name": response.get("given_name"),
"last_name": response.get("family_name"),
}
def user_data(self, access_token, *args, **kwargs):
"""Load user data from service."""
headers = {"Authorization": "Bearer %s" % access_token}
data = self.get_json(self.PROFILE_URL, headers=headers)
return data
```
## AMR values
PeeringDB currently is not collecting device attestation, thus has no way to identify the authentication method according to RFC 8176 (https://datatracker.ietf.org/doc/html/rfc8176). However, we are collecting the following AMR values:
- `pwd` - Password
- `mfa` - Multi-factor authentication
- `otp` - One-time password
- `swk` - Proof-of-Possession (PoP) of a software-secured key - usage of webauthn security key will append this
```python
"amr": ["pwd", "mfa", "otp"] # password entered + OTP
"amr": ["pwd", "mfa", "swk"] # password entered + Security Key
"amr": ["pwd"] # password entered
"amr": ["mfa", "swk", "otp"] # passwordless with security key + OTP
"amr": ["mfa", "swk", "swk"] # passwordless with security key + plus 2fa with another security key
"amr": ["swk"] # password less without mfa
```

View File

@ -1,6 +1,9 @@
from abc import ABC, abstractmethod
from django_grainy.util import Permissions
from oauth2_provider.models import Grant
from peeringdb_server.models import OAuthAccessTokenInfo, OAuthGrantInfo
class ScopedClaim(ABC):
@ -87,3 +90,29 @@ class Networks(ScopedClaim):
asn=network.asn,
perms=perms,
)
class AMR(ScopedClaim):
def enact(self, request):
try:
# get the grant instance by the code passed in the
# request body
body = dict(request.decoded_body)
code = body.get("code")
try:
# get the grant info instance by the grant instance
grant = Grant.objects.get(code=code, application=request.client)
grant_info = grant.grant_info
except (Grant.DoesNotExist, OAuthGrantInfo.DoesNotExist):
# if the grant info instance does not exist, set it to None
grant_info = None
amr = grant_info.amr if grant_info else ""
if amr:
amr = amr.split(",")
else:
amr = []
except OAuthAccessTokenInfo.DoesNotExist:
amr = []
return amr

View File

@ -11,3 +11,4 @@ class SupportedScopes(StrEnum):
PROFILE = "profile"
EMAIL = "email"
NETWORKS = "networks"
AMR = "amr"

View File

@ -1,7 +1,11 @@
from oauth2_provider.oauth2_validators import OAuth2Validator
from oauth2_provider.oauth2_validators import Grant, OAuth2Validator
from mainsite.oauth2 import claims
from mainsite.oauth2.scopes import SupportedScopes
from peeringdb_server.context import current_request
from peeringdb_server.models import OAuthAccessTokenInfo, OAuthGrantInfo
# import SessionStore
class OIDCValidator(OAuth2Validator):
@ -29,4 +33,74 @@ class OIDCValidator(OAuth2Validator):
("id", claims.UserId([SupportedScopes.PROFILE])),
("verified_user", claims.UserVerified([SupportedScopes.PROFILE])),
("networks", claims.Networks([SupportedScopes.NETWORKS])),
("amr", claims.AMR([SupportedScopes.OPENID])),
]
def _create_access_token(self, expires, request, token, source_refresh_token=None):
"""
This is the method that creates the AccessToken instance
We override it so we can attach the OAuthAccessTokenInfo instance to the token.
The info holds the AMR values as passed on the Grant instance.
"""
# create the token as usual
token = super()._create_access_token(
expires, request, token, source_refresh_token
)
# get the grant instance by the code passed in the
# request body
body = dict(request.decoded_body)
code = body.get("code")
try:
# get the grant info instance by the grant instance
grant = Grant.objects.get(code=code, application=request.client)
grant_info = grant.grant_info
except (Grant.DoesNotExist, OAuthGrantInfo.DoesNotExist):
# if the grant info instance does not exist, set it to None
grant_info = None
# create the access token info instance
OAuthAccessTokenInfo.objects.create(
access_token=token,
amr=grant_info.amr if grant_info else "",
)
return token
def _create_authorization_code(self, request, code, expires=None):
"""
This is the method that creates the AuthorizationGrant instance
We override it so we can attach the OAuthGrantInfo instance to the grant.
The info holds the AMR values as set in the user's authenticated session.
"""
# `request` is a sanitized oauthlib.common.Request instance
# and hasn't parsed cookies and/or loaded the django session.
#
# to get the amr values, we need to access the user's session
# and we can use the current_request context manager to do that
with current_request() as django_request:
if django_request:
session = django_request.session
amr = session.get("amr", [])
else:
amr = []
# create the grant as usual
grant = super()._create_authorization_code(request, code, expires=expires)
# create the grant info instance
OAuthGrantInfo.objects.create(
grant=grant,
amr=",".join(amr),
)
return grant

View File

@ -963,6 +963,7 @@ OAUTH2_PROVIDER = {
SupportedScopes.PROFILE: "user profile",
SupportedScopes.EMAIL: "email address",
SupportedScopes.NETWORKS: "list of user networks and permissions",
SupportedScopes.AMR: "authentication method reference",
},
"ALLOWED_REDIRECT_URI_SCHEMES": ["https"],
"REQUEST_APPROVAL_PROMPT": "auto",
@ -973,6 +974,8 @@ OAUTH2_PROVIDER = {
# migration 0085 has been applied.
set_option("OAUTH2_PROVIDER_APPLICATION_MODEL", "oauth2_provider.Application")
set_option("OAUTH2_PROVIDER_GRANT_MODEL", "oauth2_provider.Grant")
set_option("OAUTH2_PROVIDER_ACCESS_TOKEN_MODEL", "oauth2_provider.AccessToken")
# This is setting is for cookie timeout for oauth sessions.
# After the timeout, the ongoing oauth session would expire.

View File

@ -0,0 +1,91 @@
# Generated by Django 4.2.10 on 2024-03-11 16:20
import django.db.models.deletion
import django_peeringdb.fields
import django_peeringdb.models.abstract
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.OAUTH2_PROVIDER_GRANT_MODEL),
migrations.swappable_dependency(settings.OAUTH2_PROVIDER_ACCESS_TOKEN_MODEL),
("peeringdb_server", "0124_ixfmemberdata_bfd_support_and_more"),
]
operations = [
migrations.CreateModel(
name="OAuthGrantInfo",
fields=[
(
"id",
models.AutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
(
"amr",
models.CharField(
blank=True,
help_text="Authentication method reference",
max_length=255,
null=True,
),
),
(
"grant",
models.OneToOneField(
on_delete=django.db.models.deletion.CASCADE,
related_name="grant_info",
to=settings.OAUTH2_PROVIDER_GRANT_MODEL,
),
),
],
options={
"verbose_name": "OAuth Grant Info",
"verbose_name_plural": "OAuth Grant Info",
"db_table": "peeringdb_oauth_grant_info",
},
),
migrations.CreateModel(
name="OAuthAccessTokenInfo",
fields=[
(
"id",
models.AutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
(
"amr",
models.CharField(
blank=True,
help_text="Authentication method reference",
max_length=255,
null=True,
),
),
(
"access_token",
models.OneToOneField(
on_delete=django.db.models.deletion.CASCADE,
related_name="access_token_info",
to=settings.OAUTH2_PROVIDER_ACCESS_TOKEN_MODEL,
),
),
],
options={
"verbose_name": "OAuth Access Token Info",
"verbose_name_plural": "OAuth Access Token Info",
"db_table": "peeringdb_oauth_access_token_info",
},
),
]

View File

@ -6655,6 +6655,65 @@ class OAuthApplication(oauth2.AbstractApplication):
self.user = None
class OAuthGrantInfo(models.Model):
"""
OAuth grant info
Used to store additional information about a grant
- amr: Authentication method reference set on the session that
created the grant
"""
grant = models.OneToOneField(
oauth2.Grant,
on_delete=models.CASCADE,
related_name="grant_info",
)
amr = models.CharField(
max_length=255,
help_text=_("Authentication method reference"),
null=True,
blank=True,
)
class Meta:
db_table = "peeringdb_oauth_grant_info"
verbose_name = _("OAuth Grant Info")
verbose_name_plural = _("OAuth Grant Info")
class OAuthAccessTokenInfo(models.Model):
"""
OAuth access token info
Used to store additional information about an access token
- amr: Authentication method reference set on the session that
created the grant that resulted in this access token
"""
access_token = models.OneToOneField(
oauth2.AccessToken,
on_delete=models.CASCADE,
related_name="access_token_info",
)
amr = models.CharField(
max_length=255,
help_text=_("Authentication method reference"),
null=True,
blank=True,
)
class Meta:
db_table = "peeringdb_oauth_access_token_info"
verbose_name = _("OAuth Access Token Info")
verbose_name_plural = _("OAuth Access Token Info")
WATCHABLE_OBJECTS = [
("net", _("Network")),
]

View File

@ -17,6 +17,7 @@ import json
import os
import re
import uuid
from typing import Any
import googlemaps.exceptions
import oauth2_provider.views as oauth2_views
@ -35,6 +36,7 @@ from django.core.exceptions import (
ObjectDoesNotExist,
ValidationError,
)
from django.core.handlers.wsgi import WSGIRequest
from django.db import connection, transaction
from django.db.models import Q
from django.forms.models import modelform_factory
@ -58,6 +60,7 @@ from django.views.decorators.cache import never_cache
from django.views.decorators.csrf import csrf_protect, ensure_csrf_cookie
from django.views.decorators.http import require_http_methods
from django_grainy.util import Permissions
from django_otp import user_has_device
from django_otp.plugins.otp_email.models import EmailDevice
from django_peeringdb.const import (
CAMPUS_HELP_TEXT,
@ -69,10 +72,12 @@ from django_security_keys.ext.two_factor.views import ( # noqa
DisableView as TwoFactorDisableView,
)
from django_security_keys.ext.two_factor.views import LoginView as TwoFactorLoginView
from django_security_keys.models import SecurityKey
from grainy.const import PERM_CREATE, PERM_CRUD, PERM_DELETE, PERM_UPDATE
from oauth2_provider.decorators import protected_resource
from oauth2_provider.models import get_application_model
from oauth2_provider.oauth2_backends import get_oauthlib_core
from two_factor.utils import default_device
import peeringdb_server.geo
from peeringdb_server import settings
@ -113,6 +118,7 @@ from peeringdb_server.models import (
NetworkContact,
NetworkFacility,
NetworkIXLan,
OAuthAccessTokenInfo,
Organization,
Partnership,
Sponsorship,
@ -759,12 +765,14 @@ def view_profile_v1(request):
oauth = get_oauthlib_core()
scope_email, _request = oauth.verify_request(request, scopes=["email"])
scope_networks, _request = oauth.verify_request(request, scopes=["networks"])
scope_amr, amr_request = oauth.verify_request(request, scopes=["amr"])
json_params = {}
if "pretty" in request.GET:
json_params["indent"] = 2
user = request.user
data = dict(
id=request.user.id,
given_name=request.user.first_name,
@ -795,6 +803,18 @@ def view_profile_v1(request):
data["networks"] = networks
# only add amr if amr scope is present
if scope_amr:
try:
data["amr"] = amr_request.access_token.access_token_info.amr
if not data["amr"]:
data["amr"] = []
else:
data["amr"] = data["amr"].split(",")
except OAuthAccessTokenInfo.DoesNotExist:
data["amr"] = []
return JsonResponse(data, json_dumps_params=json_params)
@ -3328,6 +3348,28 @@ class LoginView(TwoFactorLoginView):
return kwargs
def attempt_passwordless_auth(
self, request: WSGIRequest, **kwargs: Any
) -> HttpResponseRedirect | None:
"""
Wrap attempt_passwordless_auth so we can set a session
property to indicate that the passwordless auth was
used
"""
response = super().attempt_passwordless_auth(request, **kwargs)
# if `credential` in request POST is set AND
# we got a response from `attempt_passwordless_auth`, passwordless
# authentication was used and succeeded
credential = request.POST.get("credential", None)
username = request.POST.get("auth-username", None)
if credential and response and username:
request.used_passwordless_auth = True
return response
@transaction.atomic
@method_decorator(
ratelimit(key="ip", rate=RATELIMITS["request_login_POST"], method="POST")
@ -3467,6 +3509,42 @@ class LoginView(TwoFactorLoginView):
return redir
def set_amr(self):
amr = []
done_forms = self.get_done_form_list()
passwordless = getattr(self, "used_passwordless_auth", False)
if not passwordless:
amr.append("pwd")
else:
amr.append("swk")
if "token" in done_forms:
# user used OTP to login
amr.append("otp")
if "security-key" in done_forms:
# user used a security key to login
# TODO: we cannot currently differentiate between
# sub types of security keys, so we just add "swk" for now
#
# We'd want to differentiate fingerprint readers, iris scanners etc.
# but for that webauthn attestation is needed, which we currently
# do not collect.
#
# NOTE: by design if passwordless authentication was used
# it is required to be a different security key, so there may
# actually be cases of amr being "swk swk" which is accurate
# and RFC 8176 does not seem to disallow this (multiples of the same type).
amr.append("swk")
# if user used more than one method to authenticate
# we add "mfa" to the amr list
if len(amr) > 1:
amr.append("mfa")
return amr
def done(self, form_list, **kwargs):
"""
User authenticated successfully, set language options.
@ -3476,6 +3554,8 @@ class LoginView(TwoFactorLoginView):
# TODO: do this via signal instead?
self.request.session["amr"] = self.set_amr()
user_language = self.get_user().get_locale()
translation.activate(user_language)
success_url = self.get_success_url()

51
tests/data/oidc/oidc.key Normal file
View File

@ -0,0 +1,51 @@
-----BEGIN RSA PRIVATE KEY-----
MIIJKQIBAAKCAgEAygxqqHPMZ+wq128kLR9bd41RJA/Zx+gSU8fIMlb+hfc23etO
NvcDoBqTAg9AOlND+nQBI393veIXcOGjhEwS+AsSnFWq1Yyk1Vg2niO8iu9h+rxZ
/FHwzYZ8jBFlIOdDRdzr2jTqPj8MdXM0e35yv9KD+5fRW5e1Fuyp0+PFgDz6qCnr
11mWk/ikmy0IZYWXjh6uXpyUT4xzo+pgN5w3UcKQ7ITnStc08L3yvnJvb62C85bQ
ZqSxzzk3Dre5PZkyXb9TVey99Qcz3E8grUEEbrgbMB3Jpz6kyQQnfeQJJ00KBV8s
Qk2CP32iOloph4mh4wFG9VfmsNzs90pVvS4YwEmNDPhlMaoi7ezxaZFT/EeOaLVK
9uwSQh7xC++chL7CWIXBaDqJ09nc52iterNway2SxncqBPLjH3uY/1d5YiveHjFY
7NIky6hy3iRWQHnoJ6cjmBl3eadErppNa83CZKEIDnGqc1WUsz6WfGQiz+8Bmgx4
4ias+ZjLcGgtKiBd3+R27Bk50LjomfRoSLgexcI785o+IC5KgGPldmPygUnugCks
ddzVZSwBY+zmIAkHEVtpuOr09XWmlC4FhxZGX/FuWUJQU8Gz3GbIV4sSD1xQv31i
Kh7mppKw18IMMvekXtjpc8dWurC2W+TNzaLrwYphglcckC+7hBYY4G2AMm8CAwEA
AQKCAgEApVCffBCzMFyFeRuQp/K+LgVMXIszj2MMi8wsuzN+sTHijJTYvBRDvR3B
JrA/xvkV57g+rkK/QHfa4htcZQSxiI3Wvl9BldSrkXvJxH5M6AiRTEwL/G0275KY
GSqqac+OtXza5QDin+eKVBbc3CZL0TN3lcVUIPLx0j31fk8g65n5JNqYAu1kTNHm
LKkd86Loq/nihbrrhmLOw1EjRJzmjlj5puvfKtcGk+t2z+hl0SRQvumovjMG4RxZ
NguQ3xuXoCjJEMbRbchk4F9FxzwpWkX8yNO2iMayoV5qyZBpBOT9zANg/SbZ6QTs
VX4r+J/d+xSycE9pi1BX+sWxfUZYLJfkucRcw5eLdzF88f/mNEhq5SPFRqcOxP2I
hHknpb68xixe/HBLctYOYbgUdBUklFQ3Z+Lsb90VEP0hoaDYY5JiR520x5Bc8yfx
j834T70E5XSayFYHfrqdHvdZmU9hZWKnMBVhEoZ+tuDWazLw31Pb9Q8ugMUajJ0U
Flsq7B9GGaJt9HN1oCH/0avcXZdWknbnpEmy99Vs0CSfOVHJvHmVvs9xpM3S7mOF
1Z/yaxPTl9ETlKEPvV4lJ/tRYR1GPgd1y295ApopHX1vsofYZEN+Rq3wu7tIlWkc
hVW4rE8L4gRf/9+VpclNSUYI3KBl55SwH0bj36d3e2+BZmS+0uECggEBAO7L52S3
MxHpLmvDElM+KhS2nLcaTZK9XTiW+kN9+uS+Wjyazm7pEu0tC0/89uEWoG6JrEGD
iqG2CcpFQlq8wby8ewez/nkPKTjhg49K2efxsQEjWIJRjJES0Bmg5CsOM/nsRoSa
7CYvltsE60c4WUax9WZ2KVaBv+1YdFdKjYJUv+P3bLlHQs62eBlW8cLreocaiOaV
3pEwuGx7goexfP1fz6DjX7ajvw1S41/iNQH7UvYqyXJYI7LCVfSqrhdFBogPTTNP
Z6X+HbIAqgN7V8mNBvNsZS93SitSbynzqiZ0Vq0eBkK1Qjb+0wZC7W+1HAEi3IDV
MjCc4WkqgY/c1rkCggEBANiaxhHDP9FWrQsMbXeLyipeM3vmuhN1s7Ik2zDyBRl4
pAoDAqOwF6UPJy6mWtBGjfyJnNuPc/qzvBwB7LKroPIuy4Efw50iB9DLfqhyN+az
eF6Y9bEtfBnVW4Q7ltJ/165pBfpTyhoRUV166wXijmQe0+g/2bpkcx+tcTBvsQy/
Ult1EQv4Ol5rD5dhKB175AQht3bAh8XZj6mnxzoIU8D9EPb3qrRZwDaW7nGI+BHB
gdsfavdOtkqUiun0P7a8sz5d0hWvb8Ox45uYbGI2U4bMRAnwYERsT80sXRs5Dk3S
KEVzbebp9HSy7frOC4irG4p8x2UAZsZNPvyAukpNPmcCggEALrVdnnqjF6bHuLZg
WD35lzGc98fCUqiNmmsVqqvyrll5Zw9Uv6cBV7kO1GeDlsWJPfXZ5rqnHDhyM8lW
UE9JifRIEFqptN49Tu31gMrAUrkYyUMzpdHzr9DBObIZzz4o9oK7zGZNOK7oUBDn
QHgKJmTavl7weQZKF0/M+eIQSeTv39RexOLmc26HkVVLVrgesFyFWK0J6tZfzIBg
KNCTxS+/ehXfe+cNkLqc1P13JtsVACegXahV2zpd+8yC3vRWcbikfhvTB0nsXwv4
NVQj+L2IFYcnR6CKtmyaZ9dMedcZuST9ToEL4Zv4hOoEbWw8fRBnlx+cgJu/D4YL
76rfqQKCAQAawFLCEtko44mPmcNTRmJ7hRLatI46pQjCcabYjqtuj+XCjsieqrrg
2dQ/ALCqlGwoyl4jB76NMfGDjAFNt8Cy6HCdy+x+1ZsdgGRuLV0WEwhEtqNxdGql
TiJ1XF6uNLfQhIwGdJ67RXRFFU89dNOdLemB/5DGYm2NdgBKuSCmsGJdhUHLtqaO
zq4oRgmMSqbIk9lywyiBmPMTQvNqCWk+poKX3dKpX4RLuaxNPVtV5g5Bn8m4LXJ6
F4poho/v47JHe5LctsQY6TlxGAwavRbnyQaGK7fX+4CHmHkPSaaomdH7cdwa7r4Q
Q6D8nsHx6SAHpI2Dvd7zksq6qLNZRdz1AoIBAQCj2lcj+TBNtxbNszshRowdBX0M
1Mtv4fAkqOX4IkX8prUxNu2xrZE7LyepplB2cU1j4SsaONhieE0AjRC1HZJZI6dM
pYw5u4fShdekbC4/bO+rbs46hWEfhm4xIKSr2OyVPw70PlJn4DUjc+poRt3wHw6n
nA92HkrjX2XukioZ3t665SmnaLN8t0B0q6bjb3Aw/boTv/Mv9G5ka8nF17ch+WvL
JLlT0qM9VydAdRDcQx8+XArg6oCyJt3GvwrGSPgs7DYkOUZqnh8XbZlJc9XV8RaU
WwW9B21JQ4I+zF8gnqycfr0V+v0A4g27MwXa10jto7op6SBsiP1NPoKAo5Ra
-----END RSA PRIVATE KEY-----

View File

@ -10,6 +10,7 @@ from .util import reset_group_ids
@pytest.fixture
def organization():
reset_group_ids()
return models.Organization.objects.create(name="test org", status="ok")
@ -22,7 +23,6 @@ def network(organization):
@pytest.fixture
def verified_user(organization):
reset_group_ids()
user_group = Group.objects.get(name="user")
user = models.User.objects.create_user(
@ -41,6 +41,8 @@ def oauth_request(verified_user):
request = Request("/")
request.user = verified_user
request.scopes = []
request.client = None
request.decoded_body = [("code", "testcode")]
return request
@ -60,6 +62,7 @@ def test_oidc_validator_produces_profile_claims(oauth_request):
"email": None,
"email_verified": None,
"networks": None,
"amr": [],
}
@ -79,6 +82,7 @@ def test_oidc_validator_produces_email_claims(oauth_request):
"email": "testuser@example.net",
"email_verified": False,
"networks": None,
"amr": [],
}
@ -105,4 +109,5 @@ def test_oidc_validator_produces_network_claims(oauth_request, network):
"perms": 1,
},
],
"amr": [],
}

View File

@ -1,6 +1,7 @@
import json
from urllib.parse import parse_qs, urlparse
import jwt
import pytest
from django.conf import settings
from django.core.signing import get_cookie_signer
@ -17,6 +18,7 @@ from peeringdb_server.views import (
ApplicationRegistration,
ApplicationUpdate,
)
from tests.util import reset_group_ids
def override_app_model():
@ -29,6 +31,7 @@ def restore_app_model():
@pytest.fixture
def oauth2_org_admin_user():
reset_group_ids()
user = User.objects.create_user("test", "test", "test@localhost")
org = Organization.objects.create(name="Test Org 001", status="ok")
org.admin_usergroup.user_set.add(user)
@ -469,3 +472,248 @@ def test_oauth_authorization_process(oauth2_org_admin_user):
# Clean up
AccessToken.objects.all().delete()
RefreshToken.objects.all().delete()
@pytest.mark.django_db
def test_oauth_authorization_process_amr(oauth2_org_admin_user):
user, _, _ = oauth2_org_admin_user
CLIENT_ID = "client_id_user"
CLIENT_SECRET = "client_secret_user"
REDIRECT_URIS = "https://example.com"
SCOPES = "email profile networks"
# Due to us swapping to our own OAuthApplication model, in order for the tests
# to work we need to create the application in both the oauth2_provider and
# peeringdb_server tables
#
# This is odd, and there is probably some way to avoid this, but from looking
# at the oauth2_provider code, its not immediately clear how.
user_app = Application.objects.create(
user=user,
name="User app",
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
redirect_uris=REDIRECT_URIS,
skip_authorization=False,
authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE,
client_type=Application.CLIENT_CONFIDENTIAL,
)
user_app = OAuthApplication.objects.create(
id=user_app.id,
user=user,
name="User app",
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
redirect_uris=REDIRECT_URIS,
skip_authorization=False,
authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE,
client_type=Application.CLIENT_CONFIDENTIAL,
)
user = user_app.user
client = Client()
# Log in the user
client.force_login(user)
# Set the oauth_session cookie
session = client.session
session["amr"] = ["pwd"]
session["oauth_session"] = True
session.save()
cookie_signer = get_cookie_signer(salt="oauth_session")
signed_session_cookie = cookie_signer.sign(session.session_key)
client.cookies["oauth_session"] = signed_session_cookie
# Test the authorization request
auth_url = reverse("oauth2_provider:authorize")
auth_params = {
"client_id": CLIENT_ID,
"response_type": "code",
"redirect_uri": REDIRECT_URIS,
"scope": SCOPES,
"state": "random_state",
}
resp = client.get(auth_url, auth_params)
assert resp.status_code == 200
assert "Application requires the following permissions" in resp.content.decode(
"utf-8"
)
# Test the authorization grant
auth_data = {
"client_id": CLIENT_ID,
"response_type": "code",
"redirect_uri": REDIRECT_URIS,
"scope": SCOPES,
"allow": "Authorize",
}
resp = client.post(auth_url, auth_data)
assert resp.status_code == 302
assert resp.url.startswith(REDIRECT_URIS)
auth_code = resp.url.split("code=")[1]
# Test the token exchange
token_url = reverse("oauth2_provider:token")
token_data = {
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": REDIRECT_URIS,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
}
resp = client.post(token_url, token_data)
assert resp.status_code == 200
token_data = resp.json()
access_token = token_data["access_token"]
refresh_token = token_data["refresh_token"]
access_token_object = AccessToken.objects.get(token=access_token)
assert access_token_object.access_token_info.amr == "pwd"
# Test the token refresh
refresh_data = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
}
resp = client.post(token_url, refresh_data)
assert resp.status_code == 200
refresh_data = resp.json()
new_access_token = refresh_data["access_token"]
assert new_access_token != access_token
# Test error scenarios
# Invalid client_id
invalid_client_data = token_data.copy()
invalid_client_data["client_id"] = "invalid_client_id"
resp = client.post(token_url, invalid_client_data)
assert resp.status_code == 400
# Invalid client_secret
invalid_secret_data = token_data.copy()
invalid_secret_data["client_secret"] = "invalid_client_secret"
resp = client.post(token_url, invalid_secret_data)
assert resp.status_code == 400
# Clean up
AccessToken.objects.all().delete()
RefreshToken.objects.all().delete()
@pytest.mark.django_db
def test_oauth_authorization_process_oidc_amr(oauth2_org_admin_user):
user, _, _ = oauth2_org_admin_user
CLIENT_ID = "client_id_user"
CLIENT_SECRET = "client_secret_user"
REDIRECT_URIS = "https://example.com"
SCOPES = "openid email profile networks"
# Due to us swapping to our own OAuthApplication model, in order for the tests
# to work we need to create the application in both the oauth2_provider and
# peeringdb_server tables
#
# This is odd, and there is probably some way to avoid this, but from looking
# at the oauth2_provider code, its not immediately clear how.
user_app = Application.objects.create(
user=user,
name="User app",
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
redirect_uris=REDIRECT_URIS,
skip_authorization=False,
authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE,
client_type=Application.CLIENT_CONFIDENTIAL,
algorithm="RS256",
)
user_app = OAuthApplication.objects.create(
id=user_app.id,
user=user,
name="User app",
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
redirect_uris=REDIRECT_URIS,
skip_authorization=False,
authorization_grant_type=Application.GRANT_AUTHORIZATION_CODE,
client_type=Application.CLIENT_CONFIDENTIAL,
algorithm="RS256",
)
user = user_app.user
client = Client()
# Log in the user
client.force_login(user)
# Set the oauth_session cookie
session = client.session
session["amr"] = ["pwd"]
session["oauth_session"] = True
session.save()
cookie_signer = get_cookie_signer(salt="oauth_session")
signed_session_cookie = cookie_signer.sign(session.session_key)
client.cookies["oauth_session"] = signed_session_cookie
# Test the authorization request
auth_url = reverse("oauth2_provider:authorize")
auth_params = {
"client_id": CLIENT_ID,
"response_type": "code",
"redirect_uri": REDIRECT_URIS,
"scope": SCOPES,
"state": "random_state",
}
resp = client.get(auth_url, auth_params)
assert resp.status_code == 200
assert "Application requires the following permissions" in resp.content.decode(
"utf-8"
)
# Test the authorization grant
auth_data = {
"client_id": CLIENT_ID,
"response_type": "code",
"redirect_uri": REDIRECT_URIS,
"scope": SCOPES,
"allow": "Authorize",
}
resp = client.post(auth_url, auth_data)
assert resp.status_code == 302
assert resp.url.startswith(REDIRECT_URIS)
auth_code = resp.url.split("code=")[1]
# Test the token exchange
token_url = reverse("oauth2_provider:token")
token_data = {
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": REDIRECT_URIS,
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
}
resp = client.post(token_url, token_data)
assert resp.status_code == 200
token_data = resp.json()
access_token = token_data["access_token"]
id_token = token_data["id_token"]
assert id_token is not None
id_token_payload = expand_id_token(id_token)
assert id_token_payload["amr"] == ["pwd"]
access_token_object = AccessToken.objects.get(token=access_token)
assert access_token_object.access_token_info.amr == "pwd"
def expand_id_token(id_token):
parts = id_token.split(".")
payload = jwt.utils.base64url_decode(parts[1]).decode("utf-8")
payload_json = json.loads(payload)
return payload_json