import json import re import requests import ipaddress from django.db import transaction from django.core.cache import cache from django.utils.translation import ugettext_lazy as _ import reversion from peeringdb_server.models import ( IXLanIXFMemberImportAttempt, IXLanIXFMemberImportLog, IXLanIXFMemberImportLogEntry, Network, NetworkIXLan, ) class Importer(object): allowed_member_types = [ "peering", "ixp", "routeserver", "probono", ] allowed_states = [ "", None, "active", "inactive", "connected", "operational", ] def __init__(self): self.cache_only = False self.skip_import = False self.reset() def reset(self, ixlan=None, save=False, asn=None): self.reset_log() self.netixlans = [] self.netixlans_deleted = [] self.ipaddresses = [] self.asns = [] self.archive_info = {} self.ixlan = ixlan self.save = save self.asn = asn def fetch(self, url, timeout=5): """ Retrieves ixf member export data from the url Will do a quick sanity check on the data Returns dict containing the parsed data. Arguments: - url Keyword arguments: - timeout : max time to spend on request """ if not url: return {"pdb_error": _("IXF import url not specified")} try: result = requests.get(url, timeout=timeout) except Exception as exc: return {"pdb_error": exc} if result.status_code != 200: return {"pdb_error": "Got HTTP status {}".format(result.status_code)} try: data = result.json() except Exception as inst: data = {"pdb_error": _("No JSON could be parsed")} return data data = self.sanitize(data) # locally cache result if data and not data.get("pdb_error"): cache.set(self.cache_key(url), data, timeout=None) return data def cache_key(self, url): """ returns the django cache key to use for caching ix-f data Argument: url """ return "IXF-CACHE-{}".format(url) def fetch_cached(self, url): """ Returns locally cached IX-F data Arguments: url """ if not url: return {"pdb_error": _("IXF import url not specified")} data = cache.get(self.cache_key(url)) if data is None: return { "pdb_error": _("IX-F data not locally cached for this resource yet.") } return data def sanitize(self, data): """ Takes ixf data dict and runs some sanitization on it """ invalid = None vlan_list_found = False # This fixes instances where ixps provide two separate entries for # vlans in vlan_list for ipv4 and ipv6 (AMS-IX for example) for member in data.get("member_list", []): asn = member.get("asnum") for conn in member.get("connection_list", []): vlans = conn.get("vlan_list", []) if not vlans: continue vlan_list_found = True if len(vlans) == 2: # if vlans[0].get("vlan_id") == vlans[1].get("vlan_id"): keys = list(vlans[0].keys()) + list(vlans[1].keys()) if keys.count("ipv4") == 1 and keys.count("ipv6") == 1: vlans[0].update(**vlans[1]) conn["vlan_list"] = [vlans[0]] if not vlan_list_found: invalid = _("No entries in any of the vlan_list lists, aborting.") data["pdb_error"] = invalid return data def update(self, ixlan, save=True, data=None, timeout=5, asn=None): """ Sync netixlans under this ixlan from ixf member export json data (specs can be found at https://github.com/euro-ix/json-schemas) Arguments: - ixlan (IXLan): ixlan object to update from ixf Keyword Arguments: - save (bool): commit changes to db - asn (int): only process changes for this ASN Returns: - Tuple(success, netixlans, log) """ self.reset(ixlan=ixlan, save=save, asn=asn) # if data is not provided, retrieve it either from cache or # from the remote resource if data is None: if self.cache_only: data = self.fetch_cached(ixlan.ixf_ixp_member_list_url) else: data = self.fetch(ixlan.ixf_ixp_member_list_url, timeout=timeout) # bail if there has been any errors during sanitize() or fetch() if data.get("pdb_error"): self.log_error(data.get("pdb_error"), save=save) return (False, [], [], self.log) # bail if there are no active prefixes on the ixlan if ixlan.ixpfx_set_active.count() == 0: self.log_error(_("No prefixes defined on ixlan"), save=save) return (False, [], [], self.log) if self.skip_import: return (True, [], [], self.log) try: # parse the ixf data self.parse(data) except KeyError as exc: # any key erros mean that the data is invalid, log the error and # bail (transactions are atomic and will be rolled back) self.log_error("Internal Error 'KeyError': {}".format(exc), save=save) return (False, self.netixlans, [], self.log) # process any netixlans that need to be deleted self.process_deletions() # archive the import so we can roll it back later if needed self.archive() if save: self.save_log() return (True, self.netixlans, self.netixlans_deleted, self.log) @reversion.create_revision() def process_deletions(self): """ Cycles all netixlans on the ixlan targeted by the importer and will remove any that are no longer found in the ixf data by their ip addresses In order for a netixlan to be removed both it's ipv4 and ipv6 address or it's asn need to be gone from the ixf data after validation """ netixlan_qset = self.ixlan.netixlan_set_active # if we are only processing a specific asn ignore # all that dont match if self.asn: netixlan_qset = netixlan_qset.filter(asn=self.asn) for netixlan in netixlan_qset: ipv4 = "{}-{}".format(netixlan.asn, netixlan.ipaddr4) ipv6 = "{}-{}".format(netixlan.asn, netixlan.ipaddr6) if netixlan.asn not in self.asns: self.log_peer( netixlan.asn, "delete", _("ASN no longer in data"), netixlan ) self.netixlans_deleted.append(netixlan) if self.save: netixlan.delete() elif ipv4 not in self.ipaddresses and ipv6 not in self.ipaddresses: self.log_peer( netixlan.asn, "delete", _( "Ip addresses no longer exist in validated data or are " "no longer with this asn" ), netixlan, ) self.netixlans_deleted.append(netixlan) if self.save: netixlan.delete() elif (netixlan.ipaddr4 and ipv4 not in self.ipaddresses) or ( netixlan.ipaddr6 and ipv6 not in self.ipaddresses ): if not netixlan.network.allow_ixp_update: self.log_peer( netixlan.asn, "delete", _( "At least one ipaddress mismatched and " "network has disabled updates" ), netixlan, ) self.netixlans_deleted.append(netixlan) if self.save: netixlan.delete() @transaction.atomic() def archive(self): """ Create the IXLanIXFMemberImportLog for this import """ if self.save and (self.netixlans or self.netixlans_deleted): persist_log = IXLanIXFMemberImportLog.objects.create(ixlan=self.ixlan) for netixlan in self.netixlans + self.netixlans_deleted: versions = reversion.models.Version.objects.get_for_object(netixlan) if len(versions) == 1: version_before = None else: version_before = versions[1] version_after = versions[0] info = self.archive_info.get(netixlan.id, {}) persist_log.entries.create( netixlan=netixlan, version_before=version_before, action=info.get("action"), reason=info.get("reason"), version_after=version_after, ) def parse(self, data): """ Parse ixf data Arguments: - data : result from fetch() """ with transaction.atomic(): self.parse_members(data.get("member_list", [])) def parse_members(self, member_list): """ Parse the `member_list` section of the ixf schema Arguments: - member_list """ for member in member_list: # we only process members of certain types member_type = member.get("member_type", "peering").lower() if member_type in self.allowed_member_types: # check that the as exists in pdb asn = member["asnum"] # if we are only processing a specific asn, ignore all # that dont match if self.asn and asn != self.asn: continue # keep track of asns we find in the ix-f data if asn not in self.asns: self.asns.append(asn) if Network.objects.filter(asn=asn).exists(): network = Network.objects.get(asn=asn) if network.status != "ok": self.log_peer( asn, "ignore", _("Network status is '{}'").format(network.status), ) continue self.parse_connections( member.get("connection_list", []), network, member ) else: self.log_peer( asn, "ignore", _("Network does not exist in peeringdb") ) else: self.log_peer( asn, "ignore", _("Invalid member type: {}").format(member_type) ) def parse_connections(self, connection_list, network, member): """ Parse the 'connection_list' section of the ixf schema Arguments: - connection_list - network : pdb network instance - member : row from ixf member_list """ asn = member["asnum"] for connection in connection_list: state = connection.get("state", "active").lower() if state in self.allowed_states: speed = self.parse_speed(connection.get("if_list", [])) self.parse_vlans( connection.get("vlan_list", []), network, member, connection, speed ) else: self.log_peer( asn, "ignore", _("Invalid connection state: {}").format(state) ) def parse_vlans(self, vlan_list, network, member, connection, speed): """ Parse the 'vlan_list' section of the ixf_schema Arguments: - vlan_list - network : pdb network instance - member : row from ixf member_list - connection : row from ixf connection_list - speed : interface speed """ asn = member["asnum"] for lan in vlan_list: ipv4_valid = False ipv6_valid = False ipv4 = lan.get("ipv4", {}) ipv6 = lan.get("ipv6", {}) # vlan entry has no ipaddresses set, log and ignore if not ipv4 and not ipv6: self.log_error( _( "Could not find ipv4 or 6 address in " "vlan_list entry for vlan_id {} (AS{})" ).format(lan.get("vlan_id"), asn) ) continue ipv4_addr = ipv4.get("address") ipv6_addr = ipv6.get("address") # parse and validate the ipaddresses attached to the vlan # append the ipaddresses to self.ipaddresses so we can # later check them to see which netixlans need to be # dropped during `process_deletions` try: if ipv4_addr: self.ipaddresses.append( "{}-{}".format( asn, ipaddress.ip_address(u"{}".format(ipv4_addr)) ) ) if ipv6_addr: self.ipaddresses.append( "{}-{}".format( asn, ipaddress.ip_address(u"{}".format(ipv6_addr)) ) ) except (ipaddress.AddressValueError, ValueError) as exc: self.log_error( _("Ip address error '{}' in vlan_list entry for vlan_id {}").format( exc, lan.get("vlan_id") ) ) continue netixlan_info = NetworkIXLan( ixlan=self.ixlan, network=network, ipaddr4=ipv4_addr, ipaddr6=ipv6_addr, speed=speed, asn=asn, is_rs_peer=( ipv4.get("routeserver", False) or ipv6.get("routeserver", False) ), ) if not self.save and ( not self.ixlan.test_ipv4_address(ipv4_addr) and not self.ixlan.test_ipv6_address(ipv6_addr) ): # for the preview we don't care at all about new ip addresses # not at the ixlan if they dont match the prefix continue # if connection state is inactive we won't create or update if connection.get("state", "active") == "inactive": self.log_peer( asn, "noop", _("Connection is currently marked as inactive"), netixlan_info, ) continue # after this point we either add or modify the netixlan, so # now is a good time to check if the related network allows # such updates, bail if not if not network.allow_ixp_update: self.log_peer( asn, "noop", _("Network has disabled ixp updates"), netixlan_info ) continue # add / modify the netixlan result = self.ixlan.add_netixlan( netixlan_info, save=self.save, save_others=self.save ) if result["netixlan"] and result["changed"]: self.netixlans.append(result["netixlan"]) if result["created"]: action = "add" reason = _("New ip-address") else: action = "modify" reason = _("Fields changed: {}").format( ", ".join(result.get("changed")) ) self.log_peer(asn, action, reason, result["netixlan"]) elif result["netixlan"]: self.log_peer(asn, "noop", _("No changes"), result["netixlan"]) elif result["log"]: self.log_peer(asn, "ignore", "\n".join(result["log"]), netixlan_info) def parse_speed(self, if_list): """ Parse speed from the 'if_list' section in the ixf data Arguments: - if_list Returns: - speed """ speed = 0 for iface in if_list: try: speed += int(iface.get("if_speed", 0)) except ValueError: self.log_error( _("Invalid speed value: {}").format(iface.get("if_speed")) ) return speed def save_log(self): """ Save the attempt log """ IXLanIXFMemberImportAttempt.objects.update_or_create( ixlan=self.ixlan, defaults={"info": "\n".join(json.dumps(self.log))} ) def reset_log(self): """ Reset the attempt log """ self.log = {"data": [], "errors": []} def log_peer(self, asn, action, reason, netixlan=None): """ log peer action in attempt log Arguments: - asn - action : add | modify | delete | noop | ignore - reason Keyrword Arguments: - netixlan : if set, extra data will be added to the log. """ peer = { "ixlan_id": self.ixlan.id, "ix_id": self.ixlan.ix.id, "ix_name": self.ixlan.ix.name, "asn": asn, } if netixlan: peer.update( { "net_id": netixlan.network_id, "ipaddr4": "{}".format(netixlan.ipaddr4 or ""), "ipaddr6": "{}".format(netixlan.ipaddr6 or ""), "speed": netixlan.speed, "is_rs_peer": netixlan.is_rs_peer, } ) if netixlan.id: self.archive_info[netixlan.id] = { "action": action, "reason": "{}".format(reason), } self.log["data"].append( {"peer": peer, "action": action, "reason": "{}".format(reason),} ) def log_error(self, error, save=False): """ Append error to the attempt log """ self.log["errors"].append("{}".format(error)) if save: self.save_log() class PostMortem(object): """ Generate postmortem report for ix-f import """ def reset(self, asn, **kwargs): """ Reset for a fresh run Argument(s): - asn : asn of the network to run postormem report for Keyword Argument(s): - limit : limit amount of import logs to process max limit is defined by server config `IXF_POSTMORTEM_LIMIT` """ self.asn = asn self.limit = kwargs.get("limit", 100) self.post_mortem = [] def generate(self, asn, **kwargs): """ Generate and return a new postmortem report Argument(s): - asn : asn of the network to run postmortem report for Keyword Argument(s): - limit : limit amount of import logs to process max limit is defined by server config `IXF_POSTMORTEM_LIMIT` Returns: - dict: postmortem report """ self.reset(asn, **kwargs) self._process_logs(limit=self.limit) return self.post_mortem def _process_logs(self, limit=100): """ Process IX-F import logs KeywordArgument(s): - limit : limit amount of import logs to process max limit is defined by server config `IXF_POSTMORTEM_LIMIT` """ # we only want import logs that actually touched the specified # asn qset = IXLanIXFMemberImportLogEntry.objects.filter(netixlan__asn=self.asn) qset = qset.exclude(action__isnull=True) qset = qset.order_by("-log__created") qset = qset.select_related("log", "netixlan", "log__ixlan", "log__ixlan__ix") for entry in qset[:limit]: self._process_log_entry(entry.log, entry) def _process_log_entry(self, log, entry): """ Process a single IX-F import log entry Argument(s): - log - entry """ if entry.netixlan.asn != self.asn: return data = entry.version_after.field_dict if data.get("asn") != self.asn: return if data.get("ipaddr4"): ipaddr4 = "{}".format(data.get("ipaddr4")) else: ipaddr4 = None if data.get("ipaddr6"): ipaddr6 = "{}".format(data.get("ipaddr6")) else: ipaddr6 = None self.post_mortem.append( { "ix_id": log.ixlan.ix.id, "ix_name": log.ixlan.ix.name, "ixlan_id": log.ixlan.id, "changes": entry.changes, "reason": entry.reason, "action": entry.action, "asn": data.get("asn"), "ipaddr4": ipaddr4, "ipaddr6": ipaddr6, "speed": data.get("speed"), "is_rs_peer": data.get("is_rs_peer"), "created": log.created.strftime("%Y-%m-%d %H:%M:%S"), } )