1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00
Files
netbox-community-netbox/netbox/netbox/search/backends.py

236 lines
7.7 KiB
Python

from collections import defaultdict
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ImproperlyConfigured
from django.db.models import F, Window
from django.db.models.functions import window
from django.db.models.signals import post_delete, post_save
from django.utils.module_loading import import_string
from extras.models import CachedValue, CustomField
from netbox.registry import registry
from utilities.querysets import RestrictedPrefetch
from utilities.utils import title
from . import FieldTypes, LookupTypes, get_indexer
DEFAULT_LOOKUP_TYPE = LookupTypes.PARTIAL
MAX_RESULTS = 1000
class SearchBackend:
"""
Base class for search backends. Subclasses must extend the `cache()`, `remove()`, and `clear()` methods below.
"""
_object_types = None
def get_object_types(self):
"""
Return a list of all registered object types, organized by category, suitable for populating a form's
ChoiceField.
"""
if not self._object_types:
# Organize choices by category
categories = defaultdict(dict)
for label, idx in registry['search'].items():
categories[idx.get_category()][label] = title(idx.model._meta.verbose_name)
# Compile a nested tuple of choices for form rendering
results = (
('', 'All Objects'),
*[(category, list(choices.items())) for category, choices in categories.items()]
)
self._object_types = results
return self._object_types
def search(self, value, user=None, object_types=None, lookup=DEFAULT_LOOKUP_TYPE):
"""
Search cached object representations for the given value.
"""
raise NotImplementedError
def caching_handler(self, sender, instance, **kwargs):
"""
Receiver for the post_save signal, responsible for caching object creation/changes.
"""
self.cache(instance)
def removal_handler(self, sender, instance, **kwargs):
"""
Receiver for the post_delete signal, responsible for caching object deletion.
"""
self.remove(instance)
def cache(self, instances, indexer=None, remove_existing=True):
"""
Create or update the cached representation of an instance.
"""
raise NotImplementedError
def remove(self, instance):
"""
Delete any cached representation of an instance.
"""
raise NotImplementedError
def clear(self, object_types=None):
"""
Delete *all* cached data.
"""
raise NotImplementedError
@property
def size(self):
"""
Return a total number of cached entries. The meaning of this value will be
backend-dependent.
"""
return None
class CachedValueSearchBackend(SearchBackend):
def search(self, value, user=None, object_types=None, lookup=DEFAULT_LOOKUP_TYPE):
# Define the search parameters
params = {
f'value__{lookup}': value
}
if lookup != LookupTypes.EXACT:
# Partial matches are valid only on string values
params['type'] = FieldTypes.STRING
if object_types:
params['object_type__in'] = object_types
# Construct the base queryset to retrieve matching results
queryset = CachedValue.objects.filter(**params).annotate(
# Annotate the rank of each result for its object according to its weight
row_number=Window(
expression=window.RowNumber(),
partition_by=[F('object_type'), F('object_id')],
order_by=[F('weight').asc()],
)
)[:MAX_RESULTS]
# Construct a Prefetch to pre-fetch only those related objects for which the
# user has permission to view.
if user:
prefetch = (RestrictedPrefetch('object', user, 'view'), 'object_type')
else:
prefetch = ('object', 'object_type')
# Wrap the base query to return only the lowest-weight result for each object
# Hat-tip to https://blog.oyam.dev/django-filter-by-window-function/ for the solution
sql, params = queryset.query.sql_with_params()
results = CachedValue.objects.prefetch_related(*prefetch).raw(
f"SELECT * FROM ({sql}) t WHERE row_number = 1",
params
)
# Omit any results pertaining to an object the user does not have permission to view
return [
r for r in results if r.object is not None
]
def cache(self, instances, indexer=None, remove_existing=True):
content_type = None
custom_fields = None
# Convert a single instance to an iterable
if not hasattr(instances, '__iter__'):
instances = [instances]
buffer = []
counter = 0
for instance in instances:
# First item
if not counter:
# Determine the indexer
if indexer is None:
try:
indexer = get_indexer(instance)
except KeyError:
break
# Prefetch any associated custom fields
content_type = ContentType.objects.get_for_model(indexer.model)
custom_fields = CustomField.objects.filter(content_types=content_type).exclude(search_weight=0)
# Wipe out any previously cached values for the object
if remove_existing:
self.remove(instance)
# Generate cache data
for field in indexer.to_cache(instance, custom_fields=custom_fields):
buffer.append(
CachedValue(
object_type=content_type,
object_id=instance.pk,
field=field.name,
type=field.type,
weight=field.weight,
value=field.value
)
)
# Check whether the buffer needs to be flushed
if len(buffer) >= 2000:
counter += len(CachedValue.objects.bulk_create(buffer))
buffer = []
# Final buffer flush
if buffer:
counter += len(CachedValue.objects.bulk_create(buffer))
return counter
def remove(self, instance):
# Avoid attempting to query for non-cacheable objects
try:
get_indexer(instance)
except KeyError:
return
ct = ContentType.objects.get_for_model(instance)
qs = CachedValue.objects.filter(object_type=ct, object_id=instance.pk)
# Call _raw_delete() on the queryset to avoid first loading instances into memory
return qs._raw_delete(using=qs.db)
def clear(self, object_types=None):
qs = CachedValue.objects.all()
if object_types:
qs = qs.filter(object_type__in=object_types)
# Call _raw_delete() on the queryset to avoid first loading instances into memory
return qs._raw_delete(using=qs.db)
@property
def size(self):
return CachedValue.objects.count()
def get_backend():
"""
Initializes and returns the configured search backend.
"""
try:
backend_cls = import_string(settings.SEARCH_BACKEND)
except AttributeError:
raise ImproperlyConfigured(f"Failed to import configured SEARCH_BACKEND: {settings.SEARCH_BACKEND}")
# Initialize and return the backend instance
return backend_cls()
search_backend = get_backend()
# Connect handlers to the appropriate model signals
post_save.connect(search_backend.caching_handler)
post_delete.connect(search_backend.removal_handler)