mirror of
https://github.com/netbox-community/netbox.git
synced 2024-05-10 07:54:54 +00:00
Initial work on IP ranges
This commit is contained in:
@ -4,8 +4,9 @@ from django.contrib.contenttypes.fields import GenericForeignKey
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.db import models
|
||||
from django.db.models import F
|
||||
from django.db.models import F, Q
|
||||
from django.urls import reverse
|
||||
from django.utils.functional import cached_property
|
||||
|
||||
from dcim.models import Device
|
||||
from extras.utils import extras_features
|
||||
@ -23,6 +24,7 @@ from virtualization.models import VirtualMachine
|
||||
__all__ = (
|
||||
'Aggregate',
|
||||
'IPAddress',
|
||||
'IPRange',
|
||||
'Prefix',
|
||||
'RIR',
|
||||
'Role',
|
||||
@ -475,6 +477,193 @@ class Prefix(PrimaryModel):
|
||||
return int(float(child_count) / prefix_size * 100)
|
||||
|
||||
|
||||
@extras_features('custom_fields', 'custom_links', 'export_templates', 'tags', 'webhooks')
|
||||
class IPRange(PrimaryModel):
|
||||
"""
|
||||
A range of IP addresses, defined by start and end addresses.
|
||||
"""
|
||||
start_address = IPAddressField(
|
||||
help_text='IPv4 or IPv6 address (with mask)'
|
||||
)
|
||||
end_address = IPAddressField(
|
||||
help_text='IPv4 or IPv6 address (with mask)'
|
||||
)
|
||||
size = models.PositiveIntegerField(
|
||||
editable=False
|
||||
)
|
||||
vrf = models.ForeignKey(
|
||||
to='ipam.VRF',
|
||||
on_delete=models.PROTECT,
|
||||
related_name='ip_ranges',
|
||||
blank=True,
|
||||
null=True,
|
||||
verbose_name='VRF'
|
||||
)
|
||||
tenant = models.ForeignKey(
|
||||
to='tenancy.Tenant',
|
||||
on_delete=models.PROTECT,
|
||||
related_name='ip_ranges',
|
||||
blank=True,
|
||||
null=True
|
||||
)
|
||||
status = models.CharField(
|
||||
max_length=50,
|
||||
choices=IPRangeStatusChoices,
|
||||
default=IPRangeStatusChoices.STATUS_ACTIVE,
|
||||
help_text='Operational status of this range'
|
||||
)
|
||||
role = models.ForeignKey(
|
||||
to='ipam.Role',
|
||||
on_delete=models.SET_NULL,
|
||||
related_name='ip_ranges',
|
||||
blank=True,
|
||||
null=True,
|
||||
help_text='The primary function of this range'
|
||||
)
|
||||
description = models.CharField(
|
||||
max_length=200,
|
||||
blank=True
|
||||
)
|
||||
|
||||
objects = RestrictedQuerySet.as_manager()
|
||||
|
||||
clone_fields = [
|
||||
'vrf', 'tenant', 'status', 'role', 'description',
|
||||
]
|
||||
|
||||
class Meta:
|
||||
ordering = (F('vrf').asc(nulls_first=True), 'start_address', 'pk') # (vrf, start_address) may be non-unique
|
||||
verbose_name = 'IP range'
|
||||
verbose_name_plural = 'IP ranges'
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
def get_absolute_url(self):
|
||||
return reverse('ipam:iprange', args=[self.pk])
|
||||
|
||||
def clean(self):
|
||||
super().clean()
|
||||
|
||||
if self.start_address and self.end_address:
|
||||
|
||||
# Check that start & end IP versions match
|
||||
if self.start_address.version != self.end_address.version:
|
||||
raise ValidationError({
|
||||
'end_address': f"Ending address version (IPv{self.end_address.version}) does not match starting "
|
||||
f"address (IPv{self.start_address.version})"
|
||||
})
|
||||
|
||||
# Check that the start & end IP prefix lengths match
|
||||
if self.start_address.prefixlen != self.end_address.prefixlen:
|
||||
raise ValidationError({
|
||||
'end_address': f"Ending address mask (/{self.end_address.prefixlen}) does not match starting "
|
||||
f"address mask (/{self.start_address.prefixlen})"
|
||||
})
|
||||
|
||||
# Check that the ending address is greater than the starting address
|
||||
if not self.end_address > self.start_address:
|
||||
raise ValidationError({
|
||||
'end_address': f"Ending address must be lower than the starting address ({self.start_address})"
|
||||
})
|
||||
|
||||
# Check for overlapping ranges
|
||||
overlapping_range = IPRange.objects.exclude(pk=self.pk).filter(vrf=self.vrf).filter(
|
||||
Q(start_address__gte=self.start_address, start_address__lte=self.end_address) | # Starts inside
|
||||
Q(end_address__gte=self.start_address, end_address__lte=self.end_address) | # Ends inside
|
||||
Q(start_address__lte=self.start_address, end_address__gte=self.end_address) # Starts & ends outside
|
||||
).first()
|
||||
if overlapping_range:
|
||||
raise ValidationError(f"Defined addresses overlap with range {overlapping_range} in VRF {self.vrf}")
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
|
||||
# Record the range's size (number of IP addresses)
|
||||
self.size = int(self.end_address.ip - self.start_address.ip) + 1
|
||||
|
||||
super().save(*args, **kwargs)
|
||||
|
||||
@property
|
||||
def family(self):
|
||||
if self.start_address:
|
||||
return self.start_address.version
|
||||
return None
|
||||
|
||||
@cached_property
|
||||
def name(self):
|
||||
"""
|
||||
Return an efficient string representation of the IP range.
|
||||
"""
|
||||
separator = ':' if self.family == 6 else '.'
|
||||
start_chunks = str(self.start_address.ip).split(separator)
|
||||
end_chunks = str(self.end_address.ip).split(separator)
|
||||
|
||||
base_chunks = []
|
||||
for a, b in zip(start_chunks, end_chunks):
|
||||
if a == b:
|
||||
base_chunks.append(a)
|
||||
|
||||
base_str = separator.join(base_chunks)
|
||||
start_str = separator.join(start_chunks[len(base_chunks):])
|
||||
end_str = separator.join(end_chunks[len(base_chunks):])
|
||||
|
||||
return f'{base_str}{separator}{start_str}-{end_str}/{self.start_address.prefixlen}'
|
||||
|
||||
def _set_prefix_length(self, value):
|
||||
"""
|
||||
Expose the IPRange object's prefixlen attribute on the parent model so that it can be manipulated directly,
|
||||
e.g. for bulk editing.
|
||||
"""
|
||||
self.start_address.prefixlen = value
|
||||
self.end_address.prefixlen = value
|
||||
prefix_length = property(fset=_set_prefix_length)
|
||||
|
||||
def get_status_class(self):
|
||||
return IPRangeStatusChoices.CSS_CLASSES.get(self.status)
|
||||
|
||||
def get_child_ips(self):
|
||||
"""
|
||||
Return all IPAddresses within this IPRange and VRF.
|
||||
"""
|
||||
return IPAddress.objects.filter(
|
||||
address__gte=self.start_address,
|
||||
address__lte=self.end_address,
|
||||
vrf=self.vrf
|
||||
)
|
||||
|
||||
def get_available_ips(self):
|
||||
"""
|
||||
Return all available IPs within this range as an IPSet.
|
||||
"""
|
||||
range = netaddr.IPRange(self.start_address.ip, self.end_address.ip)
|
||||
child_ips = netaddr.IPSet([ip.address.ip for ip in self.get_child_ips()])
|
||||
|
||||
return netaddr.IPSet(range) - child_ips
|
||||
|
||||
@cached_property
|
||||
def first_available_ip(self):
|
||||
"""
|
||||
Return the first available IP within the range (or None).
|
||||
"""
|
||||
available_ips = self.get_available_ips()
|
||||
if not available_ips:
|
||||
return None
|
||||
|
||||
return '{}/{}'.format(next(available_ips.__iter__()), self.start_address.prefixlen)
|
||||
|
||||
@cached_property
|
||||
def utilization(self):
|
||||
"""
|
||||
Determine the utilization of the range and return it as a percentage.
|
||||
"""
|
||||
# Compile an IPSet to avoid counting duplicate IPs
|
||||
child_count = netaddr.IPSet([
|
||||
ip.address.ip for ip in self.get_child_ips()
|
||||
]).size
|
||||
|
||||
return int(float(child_count) / self.size * 100)
|
||||
|
||||
|
||||
@extras_features('custom_fields', 'custom_links', 'export_templates', 'tags', 'webhooks')
|
||||
class IPAddress(PrimaryModel):
|
||||
"""
|
||||
|
Reference in New Issue
Block a user