mirror of
				https://github.com/peeringdb/peeringdb.git
				synced 2024-05-11 05:55:09 +00:00 
			
		
		
		
	* install django-grainy * nsp to grainy first iteration * Fix validation error message overflow * Add migration, update views.py and template to add help_text to UI * nsp to grainy second iteration * grainy and django-grainy pinned to latest releases * deskpro ticket cc (#875) * black formatting * move ac link to bottom for ticket body * Fix typo * Update djangorestframework, peeringdb, django-ratelimit * Rewrite login view ratelimit decorator * Relock pipfile * add list() to make copy of dictionaries before iterating * respect ix-f url visibilty in ix-f conflict emails * Add type coercion to settings taken from environment variables * Add bool handling * relock pipfile with python3.9 change docker to use python3.9 * Check bool via isinstance * add ordering to admin search queryset for deskproticket and email * update settings with envvar_type option * Add tooltips to add ix and add exchange views (in org) * Add tooltip to suggest fac view * get phone information in view * add missing migration * add migration and make org a geo model * Wire normalization to put/create requests for Facility * Update admin with new address fields * Refactor serializer using mixin * Add floor and suite to address API * Write command to geonormalize existing entries * Remove unnecessary method from model * Add floor and suite to views * Add ignore geo status * Force refresh for fac and org updates * adjust frontend typo * add checking if update needs geosync * redo error handling for geosync * remove save keyword from geonormalize command script * change raw_id_fields * alternate autocomplete lookup field depending on where inline is called * remove unnecessary error handling * Add csv option * Fix bug with None vs empty string * add regex parsing for suite and floor conversion * Add migration that removes geo error as a field * add geostatus update to command * Ignore suite floor and address2 changes for api normalization * update geomodel by removing geo_error * Black models.py * Black serializers.py * remove geocode error from admin * Add function for reversing pretty speed * add conversion to export method * fix typo * fix speed value feedback after submit * remove conditional * Add error handling to create endpoint * Refine floor and suite parsing regex * Add geocoding tests * Add json for tests * IX-F Importer: Bogus output of "Preview" tool #896 * remove cruft * black formatting * IX-F Importer: history of changes per ixlan & netixlan #893 * 6 add geocode to org view * 4 update geocode without refresh * Update error display * Fix bug with formatting translated string * Add DateTimeFields to model * Add update signals * add last updated fields to views and serializers * Add last updated model migration * Add the data migration for last updated fields * add test that tests a normal org user with create org permissions * grainy to 1.7 django grainy to 1.9.1 * Fix formatting issues * Adjust var names * Refactor signals * Temporary: save override from network model * Empty vlan lists no longer cause error * typo in ixf.py * typo in admin * Typos in model verbose names * Add serializer IXLAN validation for ixf_ixp_import_enabled * Add model validation to IXLan * relock pipfile * relock pipfile * begin signal test file * Remove full clean from save in ixlan * use post_reversion_commit signal instead * remove redundant save override * remove cruft / debug code * Add signal tests * exclude organizations with city missing from commandline geosync * Skip geosync if the only address information we have is a country * initial commit for vlan matcher in importer * Add more tests and remove unused imports * update tests * Actually add vlan matching to importer * Add type checking for speed list and state * Change how we register connection.state * add bootstrap options * add rdap cache command * remove outdated perm docs * rdap from master and relock * propagate rdap settings to peeringdb.settings * add loaddata for initial fixtures * user friendly error message on RdapNotFound errors (#497) * update rdap errors * django-peeringdb to 2.5.0 and relock * rdap to 1.2.0 and relock * fix migration hierarchy * add ignore_recurse_errors option * add missing fields to mock remove cruft missed during merge * rdap to 1.2.1 * dont geo validate during api tests * fix tests * Add test file * fix merge * RDAP_SELF_BOOTSTRAP to False while running tests * black formatted * run black * add github actions * add runs on Co-authored-by: Stefan Pratter <stefan@20c.com> Co-authored-by: Elliot Frank <elliot@20c.com>
		
			
				
	
	
		
			332 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			332 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import json
 | |
| import re
 | |
| 
 | |
| import pytest
 | |
| 
 | |
| from django.test import Client, TestCase, RequestFactory
 | |
| from django.contrib.auth.models import Group
 | |
| from django.conf import settings
 | |
| 
 | |
| from captcha.models import CaptchaStore
 | |
| 
 | |
| import peeringdb_server.models as models
 | |
| import peeringdb_server.views as views
 | |
| 
 | |
| 
 | |
| class UserTests(TestCase):
 | |
|     """
 | |
|     Test peeringdb_server.models.User functions
 | |
|     """
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpTestData(cls):
 | |
|         cls.guest_group = Group.objects.create(name="guest", id=settings.GUEST_GROUP_ID)
 | |
|         cls.user_group = Group.objects.create(name="user", id=settings.USER_GROUP_ID)
 | |
| 
 | |
|         settings.USER_GROUP_ID = cls.user_group.id
 | |
|         settings.GUEST_GROUP_ID = cls.guest_group.id
 | |
| 
 | |
|         for name in ["user_a", "user_b", "user_c", "user_d"]:
 | |
|             setattr(
 | |
|                 cls,
 | |
|                 name,
 | |
|                 models.User.objects.create_user(
 | |
|                     name,
 | |
|                     "%s@localhost" % name,
 | |
|                     first_name=name,
 | |
|                     last_name=name,
 | |
|                     password=name,
 | |
|                 ),
 | |
|             )
 | |
| 
 | |
|         cls.org_a = models.Organization.objects.create(name="org A", status="ok")
 | |
|         cls.org_b = models.Organization.objects.create(name="org B", status="ok")
 | |
| 
 | |
|         cls.user_group.user_set.add(cls.user_a)
 | |
|         cls.user_group.user_set.add(cls.user_d)
 | |
|         cls.guest_group.user_set.add(cls.user_b)
 | |
| 
 | |
|         cls.org_a.usergroup.user_set.add(cls.user_a)
 | |
|         cls.org_b.admin_usergroup.user_set.add(cls.user_b)
 | |
| 
 | |
|     def setUp(self):
 | |
|         self.factory = RequestFactory()
 | |
| 
 | |
|     def test_full_name(self):
 | |
|         """
 | |
|         Test User.full_name
 | |
|         """
 | |
|         self.assertEqual(self.user_a.full_name, "user_a user_a")
 | |
| 
 | |
|     def test_organizations(self):
 | |
|         """
 | |
|         Test User.organizations
 | |
|         """
 | |
| 
 | |
|         # test that organizations are returned where the user is member
 | |
|         orgs = self.user_a.organizations
 | |
|         self.assertEqual(len(orgs), 1)
 | |
|         self.assertEqual(orgs[0].id, self.org_a.id)
 | |
| 
 | |
|         # test that organizations are returned where the user is admin
 | |
|         orgs = self.user_b.organizations
 | |
|         self.assertEqual(len(orgs), 1)
 | |
|         self.assertEqual(orgs[0].id, self.org_b.id)
 | |
| 
 | |
|         orgs = self.user_c.organizations
 | |
|         self.assertEqual(len(orgs), 0)
 | |
| 
 | |
|     def test_is_org_member(self):
 | |
|         """
 | |
|         Test User.is_org_member
 | |
|         """
 | |
|         self.assertEqual(self.user_a.is_org_member(self.org_a), True)
 | |
|         self.assertEqual(self.user_a.is_org_member(self.org_b), False)
 | |
|         self.assertEqual(self.user_c.is_org_member(self.org_a), False)
 | |
|         self.assertEqual(self.user_c.is_org_member(self.org_b), False)
 | |
| 
 | |
|     def test_is_org_admin(self):
 | |
|         """
 | |
|         Test User.is_org_admin
 | |
|         """
 | |
|         self.assertEqual(self.user_b.is_org_member(self.org_b), False)
 | |
|         self.assertEqual(self.user_b.is_org_admin(self.org_b), True)
 | |
|         self.assertEqual(self.user_b.is_org_admin(self.org_a), False)
 | |
|         self.assertEqual(self.user_b.is_org_member(self.org_a), False)
 | |
| 
 | |
|     def test_is_verified_user(self):
 | |
|         """
 | |
|         Test User.is_verified_user
 | |
|         """
 | |
| 
 | |
|         self.assertEqual(self.user_a.is_verified_user, True)
 | |
|         self.assertEqual(self.user_b.is_verified_user, False)
 | |
|         self.assertEqual(self.user_c.is_verified_user, False)
 | |
| 
 | |
|     def test_set_verified(self):
 | |
|         """
 | |
|         Test user.set_verified
 | |
|         """
 | |
| 
 | |
|         self.user_c.set_verified()
 | |
|         self.user_c.refresh_from_db()
 | |
| 
 | |
|         self.assertEqual(self.user_c.status, "ok")
 | |
|         self.assertEqual(self.user_c.is_verified_user, True)
 | |
| 
 | |
|         self.assertEqual(self.user_c.groups.filter(name="guest").exists(), False)
 | |
|         self.assertEqual(self.user_c.groups.filter(name="user").exists(), True)
 | |
| 
 | |
|     def test_set_unverified(self):
 | |
|         """
 | |
|         Test user.set_unverified
 | |
|         """
 | |
| 
 | |
|         self.user_c.set_unverified()
 | |
|         self.user_c.refresh_from_db()
 | |
| 
 | |
|         self.assertEqual(self.user_c.status, "pending")
 | |
|         self.assertEqual(self.user_c.is_verified_user, False)
 | |
| 
 | |
|         self.assertEqual(self.user_c.groups.filter(name="guest").exists(), True)
 | |
|         self.assertEqual(self.user_c.groups.filter(name="user").exists(), False)
 | |
| 
 | |
|     def test_password_reset(self):
 | |
|         """
 | |
|         Test User.password_reset_initiate
 | |
|         Test User.password_reset_complete
 | |
|         Test views.view_password_reset POST
 | |
|         """
 | |
| 
 | |
|         # initiate request
 | |
|         request = self.factory.post(
 | |
|             "/reset-password", data={"email": self.user_a.email}
 | |
|         )
 | |
|         request._dont_enforce_csrf_checks = True
 | |
|         resp = views.view_password_reset(request)
 | |
| 
 | |
|         # check that password-reset instance was created
 | |
|         pr = models.UserPasswordReset.objects.get(user=self.user_a)
 | |
| 
 | |
|         self.assertIsNotNone(pr.token)
 | |
|         self.assertEqual(pr.is_valid(), True)
 | |
| 
 | |
|         # re-initiate internally so we can get the token
 | |
|         token, hashed = self.user_a.password_reset_initiate()
 | |
|         pr = self.user_a.password_reset
 | |
| 
 | |
|         # password reset request
 | |
|         pwd = "abcdefghjikl"
 | |
|         request = self.factory.post(
 | |
|             "/reset-password",
 | |
|             data={
 | |
|                 "target": self.user_a.id,
 | |
|                 "token": token,
 | |
|                 "password": pwd,
 | |
|                 "password_v": pwd,
 | |
|             },
 | |
|         )
 | |
|         request._dont_enforce_csrf_checks = True
 | |
|         resp = views.view_password_reset(request)
 | |
| 
 | |
|         self.assertEqual(json.loads(resp.content)["status"], "ok")
 | |
| 
 | |
|         with pytest.raises(models.UserPasswordReset.DoesNotExist):
 | |
|             models.UserPasswordReset.objects.get(user=self.user_a)
 | |
| 
 | |
|         # initiate another request so we can test failures
 | |
|         token, hashed = self.user_a.password_reset_initiate()
 | |
| 
 | |
|         # failure test: invalid token
 | |
|         request = self.factory.post(
 | |
|             "/reset-password",
 | |
|             data={
 | |
|                 "target": self.user_a.id,
 | |
|                 "token": "wrong",
 | |
|                 "password": pwd,
 | |
|                 "password_v": pwd,
 | |
|             },
 | |
|         )
 | |
|         request._dont_enforce_csrf_checks = True
 | |
|         resp = views.view_password_reset(request)
 | |
|         self.assertEqual(resp.status_code, 400)
 | |
| 
 | |
|         # failure test: invalid password(s): length
 | |
|         request = self.factory.post(
 | |
|             "/reset-password",
 | |
|             data={
 | |
|                 "target": self.user_a.id,
 | |
|                 "token": token,
 | |
|                 "password": "a",
 | |
|                 "password_v": "a",
 | |
|             },
 | |
|         )
 | |
|         request._dont_enforce_csrf_checks = True
 | |
|         resp = views.view_password_reset(request)
 | |
|         self.assertEqual(resp.status_code, 400)
 | |
| 
 | |
|         # failure test: invalid password(s): validation mismatch
 | |
|         request = self.factory.post(
 | |
|             "/reset-password",
 | |
|             data={
 | |
|                 "target": self.user_a.id,
 | |
|                 "token": token,
 | |
|                 "password": pwd,
 | |
|                 "password_v": "a",
 | |
|             },
 | |
|         )
 | |
|         request._dont_enforce_csrf_checks = True
 | |
|         resp = views.view_password_reset(request)
 | |
|         self.assertEqual(resp.status_code, 400)
 | |
| 
 | |
|         # failure test: invalid target
 | |
|         request = self.factory.post(
 | |
|             "/reset-password",
 | |
|             data={
 | |
|                 "target": self.user_b.id,
 | |
|                 "token": token,
 | |
|                 "password": pwd,
 | |
|                 "password_v": pwd,
 | |
|             },
 | |
|         )
 | |
|         request._dont_enforce_csrf_checks = True
 | |
|         resp = views.view_password_reset(request)
 | |
|         self.assertEqual(resp.status_code, 400)
 | |
| 
 | |
|     def test_login_redirect(self):
 | |
|         data = {
 | |
|             "next": "/org/{}".format(self.org_a.id),
 | |
|             "auth-username": "user_d",
 | |
|             "auth-password": "user_d",
 | |
|             "login_view-current_step": "auth",
 | |
|         }
 | |
|         C = Client()
 | |
|         resp = C.post("/account/login/", data, follow=True)
 | |
|         self.assertEqual(resp.redirect_chain, [("/org/{}".format(self.org_a.id), 302)])
 | |
| 
 | |
|         data = {
 | |
|             "next": "/logout",
 | |
|             "auth-username": "user_d",
 | |
|             "auth-password": "user_d",
 | |
|             "login_view-current_step": "auth",
 | |
|         }
 | |
| 
 | |
|         C = Client()
 | |
|         resp = C.post("/account/login/", data, follow=True)
 | |
|         self.assertEqual(resp.redirect_chain, [("/", 302)])
 | |
|         self.assertEqual(resp.context["user"].is_authenticated, True)
 | |
| 
 | |
|     def test_username_retrieve(self):
 | |
|         """
 | |
|         test the username retrieve process
 | |
|         """
 | |
| 
 | |
|         c = Client()
 | |
| 
 | |
|         # initiate process
 | |
|         response = c.post("/username-retrieve/initiate", {"email": self.user_a.email})
 | |
| 
 | |
|         secret = c.session["username_retrieve_secret"]
 | |
|         email = c.session["username_retrieve_email"]
 | |
|         self.assertNotEqual(secret, None)
 | |
|         self.assertEqual(email, self.user_a.email)
 | |
| 
 | |
|         # invalid secret
 | |
|         response = c.get("/username-retrieve/complete?secret=123")
 | |
|         assert self.user_a.email not in response.content.decode()
 | |
|         assert (
 | |
|             f'<p class="username">{self.user_a.username}</p>'
 | |
|             not in response.content.decode()
 | |
|         )
 | |
| 
 | |
|         # complete process
 | |
|         response = c.get(f"/username-retrieve/complete?secret={secret}")
 | |
| 
 | |
|         assert self.user_a.email in response.content.decode()
 | |
|         assert (
 | |
|             f'<p class="username">{self.user_a.username}</p>'
 | |
|             in response.content.decode()
 | |
|         )
 | |
| 
 | |
|         # process no longer valid
 | |
|         response = c.get(f"/username-retrieve/complete?secret={secret}")
 | |
| 
 | |
|         assert self.user_a.email not in response.content.decode()
 | |
|         assert (
 | |
|             f'<p class="username">{self.user_a.username}</p>'
 | |
|             not in response.content.decode()
 | |
|         )
 | |
| 
 | |
|         with pytest.raises(KeyError):
 | |
|             secret = c.session["username_retrieve_secret"]
 | |
| 
 | |
|         with pytest.raises(KeyError):
 | |
|             email = c.session["username_retrieve_email"]
 | |
| 
 | |
|     def test_signup(self):
 | |
|         """
 | |
|         test user signup with captcha fallback
 | |
|         """
 | |
| 
 | |
|         c = Client()
 | |
|         response = c.get("/register")
 | |
|         assert 'name="captcha_generator_0"' in response.content.decode()
 | |
|         m = re.search(
 | |
|             'name="captcha_generator_0" value="([^"]+)"', response.content.decode()
 | |
|         )
 | |
| 
 | |
|         captcha_obj = CaptchaStore.objects.get(hashkey=m.group(1))
 | |
| 
 | |
|         response = c.post(
 | |
|             "/register",
 | |
|             {
 | |
|                 "username": "signuptest",
 | |
|                 "password1": "signuptest_123",
 | |
|                 "password2": "signuptest_123",
 | |
|                 "email": "signuptest@localhost",
 | |
|                 "captcha": f"{captcha_obj.hashkey}:{captcha_obj.response}",
 | |
|             },
 | |
|         )
 | |
| 
 | |
|         self.assertEqual(json.loads(response.content), {"status": "ok"})
 |