1
0
mirror of https://github.com/peeringdb/peeringdb.git synced 2024-05-11 05:55:09 +00:00
Files
peeringdb-peeringdb/peeringdb_server/mock.py
Matt Griswold abe3e78d50 Dotf fixes (#781)
* fix issue where ix-f import would raise suggestions ipaddresses not that ixlan (#764)

* IX-F Suggestions: Leaving the editor and returning to it via back button issues (#765)

* IX-F importer: Clicking "Preview" (IXP Update Tools) on /net/ page resulted in 170 ticket resolutions (#769)
More robust testing

* black formatting (was lost after pyupgrade)

* Add regex searching to deskpro ticket subjects

* Change operational error

* IX-F suggestions: consolidate delete+add (#770)

* Add reset functions to commandline tool

* Fix commandline tool bugs

* Fix reset commandline tool bugs

* add commandline tool

* Ixlan needs to be set for import commandline tool

* Add email model

* Add admin view to emails

* Allow network and ix to be null

* save emails as part of ixf import

* Add email model

* Add email delete

* add iregex search and better comments

* fix ixlan selection for import

* redefine migration dependencies for this branch

* only enable search w start and end char

* Add caption to regex search

* Remove delete all ixfmemberdata option

* [beta] IX-F importer: don't bother about missing IPv{4,6} address when network is not doing IPv{4,6} (#771)

* Add cmdline tests

* Resolve email conflicts

* Add cmd tool reset tests

* add autocomplete to commandline tool

* Fix email bugs

* Fix email migrations

* Fix typos

* [beta] IX-F importer: prevent Admin Committee overload by initially limiting importer to IXes enabled by AC (#772)

* Finalize regex search for emails and deskprotickets

* Fix keyword bug

* fix typo

* protocol-conflict will now be handled in the notification consolidation

771 changes where if the network indicates neither ipv4 nor ipv6 support, it is handled as supporting both (eg the network didnt configure these at all)

realised that the importer command re instantiates the `Importer` class for each ixlan it processes, so moved the sending of consolidated notifications (#772) out of the `update` function and into the command itself after its done processing all the ixlans. This means for tests you will need to call `importer.notify_proposals` after `importer.update` to test the consolidated notifications.

fixed several MultipleObjectsReturned errors when network switch protocol support in between imports

* should be checking for "ix" in the form data (#773)

* Fix cmd ixf tests

* fix issue in log_peer

* Add commit check for reset tool

* fix importer bugs

* remove dupe IXFImportEmail definition

* ixfimportemail support ix__name and net__name searching

* ticket resolution responses

* Add commit to command ixf import changes

* fix modify entry header

* remove whitespace in notification about remote data changes

* Begin updating tests

* ixf-import command line tool to queue

* refactor conflict inserts

* Update import protocol tests, including tests for 770

* More test edits

* Change cmd tests

* better ixfmemberdata error handling and fix some test data

* dont reset the same ixfmemberdata requirement

* fix many bugs add many tests

* remove debug message

* fix bug during import when consolidating delete+add

* fix perfomance issue in IXFMemberData listing

* dont show reset flags on prod env

* Add regex search tests

* Add 772 tests

* remove debug output

* fix `test_resolve_deskpro_ticket` test

* black formatting

* remove dupe import

* fix issue with unique constraint error handling

* add test for ixp / network ip protocol notification

* add missing test data

Co-authored-by: Stefan Pratter <stefan@20c.com>
Co-authored-by: Elliot Frank <elliot@20c.com>
2020-07-27 04:36:27 +00:00

268 lines
7.9 KiB
Python

import datetime
import uuid
import ipaddress
from django.db import models
from peeringdb_server.models import REFTAG_MAP
class Mock:
"""
Class that allows us to create mock data in the database
NOTE: this actually writes data to the database and should
only be used to populate a dev instance.
"""
def __init__(self):
self._asn = 63311
# Pool of IPv4 Prefixes
# TODO - automatic generation via ipaddress module
self.prefix_pool_v4 = [
"206.126.236.0/22",
"208.115.136.0/23",
"206.223.118.0/24",
"206.223.123.0/24",
"206.223.116.0/23",
]
# Pool of IPv6 Prefixes
# TODO - automatic generation via ipaddrsss module
self.prefix_pool_v6 = [
"2001:504:0:2::/64",
"2001:504:0:4::/64",
"2001:504:0:5::/64",
"2001:504:0:3::/64",
"2001:504:0:1::/64",
]
# helper function that allows us to retrieve n valid
# hosts from an ipaddress space (prefix)
def get_hosts(network, count=100):
n = 0
for host in network.hosts():
if n > count:
break
n += 1
yield host
# Pool of IPv4 addresses (100 per prefix)
self.ipaddr_pool_v4 = {
prefix: list(get_hosts(ipaddress.IPv4Network(prefix)))
for prefix in self.prefix_pool_v4
}
# Pool of IPv6 addresses (100 per prefix)
self.ipaddr_pool_v6 = {
prefix: list(get_hosts(ipaddress.IPv6Network(prefix)))
for prefix in self.prefix_pool_v6
}
def create(self, reftag, **kwargs):
"""
Create a new instance of model specified in `reftag`
Any arguments passed as kwargs will override mock field values
Note: Unless if there are no relationships passed in kwargs, required parent
objects will be automatically created as well.
Returns: The created instance
"""
model = REFTAG_MAP.get(reftag)
data = {}
data.update(**kwargs)
# first we create any required parent relation ships
for field in model._meta.get_fields():
if field.name in data:
continue
if field.is_relation and field.many_to_one:
if hasattr(field.related_model, "ref_tag"):
data[field.name] = self.create(field.related_model.handleref.tag)
# then we set the other fields to mock values provided by this class
for field in model._meta.get_fields():
# field value specified alrady, skip
if field.name in data:
continue
# these we don't care about
if field.name in ["id", "logo", "version", "created", "updated"]:
continue
# if reftag == "ixlan" and field.name != "id":
# continue
# elif reftag != "ixlan":
# continue
# this we dont care about either
if field.name.find("geocode") == 0:
continue
# choice fields should automatically select a value from
# the available choices
#
# PDB choices often have Not Disclosed at index 0 and 1
# so we try index 2 first.
if (
not field.is_relation
and field.choices
and not hasattr(self, field.name)
):
try:
data[field.name] = field.choices[2][0]
except IndexError:
data[field.name] = field.choices[0][0]
# bool fields set to True
elif isinstance(field, models.BooleanField):
data[field.name] = True
# every other field
elif not field.is_relation:
# emails
if field.name.find("email") > -1:
data[field.name] = "test@peeringdb.com"
# phone numbers
elif field.name.find("phone") > -1:
data[field.name] = "+12065550199"
# URLs
elif field.name.find("url") > -1:
data[field.name] = "{}/{}".format(
self.website(data, reftag=reftag), field.name
)
# everything else is routed to the apropriate method
# with the same name as the field name
else:
data[field.name] = getattr(self, field.name)(data, reftag=reftag)
obj = model(**data)
obj.clean()
obj.save()
return obj
def id(self, data, reftag=None):
if reftag == "ixlan":
return data["ix"].id
return None
def status(self, data, reftag=None):
return "ok"
def address1(self, data, reftag=None):
return "Address line 1"
def address2(self, data, reftag=None):
return "Address line 2"
def state(self, data, reftag=None):
return "Illinois"
def zipcode(self, data, reftag=None):
return "12345"
def website(self, data, reftag=None):
return "https://www.peeringdb.com"
def notes(self, data, reftag=None):
return "Some notes"
def city(self, data, reftag=None):
return "Chicago"
def latitude(self, data, reftag=None):
return 0.0
def longitude(self, data, reftag=None):
return 0.0
def country(self, data, reftag=None):
return "US"
def name(self, data, reftag=None):
return "{} {}".format(reftag, str(uuid.uuid4())[:8])
def name_long(self, data, reftag=None):
return self.name(data, reftag=reftag)
def asn(self, data, reftag=None):
if reftag == "netixlan":
return data["network"].asn
self._asn += 1
return self._asn
def aka(self, data, reftag=None):
return self.name(data, reftag=reftag)
def irr_as_set(self, data, reftag=None):
return "AS-{}@RIPE".format(str(uuid.uuid4())[:8].upper())
def looking_glass(self, data, reftag=None):
return "{}/looking-glass".format(self.website(data, reftag=reftag))
def route_server(self, data, reftag=None):
return "{}/route-server".format(self.website(data, reftag=reftag))
def notes_private(self, data, reftag=None):
return "Private notes"
def info_prefixes4(self, data, reftag=None):
return 50000
def info_prefixes6(self, data, reftag=None):
return 5000
def clli(self, data, reftag=None):
return str(uuid.uuid4())[:6].upper()
def rencode(self, data, reftag=None):
return ""
def npanxx(self, data, reftag=None):
return "123-456"
def descr(self, data, reftag=None):
return "Arbitrary description"
def mtu(self, data, reftag=None):
return 1500
def vlan(self, data, reftag=None):
return None
def rs_asn(self, data, reftag=None):
return self.asn(data, reftag=reftag)
def local_asn(self, data, reftag=None):
return data["network"].asn
def arp_sponge(self, data, reftag=None):
return None
def prefix(self, data, reftag=None):
if data.get("protocol") == "IPv4":
return f"{self.prefix_pool_v4.pop()}"
elif data.get("protocol") == "IPv6":
return f"{self.prefix_pool_v6.pop()}"
def ipaddr4(self, data, reftag=None):
prefix = data["ixlan"].ixpfx_set.filter(protocol="IPv4").first().prefix
return "{}".format(self.ipaddr_pool_v4[f"{prefix}"].pop())
def ipaddr6(self, data, reftag=None):
prefix = data["ixlan"].ixpfx_set.filter(protocol="IPv6").first().prefix
return "{}".format(self.ipaddr_pool_v6[f"{prefix}"].pop())
def speed(self, data, reftag=None):
return 1000
def ixf_net_count(self, data, reftag=None):
return 0
def ixf_last_import(self, data, reftag=None):
return None