1
0
mirror of https://github.com/peeringdb/peeringdb.git synced 2024-05-11 05:55:09 +00:00
Files
peeringdb-peeringdb/peeringdb_server/api_schema.py
Matt Griswold abe3e78d50 Dotf fixes (#781)
* fix issue where ix-f import would raise suggestions ipaddresses not that ixlan (#764)

* IX-F Suggestions: Leaving the editor and returning to it via back button issues (#765)

* IX-F importer: Clicking "Preview" (IXP Update Tools) on /net/ page resulted in 170 ticket resolutions (#769)
More robust testing

* black formatting (was lost after pyupgrade)

* Add regex searching to deskpro ticket subjects

* Change operational error

* IX-F suggestions: consolidate delete+add (#770)

* Add reset functions to commandline tool

* Fix commandline tool bugs

* Fix reset commandline tool bugs

* add commandline tool

* Ixlan needs to be set for import commandline tool

* Add email model

* Add admin view to emails

* Allow network and ix to be null

* save emails as part of ixf import

* Add email model

* Add email delete

* add iregex search and better comments

* fix ixlan selection for import

* redefine migration dependencies for this branch

* only enable search w start and end char

* Add caption to regex search

* Remove delete all ixfmemberdata option

* [beta] IX-F importer: don't bother about missing IPv{4,6} address when network is not doing IPv{4,6} (#771)

* Add cmdline tests

* Resolve email conflicts

* Add cmd tool reset tests

* add autocomplete to commandline tool

* Fix email bugs

* Fix email migrations

* Fix typos

* [beta] IX-F importer: prevent Admin Committee overload by initially limiting importer to IXes enabled by AC (#772)

* Finalize regex search for emails and deskprotickets

* Fix keyword bug

* fix typo

* protocol-conflict will now be handled in the notification consolidation

771 changes where if the network indicates neither ipv4 nor ipv6 support, it is handled as supporting both (eg the network didnt configure these at all)

realised that the importer command re instantiates the `Importer` class for each ixlan it processes, so moved the sending of consolidated notifications (#772) out of the `update` function and into the command itself after its done processing all the ixlans. This means for tests you will need to call `importer.notify_proposals` after `importer.update` to test the consolidated notifications.

fixed several MultipleObjectsReturned errors when network switch protocol support in between imports

* should be checking for "ix" in the form data (#773)

* Fix cmd ixf tests

* fix issue in log_peer

* Add commit check for reset tool

* fix importer bugs

* remove dupe IXFImportEmail definition

* ixfimportemail support ix__name and net__name searching

* ticket resolution responses

* Add commit to command ixf import changes

* fix modify entry header

* remove whitespace in notification about remote data changes

* Begin updating tests

* ixf-import command line tool to queue

* refactor conflict inserts

* Update import protocol tests, including tests for 770

* More test edits

* Change cmd tests

* better ixfmemberdata error handling and fix some test data

* dont reset the same ixfmemberdata requirement

* fix many bugs add many tests

* remove debug message

* fix bug during import when consolidating delete+add

* fix perfomance issue in IXFMemberData listing

* dont show reset flags on prod env

* Add regex search tests

* Add 772 tests

* remove debug output

* fix `test_resolve_deskpro_ticket` test

* black formatting

* remove dupe import

* fix issue with unique constraint error handling

* add test for ixp / network ip protocol notification

* add missing test data

Co-authored-by: Stefan Pratter <stefan@20c.com>
Co-authored-by: Elliot Frank <elliot@20c.com>
2020-07-27 04:36:27 +00:00

578 lines
18 KiB
Python

import re
from django.utils.translation import ugettext as _
from django.conf import settings
from rest_framework.schemas.openapi import AutoSchema
class CustomField:
def __init__(self, typ, help_text=""):
self.typ = typ
self.help_text = help_text
def get_internal_type(self):
return self.typ
class BaseSchema(AutoSchema):
"""
Augments the openapi schema generation for
the peeringdb API docs
"""
# map django internal types to openapi types
numeric_fields = [
"IntegerField",
"PositiveIntegerField",
"DecimalField",
"DateField",
"DateTimeField",
"ForeignKey",
"AutoField",
]
text_fields = [
"TextField",
"CharField",
]
other_fields = [
"BooleanField",
"TextSearchField",
"NumberSearchField",
]
type_map = {
"ForeignKey": "integer",
"PositiveIntegerField": "integer",
"IntegerField": "integer",
"AutoField": "integer",
"BooleanField": "integer",
"DecimalField": "number",
"DateTimeField": "date-time",
"DateField": "date",
"NumberSearchField": "number",
}
def get_operation_type(self, *args):
"""
Determine if this is a list retrieval operation
"""
method = args[1]
if method == "GET" and "{id}" not in args[0]:
return "list"
elif method == "GET":
return "retrieve"
elif method == "POST":
return "create"
elif method == "PUT":
return "update"
elif method == "DELETE":
return "delete"
elif method == "PATCH":
return "patch"
return method.lower()
def _get_operation_id(self, path, method):
"""
We override this so operation ids become "{op} {reftag}"
"""
serializer, model = self.get_classes(path, method)
op_type = self.get_operation_type(path, method)
if model:
return f"{op_type} {model.HandleRef.tag}"
return super()._get_operation_id(path, method)
def get_operation(self, *args, **kwargs):
"""
We override this so we can augment the operation dict
for an openapi schema operation
"""
op_dict = super().get_operation(*args, **kwargs)
op_type = self.get_operation_type(*args)
# check if we have an augmentation method set for the
# operation type, if so run it
augment = getattr(self, f"augment_{op_type}_operation", None)
if augment:
augment(op_dict, args)
# attempt to relate a serializer and a model class to the operation
serializer, model = self.get_classes(*args)
# if we were able to get a model we want to include the markdown documentation
# for the model type in the openapi description field (docs/api/obj_*.md)
if model:
obj_descr_file = settings.API_DOC_INCLUDES.get(
f"obj_{model.HandleRef.tag}", ""
)
if obj_descr_file:
with open(obj_descr_file) as fh:
op_dict["description"] += "\n\n" + fh.read()
# check if we have an augmentation method set for the operation_type and object type
# combination, if so run it
augment = getattr(self, f"augment_{op_type}_{model.HandleRef.tag}", None)
if augment:
augment(serializer, model, op_dict)
# include the markdown documentation for the operation type (docs/api/op_*.md)
op_descr_file = settings.API_DOC_INCLUDES.get(f"op_{op_type}", "")
if op_descr_file:
with open(op_descr_file) as fh:
op_dict["description"] += "\n\n" + fh.read()
return op_dict
def get_classes(self, *op_args):
"""
Try to relate a serializer and model class to the openapi operation
Returns:
- tuple(serializers.Serializer, models.Model)
"""
serializer = self._get_serializer(*op_args)
model = None
if hasattr(serializer, "Meta"):
if hasattr(serializer.Meta, "model"):
model = serializer.Meta.model
return (serializer, model)
def augment_retrieve_operation(self, op_dict, op_args):
"""
Augment openapi schema for single object retrieval
"""
parameters = op_dict.get("parameters")
serializer, model = self.get_classes(*op_args)
if not model:
return
op_dict["description"] = _(
"Retrieves a single `{obj_type}` type object by id"
).format(obj_type=model.HandleRef.tag)
parameters.append(
{
"name": "depth",
"in": "query",
"required": False,
"description": _("Expand nested sets according to depth."),
"schema": {"type": "integer", "default": 1, "minimum": 0, "maximum": 2},
}
)
def augment_list_operation(self, op_dict, op_args):
"""
Augment openapi schema for object listings
"""
parameters = op_dict.get("parameters")
serializer, model = self.get_classes(*op_args)
if not model:
return
op_dict["description"] = _(
"Retrieves a list of `{obj_type}` type objects"
).format(obj_type=model.HandleRef.tag)
parameters.extend(
[
{
"name": "depth",
"in": "query",
"required": False,
"description": _("Expand nested sets according to depth."),
"schema": {
"type": "integer",
"default": 0,
"minimum": 0,
"maximum": 2,
},
},
{
"name": "limit",
"in": "query",
"required": False,
"description": _(
"Limit the number of rows returned in the result set (use for pagination in combination with `skip`)"
),
"schema": {"type": "integer",},
},
{
"name": "skip",
"in": "query",
"required": False,
"description": _(
"Skip n results in the result set (use for pagination in combination with `limit`)"
),
"schema": {"type": "integer",},
},
{
"name": "since",
"in": "query",
"required": False,
"description": _(
"Unix epoch time stamp (seconds). Only return objects that have been updated since then"
),
"schema": {"type": "integer"},
},
]
)
self.augment_list_filters(model, serializer, parameters)
op_dict.update(parameters=sorted(parameters, key=lambda x: x["name"]))
def augment_list_filters(self, model, serializer, parameters):
"""
Further augment openapi schema for object listing by filling
the query parameter list with all the possible query filters
for the object
"""
field_names = [
(fld.name, fld) for fld in model._meta.get_fields()
] + serializer.queryable_relations()
custom_filter_fields = getattr(
self, f"{model.HandleRef.tag}_list_filter_fields", None
)
if custom_filter_fields:
field_names.extend(custom_filter_fields())
blocked_prefixes = []
for field, fld in field_names:
typ = fld.get_internal_type()
supported_filters = []
# while sending queries to nested set fields sort of works
# it's currently not officially supported and results may not
# be what is expected.
#
# hide these query possibilities from the documentation
blocked = False
for prefix in blocked_prefixes:
if field.find(prefix) == 0:
blocked = True
break
if blocked:
continue
elif typ == "ForeignKey" and (fld.one_to_many or hasattr(fld, "multiple")):
# mark prefix of nested object as blocked so we
# dont expose it's fields to the documentation
blocked_prefixes.append(f"{field}__")
continue
elif typ in self.numeric_fields:
supported_filters = ["`__lt`", "`__gt`", "`__lte`", "`__gte`", "`__in`"]
elif typ in self.text_fields:
supported_filters = ["`__startswith`", "`__contains`", "`__in`"]
elif typ in self.other_fields:
supported_filters = []
else:
continue
description = [
_("Filter results by matching a value against this field."),
]
# if field has a help_text set, append it to description
help_text = getattr(fld, "help_text", None)
if help_text:
description.insert(0, f"{help_text}")
# if field has choices defined, append them to the description
choices = getattr(fld, "choices", None)
if choices:
description.append(
"{}".format(", ".join([f"`{_id}`" for _id, label in choices]))
)
if supported_filters:
description.append(
_("Supported filter suffixes: {}").format(
", ".join(supported_filters)
)
)
parameters.append(
{
"name": re.sub(
"^facility__", "fac__", re.sub("^network__", "net__", field)
),
"in": "query",
"description": "\n\n".join(description),
"required": False,
"schema": {"type": self.type_map.get(typ, "string"),},
}
)
def fac_list_filter_fields(self):
return [
(
"net_count",
CustomField(
"IntegerField", _("Number of networks present at this facility")
),
),
]
def net_list_filter_fields(self):
return [
(
"name_search",
CustomField(
"TextSearchField",
_("Targets both AKA and name fields for filtering"),
),
),
(
"not_ix",
CustomField(
"NumberSearchField",
_("Find networks not present at an exchange (exchange id)"),
),
),
(
"ix",
CustomField(
"NumberSearchField",
_("Find networks present at exchange (exchange id)"),
),
),
(
"not_fac",
CustomField(
"NumberSearchField",
_("Find networks not present at a facility (facility id)"),
),
),
(
"fac",
CustomField(
"NumberSearchField",
_("Find networks present at a facility (facility id)"),
),
),
(
"ixlan",
CustomField(
"NumberSearchField",
_("Find networks connected at ixlan (ixlan id)"),
),
),
(
"netixlan",
CustomField(
"NumberSearchField",
_("Find the network that contains this netixlan (netixlan id)"),
),
),
(
"netfac",
CustomField(
"NumberSearchField",
_("Find the network that this netfac belongs to (netfac id)"),
),
),
]
def ix_list_filter_fields(self):
return [
(
"fac",
CustomField(
"NumberSearchField",
_("Find exchanges present at a facility (facility id)"),
),
),
(
"ixlan",
CustomField(
"NumberSearchField",
_("Find the exchange that contains this ixlan (ixlan id)"),
),
),
(
"ixfac",
CustomField(
"NumberSearchField",
_("Find the exchange that contains this ixfac (ixfac id)"),
),
),
(
"net_count",
CustomField(
"IntegerField", _("Number of networks present at this exchange")
),
),
(
"net",
CustomField(
"NumberSearchField",
_("Find exchanges where this network has a presence at (net id)"),
),
),
]
def org_list_filter_fields(self):
return [
(
"asn",
CustomField(
"NumberSearchField",
_("Find organization that contains the network (network asn)"),
),
),
]
def ixpfx_list_filter_fields(self):
return [
(
"ix",
CustomField(
"NumberSearchField", _("Find prefixes by exchange (exchange id)")
),
),
(
"whereis",
CustomField("TextSearchField", _("Find prefixes by ip address")),
),
]
def netixlan_list_filter_fields(self):
return [
("name", CustomField("CharField", _("Exchange name"))),
]
def netfac_list_filter_fields(self):
return [
("name", CustomField("CharField", _("Facility name"))),
("country", CustomField("CharField", _("Facility country"))),
("city", CustomField("CharField", _("Facility city"))),
]
def augment_create_operation(self, op_dict, op_args):
"""
Augment openapi schema for object creation
"""
parameters = op_dict.get("parameters")
serializer, model = self.get_classes(*op_args)
if not model:
return
op_dict["description"] = _("Creates a new `{obj_type}` type object.").format(
obj_type=model.HandleRef.tag
)
def request_body_schema(self, op_dict, content="application/json"):
"""
Helper function that return the request body schema
for the specified content type
"""
return (
op_dict.get("requestBody", {})
.get("content", {})
.get(content, {})
.get("schema", {})
)
def augment_create_ix(self, serializer, model, op_dict):
"""
Augment openapi schema for create ix operation
"""
request_body_schema = self.request_body_schema(op_dict)
# during creation the `prefix` field should be marked as required
request_body_schema["required"].extend(["prefix"])
def augment_update_ix(self, serializer, model, op_dict):
"""
Augment openapi schema for update ix operation
"""
request_body_schema = self.request_body_schema(op_dict)
properties = request_body_schema["properties"]
# prefix on `update` will be ignored
del properties["prefix"]
# suggest on `update` will be ignored
del properties["suggest"]
def augment_update_fac(self, serializer, model, op_dict):
"""
Augment openapi schema for update fac operation
"""
request_body_schema = self.request_body_schema(op_dict)
properties = request_body_schema["properties"]
# suggest on `update` will be ignored
del properties["suggest"]
def augment_update_net(self, serializer, model, op_dict):
"""
Augment openapi schema for update net operation
"""
request_body_schema = self.request_body_schema(op_dict)
properties = request_body_schema["properties"]
# suggest on `update` will be ignored
del properties["suggest"]
def augment_update_operation(self, op_dict, op_args):
"""
Augment openapi schema for update operation
"""
parameters = op_dict.get("parameters")
serializer, model = self.get_classes(*op_args)
if not model:
return
op_dict["description"] = _(
"Updates an existing `{obj_type}` type object."
).format(obj_type=model.HandleRef.tag)
def augment_delete_operation(self, op_dict, op_args):
"""
Augment openapi schema for delete operation
"""
parameters = op_dict.get("parameters")
serializer, model = self.get_classes(*op_args)
if not model:
return
op_dict["description"] = _(
"Marks an `{obj_type}` type object as `deleted`."
).format(obj_type=model.HandleRef.tag)