1
0
mirror of https://github.com/peeringdb/peeringdb.git synced 2024-05-11 05:55:09 +00:00
Files
peeringdb-peeringdb/peeringdb_server/inet.py
Matt Griswold ea55c4dc38 July updates (#762)
* Change label from primary ASN to ASN

* Raise validation error when trying to update ASN

* first steps for dotf importer procotol (#697)

* migrations (#697)

* Add translation to error meessage

* Make ASN readonly in table

* Add test now that ASN should not be able to update

* Set fac.rencode to '' for all entries and make it readonly in serializer

* Add unique constraints to network ixlan ip addresses

* Add migration to null out duplicate ipaddresses for deleted netixlans

* Add unique constraints to network ixlan ip addresses

* Add migration to null out duplicate ipaddresses for deleted netixlans

* remove old migrations (#697)

* fix netixlan ipaddr dedupe migration (#268)
add netixlan ipaddr unique constraint migration (#268)

* ixf_member_data migrations (#697)

* fix table name (#697)

* importer protocol (#697)

* fix netixlan ipaddr dedupe migration (#268)
add netixlan ipaddr unique constraint migration (#268)

* ixf proposed changes notifications (#697)

* Delete repeated query

* Add a test to show rencode is readonly

* Blank out rencode when mocking data

* Remove validator now that constraint exists

* Add back unique field validator w Check Deleted true

* conflict resolving (#697)

* UniqueFieldValidator raise error with code "unique" (#268)

* conflict resolution (#697)

* Add fixme comment to tests

* conflict resolution (#697)

* Remove now invalid undelete tests

* UniqueFieldValidator raise error with code "unique" (#268)

* delete admin tools for duplicate ip addresses

* Make migration to delete duplicateipnetworkixlan

* Add ixlan-ixpfx status matching validation, add corresponding test

* delete redundant checking in test

* resolve conflict ui (#697)

* fix migrations hierarchy

* squash migrations for ixf member data

* clean up preview and post-mortem tools

* remove non-sensical permission check when undeleting soft-deleted objects through unique integrity error handling

* only include the ix-f data url in notifications to admincom (#697)

* resolve on --skip-import (#697)

* ac conflict resolution (#697)

* Define more accurately the incompatible statuses for ixlan and ixpfx

* Add another status test

* Preventing disrupting changes (#697)

* fix tests (#697)

* Stop allow_ixp_update from being write only and add a global stat for automated networks

* Add tests for global stats that appear in footer

* Change how timezone is called with datetime, to get test_stats.py/test_generate_for_current_date to pass

* test for protected entities (#697)

* admincom conflict resolution refine readonly fields (#697)
network notifications only if the problem is actually actionable by the network (#697)

* ixp / ac notifcation when ix-f source cannot be parsed (#697)
fix issue with ixlan prefix protection (#697)

* migrations (#697)

* code documentation (#697)

* ux tweaks (#697)

* UX tweaks (#697)

* Fix typo

* fix netixlan returned in IXFMemberData.apply when adding a new one (#697)

* fix import log incosistencies (#697)

* Add IXFMemberData to test

* Update test data

* Add protocol tests

* Add tests for views

* always persist changes to remote data on set_conflict (#697)

* More tests

* always persist changes to remote data on set_conflict (#697)

* suggest-add test

* net_present_at_ix should check status (#697)

* Add more protocol tests

* Edit language of some tests

* django-peeringdb to 2.1.1
relock pipfile, pin django-ratelimit to <3 as it breaks stuff

* Add net_count_ixf field to ix object (#683)

* Add the IX-F Member Export URL to the ixlan API endpoint (#249)

* Lock some objects from being deleted by the owner (#696)

* regenerate api docs (#249)

* always persist changes to remote data on set_add and set_update (#697)

* IXFMemberData: always persist remote data changes during set_add and set_update, also allow for saving without touching the updated field

* always persist changes to remote data on set_add and set_update (#697)

* Fix suggest-add tests

* IXFMemberData: always persist remote data changes during set_add and set_update, also allow for saving without touching the updated field

* IXFMemberData: always persist remote data changes during set_add and set_update, also allow for saving without touching the updated field

* fix issue with deletion when ixfmemberdata for entry existed previously (#697)

* fix test_suggest_delete_local_ixf_no_flag (#697 tests)

* fix issue with deletion when ixfmemberdata for entry existed previously (#697)

* invalid ips get logged and notified to the ix via notify_error (#697)

* Fix more tests

* issue with previous_data when running without save (#697)
properly track speed errors (#697)

* reset errors on ixfmemberdata that go into pending_save (#697)

* add remote_data to admin view (#697)

* fix error reset inconsistency (#697)

* Refine invalid data tests

* remove debug output

* for notifications to ac include contact points for net and ix in the message (#697)

* settings to toggle ix-f tickets / emails (#697)

* allow turning off ix-f notifications for net and ix separately (#697)

* add jsonschema test

* Add idempotent tests to updater

* remove old ixf member tests

* Invalid data tests when ixp_updates are enabled

* fix speed error validation (#697)

* fix issue with rollback (#697)

* fix migration hierarchy

* fix ixfmemberdata _email

* django-peeringdb to 2.2 and relock

* add ixf rollback tests

* ixf email notifications off by default

* black formatted

* pyupgrade

Co-authored-by: egfrank <egfrank@20c.com>
Co-authored-by: Stefan Pratter <stefan@20c.com>
2020-07-15 07:07:01 +00:00

322 lines
7.5 KiB
Python

import ipaddress
import re
import rdap
from rdap import RdapAsn
from rdap.exceptions import RdapException, RdapHTTPError, RdapNotFoundError
import requests
from django.utils.translation import ugettext_lazy as _
from peeringdb_server import settings
# Valid IRR Source values
# reference: http://www.irr.net/docs/list.html
IRR_SOURCE = (
"AFRINIC",
"ALTDB",
"AOLTW",
"APNIC",
"ARIN",
"ARIN-NONAUTH",
"BELL",
"BBOI",
"CANARIE",
"EASYNET",
"EPOCH",
"HOST",
"JPIRR",
"LACNIC",
"LEVEL3",
"NESTEGG",
"NTTCOM",
"OPENFACE",
"OTTIX",
"PANIX",
"RADB",
"REACH",
"RGNET",
"RIPE",
"RISQ",
"ROGERS",
"TC",
)
# RFC 5398 documentation asn range
ASN_RFC_5398_16BIT = (64496, 64511)
ASN_RFC_5398_32BIT = (65536, 65551)
# RFC 6996 private asn range
ASN_RFC_6996_16BIT = (64512, 65534)
ASN_RFC_6996_32BIT = (4200000000, 4294967294)
# RFC 7003 last asn
ASN_LAST_16BIT = (65535, 65535)
ASN_LAST_32BIT = (4294967295, 4294967295)
# IANA reserved ASNs
# https://www.mail-archive.com/uknof@lists.uknof.org.uk/msg03395.html
ASN_IANA_RESERVED = (65552, 131071)
ASN_TRANS = (23456, 23456)
BOGON_ASN_RANGES = [
# RFC 5398 - documentation 16-bit
ASN_RFC_5398_16BIT,
# RFC 5398 - documentation 32-bit
ASN_RFC_5398_32BIT,
# IANA Reserved
ASN_IANA_RESERVED,
# RFC 6996 - private 16-bit
ASN_RFC_6996_16BIT,
# RFC 6996 - private 32-bit
ASN_RFC_6996_32BIT,
# RFC 7003 - last asn 16-bit
ASN_LAST_16BIT,
# RFC 7003 - last asn 32-bit
ASN_LAST_32BIT,
# trans
ASN_TRANS,
]
# the following bogon asn ranges are allowed on envionments
# where TUTORIAL_MODE is set to True
TUTORIAL_ASN_RANGES = [
# RFC 5398 - documentation 16-bit
ASN_RFC_5398_16BIT,
# RFC 5398 - documentation 32-bit
ASN_RFC_5398_32BIT,
# RFC 6996 - private 16-bit
ASN_RFC_6996_16BIT,
# RFC 6996 - private 32-bit
ASN_RFC_6996_32BIT,
]
class BogonAsn(rdap.RdapAsn):
"""
On tutorial mode environments we will return an instance
of this to provide an rdapasn result for asns in the
private and documentation ranges
"""
def __init__(self, asn):
name = f"AS{asn}"
self._parsed = {
"name": name,
"org_name": name,
"org_address": None,
"emails": [],
}
class RdapLookup(rdap.RdapClient):
"""
Does RDAP lookups against defined URL.
"""
def __init__(self):
# create rdap config
config = dict(
bootstrap_url=settings.RDAP_URL.rstrip("/"),
lacnic_apikey=settings.RDAP_LACNIC_APIKEY,
timeout=2.5,
)
super().__init__(config)
def get_asn(self, asn):
"""
We handle asns that fall into the private/documentation ranges
manually - others are processed normally through rdap lookup
"""
if asn_is_bogon(asn):
if settings.TUTORIAL_MODE and asn_is_in_ranges(asn, TUTORIAL_ASN_RANGES):
return BogonAsn(asn)
else:
raise RdapException(
_("ASNs in this range " "are not allowed in this environment")
)
return super().get_asn(asn)
def asn_is_bogon(asn):
"""
Test if an asn is bogon by being either in the documentation
or private asn ranges
Arguments:
- asn<int>
Return:
- bool: True if in bogon range
"""
return asn_is_in_ranges(asn, BOGON_ASN_RANGES)
def asn_is_in_ranges(asn, ranges):
"""
Test if an asn falls within any of the ranges provided
Arguments:
- asn<int>
- ranges<list[tuple(min,max)]>
Return:
- bool
"""
asn = int(asn)
for as_range in ranges:
if asn >= as_range[0] and asn <= as_range[1]:
return True
return False
def network_is_bogon(network):
"""
Returns if the passed ipaddress network is a bogon
Arguments:
- network <ipaddress.IPv4Network|ipaddress.IPv6Network>
Return:
- bool
"""
return not network.is_global or network.is_reserved
def network_is_pdb_valid(network):
"""
Return if the passed ipaddress network is in pdb valid
address space
Arguments:
- network <ipaddress.IPv4Network|ipaddress.IPv6Network>
Return:
- bool
"""
if network.is_multicast or network_is_bogon(network):
return False
if network.version == 4:
return True
# not allowed v6 blocks
v6_invalid = [
# 2002::/16 - RFC 3068 - 6to4 prefix
0x2002,
# 3ffe::/16 - RFC 5156 - used for the 6bone but was returned
0x3FFE,
# fec0::/10 - RFC 4291 - Reserved by IETF
0xFEC0,
# ff00::/8 - RFC 4291 - Multicast
0xFF00,
]
if int(network.network_address) >> 112 in v6_invalid:
return False
return True
def get_prefix_protocol(prefix):
"""
Takes a network address space prefix string and returns
a string describing the protocol
Will raise a ValueError if it cannot determine protocol
Returns:
str: IPv4 or IPv6
"""
try:
ipaddress.IPv4Network(prefix)
return "IPv4"
except ipaddress.AddressValueError:
try:
ipaddress.IPv6Network(prefix)
return "IPv6"
except ipaddress.AddressValueError:
raise ValueError("Prefix invalid")
def renumber_ipaddress(ipaddr, old_prefix, new_prefix):
"""
Renumber an ipaddress from old prefix to new prefix
Arguments:
- ipaddr (ipaddress.ip_address)
- old_prefix (ipaddress.ip_network)
- new_prefix (ipaddress.ip_network)
Returns:
- ipaddress.ip_address: renumbered ip address
"""
# validate that old and new prefix are compatible
if old_prefix == new_prefix:
raise ValueError("New and old prefix are identical")
if old_prefix.version != new_prefix.version:
raise ValueError("New prefix needs to be the same version as old prefix")
if new_prefix.version != ipaddr.version:
raise ValueError("Prefix version needs to be same version as ip address")
if old_prefix.prefixlen != new_prefix.prefixlen:
raise ValueError("New prefix needs to be of same length")
if ipaddr not in old_prefix:
raise ValueError("Ip address not within old prefix")
if ipaddr.version == 4:
delimiter = "."
else:
delimiter = ":"
# split prefixes and ipaddress into their octets
old_octets, old_mask = old_prefix.exploded.split("/")
new_octets, new_mask = new_prefix.exploded.split("/")
old_octets = old_octets.split(delimiter)
new_octets = new_octets.split(delimiter)
ip_octets = ipaddr.exploded.split(delimiter)
# get netmask octets so we can see which are to be replace
netmask_octets = new_prefix.netmask.exploded.split(delimiter)
i = 0
for octet in netmask_octets:
# replace any octet that is not a zero in the netmask
if (ipaddr.version == 4 and int(octet) > 0) or (
ipaddr.version == 6 and octet != "0000"
):
ip_octets[i] = new_octets[i]
i += 1
# return renumbered ipaddress
return ipaddress.ip_address(
"{}".format(delimiter.join([str(o) for o in ip_octets]))
)
def get_client_ip(request):
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
ip = x_forwarded_for.split(",")[0]
else:
ip = request.META.get("REMOTE_ADDR")
return ip