1
0
mirror of https://github.com/peeringdb/peeringdb.git synced 2024-05-11 05:55:09 +00:00
Files
peeringdb-peeringdb/tests/test_ixf_member_import.py

805 lines
27 KiB
Python
Raw Normal View History

2018-11-08 19:45:21 +00:00
import os
import json
import reversion
import requests
import jsonschema
import time
import io
2018-11-08 19:45:21 +00:00
from django.db import transaction
from django.core.cache import cache
2018-11-08 19:45:21 +00:00
from django.test import TestCase, Client, RequestFactory
from django.core.management import call_command
2018-11-08 19:45:21 +00:00
from peeringdb_server.models import (
2019-12-05 16:57:52 +00:00
Organization,
Network,
NetworkIXLan,
IXLan,
IXLanPrefix,
InternetExchange,
IXLanIXFMemberImportAttempt,
IXLanIXFMemberImportLog,
IXLanIXFMemberImportLogEntry,
User,
)
2019-02-15 17:46:40 +00:00
from peeringdb_server.import_views import (
view_import_ixlan_ixf_preview,
view_import_net_ixf_preview,
view_import_net_ixf_postmortem,
2019-12-05 16:57:52 +00:00
)
2019-02-15 17:46:40 +00:00
from peeringdb_server import ixf
from .util import ClientCase
2018-11-08 19:45:21 +00:00
class JsonMembersListTestCase(ClientCase):
2018-11-08 19:45:21 +00:00
# test this version of the json schema; requires the file
# to exist at data/json_members_list/members.<VERSION>.json
version = "0.6"
schema_url = "https://raw.githubusercontent.com/euro-ix/json-schemas/v0.6/ixp-member-list.schema.json"
# will be loaded with the json data to be tested against during
# setUpTestData
json_data = {}
entities = {}
@classmethod
def setUpTestData(cls):
super(JsonMembersListTestCase, cls).setUpTestData()
2018-11-08 19:45:21 +00:00
# load json members list data to test against
with open(
2019-12-05 16:57:52 +00:00
os.path.join(
os.path.dirname(__file__),
"data",
"json_members_list",
"members.{}.json".format(cls.version),
),
"r",
) as fh:
2018-11-08 19:45:21 +00:00
cls.json_data = json.load(fh)
with reversion.create_revision():
# create organization(s)
cls.entities["org"] = [
Organization.objects.create(name="Netflix", status="ok")
]
# create exchange(s)
cls.entities["ix"] = [
2019-12-05 16:57:52 +00:00
InternetExchange.objects.create(
name="Test Exchange", org=cls.entities["org"][0], status="ok"
),
InternetExchange.objects.create(
name="Test Exchange 2", org=cls.entities["org"][0], status="ok"
),
InternetExchange.objects.create(
name="Test Exchange 3", org=cls.entities["org"][0], status="ok"
),
2018-11-08 19:45:21 +00:00
]
# create ixlan(s)
cls.entities["ixlan"] = [ix.ixlan for ix in cls.entities["ix"]]
2018-11-08 19:45:21 +00:00
# create ixlan prefix(s)
cls.entities["ixpfx"] = [
IXLanPrefix.objects.create(
2019-12-05 16:57:52 +00:00
ixlan=cls.entities["ixlan"][0],
status="ok",
prefix="195.69.144.0/22",
protocol="IPv4",
),
2018-11-08 19:45:21 +00:00
IXLanPrefix.objects.create(
2019-12-05 16:57:52 +00:00
ixlan=cls.entities["ixlan"][0],
status="ok",
prefix="2001:7f8:1::/64",
protocol="IPv6",
),
2018-11-08 19:45:21 +00:00
IXLanPrefix.objects.create(
2019-12-05 16:57:52 +00:00
ixlan=cls.entities["ixlan"][1],
status="ok",
prefix="195.66.224.0/22",
protocol="IPv4",
),
2018-11-08 19:45:21 +00:00
IXLanPrefix.objects.create(
2019-12-05 16:57:52 +00:00
ixlan=cls.entities["ixlan"][1],
status="ok",
prefix="2001:7f8:4::/64",
protocol="IPv6",
),
2018-11-08 19:45:21 +00:00
]
# create network(s)
cls.entities["net"] = [
Network.objects.create(
2019-12-05 16:57:52 +00:00
name="Netflix",
org=cls.entities["org"][0],
asn=2906,
info_prefixes4=42,
info_prefixes6=42,
website="http://netflix.com/",
policy_general="Open",
2018-11-08 19:45:21 +00:00
policy_url="https://www.netflix.com/openconnect/",
2019-12-05 16:57:52 +00:00
allow_ixp_update=True,
status="ok",
irr_as_set="AS-NFLX",
),
Network.objects.create(
name="Network with deleted netixlans",
org=cls.entities["org"][0],
asn=1001,
allow_ixp_update=True,
status="ok",
),
2018-11-08 19:45:21 +00:00
Network.objects.create(
name="Network with allow ixp update off",
2019-12-05 16:57:52 +00:00
org=cls.entities["org"][0],
asn=1002,
status="ok",
),
2018-11-08 19:45:21 +00:00
]
# create netixlans
cls.entities["netixlan"] = [
NetworkIXLan.objects.create(
network=cls.entities["net"][1],
2019-12-05 16:57:52 +00:00
ixlan=cls.entities["ixlan"][1],
asn=1001,
speed=10000,
ipaddr4="195.69.146.250",
ipaddr6=None,
status="deleted",
),
2018-11-08 19:45:21 +00:00
NetworkIXLan.objects.create(
network=cls.entities["net"][1],
2019-12-05 16:57:52 +00:00
ixlan=cls.entities["ixlan"][1],
asn=1001,
speed=10000,
ipaddr4=None,
ipaddr6="2001:7f8:1::a500:2906:1",
status="deleted",
),
2018-11-08 19:45:21 +00:00
NetworkIXLan.objects.create(
network=cls.entities["net"][0],
2019-12-05 16:57:52 +00:00
ixlan=cls.entities["ixlan"][0],
asn=2906,
speed=10000,
ipaddr4="195.69.146.249",
ipaddr6=None,
status="ok",
),
NetworkIXLan.objects.create(
network=cls.entities["net"][0],
2019-12-05 16:57:52 +00:00
ixlan=cls.entities["ixlan"][0],
asn=2906,
speed=10000,
ipaddr4="195.69.146.251",
ipaddr6=None,
status="ok",
),
NetworkIXLan.objects.create(
network=cls.entities["net"][0],
2019-12-05 16:57:52 +00:00
ixlan=cls.entities["ixlan"][0],
asn=2906,
speed=20000,
is_rs_peer=False,
ipaddr4="195.69.147.251",
ipaddr6=None,
status="ok",
),
NetworkIXLan.objects.create(
network=cls.entities["net"][0],
2019-12-05 16:57:52 +00:00
ixlan=cls.entities["ixlan"][0],
asn=1002,
speed=10000,
ipaddr4="195.69.147.252",
ipaddr6=None,
status="ok",
),
2018-11-08 19:45:21 +00:00
]
2019-12-05 16:57:52 +00:00
cls.admin_user = User.objects.create_user("admin", "admin@localhost", "admin")
cls.entities["org"][0].admin_usergroup.user_set.add(cls.admin_user)
2019-02-15 17:46:40 +00:00
def setUp(self):
self.ixf_importer = ixf.Importer()
def assertLog(self, log, expected):
2019-12-05 16:57:52 +00:00
path = os.path.join(
os.path.dirname(__file__), "data", "ixf", "logs", "{}.json".format(expected)
)
self.maxDiff = None
2019-02-15 17:46:40 +00:00
with open(path, "r") as fh:
self.assertEqual(log, json.load(fh))
2018-11-08 19:45:21 +00:00
def test_update_from_ixf_ixp_member_list(self):
ixlan = self.entities["ixlan"][0]
n_deleted = self.entities["netixlan"][0]
n_deleted2 = self.entities["netixlan"][1]
self.assertEqual(str(n_deleted.ipaddr4), "195.69.146.250")
self.assertEqual(str(n_deleted2.ipaddr6), "2001:7f8:1::a500:2906:1")
self.assertEqual(ixlan.netixlan_set_active.count(), 4)
2019-12-05 16:57:52 +00:00
r, netixlans, netixlans_deleted, log = self.ixf_importer.update(
ixlan, data=self.json_data
)
2018-11-08 19:45:21 +00:00
2019-02-15 17:46:40 +00:00
self.assertLog(log, "update_01")
self.assertEqual(len(netixlans), 5)
self.assertEqual(len(netixlans_deleted), 2)
2018-11-08 19:45:21 +00:00
n = netixlans[0]
self.assertEqual(str(n.ipaddr4), "195.69.146.250")
self.assertEqual(str(n.ipaddr6), "2001:7f8:1::a500:2906:2")
2018-11-08 19:45:21 +00:00
self.assertEqual(n.speed, 10000)
self.assertEqual(n.status, "ok")
self.assertEqual(n.ixlan, ixlan)
self.assertEqual(n.asn, 2906)
n2 = netixlans[1]
self.assertEqual(str(n2.ipaddr4), "195.69.147.250")
self.assertEqual(str(n2.ipaddr6), "2001:7f8:1::a500:2906:1")
2018-11-08 19:45:21 +00:00
self.assertEqual(n2.speed, 10000)
self.assertEqual(n2.status, "ok")
self.assertEqual(n2.ixlan, ixlan)
self.assertEqual(n.asn, 2906)
# test that inactive connections had no effect
2019-12-05 16:57:52 +00:00
self.assertEqual(
NetworkIXLan.objects.filter(
ipaddr4="195.69.146.251", speed=10000, status="ok"
).count(),
1,
)
self.assertEqual(
NetworkIXLan.objects.filter(ipaddr4="195.69.146.252").count(), 0
)
2019-12-05 16:57:52 +00:00
# self.assertEqual(IXLan.objects.get(id=ixlan.id).netixlan_set_active.count(), 2)
2018-11-08 19:45:21 +00:00
2019-12-05 16:57:52 +00:00
# FIXME: this is not practical until
# https://github.com/peeringdb/peeringdb/issues/90 is resolved
# so skipping those tests right now
# n_deleted.refresh_from_db()
# n_deleted2.refresh_from_db()
# self.assertEqual(n_deleted.ipaddr4, None)
# self.assertEqual(n_deleted2.ipaddr6, None)
2018-11-08 19:45:21 +00:00
def test_preview_from_ixf_ixp_member_list(self):
ixlan = self.entities["ixlan"][0]
2019-12-05 16:57:52 +00:00
r, netixlans, netixlans_deleted, log = self.ixf_importer.update(
ixlan, data=self.json_data, save=False
)
self.assertLog(log, "preview_01")
2018-11-08 19:45:21 +00:00
def test_update_from_ixf_ixp_member_list_skip_prefix_mismatch(self):
"""
Here we test that entries with ipaddresses that cannot be validated
against any of the prefixes that exist on the ixlan get skipped
"""
ixlan = self.entities["ixlan"][1]
2019-12-05 16:57:52 +00:00
r, netixlans, netixlans_deleted, log = self.ixf_importer.update(
ixlan, data=self.json_data
)
2018-11-08 19:45:21 +00:00
# print(json.dumps(log,indent=2))
2019-02-15 17:46:40 +00:00
self.assertLog(log, "skip_prefix_mismatch")
2018-11-08 19:45:21 +00:00
self.assertEqual(len(netixlans), 0)
def test_update_from_ixf_ixp_member_list_skip_missing_prefixes(self):
"""
Here we test that nothing is done at all if the importer is run on an
ixlan that does not have any prefixes
"""
ixlan = self.entities["ixlan"][2]
2019-12-05 16:57:52 +00:00
r, netixlans, netixlans_deleted, log = self.ixf_importer.update(
ixlan, data=self.json_data
)
2018-11-08 19:45:21 +00:00
self.assertEqual(len(netixlans), 0)
self.assertEqual(len(netixlans_deleted), 0)
self.assertEqual(log["errors"], ["No prefixes defined on ixlan"])
2018-11-08 19:45:21 +00:00
def test_update_from_ixf_ixp_member_list_skip_disabled_networks(self):
"""
Here we test that networks with allow_ixp_update set to False
will not be processed
"""
ixlan = self.entities["ixlan"][0]
network = self.entities["net"][0]
network.allow_ixp_update = False
network.save()
2019-12-05 16:57:52 +00:00
r, netixlans, netixlans_deleted, log = self.ixf_importer.update(
ixlan, data=self.json_data
)
2018-11-08 19:45:21 +00:00
2019-02-15 17:46:40 +00:00
self.assertLog(log, "skip_disabled_networks")
2018-11-08 19:45:21 +00:00
self.assertEqual(len(netixlans), 0)
for netixlan in network.netixlan_set_active.all():
netixlan.refresh_from_db()
self.assertEqual(netixlan.status, "ok")
def test_update_from_ixf_ixp_member_list_logs(self):
ixlan = self.entities["ixlan"][0]
2019-12-05 16:57:52 +00:00
r, netixlans, netixlans_deleted, log = self.ixf_importer.update(
ixlan, data=self.json_data
)
2018-11-08 19:45:21 +00:00
attempt_dt_1 = ixlan.ixf_import_attempt.updated
for netixlan in netixlans:
2019-12-05 16:57:52 +00:00
log_entry = ixlan.ixf_import_log_set.last().entries.get(netixlan=netixlan)
2019-12-05 16:57:52 +00:00
if netixlan.id in (
self.entities["netixlan"][4].id,
self.entities["netixlan"][5].id,
):
# netixlan was modified
self.assertEqual(
log_entry.version_before,
2019-12-05 16:57:52 +00:00
reversion.models.Version.objects.get_for_object(netixlan)[1],
)
else:
# netixlan was added
2019-12-05 16:57:52 +00:00
self.assertEqual(log_entry.version_before, None)
2018-11-08 19:45:21 +00:00
self.assertEqual(
log_entry.version_after,
2019-12-05 16:57:52 +00:00
reversion.models.Version.objects.get_for_object(netixlan)[0],
)
2018-11-08 19:45:21 +00:00
for netixlan in netixlans_deleted:
2019-12-05 16:57:52 +00:00
log_entry = ixlan.ixf_import_log_set.last().entries.get(netixlan=netixlan)
2018-11-08 19:45:21 +00:00
self.assertEqual(
log_entry.version_before,
2019-12-05 16:57:52 +00:00
reversion.models.Version.objects.get_for_object(netixlan)[1],
)
2018-11-08 19:45:21 +00:00
self.assertEqual(
log_entry.version_after,
2019-12-05 16:57:52 +00:00
reversion.models.Version.objects.get_for_object(netixlan)[0],
)
2018-11-08 19:45:21 +00:00
with reversion.create_revision():
netixlans[0].speed = 10
netixlans[0].save()
time.sleep(0.1)
2019-12-05 16:57:52 +00:00
r, netixlans, netixlans_deleted, log = self.ixf_importer.update(
ixlan, data=self.json_data
)
2018-11-08 19:45:21 +00:00
ixlan.ixf_import_attempt.refresh_from_db()
attempt_dt_2 = ixlan.ixf_import_attempt.updated
self.assertNotEqual(attempt_dt_1, attempt_dt_2)
self.assertEqual(ixlan.ixf_import_log_set.count(), 2)
self.assertEqual(len(netixlans), 1)
for netixlan in netixlans:
2019-12-05 16:57:52 +00:00
log_entry = ixlan.ixf_import_log_set.last().entries.get(netixlan=netixlan)
2018-11-08 19:45:21 +00:00
self.assertEqual(
log_entry.version_before,
2019-12-05 16:57:52 +00:00
reversion.models.Version.objects.get_for_object(netixlan)[1],
)
2018-11-08 19:45:21 +00:00
self.assertEqual(
log_entry.version_after,
2019-12-05 16:57:52 +00:00
reversion.models.Version.objects.get_for_object(netixlan)[0],
)
2018-11-08 19:45:21 +00:00
def test_rollback(self):
ixlan = self.entities["ixlan"][0]
2019-12-05 16:57:52 +00:00
r, netixlans, netixlans_deleted, log = self.ixf_importer.update(
ixlan, data=self.json_data
)
2018-11-08 19:45:21 +00:00
for entry in ixlan.ixf_import_log_set.last().entries.all():
self.assertEqual(entry.rollback_status(), 0)
ixlan.ixf_import_log_set.last().rollback()
netixlans[0].refresh_from_db()
netixlans[1].refresh_from_db()
self.assertEqual(netixlans[0].status, "deleted")
self.assertEqual(netixlans[1].status, "deleted")
ixlan.ixf_import_log_set.last().refresh_from_db()
for entry in ixlan.ixf_import_log_set.last().entries.all():
self.assertEqual(entry.rollback_status(), 1)
def test_rollback_avoid_ipaddress_conflict(self):
ixlan = self.entities["ixlan"][0]
2019-12-05 16:57:52 +00:00
r, netixlans, netixlans_deleted, log = self.ixf_importer.update(
ixlan, data=self.json_data
)
2018-11-08 19:45:21 +00:00
self.assertEqual(len(netixlans_deleted), 2)
2018-11-08 19:45:21 +00:00
netixlan = netixlans_deleted[0]
other = NetworkIXLan.objects.create(
2019-12-05 16:57:52 +00:00
network=netixlan.network,
ixlan=netixlan.ixlan,
speed=1000,
status="ok",
asn=netixlan.asn + 1,
ipaddr4=netixlan.ipaddr4,
)
2018-11-08 19:45:21 +00:00
for entry in ixlan.ixf_import_log_set.last().entries.all():
if entry.netixlan == netixlan:
self.assertEqual(entry.rollback_status(), 2)
ixlan.ixf_import_log_set.last().rollback()
netixlan.refresh_from_db()
self.assertEqual(netixlan.status, "deleted")
other.delete(hard=True)
def test_export_view_ixlan(self):
"""
Test that the /export/ixlan/<ixlan_id>/ixp-member-list endpoint
generates the expected result after importing a test data set
"""
# we only export 0.6 version of the schema, so can skip this test
# for the other versions
if self.version != "0.6":
return
# import the data
ixlan = self.entities["ixlan"][0]
2019-12-05 16:57:52 +00:00
r, netixlans, netixlans_deleted, log = self.ixf_importer.update(
ixlan, data=self.json_data
)
2018-11-08 19:45:21 +00:00
# request the view and compare it agaisnt expected data
c = Client()
resp = c.get("/export/ixlan/{}/ixp-member-list".format(ixlan.id))
self.assertEqual(resp.status_code, 200)
data = json.loads(resp.content)
with open(
2019-12-05 16:57:52 +00:00
os.path.join(
os.path.dirname(__file__), "data", "json_members_list", "export.json"
),
"r",
) as fh:
2018-11-08 19:45:21 +00:00
expected = json.load(fh)
data["timestamp"] = expected["timestamp"]
self.assertEqual(data, expected)
schema = requests.get(self.schema_url).json()
jsonschema.validate(data, schema)
def test_export_view_ix(self):
"""
Test that the /export/ix/<ix_id>/ixp-member-list endpoint
generates the expected result after importing a test data set
"""
# we only export 0.6 version of the schema, so can skip this test
# for the other versions
if self.version != "0.6":
return
# import the data
ixlan = self.entities["ixlan"][0]
2019-12-05 16:57:52 +00:00
r, netixlans, netixlans_deleted, log = self.ixf_importer.update(
ixlan, data=self.json_data
)
2018-11-08 19:45:21 +00:00
# request the view and compare it agaisnt expected data
c = Client()
resp = c.get("/export/ix/{}/ixp-member-list".format(ixlan.ix.id))
self.assertEqual(resp.status_code, 200)
data = json.loads(resp.content)
with open(
2019-12-05 16:57:52 +00:00
os.path.join(
os.path.dirname(__file__), "data", "json_members_list", "export.json"
),
"r",
) as fh:
2018-11-08 19:45:21 +00:00
other = json.load(fh)
data["timestamp"] = other["timestamp"]
self.assertEqual(data, other)
schema = requests.get(self.schema_url).json()
jsonschema.validate(data, schema)
def test_ixp_allow_update_default(self):
self.assertEqual(self.entities["net"][2].allow_ixp_update, False)
def test_command(self):
"""
Test the ixf_ixp_member_import command
"""
ixlan = self.entities["ixlan"][0]
ixlan.ixf_ixp_member_list_url = "http://localhost:12345/ixf/member/import"
ixlan.ixf_ixp_import_enabled = True
ixlan.save()
stdout = io.StringIO()
stderr = io.StringIO()
2019-12-05 16:57:52 +00:00
r = call_command(
"pdb_ixf_ixp_member_import",
ixlan=[ixlan.id],
commit=True,
stdout=stdout,
stderr=stderr,
)
assert "Fetching data for -ixlan1 from" in stdout.getvalue()
# importer should skip ixlans where ixf_ixp_import_enabled is
# turned off
ixlan.ixf_ixp_import_enabled = False
ixlan.save()
stdout = io.StringIO()
stderr = io.StringIO()
2019-12-05 16:57:52 +00:00
r = call_command(
"pdb_ixf_ixp_member_import",
ixlan=[ixlan.id],
commit=True,
stdout=stdout,
stderr=stderr,
)
assert "Fetching data for -ixlan1 from" not in stdout.getvalue()
def test_postmortem(self):
ixlan = self.entities["ixlan"][0]
net = self.entities["net"][0]
2019-12-05 16:57:52 +00:00
r, netixlans, netixlans_deleted, log = self.ixf_importer.update(
ixlan, data=self.json_data
)
request = RequestFactory().get("/import/net/{}/ixf/preview/".format(net.id))
request.user = self.admin_user
response = view_import_net_ixf_postmortem(request, net.id)
assert response.status_code == 200
content = json.loads(response.content)
for entry in content["data"]:
del entry["created"]
self.assertLog(content, "postmortem_01")
def test_postmortem_limit(self):
ixlan = self.entities["ixlan"][0]
net = self.entities["net"][0]
2019-12-05 16:57:52 +00:00
r, netixlans, netixlans_deleted, log = self.ixf_importer.update(
ixlan, data=self.json_data
)
request = RequestFactory().get(
"/import/net/{}/ixf/postmortem/".format(net.id), {"limit": 1}
)
request.user = self.admin_user
response = view_import_net_ixf_postmortem(request, net.id)
content = json.loads(response.content)
assert len(content["data"]) == 1
def test_postmortem_limit_max(self):
ixlan = self.entities["ixlan"][0]
net = self.entities["net"][0]
2019-12-05 16:57:52 +00:00
r, netixlans, netixlans_deleted, log = self.ixf_importer.update(
ixlan, data=self.json_data
)
request = RequestFactory().get(
"/import/net/{}/ixf/postmortem/".format(net.id), {"limit": 1000}
)
request.user = self.admin_user
response = view_import_net_ixf_postmortem(request, net.id)
content = json.loads(response.content)
assert len(content["data"]) == 6
2019-12-05 16:57:52 +00:00
assert content["non_field_errors"] == [
"Postmortem length cannot exceed 250 entries"
]
def test_import_postmortem_fail_ratelimit(self):
net = self.entities["net"][0]
request = RequestFactory().get("/import/net/{}/ixf/postmortem/".format(net.id))
request.user = self.admin_user
response = view_import_net_ixf_postmortem(request, net.id)
assert response.status_code == 200
response = view_import_net_ixf_postmortem(request, net.id)
assert response.status_code == 400
def test_import_postmortem_fail_permission(self):
net = self.entities["net"][0]
request = RequestFactory().get("/import/net/{}/ixf/postmortem/".format(net.id))
request.user = self.guest_user
response = view_import_net_ixf_postmortem(request, net.id)
assert response.status_code == 403
def test_net_preview(self):
ixlan = self.entities["ixlan"][0]
net = self.entities["net"][0]
# simulate cached IX-F data
ixlan.ixf_ixp_import_enabled = True
ixlan.ixf_ixp_member_list_url = "test"
ixlan.save()
cache.set("IXF-CACHE-test", self.json_data)
request = RequestFactory().get("/import/net/{}/ixf/preview/".format(net.id))
request.user = self.admin_user
response = view_import_net_ixf_preview(request, net.id)
assert response.status_code == 200
content = json.loads(response.content)
self.assertLog(content, "preview_02")
def test_net_preview_fail_ratelimit(self):
net = self.entities["net"][0]
request = RequestFactory().get("/import/net/{}/ixf/preview/".format(net.id))
request.user = self.admin_user
response = view_import_net_ixf_preview(request, net.id)
assert response.status_code == 200
response = view_import_net_ixf_preview(request, net.id)
assert response.status_code == 400
def test_net_preview_fail_permission(self):
net = self.entities["net"][0]
request = RequestFactory().get("/import/net/{}/ixf/preview/".format(net.id))
request.user = self.guest_user
response = view_import_net_ixf_preview(request, net.id)
assert response.status_code == 403
2018-11-08 19:45:21 +00:00
class JsonMembersListTestCase_V05(JsonMembersListTestCase):
version = "0.5"
class JsonMembersListTestCase_V04(JsonMembersListTestCase):
version = "0.4"
2019-02-15 17:46:40 +00:00
class TestImportPreview(ClientCase):
"""
Test the ixf import preview
"""
@classmethod
def setUpTestData(cls):
super(TestImportPreview, cls).setUpTestData()
cls.org = Organization.objects.create(name="Test Org", status="ok")
2019-12-05 16:57:52 +00:00
cls.ix = InternetExchange.objects.create(
name="Test IX", status="ok", org=cls.org
)
cls.ixlan = cls.ix.ixlan
2019-12-05 16:57:52 +00:00
IXLanPrefix.objects.create(
ixlan=cls.ixlan, status="ok", prefix="195.69.144.0/22", protocol="IPv4"
)
IXLanPrefix.objects.create(
ixlan=cls.ixlan, status="ok", prefix="2001:7f8:1::/64", protocol="IPv6"
)
cls.net = Network.objects.create(
org=cls.org, status="ok", asn=1000, name="net01"
)
cls.net_2 = Network.objects.create(
org=cls.org, status="ok", asn=1001, name="net02"
)
cls.admin_user = User.objects.create_user("admin", "admin@localhost", "admin")
2019-02-15 17:46:40 +00:00
cls.org.admin_usergroup.user_set.add(cls.admin_user)
def test_import_preview(self):
2019-12-05 16:57:52 +00:00
request = RequestFactory().get(
"/import/ixlan/{}/ixf/preview/".format(self.ixlan.id)
)
2019-02-15 17:46:40 +00:00
request.user = self.admin_user
response = view_import_ixlan_ixf_preview(request, self.ixlan.id)
assert response.status_code == 200
2019-12-05 16:57:52 +00:00
assert json.loads(response.content)["errors"] == [
"IXF import url not specified"
]
2019-02-15 17:46:40 +00:00
def test_import_preview_fail_ratelimit(self):
2019-12-05 16:57:52 +00:00
request = RequestFactory().get(
"/import/ixlan/{}/ixf/preview/".format(self.ixlan.id)
)
2019-02-15 17:46:40 +00:00
request.user = self.admin_user
response = view_import_ixlan_ixf_preview(request, self.ixlan.id)
assert response.status_code == 200
response = view_import_ixlan_ixf_preview(request, self.ixlan.id)
assert response.status_code == 400
def test_import_preview_fail_permission(self):
2019-12-05 16:57:52 +00:00
request = RequestFactory().get(
"/import/ixlan/{}/ixf/preview/".format(self.ixlan.id)
)
2019-02-15 17:46:40 +00:00
request.user = self.guest_user
response = view_import_ixlan_ixf_preview(request, self.ixlan.id)
assert response.status_code == 403
def test_import_net_preview(self):
2019-12-05 16:57:52 +00:00
request = RequestFactory().get(
"/import/net/{}/ixf/preview/".format(self.net.id)
)
request.user = self.admin_user
response = view_import_net_ixf_preview(request, self.net.id)
assert response.status_code == 200
def test_import_net_preview_fail_ratelimit(self):
2019-12-05 16:57:52 +00:00
request = RequestFactory().get(
"/import/net/{}/ixf/preview/".format(self.net.id)
)
request.user = self.admin_user
response = view_import_net_ixf_preview(request, self.net.id)
assert response.status_code == 200
response = view_import_net_ixf_preview(request, self.net.id)
assert response.status_code == 400
def test_import_net_preview_fail_permission(self):
2019-12-05 16:57:52 +00:00
request = RequestFactory().get(
"/import/net/{}/ixf/preview/".format(self.net.id)
)
request.user = self.guest_user
response = view_import_net_ixf_preview(request, self.net.id)
assert response.status_code == 403
def test_netixlan_diff(self):
netix1 = NetworkIXLan.objects.create(
network=self.net,
ixlan=self.ixlan,
status="ok",
ipaddr4="195.69.146.250",
ipaddr6="2001:7f8:1::a500:2906:1",
asn=self.net.asn,
speed=1000,
2019-12-05 16:57:52 +00:00
is_rs_peer=True,
)
netix2 = NetworkIXLan(
network=self.net_2,
status="ok",
ipaddr4="195.69.146.250",
ipaddr6="2001:7f8:1::a500:2906:2",
asn=self.net_2.asn,
speed=10000,
2019-12-05 16:57:52 +00:00
is_rs_peer=False,
)
2019-12-05 16:57:52 +00:00
result = self.ixlan.add_netixlan(netix2, save=False, save_others=False)
2019-12-05 16:57:52 +00:00
self.assertEqual(
sorted(result["changed"]),
["asn", "ipaddr6", "is_rs_peer", "network_id", "speed"],
)
netix2.ipaddr4 = "195.69.146.251"
netix2.ipaddr6 = netix1.ipaddr6
2019-12-05 16:57:52 +00:00
result = self.ixlan.add_netixlan(netix2, save=False, save_others=False)
2019-12-05 16:57:52 +00:00
self.assertEqual(
sorted(result["changed"]),
["asn", "ipaddr4", "is_rs_peer", "network_id", "speed"],
)