1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00

Closes #8550: Implement ASN ranges (#11835)

* Move ASN to a separate module

* Move ASNField from dcim to ipam

* Introduce ASNRange model

* Add relationship from ASN to ASNRange

* Add an available-asns API endpoint

* Add RIR assignment for ASNRange

* Add standard tests

* Move child ASNs to a tabbed view

* Remove FK on ASN to ASNRange

* Add tests for provisioning available ASNs

* Add docs for ASNRange
This commit is contained in:
Jeremy Stretch
2023-02-27 16:36:05 -05:00
committed by GitHub
parent e4e4d0c0ec
commit 7994073687
41 changed files with 1096 additions and 246 deletions

View File

@ -1,8 +1,8 @@
# ASN
# ASNs
An Autonomous System Number (ASN) is a numeric identifier used in the BGP protocol to identify which [autonomous system](https://en.wikipedia.org/wiki/Autonomous_system_%28Internet%29) a particular prefix is originating and transiting through. NetBox support both 32- and 64- ASNs.
ASNs must be globally unique within NetBox, must each may be assigned to multiple [sites](../dcim/site.md).
ASNs must be globally unique within NetBox, and may be allocated from within a [defined range](./asnrange.md). Each ASN may be assigned to multiple [sites](../dcim/site.md).
## Fields

View File

@ -0,0 +1,21 @@
# ASN Ranges
Ranges can be defined to group [AS numbers](./asn.md) numerically and to facilitate their automatic provisioning. Each range must be assigned to a [RIR](./rir.md).
## Fields
### Name
A unique human-friendly name for the range.
### Slug
A unique URL-friendly identifier. (This value can be used for filtering.)
### RIR
The [Regional Internet Registry](./rir.md) or similar authority responsible for the allocation of AS numbers within this range.
### Start & End
The starting and ending numeric boundaries of the range (inclusive).

View File

@ -20,6 +20,10 @@ This release introduces the ability to render device configurations from Jinja2
The NAPALM integration feature found in previous NetBox releases has been moved from the core application to a dedicated plugin. This allows greater control over the feature's configuration and will unlock additional potential as a separate project.
#### ASN Ranges ([#8550](https://github.com/netbox-community/netbox/issues/8550))
A new ASN range model has been introduced to facilitate the provisioning of new autonomous system numbers from within a prescribed range. For example, an administrator might define an ASN range of 65000-65099 to be used for internal site identification. This includes a REST API endpoint suitable for automatic provisioning, very similar to the allocation of available prefixes and IP addresses.
### Enhancements
* [#9073](https://github.com/netbox-community/netbox/issues/9073) - Enable syncing config context data from remote sources

View File

@ -215,6 +215,7 @@ nav:
- Webhook: 'models/extras/webhook.md'
- IPAM:
- ASN: 'models/ipam/asn.md'
- ASNRange: 'models/ipam/asnrange.md'
- Aggregate: 'models/ipam/aggregate.md'
- FHRPGroup: 'models/ipam/fhrpgroup.md'
- FHRPGroupAssignment: 'models/ipam/fhrpgroupassignment.md'

View File

@ -1,4 +1,4 @@
import dcim.fields
import ipam.fields
from utilities.json import CustomFieldJSONEncoder
from django.db import migrations, models
import django.db.models.deletion
@ -77,7 +77,7 @@ class Migration(migrations.Migration):
('id', models.BigAutoField(primary_key=True, serialize=False)),
('name', models.CharField(max_length=100, unique=True)),
('slug', models.SlugField(max_length=100, unique=True)),
('asn', dcim.fields.ASNField(blank=True, null=True)),
('asn', ipam.fields.ASNField(blank=True, null=True)),
('account', models.CharField(blank=True, max_length=30)),
('portal_url', models.URLField(blank=True)),
('noc_contact', models.TextField(blank=True)),

View File

@ -1,10 +1,8 @@
from django.contrib.postgres.fields import ArrayField
from django.core.exceptions import ValidationError
from django.core.validators import MinValueValidator, MaxValueValidator
from django.db import models
from netaddr import AddrFormatError, EUI, eui64_unix_expanded, mac_unix_expanded
from ipam.constants import BGP_ASN_MAX, BGP_ASN_MIN
from .lookups import PathContains
__all__ = (
@ -27,22 +25,6 @@ class eui64_unix_expanded_uppercase(eui64_unix_expanded):
# Fields
#
class ASNField(models.BigIntegerField):
description = "32-bit ASN field"
default_validators = [
MinValueValidator(BGP_ASN_MIN),
MaxValueValidator(BGP_ASN_MAX),
]
def formfield(self, **kwargs):
defaults = {
'min_value': BGP_ASN_MIN,
'max_value': BGP_ASN_MAX,
}
defaults.update(**kwargs)
return super().formfield(**defaults)
class MACAddressField(models.Field):
description = "PostgreSQL MAC Address field"

View File

@ -1,4 +1,5 @@
import dcim.fields
import ipam.fields
import django.contrib.postgres.fields
from utilities.json import CustomFieldJSONEncoder
import django.core.validators
@ -609,7 +610,7 @@ class Migration(migrations.Migration):
('slug', models.SlugField(max_length=100, unique=True)),
('status', models.CharField(default='active', max_length=50)),
('facility', models.CharField(blank=True, max_length=50)),
('asn', dcim.fields.ASNField(blank=True, null=True)),
('asn', ipam.fields.ASNField(blank=True, null=True)),
('time_zone', timezone_field.fields.TimeZoneField(blank=True)),
('description', models.CharField(blank=True, max_length=200)),
('physical_address', models.CharField(blank=True, max_length=200)),

View File

@ -7,6 +7,7 @@ from netbox.api.serializers import WritableNestedSerializer
__all__ = [
'NestedAggregateSerializer',
'NestedASNSerializer',
'NestedASNRangeSerializer',
'NestedFHRPGroupSerializer',
'NestedFHRPGroupAssignmentSerializer',
'NestedIPAddressSerializer',
@ -25,6 +26,18 @@ __all__ = [
]
#
# ASN ranges
#
class NestedASNRangeSerializer(WritableNestedSerializer):
url = serializers.HyperlinkedIdentityField(view_name='ipam-api:asnrange-detail')
class Meta:
model = models.ASNRange
fields = ['id', 'url', 'display', 'name']
#
# ASNs
#

View File

@ -15,15 +15,31 @@ from virtualization.api.nested_serializers import NestedVirtualMachineSerializer
from .nested_serializers import *
#
# ASN ranges
#
class ASNRangeSerializer(NetBoxModelSerializer):
url = serializers.HyperlinkedIdentityField(view_name='ipam-api:asnrange-detail')
rir = NestedRIRSerializer()
tenant = NestedTenantSerializer(required=False, allow_null=True)
asn_count = serializers.IntegerField(read_only=True)
class Meta:
model = ASNRange
fields = [
'id', 'url', 'display', 'name', 'slug', 'rir', 'start', 'end', 'tenant', 'description', 'tags',
'custom_fields', 'created', 'last_updated', 'asn_count',
]
#
# ASNs
#
from .nested_serializers import NestedL2VPNSerializer
from ..models.l2vpn import L2VPNTermination, L2VPN
class ASNSerializer(NetBoxModelSerializer):
url = serializers.HyperlinkedIdentityField(view_name='ipam-api:asn-detail')
rir = NestedRIRSerializer(required=False, allow_null=True)
tenant = NestedTenantSerializer(required=False, allow_null=True)
site_count = serializers.IntegerField(read_only=True)
provider_count = serializers.IntegerField(read_only=True)
@ -36,6 +52,22 @@ class ASNSerializer(NetBoxModelSerializer):
]
class AvailableASNSerializer(serializers.Serializer):
"""
Representation of an ASN which does not exist in the database.
"""
asn = serializers.IntegerField(read_only=True)
def to_representation(self, asn):
rir = NestedRIRSerializer(self.context['range'].rir, context={
'request': self.context['request']
}).data
return {
'rir': rir,
'asn': asn,
}
#
# VRFs
#

View File

@ -7,50 +7,33 @@ from . import views
router = NetBoxRouter()
router.APIRootView = views.IPAMRootView
# ASNs
router.register('asns', views.ASNViewSet)
# VRFs
router.register('asn-ranges', views.ASNRangeViewSet)
router.register('vrfs', views.VRFViewSet)
# Route targets
router.register('route-targets', views.RouteTargetViewSet)
# RIRs
router.register('rirs', views.RIRViewSet)
# Aggregates
router.register('aggregates', views.AggregateViewSet)
# Prefixes
router.register('roles', views.RoleViewSet)
router.register('prefixes', views.PrefixViewSet)
# IP ranges
router.register('ip-ranges', views.IPRangeViewSet)
# IP addresses
router.register('ip-addresses', views.IPAddressViewSet)
# FHRP groups
router.register('fhrp-groups', views.FHRPGroupViewSet)
router.register('fhrp-group-assignments', views.FHRPGroupAssignmentViewSet)
# VLANs
router.register('vlan-groups', views.VLANGroupViewSet)
router.register('vlans', views.VLANViewSet)
# Services
router.register('service-templates', views.ServiceTemplateViewSet)
router.register('services', views.ServiceViewSet)
# L2VPN
router.register('l2vpns', views.L2VPNViewSet)
router.register('l2vpn-terminations', views.L2VPNTerminationViewSet)
app_name = 'ipam-api'
urlpatterns = [
path(
'asn-ranges/<int:pk>/available-asns/',
views.AvailableASNsView.as_view(),
name='asnrange-available-asns'
),
path(
'ip-ranges/<int:pk>/available-ips/',
views.IPRangeAvailableIPAddressesView.as_view(),

View File

@ -33,6 +33,12 @@ class IPAMRootView(APIRootView):
# Viewsets
#
class ASNRangeViewSet(NetBoxModelViewSet):
queryset = ASNRange.objects.prefetch_related('tenant', 'rir').all()
serializer_class = serializers.ASNRangeSerializer
filterset_class = filtersets.ASNRangeFilterSet
class ASNViewSet(NetBoxModelViewSet):
queryset = ASN.objects.prefetch_related('tenant', 'rir').annotate(
site_count=count_related(Site, 'asns'),
@ -201,6 +207,74 @@ def get_results_limit(request):
return limit
class AvailableASNsView(ObjectValidationMixin, APIView):
queryset = ASN.objects.all()
@swagger_auto_schema(responses={200: serializers.AvailableASNSerializer(many=True)})
def get(self, request, pk):
asnrange = get_object_or_404(ASNRange.objects.restrict(request.user), pk=pk)
limit = get_results_limit(request)
available_asns = asnrange.get_available_asns()[:limit]
serializer = serializers.AvailableASNSerializer(available_asns, many=True, context={
'request': request,
'range': asnrange,
})
return Response(serializer.data)
@swagger_auto_schema(
request_body=serializers.AvailableASNSerializer,
responses={201: serializers.ASNSerializer(many=True)}
)
@advisory_lock(ADVISORY_LOCK_KEYS['available-asns'])
def post(self, request, pk):
self.queryset = self.queryset.restrict(request.user, 'add')
asnrange = get_object_or_404(ASNRange.objects.restrict(request.user), pk=pk)
# Normalize to a list of objects
requested_asns = request.data if isinstance(request.data, list) else [request.data]
# Determine if the requested number of IPs is available
available_asns = asnrange.get_available_asns()
if len(available_asns) < len(requested_asns):
return Response(
{
"detail": f"An insufficient number of ASNs are available within {asnrange} "
f"({len(requested_asns)} requested, {len(available_asns)} available)"
},
status=status.HTTP_409_CONFLICT
)
# Assign ASNs from the list of available IPs and copy VRF assignment from the parent
for i, requested_asn in enumerate(requested_asns):
requested_asn.update({
'rir': asnrange.rir.pk,
'range': asnrange.pk,
'asn': available_asns[i],
})
# Initialize the serializer with a list or a single object depending on what was requested
context = {'request': request}
if isinstance(request.data, list):
serializer = serializers.ASNSerializer(data=requested_asns, many=True, context=context)
else:
serializer = serializers.ASNSerializer(data=requested_asns[0], context=context)
# Create the new IP address(es)
if serializer.is_valid():
try:
with transaction.atomic():
created = serializer.save()
self._validate_objects(created)
except ObjectDoesNotExist:
raise PermissionDenied()
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class AvailablePrefixesView(ObjectValidationMixin, APIView):
queryset = Prefix.objects.all()

View File

@ -2,10 +2,6 @@ from django.db.models import Q
from .choices import FHRPGroupProtocolChoices, IPAddressRoleChoices
# BGP ASN bounds
BGP_ASN_MIN = 1
BGP_ASN_MAX = 2**32 - 1
#
# VRFs

View File

@ -1,10 +1,21 @@
from django.core.exceptions import ValidationError
from django.core.validators import MinValueValidator, MaxValueValidator
from django.db import models
from netaddr import AddrFormatError, IPNetwork
from . import lookups, validators
from .formfields import IPNetworkFormField
__all__ = (
'ASNField',
'IPAddressField',
'IPNetworkField',
)
# BGP ASN bounds
BGP_ASN_MIN = 1
BGP_ASN_MAX = 2**32 - 1
class BaseIPField(models.Field):
@ -93,3 +104,19 @@ IPAddressField.register_lookup(lookups.NetIn)
IPAddressField.register_lookup(lookups.NetHostContained)
IPAddressField.register_lookup(lookups.NetFamily)
IPAddressField.register_lookup(lookups.NetMaskLength)
class ASNField(models.BigIntegerField):
description = "32-bit ASN field"
default_validators = [
MinValueValidator(BGP_ASN_MIN),
MaxValueValidator(BGP_ASN_MAX),
]
def formfield(self, **kwargs):
defaults = {
'min_value': BGP_ASN_MIN,
'max_value': BGP_ASN_MAX,
}
defaults.update(**kwargs)
return super().formfield(**defaults)

View File

@ -20,6 +20,7 @@ from .models import *
__all__ = (
'AggregateFilterSet',
'ASNFilterSet',
'ASNRangeFilterSet',
'FHRPGroupAssignmentFilterSet',
'FHRPGroupFilterSet',
'IPAddressFilterSet',
@ -167,6 +168,29 @@ class AggregateFilterSet(NetBoxModelFilterSet, TenancyFilterSet):
return queryset.none()
class ASNRangeFilterSet(OrganizationalModelFilterSet, TenancyFilterSet):
rir_id = django_filters.ModelMultipleChoiceFilter(
queryset=RIR.objects.all(),
label=_('RIR (ID)'),
)
rir = django_filters.ModelMultipleChoiceFilter(
field_name='rir__slug',
queryset=RIR.objects.all(),
to_field_name='slug',
label=_('RIR (slug)'),
)
class Meta:
model = ASNRange
fields = ['id', 'name', 'start', 'end', 'description']
def search(self, queryset, name, value):
if not value.strip():
return queryset
qs_filter = Q(description__icontains=value)
return queryset.filter(qs_filter)
class ASNFilterSet(OrganizationalModelFilterSet, TenancyFilterSet):
rir_id = django_filters.ModelMultipleChoiceFilter(
queryset=RIR.objects.all(),

View File

@ -16,6 +16,7 @@ from utilities.forms import (
__all__ = (
'AggregateBulkEditForm',
'ASNBulkEditForm',
'ASNRangeBulkEditForm',
'FHRPGroupBulkEditForm',
'IPAddressBulkEditForm',
'IPRangeBulkEditForm',
@ -97,6 +98,28 @@ class RIRBulkEditForm(NetBoxModelBulkEditForm):
nullable_fields = ('is_private', 'description')
class ASNRangeBulkEditForm(NetBoxModelBulkEditForm):
rir = DynamicModelChoiceField(
queryset=RIR.objects.all(),
required=False,
label=_('RIR')
)
tenant = DynamicModelChoiceField(
queryset=Tenant.objects.all(),
required=False
)
description = forms.CharField(
max_length=200,
required=False
)
model = ASNRange
fieldsets = (
(None, ('rir', 'tenant', 'description')),
)
nullable_fields = ('description',)
class ASNBulkEditForm(NetBoxModelBulkEditForm):
sites = DynamicModelMultipleChoiceField(
queryset=Site.objects.all(),
@ -124,7 +147,7 @@ class ASNBulkEditForm(NetBoxModelBulkEditForm):
fieldsets = (
(None, ('sites', 'rir', 'tenant', 'description')),
)
nullable_fields = ('date_added', 'description', 'comments')
nullable_fields = ('tenant', 'description', 'comments')
class AggregateBulkEditForm(NetBoxModelBulkEditForm):

View File

@ -15,6 +15,7 @@ from virtualization.models import VirtualMachine, VMInterface
__all__ = (
'AggregateImportForm',
'ASNImportForm',
'ASNRangeImportForm',
'FHRPGroupImportForm',
'IPAddressImportForm',
'IPRangeImportForm',
@ -87,6 +88,24 @@ class AggregateImportForm(NetBoxModelImportForm):
fields = ('prefix', 'rir', 'tenant', 'date_added', 'description', 'comments', 'tags')
class ASNRangeImportForm(NetBoxModelImportForm):
rir = CSVModelChoiceField(
queryset=RIR.objects.all(),
to_field_name='name',
help_text=_('Assigned RIR')
)
tenant = CSVModelChoiceField(
queryset=Tenant.objects.all(),
required=False,
to_field_name='name',
help_text=_('Assigned tenant')
)
class Meta:
model = ASNRange
fields = ('name', 'slug', 'rir', 'start', 'end', 'tenant', 'description', 'tags')
class ASNImportForm(NetBoxModelImportForm):
rir = CSVModelChoiceField(
queryset=RIR.objects.all(),

View File

@ -17,6 +17,7 @@ from virtualization.models import VirtualMachine
__all__ = (
'AggregateFilterForm',
'ASNFilterForm',
'ASNRangeFilterForm',
'FHRPGroupFilterForm',
'IPAddressFilterForm',
'IPRangeFilterForm',
@ -114,6 +115,27 @@ class AggregateFilterForm(TenancyFilterForm, NetBoxModelFilterSetForm):
tag = TagFilterField(model)
class ASNRangeFilterForm(TenancyFilterForm, NetBoxModelFilterSetForm):
model = ASNRange
fieldsets = (
(None, ('q', 'filter_id', 'tag')),
('Range', ('rir_id', 'start', 'end')),
('Tenant', ('tenant_group_id', 'tenant_id')),
)
rir_id = DynamicModelMultipleChoiceField(
queryset=RIR.objects.all(),
required=False,
label=_('RIR')
)
start = forms.IntegerField(
required=False
)
end = forms.IntegerField(
required=False
)
tag = TagFilterField(model)
class ASNFilterForm(TenancyFilterForm, NetBoxModelFilterSetForm):
model = ASN
fieldsets = (

View File

@ -20,6 +20,7 @@ from virtualization.models import Cluster, ClusterGroup, VirtualMachine, VMInter
__all__ = (
'AggregateForm',
'ASNForm',
'ASNRangeForm',
'FHRPGroupForm',
'FHRPGroupAssignmentForm',
'IPAddressAssignForm',
@ -128,6 +129,24 @@ class AggregateForm(TenancyForm, NetBoxModelForm):
}
class ASNRangeForm(TenancyForm, NetBoxModelForm):
rir = DynamicModelChoiceField(
queryset=RIR.objects.all(),
label=_('RIR'),
)
slug = SlugField()
fieldsets = (
('ASN Range', ('name', 'slug', 'rir', 'start', 'end', 'description', 'tags')),
('Tenancy', ('tenant_group', 'tenant')),
)
class Meta:
model = ASNRange
fields = [
'name', 'slug', 'rir', 'start', 'end', 'tenant_group', 'tenant', 'description', 'tags'
]
class ASNForm(TenancyForm, NetBoxModelForm):
rir = DynamicModelChoiceField(
queryset=RIR.objects.all(),

View File

@ -8,6 +8,9 @@ class IPAMQuery(graphene.ObjectType):
asn = ObjectField(ASNType)
asn_list = ObjectListField(ASNType)
asn_range = ObjectField(ASNRangeType)
asn_range_list = ObjectListField(ASNRangeType)
aggregate = ObjectField(AggregateType)
aggregate_list = ObjectListField(AggregateType)

View File

@ -1,6 +1,5 @@
import graphene
from graphene_django import DjangoObjectType
from extras.graphql.mixins import ContactsMixin
from ipam import filtersets, models
from netbox.graphql.scalars import BigInt
@ -8,6 +7,7 @@ from netbox.graphql.types import BaseObjectType, OrganizationalObjectType, NetBo
__all__ = (
'ASNType',
'ASNRangeType',
'AggregateType',
'FHRPGroupType',
'FHRPGroupAssignmentType',
@ -36,6 +36,14 @@ class ASNType(NetBoxObjectType):
filterset_class = filtersets.ASNFilterSet
class ASNRangeType(NetBoxObjectType):
class Meta:
model = models.ASNRange
fields = '__all__'
filterset_class = filtersets.ASNRangeFilterSet
class AggregateType(NetBoxObjectType):
class Meta:

View File

@ -1,6 +1,4 @@
# Generated by Django 3.2.8 on 2021-11-02 16:16
import dcim.fields
import ipam.fields
from utilities.json import CustomFieldJSONEncoder
from django.db import migrations, models
import django.db.models.deletion
@ -23,7 +21,7 @@ class Migration(migrations.Migration):
('last_updated', models.DateTimeField(auto_now=True, null=True)),
('custom_field_data', models.JSONField(blank=True, default=dict, encoder=CustomFieldJSONEncoder)),
('id', models.BigAutoField(primary_key=True, serialize=False)),
('asn', dcim.fields.ASNField(unique=True)),
('asn', ipam.fields.ASNField(unique=True)),
('description', models.CharField(blank=True, max_length=200)),
('rir', models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, related_name='asns', to='ipam.rir')),
('tags', taggit.managers.TaggableManager(through='extras.TaggedItem', to='extras.Tag')),

View File

@ -0,0 +1,41 @@
# Generated by Django 4.1.7 on 2023-02-26 19:33
from django.db import migrations, models
import django.db.models.deletion
import ipam.fields
import taggit.managers
import utilities.json
class Migration(migrations.Migration):
dependencies = [
('tenancy', '0009_standardize_description_comments'),
('extras', '0087_dashboard'),
('ipam', '0063_standardize_description_comments'),
]
operations = [
migrations.CreateModel(
name='ASNRange',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False)),
('created', models.DateTimeField(auto_now_add=True, null=True)),
('last_updated', models.DateTimeField(auto_now=True, null=True)),
('custom_field_data', models.JSONField(blank=True, default=dict, encoder=utilities.json.CustomFieldJSONEncoder)),
('description', models.CharField(blank=True, max_length=200)),
('name', models.CharField(max_length=100, unique=True)),
('slug', models.SlugField(max_length=100, unique=True)),
('start', ipam.fields.ASNField()),
('end', ipam.fields.ASNField()),
('rir', models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, related_name='asn_ranges', to='ipam.rir')),
('tags', taggit.managers.TaggableManager(through='extras.TaggedItem', to='extras.Tag')),
('tenant', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.PROTECT, related_name='asn_ranges', to='tenancy.tenant')),
],
options={
'verbose_name': 'ASN range',
'verbose_name_plural': 'ASN ranges',
'ordering': ('name',),
},
),
]

View File

@ -1,4 +1,5 @@
# Ensure that VRFs are imported before IPs/prefixes so dumpdata & loaddata work correctly
from .asns import *
from .fhrp import *
from .vrfs import *
from .ip import *
@ -8,6 +9,7 @@ from .vlans import *
__all__ = (
'ASN',
'ASNRange',
'Aggregate',
'IPAddress',
'IPRange',

137
netbox/ipam/models/asns.py Normal file
View File

@ -0,0 +1,137 @@
from django.core.exceptions import ValidationError
from django.db import models
from django.urls import reverse
from django.utils.translation import gettext as _
from ipam.fields import ASNField
from netbox.models import OrganizationalModel, PrimaryModel
__all__ = (
'ASN',
'ASNRange',
)
class ASNRange(OrganizationalModel):
name = models.CharField(
max_length=100,
unique=True
)
slug = models.SlugField(
max_length=100,
unique=True
)
rir = models.ForeignKey(
to='ipam.RIR',
on_delete=models.PROTECT,
related_name='asn_ranges',
verbose_name='RIR'
)
start = ASNField()
end = ASNField()
tenant = models.ForeignKey(
to='tenancy.Tenant',
on_delete=models.PROTECT,
related_name='asn_ranges',
blank=True,
null=True
)
class Meta:
ordering = ('name',)
verbose_name = 'ASN range'
verbose_name_plural = 'ASN ranges'
def __str__(self):
return f'{self.name} ({self.range_as_string()})'
def get_absolute_url(self):
return reverse('ipam:asnrange', args=[self.pk])
@property
def range(self):
return range(self.start, self.end + 1)
def range_as_string(self):
return f'{self.start}-{self.end}'
def clean(self):
super().clean()
if self.end <= self.start:
raise ValidationError(f"Starting ASN ({self.start}) must be lower than ending ASN ({self.end}).")
def get_child_asns(self):
return ASN.objects.filter(
asn__gte=self.start,
asn__lte=self.end
)
def get_available_asns(self):
"""
Return all available ASNs within this range.
"""
range = set(self.range)
existing_asns = set(self.get_child_asns().values_list('asn', flat=True))
available_asns = sorted(range - existing_asns)
return available_asns
class ASN(PrimaryModel):
"""
An autonomous system (AS) number is typically used to represent an independent routing domain. A site can have
one or more ASNs assigned to it.
"""
rir = models.ForeignKey(
to='ipam.RIR',
on_delete=models.PROTECT,
related_name='asns',
verbose_name='RIR'
)
asn = ASNField(
unique=True,
verbose_name='ASN',
help_text=_('32-bit autonomous system number')
)
tenant = models.ForeignKey(
to='tenancy.Tenant',
on_delete=models.PROTECT,
related_name='asns',
blank=True,
null=True
)
prerequisite_models = (
'ipam.RIR',
)
class Meta:
ordering = ['asn']
verbose_name = 'ASN'
verbose_name_plural = 'ASNs'
def __str__(self):
return f'AS{self.asn_with_asdot}'
def get_absolute_url(self):
return reverse('ipam:asn', args=[self.pk])
@property
def asn_asdot(self):
"""
Return ASDOT notation for AS numbers greater than 16 bits.
"""
if self.asn > 65535:
return f'{self.asn // 65536}.{self.asn % 65536}'
return self.asn
@property
def asn_with_asdot(self):
"""
Return both plain and ASDOT notation, where applicable.
"""
if self.asn > 65535:
return f'{self.asn} ({self.asn // 65536}.{self.asn % 65536})'
else:
return self.asn

View File

@ -8,7 +8,6 @@ from django.urls import reverse
from django.utils.functional import cached_property
from django.utils.translation import gettext as _
from dcim.fields import ASNField
from ipam.choices import *
from ipam.constants import *
from ipam.fields import IPNetworkField, IPAddressField
@ -20,7 +19,6 @@ from netbox.models import OrganizationalModel, PrimaryModel
__all__ = (
'Aggregate',
'ASN',
'IPAddress',
'IPRange',
'Prefix',
@ -74,65 +72,6 @@ class RIR(OrganizationalModel):
return reverse('ipam:rir', args=[self.pk])
class ASN(PrimaryModel):
"""
An autonomous system (AS) number is typically used to represent an independent routing domain. A site can have
one or more ASNs assigned to it.
"""
asn = ASNField(
unique=True,
verbose_name='ASN',
help_text=_('32-bit autonomous system number')
)
rir = models.ForeignKey(
to='ipam.RIR',
on_delete=models.PROTECT,
related_name='asns',
verbose_name='RIR'
)
tenant = models.ForeignKey(
to='tenancy.Tenant',
on_delete=models.PROTECT,
related_name='asns',
blank=True,
null=True
)
prerequisite_models = (
'ipam.RIR',
)
class Meta:
ordering = ['asn']
verbose_name = 'ASN'
verbose_name_plural = 'ASNs'
def __str__(self):
return f'AS{self.asn_with_asdot}'
def get_absolute_url(self):
return reverse('ipam:asn', args=[self.pk])
@property
def asn_asdot(self):
"""
Return ASDOT notation for AS numbers greater than 16 bits.
"""
if self.asn > 65535:
return f'{self.asn // 65536}.{self.asn % 65536}'
return self.asn
@property
def asn_with_asdot(self):
"""
Return both plain and ASDOT notation, where applicable.
"""
if self.asn > 65535:
return f'{self.asn} ({self.asn // 65536}.{self.asn % 65536})'
else:
return self.asn
class Aggregate(GetAvailablePrefixesMixin, PrimaryModel):
"""
An aggregate exists at the root level of the IP address space hierarchy in NetBox. Aggregates are used to organize

View File

@ -22,6 +22,14 @@ class ASNIndex(SearchIndex):
)
@register_search
class ASNRangeIndex(SearchIndex):
model = models.ASNRange
fields = (
('description', 500),
)
@register_search
class FHRPGroupIndex(SearchIndex):
model = models.FHRPGroup

View File

@ -1,3 +1,4 @@
from .asn import *
from .fhrp import *
from .ip import *
from .l2vpn import *

77
netbox/ipam/tables/asn.py Normal file
View File

@ -0,0 +1,77 @@
import django_tables2 as tables
from django.utils.translation import gettext as _
from ipam.models import *
from netbox.tables import NetBoxTable, columns
from tenancy.tables import TenancyColumnsMixin
__all__ = (
'ASNTable',
'ASNRangeTable',
)
class ASNRangeTable(TenancyColumnsMixin, NetBoxTable):
name = tables.Column(
linkify=True
)
rir = tables.Column(
linkify=True
)
tags = columns.TagColumn(
url_name='ipam:asnrange_list'
)
asn_count = columns.LinkedCountColumn(
viewname='ipam:asn_list',
url_params={'asn_id': 'pk'},
verbose_name=_('ASN Count')
)
class Meta(NetBoxTable.Meta):
model = ASNRange
fields = (
'pk', 'name', 'slug', 'rir', 'start', 'end', 'asn_count', 'tenant', 'tenant_group', 'description', 'tags',
'created', 'last_updated', 'actions',
)
default_columns = ('pk', 'name', 'rir', 'start', 'end', 'tenant', 'asn_count', 'description')
class ASNTable(TenancyColumnsMixin, NetBoxTable):
asn = tables.Column(
linkify=True
)
rir = tables.Column(
linkify=True
)
asn_asdot = tables.Column(
accessor=tables.A('asn_asdot'),
linkify=True,
verbose_name=_('ASDOT')
)
site_count = columns.LinkedCountColumn(
viewname='dcim:site_list',
url_params={'asn_id': 'pk'},
verbose_name=_('Site Count')
)
provider_count = columns.LinkedCountColumn(
viewname='circuits:provider_list',
url_params={'asn_id': 'pk'},
verbose_name=_('Provider Count')
)
sites = columns.ManyToManyColumn(
linkify_item=True
)
comments = columns.MarkdownColumn()
tags = columns.TagColumn(
url_name='ipam:asn_list'
)
class Meta(NetBoxTable.Meta):
model = ASN
fields = (
'pk', 'asn', 'asn_asdot', 'rir', 'site_count', 'provider_count', 'tenant', 'tenant_group', 'description',
'comments', 'sites', 'tags', 'created', 'last_updated', 'actions',
)
default_columns = (
'pk', 'asn', 'rir', 'site_count', 'provider_count', 'sites', 'description', 'tenant',
)

View File

@ -8,7 +8,6 @@ from tenancy.tables import TenancyColumnsMixin, TenantColumn
__all__ = (
'AggregateTable',
'ASNTable',
'AssignedIPAddressesTable',
'IPAddressAssignTable',
'IPAddressTable',
@ -93,47 +92,6 @@ class RIRTable(NetBoxTable):
default_columns = ('pk', 'name', 'is_private', 'aggregate_count', 'description')
#
# ASNs
#
class ASNTable(TenancyColumnsMixin, NetBoxTable):
asn = tables.Column(
linkify=True
)
asn_asdot = tables.Column(
accessor=tables.A('asn_asdot'),
linkify=True,
verbose_name='ASDOT'
)
site_count = columns.LinkedCountColumn(
viewname='dcim:site_list',
url_params={'asn_id': 'pk'},
verbose_name='Site Count'
)
provider_count = columns.LinkedCountColumn(
viewname='circuits:provider_list',
url_params={'asn_id': 'pk'},
verbose_name='Provider Count'
)
sites = columns.ManyToManyColumn(
linkify_item=True,
verbose_name='Sites'
)
comments = columns.MarkdownColumn()
tags = columns.TagColumn(
url_name='ipam:asn_list'
)
class Meta(NetBoxTable.Meta):
model = ASN
fields = (
'pk', 'asn', 'asn_asdot', 'rir', 'site_count', 'provider_count', 'tenant', 'tenant_group', 'description',
'comments', 'sites', 'tags', 'created', 'last_updated', 'actions',
)
default_columns = ('pk', 'asn', 'rir', 'site_count', 'provider_count', 'sites', 'description', 'tenant')
#
# Aggregates
#

View File

@ -21,6 +21,118 @@ class AppTest(APITestCase):
self.assertEqual(response.status_code, 200)
class ASNRangeTest(APIViewTestCases.APIViewTestCase):
model = ASNRange
brief_fields = ['display', 'id', 'name', 'url']
bulk_update_data = {
'description': 'New description',
}
@classmethod
def setUpTestData(cls):
rirs = (
RIR(name='RIR 1', slug='rir-1', is_private=True),
RIR(name='RIR 2', slug='rir-2', is_private=True),
)
RIR.objects.bulk_create(rirs)
tenants = (
Tenant(name='Tenant 1', slug='tenant-1'),
Tenant(name='Tenant 2', slug='tenant-2'),
)
Tenant.objects.bulk_create(tenants)
asn_ranges = (
ASNRange(name='ASN Range 1', slug='asn-range-1', rir=rirs[0], tenant=tenants[0], start=100, end=199),
ASNRange(name='ASN Range 2', slug='asn-range-2', rir=rirs[0], tenant=tenants[0], start=200, end=299),
ASNRange(name='ASN Range 3', slug='asn-range-3', rir=rirs[0], tenant=tenants[0], start=300, end=399),
)
ASNRange.objects.bulk_create(asn_ranges)
cls.create_data = [
{
'name': 'ASN Range 4',
'slug': 'asn-range-4',
'rir': rirs[1].pk,
'start': 400,
'end': 499,
'tenant': tenants[1].pk,
},
{
'name': 'ASN Range 5',
'slug': 'asn-range-5',
'rir': rirs[1].pk,
'start': 500,
'end': 599,
'tenant': tenants[1].pk,
},
{
'name': 'ASN Range 6',
'slug': 'asn-range-6',
'rir': rirs[1].pk,
'start': 600,
'end': 699,
'tenant': tenants[1].pk,
},
]
def test_list_available_asns(self):
"""
Test retrieval of all available ASNs within a parent range.
"""
rir = RIR.objects.first()
asnrange = ASNRange.objects.create(name='Range 1', slug='range-1', rir=rir, start=101, end=110)
url = reverse('ipam-api:asnrange-available-asns', kwargs={'pk': asnrange.pk})
self.add_permissions('ipam.view_asnrange', 'ipam.view_asn')
response = self.client.get(url, **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(len(response.data), 10)
def test_create_single_available_asn(self):
"""
Test creation of the first available ASN within a range.
"""
rir = RIR.objects.first()
asnrange = ASNRange.objects.create(name='Range 1', slug='range-1', rir=rir, start=101, end=110)
url = reverse('ipam-api:asnrange-available-asns', kwargs={'pk': asnrange.pk})
self.add_permissions('ipam.view_asnrange', 'ipam.add_asn')
data = {
'description': 'New ASN'
}
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(response.data['rir']['id'], asnrange.rir.pk)
self.assertEqual(response.data['description'], data['description'])
def test_create_multiple_available_asns(self):
"""
Test the creation of several available ASNs within a parent range.
"""
rir = RIR.objects.first()
asnrange = ASNRange.objects.create(name='Range 1', slug='range-1', rir=rir, start=101, end=110)
url = reverse('ipam-api:asnrange-available-asns', kwargs={'pk': asnrange.pk})
self.add_permissions('ipam.view_asnrange', 'ipam.add_asn')
# Try to create eleven ASNs (only ten are available)
data = [
{'description': f'New ASN {i}'}
for i in range(1, 12)
]
assert len(data) == 11
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_409_CONFLICT)
self.assertIn('detail', response.data)
# Create all ten available ASNs in a single request
data.pop()
assert len(data) == 10
response = self.client.post(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_201_CREATED)
self.assertEqual(len(response.data), 10)
class ASNTest(APIViewTestCases.APIViewTestCase):
model = ASN
brief_fields = ['asn', 'display', 'id', 'url']
@ -30,25 +142,29 @@ class ASNTest(APIViewTestCases.APIViewTestCase):
@classmethod
def setUpTestData(cls):
rirs = (
RIR(name='RIR 1', slug='rir-1', is_private=True),
RIR(name='RIR 2', slug='rir-2', is_private=True),
)
RIR.objects.bulk_create(rirs)
rirs = [
RIR.objects.create(name='RFC 6996', slug='rfc-6996', description='Private Use', is_private=True),
RIR.objects.create(name='RFC 7300', slug='rfc-7300', description='IANA Use', is_private=True),
]
sites = [
Site.objects.create(name='Site 1', slug='site-1'),
Site.objects.create(name='Site 2', slug='site-2')
]
tenants = [
Tenant.objects.create(name='Tenant 1', slug='tenant-1'),
Tenant.objects.create(name='Tenant 2', slug='tenant-2'),
]
sites = (
Site(name='Site 1', slug='site-1'),
Site(name='Site 2', slug='site-2')
)
Site.objects.bulk_create(sites)
tenants = (
Tenant(name='Tenant 1', slug='tenant-1'),
Tenant(name='Tenant 2', slug='tenant-2'),
)
Tenant.objects.bulk_create(tenants)
asns = (
ASN(asn=64513, rir=rirs[0], tenant=tenants[0]),
ASN(asn=65534, rir=rirs[0], tenant=tenants[1]),
ASN(asn=4200000000, rir=rirs[0], tenant=tenants[0]),
ASN(asn=4200002301, rir=rirs[1], tenant=tenants[1]),
ASN(asn=65000, rir=rirs[0], tenant=tenants[0]),
ASN(asn=65001, rir=rirs[0], tenant=tenants[1]),
ASN(asn=4200000000, rir=rirs[1], tenant=tenants[0]),
ASN(asn=4200000001, rir=rirs[1], tenant=tenants[1]),
)
ASN.objects.bulk_create(asns)
@ -63,12 +179,12 @@ class ASNTest(APIViewTestCases.APIViewTestCase):
'rir': rirs[0].pk,
},
{
'asn': 65543,
'asn': 65002,
'rir': rirs[0].pk,
},
{
'asn': 4294967294,
'rir': rirs[0].pk,
'asn': 4200000002,
'rir': rirs[1].pk,
},
]

View File

@ -12,84 +12,160 @@ from virtualization.models import Cluster, ClusterGroup, ClusterType, VirtualMac
from tenancy.models import Tenant, TenantGroup
class ASNRangeTestCase(TestCase, ChangeLoggedFilterSetTests):
queryset = ASNRange.objects.all()
filterset = ASNRangeFilterSet
@classmethod
def setUpTestData(cls):
rirs = [
RIR(name='RIR 1', slug='rir-1'),
RIR(name='RIR 2', slug='rir-2'),
RIR(name='RIR 3', slug='rir-3'),
]
RIR.objects.bulk_create(rirs)
tenants = [
Tenant(name='Tenant 1', slug='tenant-1'),
Tenant(name='Tenant 2', slug='tenant-2'),
]
Tenant.objects.bulk_create(tenants)
asn_ranges = (
ASNRange(
name='ASN Range 1',
slug='asn-range-1',
rir=rirs[0],
tenant=None,
start=65000,
end=65009,
description='aaa'
),
ASNRange(
name='ASN Range 2',
slug='asn-range-2',
rir=rirs[1],
tenant=tenants[0],
start=65010,
end=65019,
description='bbb'
),
ASNRange(
name='ASN Range 3',
slug='asn-range-3',
rir=rirs[2],
tenant=tenants[1],
start=65020,
end=65029,
description='ccc'
),
)
ASNRange.objects.bulk_create(asn_ranges)
def test_name(self):
params = {'name': ['ASN Range 1', 'ASN Range 2']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_rir(self):
rirs = RIR.objects.all()[:2]
params = {'rir_id': [rirs[0].pk, rirs[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'rir': [rirs[0].slug, rirs[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_tenant(self):
tenants = Tenant.objects.all()[:2]
params = {'tenant_id': [tenants[0].pk, tenants[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
params = {'tenant': [tenants[0].slug, tenants[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_start(self):
params = {'start': [65000, 65010]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_end(self):
params = {'end': [65009, 65019]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_description(self):
params = {'description': ['aaa', 'bbb']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
class ASNTestCase(TestCase, ChangeLoggedFilterSetTests):
queryset = ASN.objects.all()
filterset = ASNFilterSet
@classmethod
def setUpTestData(cls):
rirs = [
RIR.objects.create(name='RFC 6996', slug='rfc-6996', description='Private Use', is_private=True),
RIR.objects.create(name='RFC 7300', slug='rfc-7300', description='IANA Use', is_private=True),
RIR(name='RIR 1', slug='rir-1', is_private=True),
RIR(name='RIR 2', slug='rir-2', is_private=True),
RIR(name='RIR 3', slug='rir-3', is_private=True),
]
RIR.objects.bulk_create(rirs)
sites = [
Site.objects.create(name='Site 1', slug='site-1'),
Site.objects.create(name='Site 2', slug='site-2'),
Site.objects.create(name='Site 3', slug='site-3')
Site(name='Site 1', slug='site-1'),
Site(name='Site 2', slug='site-2'),
Site(name='Site 3', slug='site-3')
]
Site.objects.bulk_create(sites)
tenants = [
Tenant.objects.create(name='Tenant 1', slug='tenant-1'),
Tenant.objects.create(name='Tenant 2', slug='tenant-2'),
Tenant.objects.create(name='Tenant 3', slug='tenant-3'),
Tenant.objects.create(name='Tenant 4', slug='tenant-4'),
Tenant.objects.create(name='Tenant 5', slug='tenant-5'),
Tenant(name='Tenant 1', slug='tenant-1'),
Tenant(name='Tenant 2', slug='tenant-2'),
Tenant(name='Tenant 3', slug='tenant-3'),
Tenant(name='Tenant 4', slug='tenant-4'),
Tenant(name='Tenant 5', slug='tenant-5'),
]
Tenant.objects.bulk_create(tenants)
asns = (
ASN(asn=64512, rir=rirs[0], tenant=tenants[0], description='foobar1'),
ASN(asn=64513, rir=rirs[0], tenant=tenants[0], description='foobar2'),
ASN(asn=64514, rir=rirs[0], tenant=tenants[1]),
ASN(asn=64515, rir=rirs[0], tenant=tenants[2]),
ASN(asn=64516, rir=rirs[0], tenant=tenants[3]),
ASN(asn=65535, rir=rirs[1], tenant=tenants[4]),
ASN(asn=65001, rir=rirs[0], tenant=tenants[0], description='aaa'),
ASN(asn=65002, rir=rirs[1], tenant=tenants[1], description='bbb'),
ASN(asn=65003, rir=rirs[2], tenant=tenants[2], description='ccc'),
ASN(asn=4200000000, rir=rirs[0], tenant=tenants[0]),
ASN(asn=4200000001, rir=rirs[0], tenant=tenants[1]),
ASN(asn=4200000002, rir=rirs[0], tenant=tenants[2]),
ASN(asn=4200000003, rir=rirs[0], tenant=tenants[3]),
ASN(asn=4200002301, rir=rirs[1], tenant=tenants[4]),
ASN(asn=4200000001, rir=rirs[1], tenant=tenants[1]),
ASN(asn=4200000002, rir=rirs[2], tenant=tenants[2]),
)
ASN.objects.bulk_create(asns)
asns[0].sites.set([sites[0]])
asns[1].sites.set([sites[0]])
asns[2].sites.set([sites[1]])
asns[3].sites.set([sites[2]])
asns[4].sites.set([sites[0]])
asns[5].sites.set([sites[1]])
asns[6].sites.set([sites[0]])
asns[7].sites.set([sites[1]])
asns[8].sites.set([sites[2]])
asns[9].sites.set([sites[0]])
asns[10].sites.set([sites[1]])
asns[1].sites.set([sites[1]])
asns[2].sites.set([sites[2]])
asns[3].sites.set([sites[0]])
asns[4].sites.set([sites[1]])
asns[5].sites.set([sites[2]])
def test_asn(self):
params = {'asn': ['64512', '65535']}
params = {'asn': [65001, 4200000000]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_tenant(self):
tenants = Tenant.objects.all()[:2]
params = {'tenant_id': [tenants[0].pk, tenants[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 5)
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'tenant': [tenants[0].slug, tenants[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 5)
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_rir(self):
rirs = RIR.objects.all()[:1]
params = {'rir_id': [rirs[0].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 9)
params = {'rir': [rirs[0].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 9)
rirs = RIR.objects.all()[:2]
params = {'rir_id': [rirs[0].pk, rirs[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'rir': [rirs[0].slug, rirs[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_site(self):
sites = Site.objects.all()[:2]
params = {'site_id': [sites[0].pk, sites[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 9)
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'site': [sites[0].slug, sites[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 9)
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_description(self):
params = {'description': ['foobar1', 'foobar2']}
params = {'description': ['aaa', 'bbb']}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)

View File

@ -11,30 +11,91 @@ from tenancy.models import Tenant
from utilities.testing import ViewTestCases, create_test_device, create_tags
class ASNRangeTestCase(ViewTestCases.PrimaryObjectViewTestCase):
model = ASNRange
@classmethod
def setUpTestData(cls):
rirs = [
RIR(name='RIR 1', slug='rir-1', is_private=True),
RIR(name='RIR 2', slug='rir-2', is_private=True),
]
RIR.objects.bulk_create(rirs)
tenants = [
Tenant(name='Tenant 1', slug='tenant-1'),
Tenant(name='Tenant 2', slug='tenant-2'),
]
Tenant.objects.bulk_create(tenants)
asn_ranges = (
ASNRange(name='ASN Range 1', slug='asn-range-1', rir=rirs[0], tenant=tenants[0], start=100, end=199),
ASNRange(name='ASN Range 2', slug='asn-range-2', rir=rirs[0], tenant=tenants[0], start=200, end=299),
ASNRange(name='ASN Range 3', slug='asn-range-3', rir=rirs[0], tenant=tenants[0], start=300, end=399),
)
ASNRange.objects.bulk_create(asn_ranges)
tags = create_tags('Alpha', 'Bravo', 'Charlie')
cls.form_data = {
'name': 'ASN Range X',
'slug': 'asn-range-x',
'rir': rirs[1].pk,
'tenant': tenants[1].pk,
'start': 1000,
'end': 1099,
'description': 'A new ASN range',
'tags': [t.pk for t in tags],
}
cls.csv_data = (
f"name,slug,rir,tenant,start,end,description",
f"ASN Range 4,asn-range-4,{rirs[1].name},{tenants[1].name},400,499,Fourth range",
f"ASN Range 5,asn-range-5,{rirs[1].name},{tenants[1].name},500,599,Fifth range",
f"ASN Range 6,asn-range-6,{rirs[1].name},{tenants[1].name},600,699,Sixth range",
)
cls.csv_update_data = (
"id,description",
f"{asn_ranges[0].pk},New description 1",
f"{asn_ranges[1].pk},New description 2",
f"{asn_ranges[2].pk},New description 3",
)
cls.bulk_edit_data = {
'rir': rirs[1].pk,
'description': 'Next description',
}
class ASNTestCase(ViewTestCases.PrimaryObjectViewTestCase):
model = ASN
@classmethod
def setUpTestData(cls):
rirs = [
RIR.objects.create(name='RFC 6996', slug='rfc-6996', description='Private Use', is_private=True),
RIR.objects.create(name='RFC 7300', slug='rfc-7300', description='IANA Use', is_private=True),
]
sites = [
Site.objects.create(name='Site 1', slug='site-1'),
Site.objects.create(name='Site 2', slug='site-2')
]
tenants = [
Tenant.objects.create(name='Tenant 1', slug='tenant-1'),
Tenant.objects.create(name='Tenant 2', slug='tenant-2'),
RIR(name='RIR 1', slug='rir-1', is_private=True),
RIR(name='RIR 2', slug='rir-2', is_private=True),
]
RIR.objects.bulk_create(rirs)
sites = (
Site(name='Site 1', slug='site-1'),
Site(name='Site 2', slug='site-2')
)
Site.objects.bulk_create(sites)
tenants = (
Tenant(name='Tenant 1', slug='tenant-1'),
Tenant(name='Tenant 2', slug='tenant-2'),
)
Tenant.objects.bulk_create(tenants)
asns = (
ASN(asn=64513, rir=rirs[0], tenant=tenants[0]),
ASN(asn=65535, rir=rirs[1], tenant=tenants[1]),
ASN(asn=4200000000, rir=rirs[0], tenant=tenants[0]),
ASN(asn=4200002301, rir=rirs[1], tenant=tenants[1]),
ASN(asn=65001, rir=rirs[0], tenant=tenants[0]),
ASN(asn=65002, rir=rirs[1], tenant=tenants[1]),
ASN(asn=4200000001, rir=rirs[0], tenant=tenants[0]),
ASN(asn=4200000002, rir=rirs[1], tenant=tenants[1]),
)
ASN.objects.bulk_create(asns)
@ -46,18 +107,20 @@ class ASNTestCase(ViewTestCases.PrimaryObjectViewTestCase):
tags = create_tags('Alpha', 'Bravo', 'Charlie')
cls.form_data = {
'asn': 64512,
'asn': 65000,
'rir': rirs[0].pk,
'tenant': tenants[0].pk,
'site': sites[0].pk,
'description': 'A new ASN',
'tags': [t.pk for t in tags],
}
cls.csv_data = (
"asn,rir",
"64533,RFC 6996",
"64523,RFC 6996",
"4200000002,RFC 6996",
"65003,RIR 1",
"65004,RIR 2",
"4200000003,RIR 1",
"4200000004,RIR 2",
)
cls.csv_update_data = (

View File

@ -6,6 +6,14 @@ from . import views
app_name = 'ipam'
urlpatterns = [
# ASN ranges
path('asn-ranges/', views.ASNRangeListView.as_view(), name='asnrange_list'),
path('asn-ranges/add/', views.ASNRangeEditView.as_view(), name='asnrange_add'),
path('asn-ranges/import/', views.ASNRangeBulkImportView.as_view(), name='asnrange_import'),
path('asn-ranges/edit/', views.ASNRangeBulkEditView.as_view(), name='asnrange_bulk_edit'),
path('asn-ranges/delete/', views.ASNRangeBulkDeleteView.as_view(), name='asnrange_bulk_delete'),
path('asn-ranges/<int:pk>/', include(get_model_urls('ipam', 'asnrange'))),
# ASNs
path('asns/', views.ASNListView.as_view(), name='asn_list'),
path('asns/add/', views.ASNEditView.as_view(), name='asn_add'),

View File

@ -1,7 +1,7 @@
import netaddr
from .constants import *
from .models import Prefix, VLAN
from .models import ASN, Prefix, VLAN
def add_requested_prefixes(parent, prefix_list, show_available=True, show_assigned=True):

View File

@ -7,16 +7,15 @@ from django.utils.translation import gettext as _
from circuits.models import Provider
from dcim.filtersets import InterfaceFilterSet
from dcim.models import Interface, Site, Device
from dcim.models import Interface, Site
from netbox.views import generic
from utilities.utils import count_related
from utilities.views import ViewTab, register_model_view
from virtualization.filtersets import VMInterfaceFilterSet
from virtualization.models import VMInterface, VirtualMachine
from virtualization.models import VMInterface
from . import filtersets, forms, tables
from .constants import *
from .models import *
from .models import ASN
from .tables.l2vpn import L2VPNTable, L2VPNTerminationTable
from .utils import add_requested_prefixes, add_available_ipaddresses, add_available_vlans
@ -195,6 +194,77 @@ class RIRBulkDeleteView(generic.BulkDeleteView):
table = tables.RIRTable
#
# ASN ranges
#
class ASNRangeListView(generic.ObjectListView):
queryset = ASNRange.objects.all()
filterset = filtersets.ASNRangeFilterSet
filterset_form = forms.ASNRangeFilterForm
table = tables.ASNRangeTable
@register_model_view(ASNRange)
class ASNRangeView(generic.ObjectView):
queryset = ASNRange.objects.all()
@register_model_view(ASNRange, 'asns')
class ASNRangeASNsView(generic.ObjectChildrenView):
queryset = ASNRange.objects.all()
child_model = ASN
table = tables.ASNTable
filterset = filtersets.ASNFilterSet
template_name = 'ipam/asnrange/asns.html'
tab = ViewTab(
label=_('ASNs'),
badge=lambda x: x.get_child_asns().count(),
permission='ipam.view_asns',
weight=500
)
def get_children(self, request, parent):
return parent.get_child_asns().restrict(request.user, 'view').annotate(
site_count=count_related(Site, 'asns'),
provider_count=count_related(Provider, 'asns')
)
@register_model_view(ASNRange, 'edit')
class ASNRangeEditView(generic.ObjectEditView):
queryset = ASNRange.objects.all()
form = forms.ASNRangeForm
@register_model_view(ASNRange, 'delete')
class ASNRangeDeleteView(generic.ObjectDeleteView):
queryset = ASNRange.objects.all()
class ASNRangeBulkImportView(generic.BulkImportView):
queryset = ASNRange.objects.all()
model_form = forms.ASNRangeImportForm
table = tables.ASNRangeTable
class ASNRangeBulkEditView(generic.BulkEditView):
queryset = ASNRange.objects.annotate(
site_count=count_related(Site, 'asns')
)
filterset = filtersets.ASNRangeFilterSet
table = tables.ASNRangeTable
form = forms.ASNRangeBulkEditForm
class ASNRangeBulkDeleteView(generic.BulkDeleteView):
queryset = ASNRange.objects.annotate(
site_count=count_related(Site, 'asns')
)
filterset = filtersets.ASNRangeFilterSet
table = tables.ASNRangeTable
#
# ASNs
#

View File

@ -158,6 +158,7 @@ IPAM_MENU = Menu(
MenuGroup(
label=_('ASNs'),
items=(
get_model_item('ipam', 'asnrange', _('ASN Ranges')),
get_model_item('ipam', 'asn', _('ASNs')),
),
),

View File

@ -7,6 +7,9 @@
{% block breadcrumbs %}
{{ block.super }}
<li class="breadcrumb-item"><a href="{% url 'ipam:asn_list' %}?rir_id={{ object.rir.pk }}">{{ object.rir }}</a></li>
{% if object.range %}
<li class="breadcrumb-item"><a href="{% url 'ipam:asn_list' %}?range_id={{ object.range.pk }}">{{ object.range }}</a></li>
{% endif %}
{% endblock breadcrumbs %}
{% block content %}

View File

@ -0,0 +1,57 @@
{% extends 'ipam/asnrange/base.html' %}
{% load buttons %}
{% load helpers %}
{% load plugins %}
{% load render_table from django_tables2 %}
{% block content %}
<div class="row">
<div class="col col-md-6">
<div class="card">
<h5 class="card-header">ASN Range</h5>
<div class="card-body">
<table class="table table-hover attr-table">
<tr>
<td>Name</td>
<td>{{ object.name }}</td>
</tr>
<tr>
<td>RIR</td>
<td>
<a href="{% url 'ipam:asnrange_list' %}?rir={{ object.rir.slug }}">{{ object.rir }}</a>
</td>
</tr>
<tr>
<td>Range</td>
<td>{{ object.range_as_string }}</td>
</tr>
<tr>
<td>Tenant</td>
<td>
{% if object.tenant.group %}
{{ object.tenant.group|linkify }} /
{% endif %}
{{ object.tenant|linkify|placeholder }}
</td>
</tr>
<tr>
<td>Description</td>
<td>{{ object.description|placeholder }}</td>
</tr>
</table>
</div>
</div>
{% plugin_left_page object %}
{% include 'inc/panels/tags.html' %}
</div>
<div class="col col-md-6">
{% include 'inc/panels/custom_fields.html' %}
{% plugin_right_page object %}
</div>
</div>
<div class="row">
<div class="col col-md-12">
{% plugin_full_width_page object %}
</div>
</div>
{% endblock content %}

View File

@ -0,0 +1,36 @@
{% extends 'ipam/asnrange/base.html' %}
{% load helpers %}
{% block content %}
{% include 'inc/table_controls_htmx.html' with table_modal="ASNTable_config" %}
<form method="post">
{% csrf_token %}
<div class="card">
<div class="card-body htmx-container table-responsive" id="object_list">
{% include 'htmx/table.html' %}
</div>
</div>
<div class="noprint bulk-buttons">
<div class="bulk-button-group">
{% if 'bulk_edit' in actions %}
<button type="submit" name="_edit" formaction="{% url 'ipam:asn_bulk_edit' %}?return_url={% url 'ipam:asnrange_asns' pk=object.pk %}" class="btn btn-warning btn-sm">
<i class="mdi mdi-pencil" aria-hidden="true"></i> Edit
</button>
{% endif %}
{% if 'bulk_delete' in actions %}
<button type="submit" name="_delete" formaction="{% url 'ipam:asn_bulk_delete' %}?return_url={% url 'ipam:asnrange_asns' pk=object.pk %}" class="btn btn-danger btn-sm">
<i class="mdi mdi-trash-can-outline" aria-hidden="true"></i> Delete
</button>
{% endif %}
</div>
</div>
</form>
{% endblock %}
{% block modals %}
{{ block.super }}
{% table_config_form table %}
{% endblock modals %}

View File

@ -0,0 +1,6 @@
{% extends 'generic/object.html' %}
{% block breadcrumbs %}
{{ block.super }}
<li class="breadcrumb-item"><a href="{% url 'ipam:asnrange_list' %}?rir_id={{ object.rir.pk }}">{{ object.rir }}</a></li>
{% endblock breadcrumbs %}

View File

@ -43,6 +43,7 @@ ADVISORY_LOCK_KEYS = {
'available-prefixes': 100100,
'available-ips': 100200,
'available-vlans': 100300,
'available-asns': 100400,
}
#