mirror of
https://github.com/netbox-community/netbox.git
synced 2024-05-10 07:54:54 +00:00
Proof of concept for showing containing prefixes when searching for ip-addresses.
This commit is contained in:
committed by
Jeremy Stretch
parent
315371bf7c
commit
ce166b12ce
@ -1,4 +1,4 @@
|
|||||||
from django.db.models import CharField, Lookup
|
from django.db.models import CharField, TextField, Lookup
|
||||||
|
|
||||||
|
|
||||||
class Empty(Lookup):
|
class Empty(Lookup):
|
||||||
@ -14,4 +14,17 @@ class Empty(Lookup):
|
|||||||
return 'CAST(LENGTH(%s) AS BOOLEAN) != %s' % (lhs, rhs), params
|
return 'CAST(LENGTH(%s) AS BOOLEAN) != %s' % (lhs, rhs), params
|
||||||
|
|
||||||
|
|
||||||
|
class NetContainsOrEquals(Lookup):
|
||||||
|
"""
|
||||||
|
This lookup has the same functionality as the one from the ipam app except lhs is cast to inet
|
||||||
|
"""
|
||||||
|
lookup_name = 'net_contains_or_equals'
|
||||||
|
|
||||||
|
def as_sql(self, qn, connection):
|
||||||
|
lhs, lhs_params = self.process_lhs(qn, connection)
|
||||||
|
rhs, rhs_params = self.process_rhs(qn, connection)
|
||||||
|
params = lhs_params + rhs_params
|
||||||
|
return 'CAST(%s as inet) >>= %s' % (lhs, rhs), params
|
||||||
|
|
||||||
CharField.register_lookup(Empty)
|
CharField.register_lookup(Empty)
|
||||||
|
TextField.register_lookup(NetContainsOrEquals)
|
||||||
|
@ -2,6 +2,7 @@ from collections import namedtuple
|
|||||||
|
|
||||||
from django.db import models
|
from django.db import models
|
||||||
|
|
||||||
|
from ipam.fields import IPAddressField, IPNetworkField
|
||||||
from netbox.registry import registry
|
from netbox.registry import registry
|
||||||
|
|
||||||
ObjectFieldValue = namedtuple('ObjectFieldValue', ('name', 'type', 'weight', 'value'))
|
ObjectFieldValue = namedtuple('ObjectFieldValue', ('name', 'type', 'weight', 'value'))
|
||||||
@ -11,6 +12,8 @@ class FieldTypes:
|
|||||||
FLOAT = 'float'
|
FLOAT = 'float'
|
||||||
INTEGER = 'int'
|
INTEGER = 'int'
|
||||||
STRING = 'str'
|
STRING = 'str'
|
||||||
|
INET = 'inet'
|
||||||
|
CIDR = 'cidr'
|
||||||
|
|
||||||
|
|
||||||
class LookupTypes:
|
class LookupTypes:
|
||||||
@ -43,6 +46,10 @@ class SearchIndex:
|
|||||||
field_cls = instance._meta.get_field(field_name).__class__
|
field_cls = instance._meta.get_field(field_name).__class__
|
||||||
if issubclass(field_cls, (models.FloatField, models.DecimalField)):
|
if issubclass(field_cls, (models.FloatField, models.DecimalField)):
|
||||||
return FieldTypes.FLOAT
|
return FieldTypes.FLOAT
|
||||||
|
if issubclass(field_cls, IPAddressField):
|
||||||
|
return FieldTypes.INET
|
||||||
|
if issubclass(field_cls, (IPNetworkField)):
|
||||||
|
return FieldTypes.CIDR
|
||||||
if issubclass(field_cls, models.IntegerField):
|
if issubclass(field_cls, models.IntegerField):
|
||||||
return FieldTypes.INTEGER
|
return FieldTypes.INTEGER
|
||||||
return FieldTypes.STRING
|
return FieldTypes.STRING
|
||||||
|
@ -3,10 +3,12 @@ from collections import defaultdict
|
|||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.contrib.contenttypes.models import ContentType
|
from django.contrib.contenttypes.models import ContentType
|
||||||
from django.core.exceptions import ImproperlyConfigured
|
from django.core.exceptions import ImproperlyConfigured
|
||||||
from django.db.models import F, Window
|
from django.db.models import F, Window, Q
|
||||||
from django.db.models.functions import window
|
from django.db.models.functions import window
|
||||||
from django.db.models.signals import post_delete, post_save
|
from django.db.models.signals import post_delete, post_save
|
||||||
from django.utils.module_loading import import_string
|
from django.utils.module_loading import import_string
|
||||||
|
import netaddr
|
||||||
|
from netaddr.core import AddrFormatError
|
||||||
|
|
||||||
from extras.models import CachedValue, CustomField
|
from extras.models import CachedValue, CustomField
|
||||||
from netbox.registry import registry
|
from netbox.registry import registry
|
||||||
@ -95,18 +97,23 @@ class CachedValueSearchBackend(SearchBackend):
|
|||||||
|
|
||||||
def search(self, value, user=None, object_types=None, lookup=DEFAULT_LOOKUP_TYPE):
|
def search(self, value, user=None, object_types=None, lookup=DEFAULT_LOOKUP_TYPE):
|
||||||
|
|
||||||
# Define the search parameters
|
query_filter = Q(**{f'value__{lookup}': value})
|
||||||
params = {
|
|
||||||
f'value__{lookup}': value
|
|
||||||
}
|
|
||||||
if lookup in (LookupTypes.STARTSWITH, LookupTypes.ENDSWITH):
|
if lookup in (LookupTypes.STARTSWITH, LookupTypes.ENDSWITH):
|
||||||
# Partial string matches are valid only on string values
|
# Partial string matches are valid only on string values
|
||||||
params['type'] = FieldTypes.STRING
|
query_filter &= Q(type=FieldTypes.STRING)
|
||||||
if object_types:
|
if object_types:
|
||||||
params['object_type__in'] = object_types
|
query_filter &= Q(object_typeo__in=object_types)
|
||||||
|
|
||||||
|
if lookup == LookupTypes.PARTIAL:
|
||||||
|
try:
|
||||||
|
address = str(netaddr.IPNetwork(value.strip()).cidr)
|
||||||
|
query_filter |= Q(type=FieldTypes.CIDR) & Q(value__net_contains_or_equals=address)
|
||||||
|
except (AddrFormatError, ValueError):
|
||||||
|
pass
|
||||||
|
|
||||||
# Construct the base queryset to retrieve matching results
|
# Construct the base queryset to retrieve matching results
|
||||||
queryset = CachedValue.objects.filter(**params).annotate(
|
queryset = CachedValue.objects.filter(query_filter).annotate(
|
||||||
# Annotate the rank of each result for its object according to its weight
|
# Annotate the rank of each result for its object according to its weight
|
||||||
row_number=Window(
|
row_number=Window(
|
||||||
expression=window.RowNumber(),
|
expression=window.RowNumber(),
|
||||||
|
Reference in New Issue
Block a user