1
0
mirror of https://github.com/peeringdb/peeringdb.git synced 2024-05-11 05:55:09 +00:00
Files
peeringdb-peeringdb/tests/test_admin.py
Matt Griswold b07baf3092 Support 202011 (#917)
* install django-grainy

* nsp to grainy first iteration

* Fix validation error message overflow

* Add migration, update views.py and template to add help_text to UI

* nsp to grainy second iteration

* grainy and django-grainy pinned to latest releases

* deskpro ticket cc (#875)

* black formatting

* move ac link to bottom for ticket body

* Fix typo

* Update djangorestframework, peeringdb, django-ratelimit

* Rewrite login view ratelimit decorator

* Relock pipfile

* add list() to make copy of dictionaries before iterating

* respect ix-f url visibilty in ix-f conflict emails

* Add type coercion to settings taken from environment variables

* Add bool handling

* relock pipfile with python3.9
change docker to use python3.9

* Check bool via isinstance

* add ordering to admin search queryset for deskproticket and email

* update settings with envvar_type option

* Add tooltips to add ix and add exchange views (in org)

* Add tooltip to suggest fac view

* get phone information in view

* add missing migration

* add migration and make org a geo model

* Wire normalization to put/create requests for Facility

* Update admin with new address fields

* Refactor serializer using mixin

* Add floor and suite to address API

* Write command to geonormalize existing entries

* Remove unnecessary method from model

* Add floor and suite to views

* Add ignore geo status

* Force refresh for fac and org updates

* adjust frontend typo

* add checking if update needs geosync

* redo error handling for geosync

* remove save keyword from geonormalize command script

* change raw_id_fields

* alternate autocomplete lookup field depending on where inline is called

* remove unnecessary error handling

* Add  csv option

* Fix bug
 with None vs empty string

* add regex parsing for suite and floor conversion

* Add migration that removes geo error as a field

* add geostatus update to command

* Ignore suite floor and address2 changes for api normalization

* update geomodel by removing geo_error

* Black models.py

* Black serializers.py

* remove geocode error from admin

* Add function for reversing pretty speed

* add conversion to export method

* fix typo

* fix speed value feedback after submit

* remove conditional

* Add error handling to create endpoint

* Refine floor and suite parsing regex

* Add geocoding tests

* Add json for tests

* IX-F Importer: Bogus output of "Preview" tool #896

* remove cruft

* black formatting

* IX-F Importer: history of changes per ixlan & netixlan #893

* 6 add geocode to org view

* 4 update geocode without refresh

* Update error display

* Fix bug with formatting translated string

* Add DateTimeFields to model

* Add update signals

* add last updated fields to views and serializers

* Add last updated model migration

* Add the data migration for last updated fields

* add test that tests a normal org user with create org permissions

* grainy to 1.7
django grainy to 1.9.1

* Fix formatting issues

* Adjust var names

* Refactor signals

* Temporary: save override from network model

* Empty vlan lists no longer cause error

* typo in ixf.py

* typo in admin

* Typos in model verbose names

* Add serializer IXLAN validation for ixf_ixp_import_enabled

* Add model validation to IXLan

* relock pipfile

* relock pipfile

* begin signal test file

* Remove full clean from save in ixlan

* use post_reversion_commit signal instead

* remove redundant save override

* remove cruft / debug code

* Add signal tests

* exclude organizations with city missing from commandline geosync

* Skip geosync if the only address information we have is a country

* initial commit for vlan matcher in importer

* Add more tests and remove unused imports

* update tests

* Actually add vlan matching to importer

* Add type checking for speed list and state

* Change how we register connection.state

* add bootstrap options

* add rdap cache command

* remove outdated perm docs

* rdap from master and relock

* propagate rdap settings to peeringdb.settings

* add loaddata for initial fixtures

* user friendly error message on RdapNotFound errors (#497)

* update rdap errors

* django-peeringdb to 2.5.0 and relock

* rdap to 1.2.0 and relock

* fix migration hierarchy

* add ignore_recurse_errors option

* add missing fields to mock
remove cruft missed during merge

* rdap to 1.2.1

* dont geo validate during api tests

* fix tests

* Add test file

* fix merge

* RDAP_SELF_BOOTSTRAP to False while running tests

* black formatted

* run black

* add github actions

* add runs on

Co-authored-by: Stefan Pratter <stefan@20c.com>
Co-authored-by: Elliot Frank <elliot@20c.com>
2021-01-13 14:35:07 -06:00

745 lines
25 KiB
Python

import os
import json
import pytest
import urllib
from django.test import Client, TestCase, RequestFactory
from django.contrib.auth.models import Group
from django.urls import reverse, resolve
from django.core.management import call_command
from django.contrib.messages import get_messages
from django_grainy.models import UserPermission, GroupPermission
import peeringdb_server.models as models
import peeringdb_server.admin as admin
class AdminTests(TestCase):
"""
Test peeringdb django admin functionality
"""
asn_count = 0
@classmethod
def entity_data(cls, org, tag):
kwargs = {"name": f"{org.name} {tag}", "status": "ok", "org": org}
if tag == "net":
cls.asn_count += 1
kwargs.update(asn=cls.asn_count)
return kwargs
@classmethod
def setUpTestData(cls):
cls.entities = {}
# set up organizations
cls.entities["org"] = [
org
for org in [
models.Organization.objects.create(name="Org %d" % i, status="ok")
for i in range(0, 9)
]
]
# set up a network,facility and ix under each org
for tag in ["ix", "net", "fac"]:
cls.entities[tag] = [
models.REFTAG_MAP[tag].objects.create(**cls.entity_data(org, tag))
for org in cls.entities["org"]
]
# create a user under each org
cls.entities["user"] = [
models.User.objects.create_user(
"user " + org.name,
"%s@localhost" % org.name,
first_name="First",
last_name="Last",
)
for org in cls.entities["org"]
]
i = 0
for user in cls.entities["user"]:
cls.entities["org"][i].usergroup.user_set.add(user)
i += 1
cls.admin_user = models.User.objects.create_user(
"admin", "admin@localhost", first_name="admin", last_name="admin"
)
cls.admin_user.is_superuser = True
cls.admin_user.is_staff = True
cls.admin_user.save()
cls.admin_user.set_password("admin")
cls.admin_user.save()
# user and group for read-only access to /cp
cls.readonly_admin = models.User.objects.create_user(
"ro_admin", "ro_admin@localhost", password="admin", is_staff=True
)
readonly_group = Group.objects.create(name="readonly")
for app_label in admin.PERMISSION_APP_LABELS:
GroupPermission.objects.create(
group=readonly_group, namespace=app_label, permission=0x01
)
readonly_group.user_set.add(cls.readonly_admin)
# set up some ixlans
cls.entities["ixlan"] = [ix.ixlan for ix in cls.entities["ix"]]
# set up a prefix
cls.entities["ixpfx"] = [
models.IXLanPrefix.objects.create(
ixlan=cls.entities["ixlan"][0],
protocol="IPv4",
prefix="207.41.110.0/24",
status="ok",
)
]
# set up some netixlans
cls.entities["netixlan"] = [
models.NetworkIXLan.objects.create(
network=cls.entities["net"][0],
ixlan=cls.entities["ixlan"][0],
ipaddr4=addr,
status="ok",
asn=cls.entities["net"][0].asn,
speed=1000,
)
for addr in ["207.41.110.37", "207.41.110.38", "207.41.110.39"]
]
def setUp(self):
self.factory = RequestFactory()
def test_views(self):
"""
Test that all views are still functional
Note: this only tests for HTTP status and is a quick and dirty
way that none of the views got broken for GET requests. This will
need to be replaced by something more extensive
"""
m = [
models.Facility,
models.InternetExchange,
models.Network,
models.Organization,
models.User,
]
c = Client()
c.login(username="admin", password="admin")
for model in m:
url = f"/cp/{model._meta.app_label}/{model._meta.model_name}/"
response = c.get(url, follow=True)
self.assertEqual(response.status_code, 200)
url_add = "%sadd" % url
response = c.get(url_add, follow=True)
self.assertEqual(response.status_code, 200)
url_id = f"{url}{model.objects.first().id}"
response = c.get(url_id, follow=True)
self.assertEqual(response.status_code, 200)
def test_org_merge(self):
"""
Test the org merge functionality, which should merge 1 or more
organizations into a target organization, moving all entities
to the target organization
"""
request = self.factory.post("/cp")
request.user = None
# TEST 1
# merge orgs 1 and 2 into org 0
t_org = self.entities["org"][0]
admin.merge_organizations(self.entities["org"][1:3], t_org, request)
# check that all entities moved
for tag in ["ix", "net", "fac"]:
for ent in self.entities[tag][0:3]:
ent.refresh_from_db()
self.assertEqual(ent.org, t_org)
# check that all users moved
i = 1
for user in self.entities["user"][1:3]:
org = self.entities["org"][i]
self.assertEqual(user.is_org_member(t_org), True)
self.assertEqual(user.is_org_admin(t_org), False)
self.assertEqual(user.is_org_member(org), False)
self.assertEqual(user.is_org_admin(org), False)
i += 1
# check that all merged orgs are deleted
for org in self.entities["org"][1:3]:
org.refresh_from_db()
self.assertEqual(org.status, "deleted")
# check that target org is still in tact
t_org.refresh_from_db()
self.assertEqual(t_org.status, "ok")
# TEST 2 - Don't allow merging of target org into target org
with pytest.raises(ValueError):
admin.merge_organizations([t_org], t_org, request)
def test_org_unmerge(self):
"""
Test undoing an organization merge
"""
request = self.factory.post("/cp")
request.user = None
# merge orgs 4 and 5 into org 3
t_org = self.entities["org"][3]
admin.merge_organizations(self.entities["org"][4:6], t_org, request)
print(t_org)
# check that merge log exists
merges = models.OrganizationMerge.objects.filter(to_org=t_org)
self.assertEqual(merges.count(), 2)
# undo merges
i = 4
for merge in [m for m in merges]:
self.assertEqual(merge.from_org, self.entities["org"][i])
merge.undo()
i += 1
# check that all entities moved back
for tag in ["ix", "net", "fac"]:
i = 4
for ent in self.entities[tag][4:6]:
ent.refresh_from_db()
self.assertEqual(ent.org, self.entities["org"][i])
i += 1
# check that all users moved back
i = 4
for user in self.entities["user"][4:6]:
org = self.entities["org"][i]
self.assertEqual(user.is_org_member(t_org), False)
self.assertEqual(user.is_org_admin(t_org), False)
self.assertEqual(user.is_org_member(org), True)
self.assertEqual(user.is_org_admin(org), False)
i += 1
# check that all merged orgs are deleted
for org in self.entities["org"][4:6]:
org.refresh_from_db()
self.assertEqual(org.status, "ok")
# check that target org is still in tact
t_org.refresh_from_db()
self.assertEqual(t_org.status, "ok")
def test_commandline_tool(self):
c = Client()
c.login(username="admin", password="admin")
# test form that lets user select which command run
url = "/cp/peeringdb_server/commandlinetool/prepare"
response = c.get(url, follow=True)
self.assertEqual(response.status_code, 200)
for i, n in models.COMMANDLINE_TOOLS:
assert f'<option value="{i}">{n}</option>' in response.content.decode()
def test_commandline_tool_renumber_lans(self):
# test the form that runs the renumer ip space tool
c = Client()
c.login(username="admin", password="admin")
# test renumber lans command form
data = {"tool": "pdb_renumber_lans"}
url = "/cp/peeringdb_server/commandlinetool/prepare/"
response = c.post(url, data, follow=True)
cont = response.content.decode("utf-8")
assert response.status_code == 200
assert "Old prefix" in cont
assert "Exchange" in cont
# test post to renumber lans command form (preview)
data = {
"tool": "pdb_renumber_lans",
"exchange": self.entities["ix"][0].id,
"old_prefix": "207.41.110.0/24",
"new_prefix": "207.41.111.0/24",
}
url = "/cp/peeringdb_server/commandlinetool/preview/"
response = c.post(url, data, follow=True)
cont = response.content.decode("utf-8")
assert response.status_code == 200
assert "[pretend]" in cont
assert "207.41.110.0/24 -> 207.41.111.0/24" in cont
assert "AS1 207.41.110.37 -> 207.41.111.37" in cont
assert "AS1 207.41.110.38 -> 207.41.111.38" in cont
assert "AS1 207.41.110.39 -> 207.41.111.39" in cont
# test post to renumber lans command form
data = {
"tool": "pdb_renumber_lans",
"exchange": self.entities["ix"][0].id,
"old_prefix": "207.41.110.0/24",
"new_prefix": "207.41.111.0/24",
}
url = "/cp/peeringdb_server/commandlinetool/run/"
response = c.post(url, data, follow=True)
cont = response.content.decode("utf-8")
assert response.status_code == 200
assert "[pretend]" not in cont
assert "207.41.110.0/24 -> 207.41.111.0/24" in cont
assert "AS1 207.41.110.37 -> 207.41.111.37" in cont
assert "AS1 207.41.110.38 -> 207.41.111.38" in cont
assert "AS1 207.41.110.39 -> 207.41.111.39" in cont
for netixlan in self.entities["netixlan"]:
netixlan.refresh_from_db()
self.assertEqual(str(self.entities["netixlan"][0].ipaddr4), "207.41.111.37")
self.assertEqual(str(self.entities["netixlan"][1].ipaddr4), "207.41.111.38")
self.assertEqual(str(self.entities["netixlan"][2].ipaddr4), "207.41.111.39")
def test_netixlan_inline(self):
"""
test that inline netixlan admin forms can handle blank
values in ipaddress fields (#644)
also tests that duplicate ipaddr values are blocked
"""
ixlan = self.entities["ixlan"][0]
netixlan = ixlan.netixlan_set.all()[0]
netixlan_b = ixlan.netixlan_set.all()[1]
url = reverse(
"admin:{}_{}_change".format(
ixlan._meta.app_label,
ixlan._meta.object_name,
).lower(),
args=(ixlan.id,),
)
client = Client()
client.force_login(self.admin_user)
def post_data(ipaddr4, ipaddr6):
"""
helper function that builds data to send to
the ixlan django admin form with inline
netixlan data
"""
return {
# required ixlan form data
"arp_sponge": "00:0a:95:9d:68:16",
"ixf_ixp_member_list_url_visible": "Private",
"ix": ixlan.ix.id,
"status": ixlan.status,
# required management form data
"ixpfx_set-TOTAL_FORMS": 0,
"ixpfx_set-INITIAL_FORMS": 0,
"ixpfx_set-MIN_NUM_FORMS": 0,
"ixpfx_set-MAX_NUM_FORMS": 1000,
"netixlan_set-TOTAL_FORMS": 1,
"netixlan_set-INITIAL_FORMS": 1,
"netixlan_set-MIN_NUM_FORMS": 0,
"netixlan_set-MAX_NUM_FORMS": 1000,
# inline netixlan data
"netixlan_set-0-ipaddr4": ipaddr4 or "",
"netixlan_set-0-ipaddr6": ipaddr6 or "",
"netixlan_set-0-speed": netixlan.speed,
"netixlan_set-0-network": netixlan.network.id,
"netixlan_set-0-ixlan": ixlan.id,
"netixlan_set-0-id": netixlan.id,
"netixlan_set-0-status": netixlan.status,
"netixlan_set-0-asn": netixlan.network.asn,
}
# test #1: post request to ixlan change operation passing
# blank values to ipaddress fields
response = client.post(url, post_data("", ""))
netixlan.refresh_from_db()
assert netixlan.ipaddr6 is None
assert netixlan.ipaddr4 is None
# test #2: block dupe ipv4
response = client.post(url, post_data(netixlan_b.ipaddr4, netixlan_b.ipaddr6))
assert netixlan.ipaddr6 is None
assert netixlan.ipaddr4 is None
assert "Ip address already exists elsewhere" in response.content.decode("utf-8")
def test_validate_netixlan_speed(self):
ixlan = self.entities["ixlan"][0]
netixlan = ixlan.netixlan_set.first()
url = reverse(
"admin:{}_{}_change".format(
netixlan._meta.app_label,
netixlan._meta.object_name,
).lower(),
args=(netixlan.id,),
)
original_speed = netixlan.speed
data = {
"status": netixlan.status,
"asn": netixlan.asn,
"ipaddr4": netixlan.ipaddr4,
"ipaddr6": "",
"notes": netixlan.notes,
"speed": 1200000,
"operational": netixlan.operational,
"network": netixlan.network_id,
"ixlan": netixlan.ixlan_id,
"_save": "Save",
}
client = Client()
client.force_login(self.admin_user)
response = client.post(url, data)
netixlan.refresh_from_db()
assert "Maximum speed: 1T" in response.content.decode("utf-8")
assert netixlan.speed == original_speed
data["speed"] = 10
response = client.post(url, data)
netixlan.refresh_from_db()
assert "Minimum speed: 100M" in response.content.decode("utf-8")
assert netixlan.speed == original_speed
def _run_regex_search(self, model, search_term):
c = Client()
c.login(username="admin", password="admin")
url = reverse(f"admin:peeringdb_server_{model}_changelist")
search = url + "?q=" + urllib.parse.quote_plus(search_term)
response = c.get(search)
content = response.content.decode("utf-8")
return content
def test_search_deskprotickets(self):
# Set up data
ixf_importer, _ = models.User.objects.get_or_create(username="ixf_importer")
for i in range(10):
models.DeskProTicket.objects.create(
subject=f"test number {i}", body="test", user=ixf_importer
)
search_term = "^.*[0-5]$"
content = self._run_regex_search("deskproticket", search_term)
print(content)
expected = [f"test number {i}" for i in range(5)]
expected_not = [f"test number {i}" for i in range(6, 10)]
for e in expected:
assert e in content
for e in expected_not:
assert e not in content
def test_search_ixfimportemails(self):
for i in range(10):
models.IXFImportEmail.objects.create(
subject=f"test number {i}", message="test", recipients="test"
)
search_term = "^.*[2-4]$"
content = self._run_regex_search("ixfimportemail", search_term)
print(content)
expected = [f"test number {i}" for i in range(2, 5)]
expected_not = ["test number 1"] + [f"test number {i}" for i in range(6, 10)]
for e in expected:
assert e in content
for e in expected_not:
assert e not in content
def test_all_views_readonly(self):
self._test_all_views(
self.readonly_admin,
status_add=403,
status_get_orgmerge=403,
status_get_orgmerge_undo=403,
status_get_vq_approve=403,
status_get_vq_deny=403,
)
def test_all_views_superuser(self):
self._test_all_views(self.admin_user)
def _test_all_views(self, user, **kwargs):
call_command("pdb_generate_test_data", limit=2, commit=True)
# create a verification queue item we can check
org = models.Organization.objects.all().first()
net = models.Network.objects.create(
name="Unverified network", org=org, asn=33333, status="pending"
)
vqitem = models.VerificationQueueItem.objects.all().first()
assert vqitem
# create sponsorhship we can check
sponsorship = models.Sponsorship.objects.create()
models.SponsorshipOrganization.objects.create(sponsorship=sponsorship, org=org)
# create partnership we can check
partnership = models.Partnership.objects.create(org=org)
# create ixlan ix-f import log we can check
ixfmemberdata = models.IXFMemberData.instantiate(
ixlan=models.NetworkIXLan.objects.first().ixlan,
ipaddr4=models.NetworkIXLan.objects.first().ipaddr4,
ipaddr6=models.NetworkIXLan.objects.first().ipaddr6,
asn=models.NetworkIXLan.objects.first().network.asn,
)
ixfmemberdata.save()
# create ixlan ix-f import log we can check
importlog = models.IXLanIXFMemberImportLog.objects.create(
ixlan=models.IXLan.objects.all().first()
)
# create user to organization affiliation request
affil = models.UserOrgAffiliationRequest.objects.create(
org=org, user=self.readonly_admin
)
# create command line tool instance
cmdtool = models.CommandLineTool.objects.create(
user=self.readonly_admin, arguments="{}", tool="pdb_renumber_lans"
)
# create organization merge
orgmerge = models.OrganizationMerge.objects.create(
from_org=org, to_org=models.Organization.objects.all()[1]
)
# set up testing for all pdb models that have
# admin views registered
ops = ["changelist", "change", "add"]
classes = [
models.Organization,
models.Facility,
models.InternetExchange,
models.InternetExchangeFacility,
models.Network,
models.NetworkFacility,
models.NetworkIXLan,
models.NetworkContact,
models.IXLan,
models.IXLanPrefix,
models.User,
models.UserOrgAffiliationRequest,
models.Sponsorship,
models.Partnership,
models.IXLanIXFMemberImportLog,
models.VerificationQueueItem,
models.CommandLineTool,
admin.UserPermission,
models.OrganizationMerge,
models.IXFMemberData,
]
ignore_add = [
admin.UserPermission,
models.OrganizationMerge,
models.IXFMemberData,
]
ignore_change = []
# any other urls we want to test
extra_urls = [
(
"/cp/peeringdb_server/organization/org-merge-tool/",
"get",
kwargs.get("status_get_orgmerge", 200),
),
(
reverse(
"admin:peeringdb_server_commandlinetool_prepare",
),
"get",
kwargs.get("status_add", 200),
),
(
reverse(
"admin:peeringdb_server_organizationmerge_actions",
args=(orgmerge.id, "undo"),
),
"get",
kwargs.get("status_get_orgmerge_undo", 200),
),
(
reverse(
"admin:peeringdb_server_verificationqueueitem_actions",
args=(vqitem.id, "vq_approve"),
),
"get",
kwargs.get("status_get_vq_approve", 200),
),
(
reverse(
"admin:peeringdb_server_verificationqueueitem_actions",
args=(vqitem.id, "vq_deny"),
),
"get",
kwargs.get("status_get_vq_deny", 200),
),
]
client = Client()
client.force_login(user)
assert user.is_staff
search_str = '<a href="/cp/logout/"'
for op in ops:
for cls in classes:
args = None
if op == "change":
# change op required object id
args = (cls.objects.all().first().id,)
if cls in ignore_change:
continue
elif op == "add":
if cls in ignore_add:
continue
url = reverse(
"admin:{}_{}_{}".format(
cls._meta.app_label, cls._meta.object_name, op
).lower(),
args=args,
)
response = client.get(url)
cont = response.content.decode("utf-8")
assert response.status_code == kwargs.get(f"status_{op}", 200)
if response.status_code == 200:
assert search_str in cont
for url, method, status in extra_urls:
fn = getattr(client, method)
response = fn(url, follow=True)
assert response.status_code == status
if response.status_code == 200:
assert search_str in cont
response = client.post(
reverse("admin:peeringdb_server_commandlinetool_preview"),
data={"tool": "pdb_fac_merge"},
)
assert response.status_code == kwargs.get("status_add", 200)
if response.status_code == 200:
assert search_str in cont
def test_grappelli_autocomplete(self):
"""
test that grappelli autocomplete works correctly
as we are overriding it with our own handler that
respects soft-deleted objects (#664)
"""
client = Client()
client.force_login(self.admin_user)
# these are the handle models we currently have auto-complete
# fields setup for in admin
tags = [
"fac",
"org",
"ix",
"net",
"ixlan",
]
# we also do auto complete on user relationships
check_models = [models.User]
for reftag in tags:
check_models.append(models.REFTAG_MAP[reftag])
for model in check_models:
instance = model.objects.first()
# make sure we have at least once instance
# available
assert instance
# determine partial search term (min. 3 chars)
if model == models.User:
term = instance.username
elif hasattr(instance, "name"):
term = instance.name
elif model == models.IXLan:
term = instance.ix.name
else:
raise ValueError(f"could not get search term for {model}")
term = term[:3]
app_label = model._meta.app_label
model_name = model._meta.object_name
# grappelli autocomplete request
response = client.get(
"/grappelli/lookup/autocomplete/?"
f"term={term}&app_label={app_label}&"
f"model_name={model_name}&query_string="
"_to_field=id&to_field=id"
)
assert response.status_code == 200
data = json.loads(response.content.decode("utf8"))
assert len(data)
def test_protected_entity_errors(self):
"""
Test that attempting to delete a protected
entity shows an error message and doesnt raise a 500
"""
client = Client()
client.force_login(self.admin_user)
org = models.Organization.objects.first()
url = reverse(
"admin:peeringdb_server_organization_changelist",
)
response = client.post(
url, {"_selected_action": org.id, "action": "soft_delete"}, follow=True
)
assert response.status_code == 200
messages = list(get_messages(response.wsgi_request))
assert len(messages) == 1
assert "Protected object" in str(messages[0])