mirror of
https://github.com/peeringdb/peeringdb.git
synced 2024-05-11 05:55:09 +00:00
validate netixlan ip against parent prefixes (#8)
This commit is contained in:
@ -100,6 +100,30 @@ EMAIL = "test@20c.com"
|
||||
|
||||
VERBOSE = False
|
||||
|
||||
PREFIXES_V4 = [
|
||||
u"206.223.114.0/24",
|
||||
u"206.223.115.0/24",
|
||||
u"206.223.116.0/24",
|
||||
u"206.223.117.0/24",
|
||||
u"206.223.118.0/24",
|
||||
u"206.223.119.0/24",
|
||||
u"206.223.120.0/24",
|
||||
u"206.223.121.0/24",
|
||||
u"206.223.122.0/24",
|
||||
]
|
||||
|
||||
PREFIXES_V6 = [
|
||||
u"2001:504:0:1::/64",
|
||||
u"2001:504:0:2::/64",
|
||||
u"2001:504:0:3::/64",
|
||||
u"2001:504:0:4::/64",
|
||||
u"2001:504:0:5::/64",
|
||||
u"2001:504:0:6::/64",
|
||||
u"2001:504:0:7::/64",
|
||||
u"2001:504:0:8::/64",
|
||||
u"2001:504:0:9::/64",
|
||||
]
|
||||
|
||||
|
||||
class TestJSON(unittest.TestCase):
|
||||
|
||||
@ -110,17 +134,32 @@ class TestJSON(unittest.TestCase):
|
||||
IP6_COUNT = 1
|
||||
|
||||
@classmethod
|
||||
def get_ip6(cls):
|
||||
r = u"2001:7f8:4::1154:%d" % cls.IP6_COUNT
|
||||
def get_ip6(cls, ixlan):
|
||||
hosts = []
|
||||
for host in ixlan.ixpfx_set.filter(status=ixlan.status, protocol=6).first().prefix.hosts():
|
||||
if len(hosts) < 100:
|
||||
hosts.append(host)
|
||||
else:
|
||||
break
|
||||
|
||||
r = u"{}".format(hosts[cls.IP6_COUNT])
|
||||
cls.IP6_COUNT += 1
|
||||
return r
|
||||
|
||||
@classmethod
|
||||
def get_ip4(cls):
|
||||
r = u"1.1.1.%d" % cls.IP4_COUNT
|
||||
def get_ip4(cls, ixlan):
|
||||
hosts = []
|
||||
for host in ixlan.ixpfx_set.filter(status=ixlan.status, protocol=4).first().prefix.hosts():
|
||||
if len(hosts) < 100:
|
||||
hosts.append(host)
|
||||
else:
|
||||
break
|
||||
|
||||
r = u"{}".format(hosts[cls.IP4_COUNT])
|
||||
cls.IP4_COUNT += 1
|
||||
return r
|
||||
|
||||
|
||||
@classmethod
|
||||
def get_prefix4(cls):
|
||||
r = u"206.41.{}.0/24".format(cls.PREFIX_COUNT)
|
||||
@ -318,14 +357,17 @@ class TestJSON(unittest.TestCase):
|
||||
"notes": NOTE,
|
||||
"speed": 30000,
|
||||
"asn": 12345,
|
||||
"ipaddr4": self.get_ip4(),
|
||||
"ipaddr6": self.get_ip6()
|
||||
}
|
||||
|
||||
data.update(**kwargs)
|
||||
for k, v in rename.items():
|
||||
data[v] = data[k]
|
||||
del data[k]
|
||||
|
||||
data.update(
|
||||
ipaddr4=self.get_ip4(IXLan.objects.get(id=data["ixlan_id"])),
|
||||
ipaddr6=self.get_ip6(IXLan.objects.get(id=data["ixlan_id"])),
|
||||
)
|
||||
return data
|
||||
|
||||
##########################################################################
|
||||
@ -615,6 +657,20 @@ class TestJSON(unittest.TestCase):
|
||||
SHARED["%s_r_ok_public" % target].id,
|
||||
SHARED["%s_rw_ok_public" % target].id
|
||||
]
|
||||
elif target == "ixpfx":
|
||||
|
||||
valid_s = [
|
||||
SHARED["%s_r_ok" % target].id,
|
||||
SHARED["%s_r_v6_ok" % target].id,
|
||||
]
|
||||
|
||||
valid_m = [
|
||||
SHARED["%s_r_ok" % target].id,
|
||||
SHARED["%s_rw_ok" % target].id,
|
||||
SHARED["%s_r_v6_ok" % target].id,
|
||||
SHARED["%s_rw_v6_ok" % target].id,
|
||||
]
|
||||
|
||||
else:
|
||||
|
||||
valid_s = [SHARED["%s_r_ok" % target].id]
|
||||
@ -623,7 +679,7 @@ class TestJSON(unittest.TestCase):
|
||||
SHARED["%s_r_ok" % target].id, SHARED["%s_rw_ok" % target].id
|
||||
]
|
||||
|
||||
# exact
|
||||
# exact
|
||||
data = self.db_guest.all(target, **kwargs_s)
|
||||
self.assertGreater(len(data), 0)
|
||||
for row in data:
|
||||
@ -1329,12 +1385,13 @@ class TestJSON(unittest.TestCase):
|
||||
},
|
||||
"perms": {
|
||||
# set network to one the user doesnt have perms to
|
||||
"ipaddr4": self.get_ip4(),
|
||||
"ipaddr6": self.get_ip6(),
|
||||
"ipaddr4": self.get_ip4(SHARED["ixlan_rw_ok"]),
|
||||
"ipaddr6": self.get_ip6(SHARED["ixlan_rw_ok"]),
|
||||
"net_id": SHARED["net_r_ok"].id
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
SHARED["netixlan_id"] = r_data.get("id")
|
||||
|
||||
self.assert_update(self.db_org_admin, "netixlan",
|
||||
@ -1352,6 +1409,26 @@ class TestJSON(unittest.TestCase):
|
||||
test_success=SHARED["netixlan_id"],
|
||||
test_failure=SHARED["netixlan_r_ok"].id)
|
||||
|
||||
|
||||
##########################################################################
|
||||
|
||||
def test_org_admin_002_POST_PUT_netixlan_validation(self):
|
||||
data = self.make_data_netixlan(net_id=SHARED["net_rw_ok"].id,
|
||||
ixlan_id=SHARED["ixlan_rw_ok"].id)
|
||||
|
||||
test_failures = [
|
||||
# test failure if ip4 not in prefix
|
||||
{"invalid": { "ipaddr4": self.get_ip4(SHARED["ixlan_r_ok"]) }},
|
||||
# test failure if ip6 not in prefix
|
||||
{"invalid": { "ipaddr6": self.get_ip6(SHARED["ixlan_r_ok"]) }},
|
||||
]
|
||||
|
||||
for test_failure in test_failures:
|
||||
self.assert_create(self.db_org_admin, "netixlan", data,
|
||||
test_failures=test_failure, test_success=False)
|
||||
|
||||
|
||||
|
||||
##########################################################################
|
||||
|
||||
def test_org_admin_002_POST_PUT_DELETE_ixfac(self):
|
||||
@ -2719,6 +2796,12 @@ class Command(BaseCommand):
|
||||
if tag in ["ix", "net", "fac", "org"]:
|
||||
data["name"] = name
|
||||
|
||||
if tag == "ixpfx":
|
||||
if kwargs.get("protocol", 4) == 4:
|
||||
data["prefix"] = PREFIXES_V4[model.objects.all().count()]
|
||||
elif kwargs.get("protocol") == 6:
|
||||
data["prefix"] = PREFIXES_V6[model.objects.all().count()]
|
||||
|
||||
data.update(**kwargs)
|
||||
try:
|
||||
obj = model.objects.get(**data)
|
||||
@ -2886,6 +2969,14 @@ class Command(BaseCommand):
|
||||
IXLanPrefix,
|
||||
status=status,
|
||||
prefix=prefix,
|
||||
protocol=4,
|
||||
ixlan_id=SHARED["ixlan_%s_%s" % (prefix, status)].id,
|
||||
)
|
||||
cls.create_entity(
|
||||
IXLanPrefix,
|
||||
status=status,
|
||||
prefix="{}_v6".format(prefix),
|
||||
protocol=6,
|
||||
ixlan_id=SHARED["ixlan_%s_%s" % (prefix, status)].id,
|
||||
)
|
||||
cls.create_entity(
|
||||
|
Reference in New Issue
Block a user