1
0
mirror of https://github.com/peeringdb/peeringdb.git synced 2024-05-11 05:55:09 +00:00
Files
Stefan Pratter 0187e99377 Support 202403 (#1594)
* Support 202403 prepare

* refactor peeringdb_server/management/commands/pdb_delete_outdated_pending_affil_request.py for improved code structure
fix tests/test_settings.py::TestAutoApproveAffiliation::test_setting by using rdap mocking

* db schema docs and api docs regen

* rir should be RIR

---------

Co-authored-by: 20C <code@20c.com>
2024-04-15 09:03:24 -05:00

559 lines
18 KiB
Python

"""
Define export views used for IX-F export and advanced search file download.
"""
import collections
import csv
import datetime
import json
import os
import urllib.error
import urllib.parse
import urllib.request
from django.conf import settings
from django.http import HttpResponse, JsonResponse
from django.utils.html import escape
from django.utils.translation import gettext_lazy as _
from django.views import View
from rest_framework.test import APIRequestFactory
from simplekml import Kml, Style
from peeringdb_server.models import Campus, InternetExchange, IXLan
from peeringdb_server.renderers import JSONEncoder
from peeringdb_server.rest import REFTAG_MAP as RestViewSets
from peeringdb_server.util import generate_balloonstyle_text
def export_ixf_ix_members(ixlans, pretty=False):
member_list = []
ixp_list = []
for ixlan in ixlans:
if ixlan.ix not in ixp_list:
ixp_list.append(ixlan.ix)
rv = {
"version": "0.6",
"timestamp": datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"),
"member_list": member_list,
"ixp_list": [{"ixp_id": ixp.id, "shortname": ixp.name} for ixp in ixp_list],
}
for ixlan in ixlans:
asns = []
for netixlan in ixlan.netixlan_set_active.all():
if netixlan.asn in asns:
continue
connection_list = []
member = {
"asnum": netixlan.asn,
"member_type": "peering",
"name": netixlan.network.name,
"url": netixlan.network.website,
"contact_email": [
poc.email
for poc in netixlan.network.poc_set_active.filter(visible="Public")
],
"contact_phone": [
poc.phone
for poc in netixlan.network.poc_set_active.filter(visible="Public")
],
"peering_policy": netixlan.network.policy_general.lower(),
"peering_policy_url": netixlan.network.policy_url,
"connection_list": connection_list,
}
member_list.append(member)
asns.append(netixlan.asn)
for _netixlan in ixlan.netixlan_set_active.filter(asn=netixlan.asn):
vlan_list = [{}]
connection = {
"ixp_id": _netixlan.ixlan.ix_id,
"state": "active",
"if_list": [{"if_speed": _netixlan.speed}],
"vlan_list": vlan_list,
}
connection_list.append(connection)
if _netixlan.ipaddr4:
vlan_list[0]["ipv4"] = {
"address": f"{_netixlan.ipaddr4}",
"routeserver": _netixlan.is_rs_peer,
"max_prefix": _netixlan.network.info_prefixes4,
"as_macro": _netixlan.network.irr_as_set,
}
if _netixlan.ipaddr6:
vlan_list[0]["ipv6"] = {
"address": f"{_netixlan.ipaddr6}",
"routeserver": _netixlan.is_rs_peer,
"max_prefix": _netixlan.network.info_prefixes6,
"as_macro": _netixlan.network.irr_as_set,
}
if pretty:
return json.dumps(rv, indent=2)
else:
return json.dumps(rv)
def view_export_ixf_ix_members(request, ix_id):
return HttpResponse(
export_ixf_ix_members(
IXLan.objects.filter(ix_id=ix_id, status="ok"),
pretty="pretty" in request.GET,
),
content_type="application/json",
)
def view_export_ixf_ixlan_members(request, ixlan_id):
return HttpResponse(
export_ixf_ix_members(
IXLan.objects.filter(id=ixlan_id, status="ok"),
pretty="pretty" in request.GET,
),
content_type="application/json",
)
def delete_key(data, keys):
for d in data:
for key in keys:
if key in d:
d.pop(key, "")
class ExportView(View):
"""
Base class for more complex data exports.
"""
# supported export fortmats
formats = ["json", "json_pretty", "csv", "kmz"]
# when exporting json data, if this is it not None
# json data will be wrapped in one additional dict
# and referenced at a key with the specified name
json_root_key = "data"
# exporting data should send file attachment headers
download = True
# if download=True this value will be used to specify
# the filename of the downloaded file
download_name = "export.{extension}"
# format to file extension translation table
extensions = {"csv": "csv", "json": "json", "json_pretty": "json", "kmz": "kmz"}
def get(self, request, fmt):
fmt = fmt.replace("-", "_")
if fmt not in self.formats:
raise ValueError(_("Invalid export format"))
response_handler = getattr(self, f"response_{fmt}")
response = response_handler(self.generate(request, fmt))
if self.download is True:
# send attachment header, triggering download on the client side
filename = self.download_name.format(extension=self.extensions.get(fmt))
response["Content-Disposition"] = 'attachment; filename="{}"'.format(
filename
)
return response
def generate(self, request):
"""
Function that generates export data from request.
Override this.
"""
return {}
def response_json(self, data):
"""
Return Response object for normal json response.
Arguments:
- data <list|dict>: serializable data, if list is passed you will need
to specify a value in self.json_root_key
Returns:
- JsonResponse
"""
if self.json_root_key:
delete_key(data, ["Latitude", "Longitude", "Notes"])
data = {self.json_root_key: data}
return JsonResponse(data, encoder=JSONEncoder)
def response_json_pretty(self, data):
"""
Return Response object for pretty (indented) json response.
Arguments:
- data <list|dict>: serializable data, if list is passed tou will need
to specify a value in self.json_root_key
Returns:
- HttpResponse: http response with appropriate json headers, cannot use
JsonResponse here because we need to specify indent level
"""
if self.json_root_key:
delete_key(data, ["Latitude", "Longitude", "Notes"])
data = {self.json_root_key: data}
return HttpResponse(
json.dumps(data, indent=2, cls=JSONEncoder), content_type="application/json"
)
def response_csv(self, data):
"""
Return Response object for CSV response.
Arguments:
- data <list>
Returns:
- HttpResponse
"""
if not data:
return ""
delete_key(data, ["Latitude", "Longitude", "Notes"])
response = HttpResponse(content_type="text/csv")
csv_writer = csv.DictWriter(response, fieldnames=list(data[0].keys()))
csv_writer.writeheader()
for row in data:
for k, v in list(row.items()):
if isinstance(v, str):
row[k] = f"{v}"
csv_writer.writerow(row)
return response
def response_kmz(self, data):
"""
Return Response object for kmz response.
Arguments:
- data <list>
Returns:
- HttpResponse
"""
if not data:
return ""
kml = Kml()
style = Style()
keys = list(data[0].keys())
for exclude_key in ["Latitude", "Longitude", "Notes"]:
try:
keys.remove(exclude_key)
except ValueError:
pass
style.balloonstyle.text = generate_balloonstyle_text(keys)
for row in data:
lat = row.pop("Latitude", None)
lon = row.pop("Longitude", None)
notes = row.pop("Notes", "")
if lat is None or lon is None:
continue
point = kml.newpoint(
name=row.get("Name"),
description=notes,
coords=[(lon, lat)],
)
point.style = style
for key, value in row.items():
point.extendeddata.newdata(
name=key, value=escape(value), displayname=key.title()
)
response = HttpResponse(kml.kml())
response["Content-Type"] = "application/vnd.google-earth.kmz"
return response
class AdvancedSearchExportView(ExportView):
"""
Allow exporting of advanced search result data.
"""
tag = None
json_root_key = "results"
download_name = "advanced_search_export.{extension}"
def fetch(self, request):
"""
Fetch data from API according to GET parameters.
Note that `limit` and `depth` will be overwritten, other API
parameters will be passed along as-is.
Returns:
- dict: un-rendered dataset returned by API
"""
params = request.GET.dict()
params["limit"] = 250
params["depth"] = 1
# prepare api request
request_factory = APIRequestFactory()
viewset = RestViewSets[self.tag].as_view({"get": "list"})
api_request = request_factory.get(
f"/api/{self.tag}/?{urllib.parse.urlencode(params)}"
)
api_request.session = request.session
# we want to use the same user as the original request
# so permissions are applied correctly
api_request.user = request.user
response = viewset(api_request)
if response.data and "results" in response.data:
return response.data.get("results")
return response.data
def get(self, request, tag, fmt): # lgtm[py/inheritance/signature-mismatch]
"""
Handle export.
LGTM Notes: signature-mismatch: order of arguments are defined by the
url routing set up for this view. (e.g., /<tag>/<fmt>)
The `get` method will never be called in a different
context where a mismatching signature would matter so
the lgtm warning can be ignored in this case.
"""
self.tag = tag
return super().get(request, fmt)
def generate(self, request, fmt):
"""
Generate data for the reftag specified in self.tag
This function will call generate_<tag> and return the result.
Arguments:
- request <Request>
Returns:
- list: list containing rendered data rows ready for export
"""
fmt = fmt.replace("-", "_")
if self.tag not in ["net", "ix", "fac", "org", "campus"]:
raise ValueError(_("Invalid tag"))
if fmt == "kmz" and self.tag not in ["org", "fac", "campus", "ix"]:
# export kmz only for org,fac,campus,ix
raise ValueError(_("Invalid export format"))
data_function = getattr(self, f"generate_{self.tag}")
return data_function(request)
def generate_net(self, request):
"""
Fetch network data from the API according to request and then render
it ready for export.
Arguments:
- request <Request>
Returns:
- list: list containing rendered data ready for export
"""
data = self.fetch(request)
download_data = []
for row in data:
download_data.append(
collections.OrderedDict(
[
("Name", row["name"]),
("Also known as", row["aka"]),
("ASN", row["asn"]),
("General Policy", row["policy_general"]),
("Network Type", row["info_type"]),
("Network Scope", row["info_scope"]),
("Traffic Levels", row["info_traffic"]),
("Traffic Ratio", row["info_ratio"]),
("Exchanges", len(row["netixlan_set"])),
("Facilities", len(row["netfac_set"])),
]
)
)
return download_data
def generate_fac(self, request):
"""
Fetch facility data from the API according to request and then render
it ready for export.
Arguments:
- request <Request>
Returns:
- list: list containing rendered data ready for export
"""
data = self.fetch(request)
download_data = []
for row in data:
download_data.append(
collections.OrderedDict(
[
("Name", row["name"]),
("Management", row["org_name"]),
("CLLI", row["clli"]),
("NPA-NXX", row["npanxx"]),
("City", row["city"]),
("Country", row["country"]),
("State", row["state"]),
("Postal Code", row["zipcode"]),
("Networks", row["net_count"]),
("Latitude", row["latitude"]),
("Longitude", row["longitude"]),
("Notes", row["notes"]),
]
)
)
return download_data
def generate_ix(self, request):
"""
Fetch exchange data from the API according to request and then render
it ready for export.
Arguments:
- request <Request>
Returns:
- list: list containing rendered data ready for export
"""
data = self.fetch(request)
download_data = []
for row in data:
latitude = None
longitude = None
ix = InternetExchange.objects.filter(id=row["id"]).first()
if ix:
ixfac = ix.ixfac_set.filter(facility__city=row["city"]).first()
if ixfac:
fac = ixfac.facility
latitude = fac.latitude
longitude = fac.longitude
download_data.append(
collections.OrderedDict(
[
("Name", row["name"]),
("Media Type", row["media"]),
("Country", row["country"]),
("City", row["city"]),
("Networks", row["net_count"]),
("Latitude", latitude),
("Longitude", longitude),
("Notes", row["notes"]),
]
)
)
return download_data
def generate_org(self, request):
"""
Fetch organization data from the API according to request and then render
it ready for export.
Arguments:
- request <Request>
Returns:
- list: list containing rendered data ready for export
"""
data = self.fetch(request)
download_data = []
for row in data:
download_data.append(
collections.OrderedDict(
[
("Name", row["name"]),
("Country", row["country"]),
("City", row["city"]),
("Latitude", row["latitude"]),
("Longitude", row["longitude"]),
("Notes", row["notes"]),
]
)
)
return download_data
def generate_campus(self, request):
"""
Fetch campus data from the API according to request and then render
it ready for export.
Arguments:
- request <Request>
Returns:
- list: list containing rendered data ready for export
"""
data = self.fetch(request)
download_data = []
for row in data:
latitude = None
longitude = None
campus = Campus.objects.filter(id=row["id"]).first()
if campus:
latitude = campus.latitude
longitude = campus.longitude
download_data.append(
collections.OrderedDict(
[
("Name", row["name"]),
("Name Long", row["name_long"]),
("Website", row["website"]),
("Fac_set", row["fac_set"]),
("Latitude", latitude),
("Longitude", longitude),
("Notes", row["notes"]),
]
)
)
return download_data
def kmz_download(request):
"""
Will return a file download of the KMZ file located at
settings.KMZ_EXPORT_FILE if it exists.
Will also send cache headers based on the file's modification time
"""
try:
with open(settings.KMZ_EXPORT_FILE, "rb") as file:
response = HttpResponse(
file.read(), content_type="application/vnd.google-earth.kmz"
)
response["Content-Disposition"] = 'attachment; filename="{}"'.format(
os.path.basename(settings.KMZ_EXPORT_FILE)
)
response["Last-Modified"] = datetime.datetime.fromtimestamp(
os.path.getmtime(settings.KMZ_EXPORT_FILE)
).strftime("%a, %d %b %Y %H:%M:%S GMT")
return response
except OSError:
return HttpResponse(status=404)