mirror of
				https://github.com/netbox-community/netbox.git
				synced 2024-05-10 07:54:54 +00:00 
			
		
		
		
	Closes #1246: Added ability to auto-create the next available IP address within a prefix
This commit is contained in:
		@@ -1,6 +1,8 @@
 | 
			
		||||
from __future__ import unicode_literals
 | 
			
		||||
 | 
			
		||||
from rest_framework import status
 | 
			
		||||
from rest_framework.decorators import detail_route
 | 
			
		||||
from rest_framework.exceptions import PermissionDenied
 | 
			
		||||
from rest_framework.response import Response
 | 
			
		||||
from rest_framework.viewsets import ModelViewSet
 | 
			
		||||
 | 
			
		||||
@@ -66,7 +68,7 @@ class PrefixViewSet(WritableSerializerMixin, CustomFieldModelViewSet):
 | 
			
		||||
    write_serializer_class = serializers.WritablePrefixSerializer
 | 
			
		||||
    filter_class = filters.PrefixFilter
 | 
			
		||||
 | 
			
		||||
    @detail_route(url_path='available-ips')
 | 
			
		||||
    @detail_route(url_path='available-ips', methods=['get', 'post'])
 | 
			
		||||
    def available_ips(self, request, pk=None):
 | 
			
		||||
        """
 | 
			
		||||
        A convenience method for returning available IP addresses within a prefix. By default, the number of IPs
 | 
			
		||||
@@ -75,23 +77,52 @@ class PrefixViewSet(WritableSerializerMixin, CustomFieldModelViewSet):
 | 
			
		||||
        """
 | 
			
		||||
        prefix = get_object_or_404(Prefix, pk=pk)
 | 
			
		||||
 | 
			
		||||
        # Create the next available IP within the prefix
 | 
			
		||||
        if request.method == 'POST':
 | 
			
		||||
 | 
			
		||||
            # Permissions check
 | 
			
		||||
            if not request.user.has_perm('ipam.add_ipaddress'):
 | 
			
		||||
                raise PermissionDenied()
 | 
			
		||||
 | 
			
		||||
            # Find the first available IP address in the prefix
 | 
			
		||||
            try:
 | 
			
		||||
                ipaddress = list(prefix.get_available_ips())[0]
 | 
			
		||||
            except IndexError:
 | 
			
		||||
                return Response(
 | 
			
		||||
                    {
 | 
			
		||||
                        "detail": "There are no available IPs within this prefix ({})".format(prefix)
 | 
			
		||||
                    },
 | 
			
		||||
                    status=status.HTTP_400_BAD_REQUEST
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            # Create the new IP address
 | 
			
		||||
            data = request.data.copy()
 | 
			
		||||
            data['address'] = '{}/{}'.format(ipaddress, prefix.prefix.prefixlen)
 | 
			
		||||
            data['vrf'] = prefix.vrf
 | 
			
		||||
            serializer = serializers.WritableIPAddressSerializer(data=data)
 | 
			
		||||
            if serializer.is_valid():
 | 
			
		||||
                serializer.save()
 | 
			
		||||
                return Response(serializer.data, status=status.HTTP_201_CREATED)
 | 
			
		||||
            return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
 | 
			
		||||
 | 
			
		||||
        # Determine the maximum amount of IPs to return
 | 
			
		||||
        try:
 | 
			
		||||
            limit = int(request.query_params.get('limit', settings.PAGINATE_COUNT))
 | 
			
		||||
        except ValueError:
 | 
			
		||||
            limit = settings.PAGINATE_COUNT
 | 
			
		||||
        if settings.MAX_PAGE_SIZE:
 | 
			
		||||
            limit = min(limit, settings.MAX_PAGE_SIZE)
 | 
			
		||||
        else:
 | 
			
		||||
            try:
 | 
			
		||||
                limit = int(request.query_params.get('limit', settings.PAGINATE_COUNT))
 | 
			
		||||
            except ValueError:
 | 
			
		||||
                limit = settings.PAGINATE_COUNT
 | 
			
		||||
            if settings.MAX_PAGE_SIZE:
 | 
			
		||||
                limit = min(limit, settings.MAX_PAGE_SIZE)
 | 
			
		||||
 | 
			
		||||
        # Calculate available IPs within the prefix
 | 
			
		||||
        ip_list = list(prefix.get_available_ips())[:limit]
 | 
			
		||||
        serializer = serializers.AvailableIPSerializer(ip_list, many=True, context={
 | 
			
		||||
            'request': request,
 | 
			
		||||
            'prefix': prefix.prefix,
 | 
			
		||||
            'vrf': prefix.vrf,
 | 
			
		||||
        })
 | 
			
		||||
            # Calculate available IPs within the prefix
 | 
			
		||||
            ip_list = list(prefix.get_available_ips())[:limit]
 | 
			
		||||
            serializer = serializers.AvailableIPSerializer(ip_list, many=True, context={
 | 
			
		||||
                'request': request,
 | 
			
		||||
                'prefix': prefix.prefix,
 | 
			
		||||
                'vrf': prefix.vrf,
 | 
			
		||||
            })
 | 
			
		||||
 | 
			
		||||
        return Response(serializer.data)
 | 
			
		||||
            return Response(serializer.data)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
#
 | 
			
		||||
 
 | 
			
		||||
@@ -331,7 +331,7 @@ class Prefix(CreatedUpdatedModel, CustomFieldModel):
 | 
			
		||||
        Return all available IPs within this prefix as an IPSet.
 | 
			
		||||
        """
 | 
			
		||||
        prefix = netaddr.IPSet(self.prefix)
 | 
			
		||||
        child_ips = netaddr.IPSet([ip.address for ip in self.get_child_ips()])
 | 
			
		||||
        child_ips = netaddr.IPSet([ip.address.ip for ip in self.get_child_ips()])
 | 
			
		||||
        available_ips = prefix - child_ips
 | 
			
		||||
 | 
			
		||||
        # Remove unusable IPs from non-pool prefixes
 | 
			
		||||
 
 | 
			
		||||
@@ -367,6 +367,35 @@ class PrefixTest(HttpStatusMixin, APITestCase):
 | 
			
		||||
        self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
 | 
			
		||||
        self.assertEqual(Prefix.objects.count(), 2)
 | 
			
		||||
 | 
			
		||||
    def test_available_ips(self):
 | 
			
		||||
 | 
			
		||||
        prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/29'), is_pool=True)
 | 
			
		||||
        url = reverse('ipam-api:prefix-available-ips', kwargs={'pk': prefix.pk})
 | 
			
		||||
 | 
			
		||||
        # Retrieve all available IPs
 | 
			
		||||
        response = self.client.get(url, **self.header)
 | 
			
		||||
        self.assertEqual(len(response.data), 8)  # 8 because prefix.is_pool = True
 | 
			
		||||
 | 
			
		||||
        # Change the prefix to not be a pool and try again
 | 
			
		||||
        prefix.is_pool = False
 | 
			
		||||
        prefix.save()
 | 
			
		||||
        response = self.client.get(url, **self.header)
 | 
			
		||||
        self.assertEqual(len(response.data), 6)  # 8 - 2 because prefix.is_pool = False
 | 
			
		||||
 | 
			
		||||
        # Create all six available IPs
 | 
			
		||||
        for i in range(6):
 | 
			
		||||
            data = {
 | 
			
		||||
                'description': 'Test IP {}'.format(i)
 | 
			
		||||
            }
 | 
			
		||||
            response = self.client.post(url, data, **self.header)
 | 
			
		||||
            self.assertHttpStatus(response, status.HTTP_201_CREATED)
 | 
			
		||||
            self.assertEqual(response.data['description'], data['description'])
 | 
			
		||||
 | 
			
		||||
        # Try to create one more IP
 | 
			
		||||
        response = self.client.post(url, {}, **self.header)
 | 
			
		||||
        self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
 | 
			
		||||
        self.assertIn('detail', response.data)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class IPAddressTest(HttpStatusMixin, APITestCase):
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user