mirror of
https://github.com/peeringdb/peeringdb.git
synced 2024-05-11 05:55:09 +00:00
Gh 399 Review
This commit is contained in:
@@ -1,10 +1,13 @@
|
||||
import StringIO
|
||||
import json
|
||||
|
||||
import reversion
|
||||
from reversion.models import Version
|
||||
|
||||
from dal import autocomplete
|
||||
from django import forms
|
||||
from django.core.management import call_command
|
||||
from peeringdb_server.models import (COMMANDLINE_TOOLS, CommandLineTool,
|
||||
from peeringdb_server.models import (REFTAG_MAP, COMMANDLINE_TOOLS, CommandLineTool,
|
||||
InternetExchange, Facility)
|
||||
|
||||
from peeringdb_server import maintenance
|
||||
@@ -99,6 +102,9 @@ class CommandLineToolWrapper(object):
|
||||
def set_arguments(self, form_data):
|
||||
pass
|
||||
|
||||
def validate(self):
|
||||
pass
|
||||
|
||||
def _run(self, user, commit=False):
|
||||
r = StringIO.StringIO()
|
||||
|
||||
@@ -106,6 +112,7 @@ class CommandLineToolWrapper(object):
|
||||
maintenance.on()
|
||||
|
||||
try:
|
||||
self.validate()
|
||||
if commit:
|
||||
call_command(self.tool, *self.args, commit=True, stdout=r,
|
||||
**self.kwargs)
|
||||
@@ -274,3 +281,44 @@ class ToolReset(CommandLineToolWrapper):
|
||||
|
||||
def set_arguments(self, form_data):
|
||||
self.kwargs = form_data
|
||||
|
||||
|
||||
@register_tool
|
||||
class ToolUndelete(CommandLineToolWrapper):
|
||||
"""
|
||||
Allows restoration of an object object and it's child objects
|
||||
"""
|
||||
tool = "pdb_undelete"
|
||||
|
||||
# These are the reftags that are currently supported by this
|
||||
# tool.
|
||||
supported_reftags = ["ixlan","fac"]
|
||||
|
||||
class Form(forms.Form):
|
||||
version = forms.ModelChoiceField(
|
||||
queryset=Version.objects.all().order_by("-revision_id"),
|
||||
widget=autocomplete.ModelSelect2(
|
||||
url="/autocomplete/admin/deletedversions"),
|
||||
help_text=_("Restore this object - search by [reftag] [id]"))
|
||||
|
||||
@property
|
||||
def description(self):
|
||||
return "{reftag} {id}".format(**self.kwargs)
|
||||
|
||||
def set_arguments(self, form_data):
|
||||
version = form_data.get("version")
|
||||
if not version:
|
||||
return
|
||||
reftag = version.content_type.model_class().HandleRef.tag
|
||||
self.kwargs = {"reftag":reftag, "id":version.object_id, "version_id":version.id}
|
||||
|
||||
def validate(self):
|
||||
if self.kwargs.get("reftag") not in self.supported_reftags:
|
||||
raise ValueError(_("Only {} type objects may be restored " \
|
||||
"through this interface at this point").format(",".join(self.supported_reftags)))
|
||||
|
||||
obj = REFTAG_MAP[self.kwargs.get("reftag")].objects.get(id=self.kwargs.get("id"))
|
||||
if obj.status != "deleted":
|
||||
raise ValueError("{} is not currently marked as deleted".format(obj))
|
||||
|
||||
|
||||
|
@@ -1,10 +1,14 @@
|
||||
import json
|
||||
|
||||
from django.db.models import Q
|
||||
from django import http
|
||||
from django.utils import html
|
||||
from django.core.exceptions import ObjectDoesNotExist
|
||||
from reversion.models import Version
|
||||
from dal import autocomplete
|
||||
from peeringdb_server.models import (InternetExchange, Facility,
|
||||
NetworkFacility, InternetExchangeFacility,
|
||||
Organization, IXLan, CommandLineTool)
|
||||
Organization, IXLan, CommandLineTool, REFTAG_MAP)
|
||||
|
||||
from peeringdb_server.admin_commandline_tools import TOOL_MAP
|
||||
|
||||
@@ -121,6 +125,61 @@ class IXLanAutocomplete(AutocompleteHTMLResponse):
|
||||
html.escape(item.name))
|
||||
|
||||
|
||||
class DeletedVersionAutocomplete(autocomplete.Select2QuerySetView):
|
||||
"""
|
||||
Autocomplete that will show reversion versions where an object
|
||||
was set to deleted
|
||||
"""
|
||||
|
||||
def get_queryset(self):
|
||||
# Only staff needs to be able to see these
|
||||
if not self.request.user.is_staff:
|
||||
return []
|
||||
|
||||
|
||||
# no query supplied, return empty result
|
||||
if not self.q:
|
||||
return []
|
||||
|
||||
try:
|
||||
# query is expected to be of format "<reftag> <id>"
|
||||
# return empty result on parsing failure
|
||||
reftag, _id = tuple(self.q.split(" "))
|
||||
except ValueError:
|
||||
return []
|
||||
|
||||
try:
|
||||
# make sure target object exists, return
|
||||
# empty result if not
|
||||
obj = REFTAG_MAP[reftag].objects.get(id=_id)
|
||||
except (KeyError, ObjectDoesNotExist):
|
||||
return []
|
||||
|
||||
versions = Version.objects.get_for_object(obj).order_by("revision_id").select_related("revision")
|
||||
rv = []
|
||||
previous = {}
|
||||
|
||||
# cycle through all versions of the object and collect the ones where
|
||||
# status was changed from 'ok' to 'deleted'
|
||||
#
|
||||
# order them by most recent first
|
||||
for version in versions:
|
||||
data = json.loads(version.serialized_data)[0].get("fields")
|
||||
|
||||
if previous.get("status", "ok") == "ok" and data.get("status") == "deleted":
|
||||
rv.insert(0, version)
|
||||
|
||||
previous = data
|
||||
|
||||
return rv
|
||||
|
||||
def get_result_label(self, item):
|
||||
# label should be obj representation as well as date of deletion
|
||||
# we split the date string to remove the ms and tz parts
|
||||
return "{} - {}".format(item, str(item.revision.date_created).split(".")[0])
|
||||
|
||||
|
||||
|
||||
class CommandLineToolHistoryAutocomplete(autocomplete.Select2QuerySetView):
|
||||
"""
|
||||
Autocomplete for command line tools that were ran via the admin ui
|
||||
|
@@ -4,6 +4,7 @@ from peeringdb_server.models import REFTAG_MAP
|
||||
|
||||
import reversion
|
||||
import json
|
||||
import re
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
@@ -24,17 +25,85 @@ class Command(BaseCommand):
|
||||
else:
|
||||
self.stdout.write("[pretend] {}".format(msg))
|
||||
|
||||
def log_err(self, msg):
|
||||
self.log("[error] {}".format(msg))
|
||||
|
||||
def log_warn(self, msg):
|
||||
self.log("[warning] {}".format(msg))
|
||||
|
||||
def handle(self, *args, **options):
|
||||
self.commit = options.get("commit", False)
|
||||
self.version_id = options.get("version_id")
|
||||
version = reversion.models.Version.objects.get(id=self.version_id)
|
||||
self.suppress_warning = None
|
||||
self.version = version = reversion.models.Version.objects.get(
|
||||
id=self.version_id)
|
||||
self.date = version.revision.date_created
|
||||
self.log("UNDELETING FROM DATE: {}".format(self.date))
|
||||
self.undelete(options.get("reftag"), options.get("id"))
|
||||
|
||||
def handle_netixlan(self, netixlan):
|
||||
model = REFTAG_MAP["netixlan"]
|
||||
conflict_ip4, conflict_ip6 = netixlan.ipaddress_conflict()
|
||||
|
||||
if conflict_ip4:
|
||||
# ipv4 exists in another netixlan now
|
||||
others = model.objects.filter(ipaddr4=netixlan.ipaddr4,
|
||||
status="ok")
|
||||
for other in [
|
||||
o for o in others if o.ixlan.ix_id == netixlan.ixlan.ix_id
|
||||
]:
|
||||
# netixlan is at same ix as the one being undeleted, delete the other
|
||||
# one so we can proceed with undeletion
|
||||
self.log("Found duplicate netixlan at same ix: {} - deleting".
|
||||
format(other.ipaddr4))
|
||||
if self.commit:
|
||||
other.delete()
|
||||
else:
|
||||
# when in pretend mode we need suppress the next warning as we
|
||||
# are not deleting the conflict
|
||||
self.suppress_warning = True
|
||||
|
||||
for other in [
|
||||
o for o in others if o.ixlan.ix_id != netixlan.ixlan.ix_id
|
||||
]:
|
||||
# unless ipv4 also exists in a netixlan that is NOT at the same ix
|
||||
# then we need the warning again
|
||||
self.suppress_warning = False
|
||||
|
||||
if conflict_ip6:
|
||||
# ipv6 exists in another netixlan now
|
||||
others = model.objects.filter(ipaddr6=netixlan.ipaddr6,
|
||||
status="ok")
|
||||
for other in [
|
||||
o for o in others if o.ixlan.ix_id == netixlan.ixlan.ix_id
|
||||
]:
|
||||
# netixlan is at same ix as the one being undeleted, delete the other
|
||||
# one so we can proceed with undeletion
|
||||
self.log("Found duplicate netixlan at same ix: {} - deleting".
|
||||
format(other.ipaddr6))
|
||||
if self.commit:
|
||||
other.delete()
|
||||
else:
|
||||
# when in pretend mode we need suppress the next warning as we
|
||||
# are not deleting the conflict
|
||||
self.suppress_warning = True
|
||||
|
||||
for other in [
|
||||
o for o in others if o.ixlan.ix_id != netixlan.ixlan.ix_id
|
||||
]:
|
||||
# unless ipv6 also exists in a netixlan that is NOT at the same ix
|
||||
# then we need the warning again
|
||||
self.suppress_warning = False
|
||||
|
||||
def undelete(self, reftag, _id, parent=None, date=None):
|
||||
cls = REFTAG_MAP.get(reftag)
|
||||
obj = cls.objects.get(id=_id)
|
||||
self.suppress_warning = False
|
||||
|
||||
def _label(obj):
|
||||
if hasattr(obj, "descriptive_name"):
|
||||
return obj.descriptive_name
|
||||
return obj
|
||||
|
||||
if date:
|
||||
version = reversion.models.Version.objects.get_for_object(
|
||||
@@ -46,9 +115,9 @@ class Command(BaseCommand):
|
||||
except:
|
||||
status = None
|
||||
if status == "deleted":
|
||||
self.log(
|
||||
self.log_warn(
|
||||
"{} was already deleted at snapshot, skipping ..".format(
|
||||
obj))
|
||||
_label(obj)))
|
||||
return
|
||||
|
||||
can_undelete_obj = True
|
||||
@@ -63,18 +132,29 @@ class Command(BaseCommand):
|
||||
continue
|
||||
if relation.status == "deleted" and relation != parent:
|
||||
can_undelete_obj = False
|
||||
self.log(
|
||||
"Cannot undelete {}, dependent relation marked as deleted: {}".
|
||||
format(obj, relation))
|
||||
self.log_warn(
|
||||
"Cannot undelete {}, dependent relation marked as deleted: {}"
|
||||
.format(_label(obj), relation))
|
||||
|
||||
if not can_undelete_obj:
|
||||
return
|
||||
|
||||
if obj.status == "deleted":
|
||||
obj.status = "ok"
|
||||
self.log("Undeleting {}".format(obj))
|
||||
self.log("Undeleting {}".format(_label(obj)))
|
||||
|
||||
handler = getattr(self, "handle_{}".format(reftag), None)
|
||||
if handler:
|
||||
handler(obj)
|
||||
|
||||
try:
|
||||
obj.clean()
|
||||
if self.commit:
|
||||
obj.save()
|
||||
except Exception as exc:
|
||||
if not self.suppress_warning:
|
||||
self.log_warn("Cannot undelete {}: {}".format(
|
||||
_label(obj), exc))
|
||||
|
||||
for field in cls._meta.get_fields():
|
||||
if field.is_relation:
|
||||
|
@@ -47,7 +47,8 @@ PARTNERSHIP_LEVELS = ((1, _("Data Validation")), (2, _("RIR")))
|
||||
COMMANDLINE_TOOLS = (("pdb_renumber_lans",
|
||||
_("Renumber IP Space")), ("pdb_fac_merge",
|
||||
_("Merge Facilities")),
|
||||
("pdb_fac_merge_undo", _("Merge Facilities: UNDO")))
|
||||
("pdb_fac_merge_undo", _("Merge Facilities: UNDO")),
|
||||
("pdb_undelete", _("Restore Object(s)")))
|
||||
|
||||
|
||||
if settings.TUTORIAL_MODE:
|
||||
@@ -987,6 +988,14 @@ class Facility(pdb_models.FacilityBase, GeocodeBaseMixin):
|
||||
"""
|
||||
return self.netfac_set.filter(status="ok")
|
||||
|
||||
@property
|
||||
def ixfac_set_active(self):
|
||||
"""
|
||||
Returns queryset of active InternetExchangeFacility objects connected
|
||||
to this facility
|
||||
"""
|
||||
return self.ixfac_set.filter(status="ok")
|
||||
|
||||
@property
|
||||
def net_count(self):
|
||||
"""
|
||||
@@ -1262,6 +1271,16 @@ class InternetExchangeFacility(pdb_models.InternetExchangeFacilityBase):
|
||||
ix = models.ForeignKey(InternetExchange, related_name="ixfac_set")
|
||||
facility = models.ForeignKey(Facility, default=0, related_name="ixfac_set")
|
||||
|
||||
@property
|
||||
def descriptive_name(self):
|
||||
"""
|
||||
Returns a descriptive label of the ixfac for logging purposes
|
||||
"""
|
||||
return "ixfac{} {} <-> {}".format(
|
||||
self.id, self.ix.name, self.facility.name)
|
||||
|
||||
|
||||
|
||||
@classmethod
|
||||
def nsp_namespace_from_id(cls, org_id, ix_id, id):
|
||||
"""
|
||||
@@ -1305,6 +1324,15 @@ class IXLan(pdb_models.IXLanBase):
|
||||
class Meta:
|
||||
db_table = u'peeringdb_ixlan'
|
||||
|
||||
@property
|
||||
def descriptive_name(self):
|
||||
"""
|
||||
Returns a descriptive label of the ixlan for logging purposes
|
||||
"""
|
||||
return "ixlan{} {} {}".format(
|
||||
self.id, self.name, self.ix.name)
|
||||
|
||||
|
||||
@classmethod
|
||||
def nsp_namespace_from_id(cls, org_id, ix_id, id):
|
||||
"""
|
||||
@@ -1837,6 +1865,15 @@ class IXLanPrefix(pdb_models.IXLanPrefixBase):
|
||||
|
||||
ixlan = models.ForeignKey(IXLan, default=0, related_name="ixpfx_set")
|
||||
|
||||
@property
|
||||
def descriptive_name(self):
|
||||
"""
|
||||
Returns a descriptive label of the ixpfx for logging purposes
|
||||
"""
|
||||
return "ixpfx{} {}".format(
|
||||
self.id, self.prefix)
|
||||
|
||||
|
||||
@classmethod
|
||||
def nsp_namespace_from_id(cls, org_id, ix_id, ixlan_id, id):
|
||||
"""
|
||||
@@ -2210,6 +2247,8 @@ class NetworkFacility(pdb_models.NetworkFacilityBase):
|
||||
db_table = u'peeringdb_network_facility'
|
||||
unique_together = ('network', 'facility', 'local_asn')
|
||||
|
||||
|
||||
|
||||
@classmethod
|
||||
def nsp_namespace_from_id(cls, org_id, net_id, fac_id):
|
||||
"""
|
||||
@@ -2265,6 +2304,15 @@ class NetworkFacility(pdb_models.NetworkFacilityBase):
|
||||
qset = cls.handleref.undeleted()
|
||||
return qset.filter(**make_relation_filter(field, filt, value))
|
||||
|
||||
@property
|
||||
def descriptive_name(self):
|
||||
"""
|
||||
Returns a descriptive label of the netfac for logging purposes
|
||||
"""
|
||||
return "netfac{} AS{} {} <-> {}".format(
|
||||
self.id, self.network.asn, self.network.name, self.facility.name)
|
||||
|
||||
|
||||
def nsp_has_perms_PUT(self, user, request):
|
||||
return validate_PUT_ownership(user, self, request.data, ["net"])
|
||||
|
||||
@@ -2290,6 +2338,14 @@ class NetworkIXLan(pdb_models.NetworkIXLanBase):
|
||||
def name(self):
|
||||
return ""
|
||||
|
||||
@property
|
||||
def descriptive_name(self):
|
||||
"""
|
||||
Returns a descriptive label of the netixlan for logging purposes
|
||||
"""
|
||||
return "netixlan{} AS{} {} {}".format(
|
||||
self.id, self.asn, self.ipaddr4, self.ipaddr6)
|
||||
|
||||
@property
|
||||
def ix_name(self):
|
||||
"""
|
||||
|
@@ -11,7 +11,7 @@ from peeringdb_server.autocomplete_views import (
|
||||
FacilityAutocompleteForNetwork, FacilityAutocompleteForExchange,
|
||||
OrganizationAutocomplete, ExchangeAutocomplete, ExchangeAutocompleteJSON,
|
||||
IXLanAutocomplete, FacilityAutocomplete, FacilityAutocompleteJSON,
|
||||
clt_history)
|
||||
DeletedVersionAutocomplete, clt_history)
|
||||
|
||||
from peeringdb_server.export_views import (
|
||||
view_export_ixf_ix_members,
|
||||
@@ -156,6 +156,8 @@ urlpatterns += [
|
||||
name="autocomplete-fac"),
|
||||
url(r'^autocomplete/ixlan/$', IXLanAutocomplete.as_view(),
|
||||
name="autocomplete-ixlan"),
|
||||
url(r'^autocomplete/admin/deletedversions$', DeletedVersionAutocomplete.as_view(),
|
||||
name="autocomplete-admin-deleted-versions"),
|
||||
]
|
||||
|
||||
# Admin autocomplete for commandlinetool history
|
||||
|
43
tests/test_autocomplete.py
Normal file
43
tests/test_autocomplete.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import json
|
||||
|
||||
from django.urls import reverse
|
||||
from django.test import Client, RequestFactory
|
||||
|
||||
import reversion
|
||||
|
||||
from peeringdb_server.models import User, Organization
|
||||
from peeringdb_server import autocomplete_views
|
||||
from util import ClientCase
|
||||
|
||||
|
||||
class TestAutocomplete(ClientCase):
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
super(TestAutocomplete, cls).setUpTestData()
|
||||
cls.staff_user = User.objects.create_user("staff", "staff@localhost",
|
||||
"staff", is_staff=True)
|
||||
|
||||
def setUp(self):
|
||||
self.factory = RequestFactory()
|
||||
|
||||
def test_deleted_versions(self):
|
||||
with reversion.create_revision():
|
||||
org = Organization.objects.create(name="Test Org", status="ok")
|
||||
with reversion.create_revision():
|
||||
org.delete()
|
||||
with reversion.create_revision():
|
||||
org.status = "ok"
|
||||
org.save()
|
||||
with reversion.create_revision():
|
||||
org.delete()
|
||||
|
||||
url = reverse("autocomplete-admin-deleted-versions")
|
||||
|
||||
r = self.factory.get("{}?q=org {}".format(url, org.id))
|
||||
r.user = self.staff_user
|
||||
r = autocomplete_views.DeletedVersionAutocomplete.as_view()(r)
|
||||
|
||||
content = json.loads(r.content)
|
||||
|
||||
assert reversion.models.Version.objects.all().count() == 4
|
||||
assert len(content.get("results")) == 2
|
127
tests/test_undelete.py
Normal file
127
tests/test_undelete.py
Normal file
@@ -0,0 +1,127 @@
|
||||
import datetime
|
||||
|
||||
from util import ClientCase, Group
|
||||
|
||||
from django.core.management import call_command
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.conf import settings
|
||||
|
||||
from peeringdb_server.models import REFTAG_MAP
|
||||
from peeringdb_server.management.commands.pdb_undelete import Command
|
||||
|
||||
|
||||
class TestUndelete(ClientCase):
|
||||
@classmethod
|
||||
def setUpTestData(cls):
|
||||
super(TestUndelete, cls).setUpTestData()
|
||||
call_command("pdb_generate_test_data", limit=2, commit=True)
|
||||
|
||||
cls.org_a = REFTAG_MAP["org"].objects.get(id=1)
|
||||
cls.org_b = REFTAG_MAP["org"].objects.get(id=2)
|
||||
|
||||
cls.net_a = cls.org_a.net_set.first()
|
||||
|
||||
cls.ix_a = cls.org_a.ix_set.first()
|
||||
cls.ix_b = cls.org_b.ix_set.first()
|
||||
|
||||
cls.fac_a = cls.org_a.fac_set.first()
|
||||
cls.fac_b = cls.org_b.fac_set.first()
|
||||
|
||||
cls.ixlan_a = cls.ix_a.ixlan_set.first()
|
||||
cls.ixlan_b = cls.ix_b.ixlan_set.first()
|
||||
|
||||
cls.date = datetime.date
|
||||
|
||||
@property
|
||||
def _command(self):
|
||||
command = Command()
|
||||
command.commit = True
|
||||
return command
|
||||
|
||||
def _undelete(self, obj):
|
||||
command = self._command
|
||||
command.date = obj.updated
|
||||
command.undelete(obj.HandleRef.tag, obj.id)
|
||||
obj.refresh_from_db()
|
||||
|
||||
def test_undelete_ixlan(self):
|
||||
assert self.ixlan_a.netixlan_set_active.count() == 1
|
||||
assert self.ixlan_a.ixpfx_set_active.count() == 2
|
||||
|
||||
self.ixlan_a.delete()
|
||||
|
||||
assert self.ixlan_a.status == "deleted"
|
||||
|
||||
self._undelete(self.ixlan_a)
|
||||
|
||||
assert self.ixlan_a.status == "ok"
|
||||
assert self.ixlan_a.netixlan_set_active.count() == 1
|
||||
assert self.ixlan_a.ixpfx_set_active.count() == 2
|
||||
|
||||
def test_undelete_fac(self):
|
||||
assert self.fac_a.netfac_set_active.count() == 1
|
||||
assert self.fac_a.ixfac_set_active.count() == 1
|
||||
|
||||
self.fac_a.delete()
|
||||
|
||||
assert self.fac_a.status == "deleted"
|
||||
|
||||
self._undelete(self.fac_a)
|
||||
|
||||
assert self.fac_a.status == "ok"
|
||||
assert self.fac_a.netfac_set_active.count() == 1
|
||||
assert self.fac_a.ixfac_set_active.count() == 1
|
||||
|
||||
def test_undelete_ixlan_netixlan_dupe_other(self):
|
||||
netixlan_a = self.ixlan_a.netixlan_set.first()
|
||||
self.ixlan_a.delete()
|
||||
netixlan_c = REFTAG_MAP["netixlan"].objects.create(
|
||||
asn=self.net_a.asn, ixlan=self.ixlan_b, status="ok",
|
||||
ipaddr4=netixlan_a.ipaddr4, network=self.net_a, speed=100)
|
||||
self._undelete(self.ixlan_a)
|
||||
|
||||
assert self.ixlan_a.status == "ok"
|
||||
assert self.ixlan_a.netixlan_set_active.count() == 0
|
||||
|
||||
def test_undelete_ixlan_netixlan_dupe_other_ipv6(self):
|
||||
netixlan_a = self.ixlan_a.netixlan_set.first()
|
||||
self.ixlan_a.delete()
|
||||
netixlan_c = REFTAG_MAP["netixlan"].objects.create(
|
||||
asn=self.net_a.asn, ixlan=self.ixlan_b, status="ok",
|
||||
ipaddr6=netixlan_a.ipaddr6, network=self.net_a, speed=100)
|
||||
self._undelete(self.ixlan_a)
|
||||
|
||||
assert self.ixlan_a.status == "ok"
|
||||
assert self.ixlan_a.netixlan_set_active.count() == 0
|
||||
|
||||
def test_undelete_ixlan_netixlan_dupe_same_ix(self):
|
||||
ixlan_c = REFTAG_MAP["ixlan"].objects.create(ix=self.ix_a, status="ok")
|
||||
netixlan_a = self.ixlan_a.netixlan_set.first()
|
||||
self.ixlan_a.delete()
|
||||
netixlan_c = REFTAG_MAP["netixlan"].objects.create(
|
||||
asn=self.net_a.asn, ixlan=ixlan_c, status="ok",
|
||||
ipaddr4=netixlan_a.ipaddr4, network=self.net_a, speed=100)
|
||||
|
||||
assert ixlan_c.netixlan_set_active.count() == 1
|
||||
|
||||
self._undelete(self.ixlan_a)
|
||||
|
||||
assert self.ixlan_a.status == "ok"
|
||||
assert self.ixlan_a.netixlan_set_active.count() == 1
|
||||
assert ixlan_c.netixlan_set_active.count() == 0
|
||||
|
||||
def test_undelete_ixlan_netixlan_dupe_same_ix_ipv6(self):
|
||||
ixlan_c = REFTAG_MAP["ixlan"].objects.create(ix=self.ix_a, status="ok")
|
||||
netixlan_a = self.ixlan_a.netixlan_set.first()
|
||||
self.ixlan_a.delete()
|
||||
netixlan_c = REFTAG_MAP["netixlan"].objects.create(
|
||||
asn=self.net_a.asn, ixlan=ixlan_c, status="ok",
|
||||
ipaddr6=netixlan_a.ipaddr6, network=self.net_a, speed=100)
|
||||
|
||||
assert ixlan_c.netixlan_set_active.count() == 1
|
||||
|
||||
self._undelete(self.ixlan_a)
|
||||
|
||||
assert self.ixlan_a.status == "ok"
|
||||
assert self.ixlan_a.netixlan_set_active.count() == 1
|
||||
assert ixlan_c.netixlan_set_active.count() == 0
|
Reference in New Issue
Block a user