1
0
mirror of https://github.com/peeringdb/peeringdb.git synced 2024-05-11 05:55:09 +00:00
Files
peeringdb-peeringdb/peeringdb_server/mock.py
Matt Griswold fbc72ea682 Support 202104 (#980)
* Add migration for service level and terms

* Add service level and terms to UI and serializer, as well as data/enum

* Wire up data/enum endpoint and loader

* remove proto_ from ix UI

* derive fields for proto_unicast and proto_ipv6

* update tests for readonly fields

* Fix query for protocols

* Fix api bug with protocol

* add readonly fields to django admin

* rename readonly fields

* Add translation to names

* Add pdb api test for suggested facility re-add

* Add printing debuggin test

* add printing debugging serializer

* Update _undelete with _reapprove to handle pending cases

* Update tests (one is still failing)

* adjust suggest test

* Add ix_count to fac (834)

* Add test for ix_count on fac (834)

* Add fac_count to IX (836)

* add ix_count and fac_count to Network

* Refactor ix net_count filtering

* Add filtering for 834, 835, 836

* Remove duplicates from the Network's ix_count

* Setup Network for ix_count and fac_count (835)

* initial obj_counts for Facilities and Exchanges

* Add signals for updates to all counts

* add migration

* Add print statements to test

* introduce reversion to tests

* rename network count to net count across codebase

* fix network_count typo

* add migration to set default vals

* fix filter tests for obj_counts

* speed up migration

* fix failing tests

* fix final test

* sort out migration tree and add fac offered fields

* update frontend for facility dropdown offered_resilience

* First pass at advanced api search for user story 1

* melissa geo lookup first steps

* fix migration hierarchy

* working melissa integration

* begin ending filters for api endpoints

* add more org_present endpoints

* add search for IXs that match multiple networks

* extend logic to facility

* Add service level and terms to advanced search

* use address2 field for lookup

* melissa tests

* cleanup and docs

* uncomment offered_power

* developed offered_power component

* fix geo normalize existing cmd
normalize state

* change migration to match django-peeringdb

* add offered_space field

* Fill out remaining api filter fields

* Add org_not_present endpoint filter

* fix unit input ux

* more ux fixes

* remove merge cruft

* google for geocoding
various melissa improvements (consider result quality)

* fix tests

* refactor org_preset and org_not_present queries

* ix capacity api filters

* ix capacity filters for #802
advanced search ux for #802

* finalize advanced search UX for #802

* css fixes

* remove cruft

* fix net_count fac_count queries

* add new fields to create facility (#800)
tests for #802 and #800

* fix tests

* remove #800 changes

* fix capacity search

* more #800 changes to remove

* django-peeringdb 2.7.0 and pipenv relock

* black format

* pin black version

Co-authored-by: Elliot Frank <elliot@20c.com>
Co-authored-by: Stefan Pratter <stefan@20c.com>
2021-05-19 08:11:30 -05:00

292 lines
8.4 KiB
Python

import datetime
import uuid
import ipaddress
from django.db import models
from peeringdb_server.models import REFTAG_MAP
class Mock:
"""
Class that allows us to create mock data in the database
NOTE: this actually writes data to the database and should
only be used to populate a dev instance.
"""
def __init__(self):
self._asn = 63311
# Pool of IPv4 Prefixes
# TODO - automatic generation via ipaddress module
self.prefix_pool_v4 = [
"206.126.236.0/22",
"208.115.136.0/23",
"206.223.118.0/24",
"206.223.123.0/24",
"206.223.116.0/23",
]
# Pool of IPv6 Prefixes
# TODO - automatic generation via ipaddrsss module
self.prefix_pool_v6 = [
"2001:504:0:2::/64",
"2001:504:0:4::/64",
"2001:504:0:5::/64",
"2001:504:0:3::/64",
"2001:504:0:1::/64",
]
# helper function that allows us to retrieve n valid
# hosts from an ipaddress space (prefix)
def get_hosts(network, count=100):
n = 0
for host in network.hosts():
if n > count:
break
n += 1
yield host
# Pool of IPv4 addresses (100 per prefix)
self.ipaddr_pool_v4 = {
prefix: list(get_hosts(ipaddress.IPv4Network(prefix)))
for prefix in self.prefix_pool_v4
}
# Pool of IPv6 addresses (100 per prefix)
self.ipaddr_pool_v6 = {
prefix: list(get_hosts(ipaddress.IPv6Network(prefix)))
for prefix in self.prefix_pool_v6
}
def create(self, reftag, **kwargs):
"""
Create a new instance of model specified in `reftag`
Any arguments passed as kwargs will override mock field values
Note: Unless if there are no relationships passed in kwargs, required parent
objects will be automatically created as well.
Returns: The created instance
"""
model = REFTAG_MAP.get(reftag)
data = {}
data.update(**kwargs)
# first we create any required parent relation ships
for field in model._meta.get_fields():
if field.name in data:
continue
if field.is_relation and field.many_to_one:
if hasattr(field.related_model, "ref_tag"):
data[field.name] = self.create(field.related_model.handleref.tag)
# then we set the other fields to mock values provided by this class
for field in model._meta.get_fields():
# field value specified alrady, skip
if field.name in data:
continue
# these we don't care about
if field.name in ["id", "logo", "version", "created", "updated"]:
continue
# if reftag == "ixlan" and field.name != "id":
# continue
# elif reftag != "ixlan":
# continue
# this we don't care about either
if field.name.find("geocode") == 0:
continue
# choice fields should automatically select a value from
# the available choices
#
# PDB choices often have Not Disclosed at index 0 and 1
# so we try index 2 first.
if (
not field.is_relation
and field.choices
and not hasattr(self, field.name)
):
try:
data[field.name] = field.choices[2][0]
except IndexError:
data[field.name] = field.choices[0][0]
# bool fields set to True
elif isinstance(field, models.BooleanField):
data[field.name] = True
# every other field
elif not field.is_relation:
# emails
if field.name.find("email") > -1:
data[field.name] = "test@peeringdb.com"
# phone numbers
elif field.name.find("phone") > -1:
data[field.name] = "+12065550199"
# URLs
elif field.name.find("url") > -1:
data[field.name] = "{}/{}".format(
self.website(data, reftag=reftag), field.name
)
# everything else is routed to the apropriate method
# with the same name as the field name
else:
data[field.name] = getattr(self, field.name)(data, reftag=reftag)
obj = model(**data)
obj.clean()
obj.save()
return obj
def id(self, data, reftag=None):
if reftag == "ixlan":
return data["ix"].id
return None
def status(self, data, reftag=None):
return "ok"
def address1(self, data, reftag=None):
return "Address line 1"
def address2(self, data, reftag=None):
return "Address line 2"
def state(self, data, reftag=None):
return "Illinois"
def zipcode(self, data, reftag=None):
return "12345"
def website(self, data, reftag=None):
return "https://www.peeringdb.com"
def notes(self, data, reftag=None):
return "Some notes"
def city(self, data, reftag=None):
return "Chicago"
def suite(self, data, reftag=None):
return ""
def floor(self, data, reftag=None):
return ""
def netixlan_updated(self, data, reftag=None):
return None
def poc_updated(self, data, reftag=None):
return None
def netfac_updated(self, data, reftag=None):
return None
def latitude(self, data, reftag=None):
return 0.0
def longitude(self, data, reftag=None):
return 0.0
def country(self, data, reftag=None):
return "US"
def name(self, data, reftag=None):
return "{} {}".format(reftag, str(uuid.uuid4())[:8])
def name_long(self, data, reftag=None):
return self.name(data, reftag=reftag)
def asn(self, data, reftag=None):
if reftag == "netixlan":
return data["network"].asn
self._asn += 1
return self._asn
def aka(self, data, reftag=None):
return self.name(data, reftag=reftag)
def irr_as_set(self, data, reftag=None):
return "AS-{}@RIPE".format(str(uuid.uuid4())[:8].upper())
def looking_glass(self, data, reftag=None):
return "{}/looking-glass".format(self.website(data, reftag=reftag))
def route_server(self, data, reftag=None):
return "{}/route-server".format(self.website(data, reftag=reftag))
def notes_private(self, data, reftag=None):
return "Private notes"
def info_prefixes4(self, data, reftag=None):
return 50000
def info_prefixes6(self, data, reftag=None):
return 5000
def clli(self, data, reftag=None):
return str(uuid.uuid4())[:6].upper()
def rencode(self, data, reftag=None):
return ""
def npanxx(self, data, reftag=None):
return "123-456"
def descr(self, data, reftag=None):
return "Arbitrary description"
def mtu(self, data, reftag=None):
return 1500
def vlan(self, data, reftag=None):
return None
def rs_asn(self, data, reftag=None):
return self.asn(data, reftag=reftag)
def local_asn(self, data, reftag=None):
return data["network"].asn
def arp_sponge(self, data, reftag=None):
return None
def prefix(self, data, reftag=None):
if data.get("protocol") == "IPv4":
return f"{self.prefix_pool_v4.pop()}"
elif data.get("protocol") == "IPv6":
return f"{self.prefix_pool_v6.pop()}"
def ipaddr4(self, data, reftag=None):
prefix = data["ixlan"].ixpfx_set.filter(protocol="IPv4").first().prefix
return "{}".format(self.ipaddr_pool_v4[f"{prefix}"].pop())
def ipaddr6(self, data, reftag=None):
prefix = data["ixlan"].ixpfx_set.filter(protocol="IPv6").first().prefix
return "{}".format(self.ipaddr_pool_v6[f"{prefix}"].pop())
def speed(self, data, reftag=None):
return 1000
def ixf_net_count(self, data, reftag=None):
return 0
def ixf_last_import(self, data, reftag=None):
return None
def ix_count(self, data, reftag=None):
return 0
def fac_count(self, data, reftag=None):
return 0
def net_count(self, data, reftag=None):
return 0