1
0
mirror of https://github.com/peeringdb/peeringdb.git synced 2024-05-11 05:55:09 +00:00
Files
peeringdb-peeringdb/tests/test_ixf_member_import.py
2018-11-08 19:45:21 +00:00

357 lines
14 KiB
Python

import os
import json
import reversion
import requests
import jsonschema
import time
from django.db import transaction
from django.test import TestCase, Client, RequestFactory
from peeringdb_server.models import (
Organization, Network, NetworkIXLan, IXLan, IXLanPrefix, InternetExchange,
IXLanIXFMemberImportAttempt, IXLanIXFMemberImportLog,
IXLanIXFMemberImportLogEntry)
class JsonMembersListTestCase(TestCase):
# test this version of the json schema; requires the file
# to exist at data/json_members_list/members.<VERSION>.json
version = "0.6"
schema_url = "https://raw.githubusercontent.com/euro-ix/json-schemas/v0.6/ixp-member-list.schema.json"
# will be loaded with the json data to be tested against during
# setUpTestData
json_data = {}
entities = {}
@classmethod
def setUpTestData(cls):
# load json members list data to test against
with open(
os.path.join(
os.path.dirname(__file__), "data", "json_members_list",
"members.{}.json".format(cls.version)), "r") as fh:
cls.json_data = json.load(fh)
with reversion.create_revision():
# create organization(s)
cls.entities["org"] = [
Organization.objects.create(name="Netflix", status="ok")
]
# create exchange(s)
cls.entities["ix"] = [
InternetExchange.objects.create(name="Test Exchange",
org=cls.entities["org"][0],
status="ok")
]
# create ixlan(s)
cls.entities["ixlan"] = [
IXLan.objects.create(ix=cls.entities["ix"][0], status="ok"),
IXLan.objects.create(ix=cls.entities["ix"][0], status="ok"),
IXLan.objects.create(ix=cls.entities["ix"][0], status="ok")
]
# create ixlan prefix(s)
cls.entities["ixpfx"] = [
IXLanPrefix.objects.create(
ixlan=cls.entities["ixlan"][0], status="ok",
prefix="195.69.144.0/22", protocol="IPv4"),
IXLanPrefix.objects.create(
ixlan=cls.entities["ixlan"][0], status="ok",
prefix="2001:7f8:1::/64", protocol="IPv6"),
IXLanPrefix.objects.create(
ixlan=cls.entities["ixlan"][1], status="ok",
prefix="195.66.224.0/22", protocol="IPv4"),
IXLanPrefix.objects.create(
ixlan=cls.entities["ixlan"][1], status="ok",
prefix="2001:7f8:4::/64", protocol="IPv6")
]
# create network(s)
cls.entities["net"] = [
Network.objects.create(
name="Netflix", org=cls.entities["org"][0], asn=2906,
info_prefixes4=42, info_prefixes6=42,
website="http://netflix.com/", policy_general="Open",
policy_url="https://www.netflix.com/openconnect/",
allow_ixp_update=True, status="ok", irr_as_set="AS-NFLX"),
Network.objects.create(name="Network with deleted netixlans",
org=cls.entities["org"][0], asn=1001,
allow_ixp_update=True, status="ok"),
Network.objects.create(
name="Network with allow ixp update off",
org=cls.entities["org"][0], asn=1002, status="ok")
]
# create netixlans
cls.entities["netixlan"] = [
NetworkIXLan.objects.create(
network=cls.entities["net"][1],
ixlan=cls.entities["ixlan"][1], asn=1001, speed=10000,
ipaddr4="195.69.146.250", ipaddr6=None, status="deleted"),
NetworkIXLan.objects.create(
network=cls.entities["net"][1],
ixlan=cls.entities["ixlan"][1], asn=1001, speed=10000,
ipaddr4=None, ipaddr6="2001:7f8:1::a500:2906:1",
status="deleted"),
NetworkIXLan.objects.create(
network=cls.entities["net"][0],
ixlan=cls.entities["ixlan"][0], asn=2906, speed=10000,
ipaddr4="195.69.146.249", ipaddr6=None, status="ok"),
]
def test_update_from_ixf_ixp_member_list(self):
ixlan = self.entities["ixlan"][0]
n_deleted = self.entities["netixlan"][0]
n_deleted2 = self.entities["netixlan"][1]
self.assertEqual(unicode(n_deleted.ipaddr4), u'195.69.146.250')
self.assertEqual(
unicode(n_deleted2.ipaddr6), u'2001:7f8:1::a500:2906:1')
self.assertEqual(ixlan.netixlan_set_active.count(), 1)
r, netixlans, netixlans_deleted, log = ixlan.update_from_ixf_ixp_member_list(
self.json_data)
print(log)
self.assertEqual(len(log), 1)
self.assertEqual(len(netixlans), 2)
self.assertEqual(len(netixlans_deleted), 1)
n = netixlans[0]
self.assertEqual(unicode(n.ipaddr4), u"195.69.146.250")
self.assertEqual(unicode(n.ipaddr6), u"2001:7f8:1::a500:2906:2")
self.assertEqual(n.speed, 10000)
self.assertEqual(n.status, "ok")
self.assertEqual(n.ixlan, ixlan)
self.assertEqual(n.asn, 2906)
n2 = netixlans[1]
self.assertEqual(unicode(n2.ipaddr4), u"195.69.147.250")
self.assertEqual(unicode(n2.ipaddr6), u"2001:7f8:1::a500:2906:1")
self.assertEqual(n2.speed, 10000)
self.assertEqual(n2.status, "ok")
self.assertEqual(n2.ixlan, ixlan)
self.assertEqual(n.asn, 2906)
#self.assertEqual(IXLan.objects.get(id=ixlan.id).netixlan_set_active.count(), 2)
#FIXME: this is not practical until
#https://github.com/peeringdb/peeringdb/issues/90 is resolved
#so skipping those tests right now
#n_deleted.refresh_from_db()
#n_deleted2.refresh_from_db()
#self.assertEqual(n_deleted.ipaddr4, None)
#self.assertEqual(n_deleted2.ipaddr6, None)
def test_update_from_ixf_ixp_member_list_skip_prefix_mismatch(self):
"""
Here we test that entries with ipaddresses that cannot be validated
against any of the prefixes that exist on the ixlan get skipped
"""
ixlan = self.entities["ixlan"][1]
r, netixlans, netixlans_deleted, log = ixlan.update_from_ixf_ixp_member_list(
self.json_data)
self.assertEqual(len(log), 2)
self.assertEqual(len(netixlans), 0)
def test_update_from_ixf_ixp_member_list_skip_missing_prefixes(self):
"""
Here we test that nothing is done at all if the importer is run on an
ixlan that does not have any prefixes
"""
ixlan = self.entities["ixlan"][2]
r, netixlans, netixlans_deleted, log = ixlan.update_from_ixf_ixp_member_list(
self.json_data)
self.assertEqual(len(netixlans), 0)
self.assertEqual(len(netixlans_deleted), 0)
self.assertEqual(log, [u'No prefixes defined on ixlan, skipping.'])
def test_update_from_ixf_ixp_member_list_skip_disabled_networks(self):
"""
Here we test that networks with allow_ixp_update set to False
will not be processed
"""
ixlan = self.entities["ixlan"][0]
network = self.entities["net"][0]
network.allow_ixp_update = False
network.save()
r, netixlans, netixlans_deleted, log = ixlan.update_from_ixf_ixp_member_list(
self.json_data)
self.assertEqual(len(log), 3)
self.assertEqual(len(netixlans), 0)
for netixlan in network.netixlan_set_active.all():
netixlan.refresh_from_db()
self.assertEqual(netixlan.status, "ok")
def test_update_from_ixf_ixp_member_list_logs(self):
ixlan = self.entities["ixlan"][0]
r, netixlans, netixlans_deleted, log = ixlan.update_from_ixf_ixp_member_list(
self.json_data)
attempt_dt_1 = ixlan.ixf_import_attempt.updated
for netixlan in netixlans:
log_entry = ixlan.ixf_import_log_set.last().entries.get(
netixlan=netixlan)
self.assertEqual(log_entry.version_before, None)
self.assertEqual(
log_entry.version_after,
reversion.models.Version.objects.get_for_object(netixlan)[0])
for netixlan in netixlans_deleted:
log_entry = ixlan.ixf_import_log_set.last().entries.get(
netixlan=netixlan)
self.assertEqual(
log_entry.version_before,
reversion.models.Version.objects.get_for_object(netixlan)[1])
self.assertEqual(
log_entry.version_after,
reversion.models.Version.objects.get_for_object(netixlan)[0])
with reversion.create_revision():
netixlans[0].speed = 10
netixlans[0].save()
time.sleep(0.1)
r, netixlans, netixlans_deleted, log = ixlan.update_from_ixf_ixp_member_list(
self.json_data)
ixlan.ixf_import_attempt.refresh_from_db()
attempt_dt_2 = ixlan.ixf_import_attempt.updated
self.assertNotEqual(attempt_dt_1, attempt_dt_2)
self.assertEqual(ixlan.ixf_import_log_set.count(), 2)
self.assertEqual(len(netixlans), 1)
for netixlan in netixlans:
log_entry = ixlan.ixf_import_log_set.last().entries.get(
netixlan=netixlan)
self.assertEqual(
log_entry.version_before,
reversion.models.Version.objects.get_for_object(netixlan)[1])
self.assertEqual(
log_entry.version_after,
reversion.models.Version.objects.get_for_object(netixlan)[0])
def test_rollback(self):
ixlan = self.entities["ixlan"][0]
r, netixlans, netixlans_deleted, log = ixlan.update_from_ixf_ixp_member_list(
self.json_data)
for entry in ixlan.ixf_import_log_set.last().entries.all():
self.assertEqual(entry.rollback_status(), 0)
ixlan.ixf_import_log_set.last().rollback()
netixlans[0].refresh_from_db()
netixlans[1].refresh_from_db()
self.assertEqual(netixlans[0].status, "deleted")
self.assertEqual(netixlans[1].status, "deleted")
ixlan.ixf_import_log_set.last().refresh_from_db()
for entry in ixlan.ixf_import_log_set.last().entries.all():
self.assertEqual(entry.rollback_status(), 1)
def test_rollback_avoid_ipaddress_conflict(self):
ixlan = self.entities["ixlan"][0]
r, netixlans, netixlans_deleted, log = ixlan.update_from_ixf_ixp_member_list(
self.json_data)
self.assertEqual(len(netixlans_deleted), 1)
netixlan = netixlans_deleted[0]
other = NetworkIXLan.objects.create(
network=netixlan.network, ixlan=netixlan.ixlan, speed=1000,
status="ok", asn=netixlan.asn + 1, ipaddr4=netixlan.ipaddr4)
for entry in ixlan.ixf_import_log_set.last().entries.all():
if entry.netixlan == netixlan:
self.assertEqual(entry.rollback_status(), 2)
ixlan.ixf_import_log_set.last().rollback()
netixlan.refresh_from_db()
self.assertEqual(netixlan.status, "deleted")
other.delete(hard=True)
def test_export_view_ixlan(self):
"""
Test that the /export/ixlan/<ixlan_id>/ixp-member-list endpoint
generates the expected result after importing a test data set
"""
# we only export 0.6 version of the schema, so can skip this test
# for the other versions
if self.version != "0.6":
return
# import the data
ixlan = self.entities["ixlan"][0]
r, netixlans, netixlans_deleted, log = ixlan.update_from_ixf_ixp_member_list(
self.json_data)
# request the view and compare it agaisnt expected data
c = Client()
resp = c.get("/export/ixlan/{}/ixp-member-list".format(ixlan.id))
self.assertEqual(resp.status_code, 200)
data = json.loads(resp.content)
with open(
os.path.join(
os.path.dirname(__file__), "data", "json_members_list",
"export.json"), "r") as fh:
expected = json.load(fh)
data["timestamp"] = expected["timestamp"]
self.assertEqual(data, expected)
schema = requests.get(self.schema_url).json()
jsonschema.validate(data, schema)
def test_export_view_ix(self):
"""
Test that the /export/ix/<ix_id>/ixp-member-list endpoint
generates the expected result after importing a test data set
"""
# we only export 0.6 version of the schema, so can skip this test
# for the other versions
if self.version != "0.6":
return
# import the data
ixlan = self.entities["ixlan"][0]
r, netixlans, netixlans_deleted, log = ixlan.update_from_ixf_ixp_member_list(
self.json_data)
# request the view and compare it agaisnt expected data
c = Client()
resp = c.get("/export/ix/{}/ixp-member-list".format(ixlan.ix.id))
self.assertEqual(resp.status_code, 200)
data = json.loads(resp.content)
with open(
os.path.join(
os.path.dirname(__file__), "data", "json_members_list",
"export.json"), "r") as fh:
other = json.load(fh)
data["timestamp"] = other["timestamp"]
self.assertEqual(data, other)
schema = requests.get(self.schema_url).json()
jsonschema.validate(data, schema)
def test_ixp_allow_update_default(self):
self.assertEqual(self.entities["net"][2].allow_ixp_update, False)
class JsonMembersListTestCase_V05(JsonMembersListTestCase):
version = "0.5"
class JsonMembersListTestCase_V04(JsonMembersListTestCase):
version = "0.4"