mirror of
https://github.com/peeringdb/peeringdb.git
synced 2024-05-11 05:55:09 +00:00
* Change label from primary ASN to ASN * Raise validation error when trying to update ASN * first steps for dotf importer procotol (#697) * migrations (#697) * Add translation to error meessage * Make ASN readonly in table * Add test now that ASN should not be able to update * Set fac.rencode to '' for all entries and make it readonly in serializer * Add unique constraints to network ixlan ip addresses * Add migration to null out duplicate ipaddresses for deleted netixlans * Add unique constraints to network ixlan ip addresses * Add migration to null out duplicate ipaddresses for deleted netixlans * remove old migrations (#697) * fix netixlan ipaddr dedupe migration (#268) add netixlan ipaddr unique constraint migration (#268) * ixf_member_data migrations (#697) * fix table name (#697) * importer protocol (#697) * fix netixlan ipaddr dedupe migration (#268) add netixlan ipaddr unique constraint migration (#268) * ixf proposed changes notifications (#697) * Delete repeated query * Add a test to show rencode is readonly * Blank out rencode when mocking data * Remove validator now that constraint exists * Add back unique field validator w Check Deleted true * conflict resolving (#697) * UniqueFieldValidator raise error with code "unique" (#268) * conflict resolution (#697) * Add fixme comment to tests * conflict resolution (#697) * Remove now invalid undelete tests * UniqueFieldValidator raise error with code "unique" (#268) * delete admin tools for duplicate ip addresses * Make migration to delete duplicateipnetworkixlan * Add ixlan-ixpfx status matching validation, add corresponding test * delete redundant checking in test * resolve conflict ui (#697) * fix migrations hierarchy * squash migrations for ixf member data * clean up preview and post-mortem tools * remove non-sensical permission check when undeleting soft-deleted objects through unique integrity error handling * only include the ix-f data url in notifications to admincom (#697) * resolve on --skip-import (#697) * ac conflict resolution (#697) * Define more accurately the incompatible statuses for ixlan and ixpfx * Add another status test * Preventing disrupting changes (#697) * fix tests (#697) * Stop allow_ixp_update from being write only and add a global stat for automated networks * Add tests for global stats that appear in footer * Change how timezone is called with datetime, to get test_stats.py/test_generate_for_current_date to pass * test for protected entities (#697) * admincom conflict resolution refine readonly fields (#697) network notifications only if the problem is actually actionable by the network (#697) * ixp / ac notifcation when ix-f source cannot be parsed (#697) fix issue with ixlan prefix protection (#697) * migrations (#697) * code documentation (#697) * ux tweaks (#697) * UX tweaks (#697) * Fix typo * fix netixlan returned in IXFMemberData.apply when adding a new one (#697) * fix import log incosistencies (#697) * Add IXFMemberData to test * Update test data * Add protocol tests * Add tests for views * always persist changes to remote data on set_conflict (#697) * More tests * always persist changes to remote data on set_conflict (#697) * suggest-add test * net_present_at_ix should check status (#697) * Add more protocol tests * Edit language of some tests * django-peeringdb to 2.1.1 relock pipfile, pin django-ratelimit to <3 as it breaks stuff * Add net_count_ixf field to ix object (#683) * Add the IX-F Member Export URL to the ixlan API endpoint (#249) * Lock some objects from being deleted by the owner (#696) * regenerate api docs (#249) * always persist changes to remote data on set_add and set_update (#697) * IXFMemberData: always persist remote data changes during set_add and set_update, also allow for saving without touching the updated field * always persist changes to remote data on set_add and set_update (#697) * Fix suggest-add tests * IXFMemberData: always persist remote data changes during set_add and set_update, also allow for saving without touching the updated field * IXFMemberData: always persist remote data changes during set_add and set_update, also allow for saving without touching the updated field * fix issue with deletion when ixfmemberdata for entry existed previously (#697) * fix test_suggest_delete_local_ixf_no_flag (#697 tests) * fix issue with deletion when ixfmemberdata for entry existed previously (#697) * invalid ips get logged and notified to the ix via notify_error (#697) * Fix more tests * issue with previous_data when running without save (#697) properly track speed errors (#697) * reset errors on ixfmemberdata that go into pending_save (#697) * add remote_data to admin view (#697) * fix error reset inconsistency (#697) * Refine invalid data tests * remove debug output * for notifications to ac include contact points for net and ix in the message (#697) * settings to toggle ix-f tickets / emails (#697) * allow turning off ix-f notifications for net and ix separately (#697) * add jsonschema test * Add idempotent tests to updater * remove old ixf member tests * Invalid data tests when ixp_updates are enabled * fix speed error validation (#697) * fix issue with rollback (#697) * fix migration hierarchy * fix ixfmemberdata _email * django-peeringdb to 2.2 and relock * add ixf rollback tests * ixf email notifications off by default * black formatted * pyupgrade Co-authored-by: egfrank <egfrank@20c.com> Co-authored-by: Stefan Pratter <stefan@20c.com>
327 lines
10 KiB
Python
327 lines
10 KiB
Python
import json
|
|
import re
|
|
|
|
import pytest
|
|
|
|
from django.test import Client, TestCase, RequestFactory
|
|
from django.contrib.auth.models import Group
|
|
|
|
from captcha.models import CaptchaStore
|
|
|
|
import peeringdb_server.models as models
|
|
import peeringdb_server.views as views
|
|
|
|
|
|
class UserTests(TestCase):
|
|
"""
|
|
Test peeringdb_server.models.User functions
|
|
"""
|
|
|
|
@classmethod
|
|
def setUpTestData(cls):
|
|
guest_group = Group.objects.create(name="guest")
|
|
user_group = Group.objects.create(name="user")
|
|
for name in ["user_a", "user_b", "user_c", "user_d"]:
|
|
setattr(
|
|
cls,
|
|
name,
|
|
models.User.objects.create_user(
|
|
name,
|
|
"%s@localhost" % name,
|
|
first_name=name,
|
|
last_name=name,
|
|
password=name,
|
|
),
|
|
)
|
|
|
|
cls.org_a = models.Organization.objects.create(name="org A", status="ok")
|
|
cls.org_b = models.Organization.objects.create(name="org B", status="ok")
|
|
|
|
user_group.user_set.add(cls.user_a)
|
|
user_group.user_set.add(cls.user_d)
|
|
guest_group.user_set.add(cls.user_b)
|
|
|
|
cls.org_a.usergroup.user_set.add(cls.user_a)
|
|
cls.org_b.admin_usergroup.user_set.add(cls.user_b)
|
|
|
|
def setUp(self):
|
|
self.factory = RequestFactory()
|
|
|
|
def test_full_name(self):
|
|
"""
|
|
Test User.full_name
|
|
"""
|
|
self.assertEqual(self.user_a.full_name, "user_a user_a")
|
|
|
|
def test_organizations(self):
|
|
"""
|
|
Test User.organizations
|
|
"""
|
|
|
|
# test that organizations are returned where the user is member
|
|
orgs = self.user_a.organizations
|
|
self.assertEqual(len(orgs), 1)
|
|
self.assertEqual(orgs[0].id, self.org_a.id)
|
|
|
|
# test that organizations are returned where the user is admin
|
|
orgs = self.user_b.organizations
|
|
self.assertEqual(len(orgs), 1)
|
|
self.assertEqual(orgs[0].id, self.org_b.id)
|
|
|
|
orgs = self.user_c.organizations
|
|
self.assertEqual(len(orgs), 0)
|
|
|
|
def test_is_org_member(self):
|
|
"""
|
|
Test User.is_org_member
|
|
"""
|
|
self.assertEqual(self.user_a.is_org_member(self.org_a), True)
|
|
self.assertEqual(self.user_a.is_org_member(self.org_b), False)
|
|
self.assertEqual(self.user_c.is_org_member(self.org_a), False)
|
|
self.assertEqual(self.user_c.is_org_member(self.org_b), False)
|
|
|
|
def test_is_org_admin(self):
|
|
"""
|
|
Test User.is_org_admin
|
|
"""
|
|
self.assertEqual(self.user_b.is_org_member(self.org_b), False)
|
|
self.assertEqual(self.user_b.is_org_admin(self.org_b), True)
|
|
self.assertEqual(self.user_b.is_org_admin(self.org_a), False)
|
|
self.assertEqual(self.user_b.is_org_member(self.org_a), False)
|
|
|
|
def test_is_verified_user(self):
|
|
"""
|
|
Test User.is_verified_user
|
|
"""
|
|
|
|
self.assertEqual(self.user_a.is_verified_user, True)
|
|
self.assertEqual(self.user_b.is_verified_user, False)
|
|
self.assertEqual(self.user_c.is_verified_user, False)
|
|
|
|
def test_set_verified(self):
|
|
"""
|
|
Test user.set_verified
|
|
"""
|
|
|
|
self.user_c.set_verified()
|
|
self.user_c.refresh_from_db()
|
|
|
|
self.assertEqual(self.user_c.status, "ok")
|
|
self.assertEqual(self.user_c.is_verified_user, True)
|
|
|
|
self.assertEqual(self.user_c.groups.filter(name="guest").exists(), False)
|
|
self.assertEqual(self.user_c.groups.filter(name="user").exists(), True)
|
|
|
|
def test_set_unverified(self):
|
|
"""
|
|
Test user.set_unverified
|
|
"""
|
|
|
|
self.user_c.set_unverified()
|
|
self.user_c.refresh_from_db()
|
|
|
|
self.assertEqual(self.user_c.status, "pending")
|
|
self.assertEqual(self.user_c.is_verified_user, False)
|
|
|
|
self.assertEqual(self.user_c.groups.filter(name="guest").exists(), True)
|
|
self.assertEqual(self.user_c.groups.filter(name="user").exists(), False)
|
|
|
|
def test_password_reset(self):
|
|
"""
|
|
Test User.password_reset_initiate
|
|
Test User.password_reset_complete
|
|
Test views.view_password_reset POST
|
|
"""
|
|
|
|
# initiate request
|
|
request = self.factory.post(
|
|
"/reset-password", data={"email": self.user_a.email}
|
|
)
|
|
request._dont_enforce_csrf_checks = True
|
|
resp = views.view_password_reset(request)
|
|
|
|
# check that password-reset instance was created
|
|
pr = models.UserPasswordReset.objects.get(user=self.user_a)
|
|
|
|
self.assertIsNotNone(pr.token)
|
|
self.assertEqual(pr.is_valid(), True)
|
|
|
|
# re-initiate internally so we can get the token
|
|
token, hashed = self.user_a.password_reset_initiate()
|
|
pr = self.user_a.password_reset
|
|
|
|
# password reset request
|
|
pwd = "abcdefghjikl"
|
|
request = self.factory.post(
|
|
"/reset-password",
|
|
data={
|
|
"target": self.user_a.id,
|
|
"token": token,
|
|
"password": pwd,
|
|
"password_v": pwd,
|
|
},
|
|
)
|
|
request._dont_enforce_csrf_checks = True
|
|
resp = views.view_password_reset(request)
|
|
|
|
self.assertEqual(json.loads(resp.content)["status"], "ok")
|
|
|
|
with pytest.raises(models.UserPasswordReset.DoesNotExist):
|
|
models.UserPasswordReset.objects.get(user=self.user_a)
|
|
|
|
# initiate another request so we can test failures
|
|
token, hashed = self.user_a.password_reset_initiate()
|
|
|
|
# failure test: invalid token
|
|
request = self.factory.post(
|
|
"/reset-password",
|
|
data={
|
|
"target": self.user_a.id,
|
|
"token": "wrong",
|
|
"password": pwd,
|
|
"password_v": pwd,
|
|
},
|
|
)
|
|
request._dont_enforce_csrf_checks = True
|
|
resp = views.view_password_reset(request)
|
|
self.assertEqual(resp.status_code, 400)
|
|
|
|
# failure test: invalid password(s): length
|
|
request = self.factory.post(
|
|
"/reset-password",
|
|
data={
|
|
"target": self.user_a.id,
|
|
"token": token,
|
|
"password": "a",
|
|
"password_v": "a",
|
|
},
|
|
)
|
|
request._dont_enforce_csrf_checks = True
|
|
resp = views.view_password_reset(request)
|
|
self.assertEqual(resp.status_code, 400)
|
|
|
|
# failure test: invalid password(s): validation mismatch
|
|
request = self.factory.post(
|
|
"/reset-password",
|
|
data={
|
|
"target": self.user_a.id,
|
|
"token": token,
|
|
"password": pwd,
|
|
"password_v": "a",
|
|
},
|
|
)
|
|
request._dont_enforce_csrf_checks = True
|
|
resp = views.view_password_reset(request)
|
|
self.assertEqual(resp.status_code, 400)
|
|
|
|
# failure test: invalid target
|
|
request = self.factory.post(
|
|
"/reset-password",
|
|
data={
|
|
"target": self.user_b.id,
|
|
"token": token,
|
|
"password": pwd,
|
|
"password_v": pwd,
|
|
},
|
|
)
|
|
request._dont_enforce_csrf_checks = True
|
|
resp = views.view_password_reset(request)
|
|
self.assertEqual(resp.status_code, 400)
|
|
|
|
def test_login_redirect(self):
|
|
data = {
|
|
"next": "/org/1",
|
|
"auth-username": "user_d",
|
|
"auth-password": "user_d",
|
|
"login_view-current_step": "auth",
|
|
}
|
|
C = Client()
|
|
resp = C.post("/account/login/", data, follow=True)
|
|
self.assertEqual(resp.redirect_chain, [("/org/1", 302)])
|
|
|
|
data = {
|
|
"next": "/logout",
|
|
"auth-username": "user_d",
|
|
"auth-password": "user_d",
|
|
"login_view-current_step": "auth",
|
|
}
|
|
|
|
C = Client()
|
|
resp = C.post("/account/login/", data, follow=True)
|
|
self.assertEqual(resp.redirect_chain, [("/", 302)])
|
|
self.assertEqual(resp.context["user"].is_authenticated, True)
|
|
|
|
def test_username_retrieve(self):
|
|
"""
|
|
test the username retrieve process
|
|
"""
|
|
|
|
c = Client()
|
|
|
|
# initiate process
|
|
response = c.post("/username-retrieve/initiate", {"email": self.user_a.email})
|
|
|
|
secret = c.session["username_retrieve_secret"]
|
|
email = c.session["username_retrieve_email"]
|
|
self.assertNotEqual(secret, None)
|
|
self.assertEqual(email, self.user_a.email)
|
|
|
|
# invalid secret
|
|
response = c.get("/username-retrieve/complete?secret=123")
|
|
assert self.user_a.email not in response.content.decode()
|
|
assert (
|
|
f'<p class="username">{self.user_a.username}</p>'
|
|
not in response.content.decode()
|
|
)
|
|
|
|
# complete process
|
|
response = c.get(f"/username-retrieve/complete?secret={secret}")
|
|
|
|
assert self.user_a.email in response.content.decode()
|
|
assert (
|
|
f'<p class="username">{self.user_a.username}</p>'
|
|
in response.content.decode()
|
|
)
|
|
|
|
# process no longer valid
|
|
response = c.get(f"/username-retrieve/complete?secret={secret}")
|
|
|
|
assert self.user_a.email not in response.content.decode()
|
|
assert (
|
|
f'<p class="username">{self.user_a.username}</p>'
|
|
not in response.content.decode()
|
|
)
|
|
|
|
with pytest.raises(KeyError):
|
|
secret = c.session["username_retrieve_secret"]
|
|
|
|
with pytest.raises(KeyError):
|
|
email = c.session["username_retrieve_email"]
|
|
|
|
def test_signup(self):
|
|
"""
|
|
test user signup with captcha fallback
|
|
"""
|
|
|
|
c = Client()
|
|
response = c.get("/register")
|
|
assert 'name="captcha_generator_0"' in response.content.decode()
|
|
m = re.search(
|
|
'name="captcha_generator_0" value="([^"]+)"', response.content.decode()
|
|
)
|
|
|
|
captcha_obj = CaptchaStore.objects.get(hashkey=m.group(1))
|
|
|
|
response = c.post(
|
|
"/register",
|
|
{
|
|
"username": "signuptest",
|
|
"password1": "signuptest_123",
|
|
"password2": "signuptest_123",
|
|
"email": "signuptest@localhost",
|
|
"captcha": f"{captcha_obj.hashkey}:{captcha_obj.response}",
|
|
},
|
|
)
|
|
|
|
self.assertEqual(json.loads(response.content), {"status": "ok"})
|