diff --git a/peeringdb_server/management/commands/pdb_api_test.py b/peeringdb_server/management/commands/pdb_api_test.py index 3481858a..9a70c56a 100644 --- a/peeringdb_server/management/commands/pdb_api_test.py +++ b/peeringdb_server/management/commands/pdb_api_test.py @@ -100,6 +100,30 @@ EMAIL = "test@20c.com" VERBOSE = False +PREFIXES_V4 = [ + u"206.223.114.0/24", + u"206.223.115.0/24", + u"206.223.116.0/24", + u"206.223.117.0/24", + u"206.223.118.0/24", + u"206.223.119.0/24", + u"206.223.120.0/24", + u"206.223.121.0/24", + u"206.223.122.0/24", +] + +PREFIXES_V6 = [ + u"2001:504:0:1::/64", + u"2001:504:0:2::/64", + u"2001:504:0:3::/64", + u"2001:504:0:4::/64", + u"2001:504:0:5::/64", + u"2001:504:0:6::/64", + u"2001:504:0:7::/64", + u"2001:504:0:8::/64", + u"2001:504:0:9::/64", +] + class TestJSON(unittest.TestCase): @@ -110,17 +134,32 @@ class TestJSON(unittest.TestCase): IP6_COUNT = 1 @classmethod - def get_ip6(cls): - r = u"2001:7f8:4::1154:%d" % cls.IP6_COUNT + def get_ip6(cls, ixlan): + hosts = [] + for host in ixlan.ixpfx_set.filter(status=ixlan.status, protocol=6).first().prefix.hosts(): + if len(hosts) < 100: + hosts.append(host) + else: + break + + r = u"{}".format(hosts[cls.IP6_COUNT]) cls.IP6_COUNT += 1 return r @classmethod - def get_ip4(cls): - r = u"1.1.1.%d" % cls.IP4_COUNT + def get_ip4(cls, ixlan): + hosts = [] + for host in ixlan.ixpfx_set.filter(status=ixlan.status, protocol=4).first().prefix.hosts(): + if len(hosts) < 100: + hosts.append(host) + else: + break + + r = u"{}".format(hosts[cls.IP4_COUNT]) cls.IP4_COUNT += 1 return r + @classmethod def get_prefix4(cls): r = u"206.41.{}.0/24".format(cls.PREFIX_COUNT) @@ -318,14 +357,17 @@ class TestJSON(unittest.TestCase): "notes": NOTE, "speed": 30000, "asn": 12345, - "ipaddr4": self.get_ip4(), - "ipaddr6": self.get_ip6() } data.update(**kwargs) for k, v in rename.items(): data[v] = data[k] del data[k] + + data.update( + ipaddr4=self.get_ip4(IXLan.objects.get(id=data["ixlan_id"])), + ipaddr6=self.get_ip6(IXLan.objects.get(id=data["ixlan_id"])), + ) return data ########################################################################## @@ -615,6 +657,20 @@ class TestJSON(unittest.TestCase): SHARED["%s_r_ok_public" % target].id, SHARED["%s_rw_ok_public" % target].id ] + elif target == "ixpfx": + + valid_s = [ + SHARED["%s_r_ok" % target].id, + SHARED["%s_r_v6_ok" % target].id, + ] + + valid_m = [ + SHARED["%s_r_ok" % target].id, + SHARED["%s_rw_ok" % target].id, + SHARED["%s_r_v6_ok" % target].id, + SHARED["%s_rw_v6_ok" % target].id, + ] + else: valid_s = [SHARED["%s_r_ok" % target].id] @@ -623,7 +679,7 @@ class TestJSON(unittest.TestCase): SHARED["%s_r_ok" % target].id, SHARED["%s_rw_ok" % target].id ] - # exact + # exact data = self.db_guest.all(target, **kwargs_s) self.assertGreater(len(data), 0) for row in data: @@ -1329,12 +1385,13 @@ class TestJSON(unittest.TestCase): }, "perms": { # set network to one the user doesnt have perms to - "ipaddr4": self.get_ip4(), - "ipaddr6": self.get_ip6(), + "ipaddr4": self.get_ip4(SHARED["ixlan_rw_ok"]), + "ipaddr6": self.get_ip6(SHARED["ixlan_rw_ok"]), "net_id": SHARED["net_r_ok"].id } }) + SHARED["netixlan_id"] = r_data.get("id") self.assert_update(self.db_org_admin, "netixlan", @@ -1352,6 +1409,26 @@ class TestJSON(unittest.TestCase): test_success=SHARED["netixlan_id"], test_failure=SHARED["netixlan_r_ok"].id) + + ########################################################################## + + def test_org_admin_002_POST_PUT_netixlan_validation(self): + data = self.make_data_netixlan(net_id=SHARED["net_rw_ok"].id, + ixlan_id=SHARED["ixlan_rw_ok"].id) + + test_failures = [ + # test failure if ip4 not in prefix + {"invalid": { "ipaddr4": self.get_ip4(SHARED["ixlan_r_ok"]) }}, + # test failure if ip6 not in prefix + {"invalid": { "ipaddr6": self.get_ip6(SHARED["ixlan_r_ok"]) }}, + ] + + for test_failure in test_failures: + self.assert_create(self.db_org_admin, "netixlan", data, + test_failures=test_failure, test_success=False) + + + ########################################################################## def test_org_admin_002_POST_PUT_DELETE_ixfac(self): @@ -2719,6 +2796,12 @@ class Command(BaseCommand): if tag in ["ix", "net", "fac", "org"]: data["name"] = name + if tag == "ixpfx": + if kwargs.get("protocol", 4) == 4: + data["prefix"] = PREFIXES_V4[model.objects.all().count()] + elif kwargs.get("protocol") == 6: + data["prefix"] = PREFIXES_V6[model.objects.all().count()] + data.update(**kwargs) try: obj = model.objects.get(**data) @@ -2886,6 +2969,14 @@ class Command(BaseCommand): IXLanPrefix, status=status, prefix=prefix, + protocol=4, + ixlan_id=SHARED["ixlan_%s_%s" % (prefix, status)].id, + ) + cls.create_entity( + IXLanPrefix, + status=status, + prefix="{}_v6".format(prefix), + protocol=6, ixlan_id=SHARED["ixlan_%s_%s" % (prefix, status)].id, ) cls.create_entity( diff --git a/peeringdb_server/models.py b/peeringdb_server/models.py index 6d07d099..90ca632d 100644 --- a/peeringdb_server/models.py +++ b/peeringdb_server/models.py @@ -1435,7 +1435,6 @@ class IXLan(pdb_models.IXLanBase): return False - @reversion.create_revision() def add_netixlan(self, netixlan_info, save=True, save_others=True): """ @@ -2286,11 +2285,40 @@ class NetworkIXLan(pdb_models.NetworkIXLanBase): conflict_v6 = (self.ipaddr6 and ipv6.exists()) return (conflict_v4, conflict_v6) + + def validate_ipaddr4(self): + if self.ipaddr4 and not self.ixlan.test_ipv4_address(self.ipaddr4): + raise ValidationError(_("IPv4 address outside of prefix")) + + def validate_ipaddr6(self): + if self.ipaddr6 and not self.ixlan.test_ipv6_address(self.ipaddr6): + raise ValidationError(_("IPv6 address outside of prefix")) + + def clean(self): """ Custom model validation """ errors = {} + + # check that the ip address can be validated agaisnt + # at least one of the prefix on the parent ixlan + + try: + self.validate_ipaddr4() + except ValidationError as exc: + errors["ipaddr4"] = exc.message + + try: + self.validate_ipaddr6() + except ValidationError as exc: + errors["ipaddr6"] = exc.message + + if errors: + raise ValidationError(errors) + + # make sure this ip address is not claimed anywhere else + conflict_v4, conflict_v6 = self.ipaddress_conflict() if conflict_v4: errors["ipaddr4"] = _("Ip address already exists elsewhere") diff --git a/peeringdb_server/serializers.py b/peeringdb_server/serializers.py index c6710ee4..0796652e 100644 --- a/peeringdb_server/serializers.py +++ b/peeringdb_server/serializers.py @@ -8,7 +8,7 @@ from django.db.models.query import QuerySet from django.db.models import Prefetch, Q, Sum, IntegerField, Case, When from django.db import models, transaction from django.db.models.fields.related import ReverseManyToOneDescriptor, ForwardManyToOneDescriptor -from django.core.exceptions import FieldError +from django.core.exceptions import FieldError, ValidationError from rest_framework import serializers, validators from rest_framework.exceptions import ValidationError as RestValidationError # from drf_toolbox import serializers @@ -1161,6 +1161,20 @@ class NetworkIXLanSerializer(ModelSerializer): def get_ix_id(self, inst): return inst.ix_id + def validate(self, data): + netixlan = NetworkIXLan(**data) + + try: + netixlan.validate_ipaddr4() + except ValidationError as exc: + raise serializers.ValidationError({"ipaddr4":exc.message}) + + try: + netixlan.validate_ipaddr6() + except ValidationError as exc: + raise serializers.ValidationError({"ipaddr6":exc.message}) + return data + class NetworkFacilitySerializer(ModelSerializer): """