1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00

296 lines
7.5 KiB
Python
Raw Normal View History

2019-08-09 12:33:33 -04:00
from collections import OrderedDict
import inspect
import json
import os
2019-08-09 12:33:33 -04:00
import pkgutil
2019-08-14 10:12:30 -04:00
import time
import yaml
2019-08-09 12:33:33 -04:00
from django import forms
from django.conf import settings
from django.core.validators import RegexValidator
2019-08-12 13:51:25 -04:00
from django.db import transaction
2019-08-09 12:33:33 -04:00
2019-08-13 09:48:51 -04:00
from ipam.formfields import IPFormField
2019-08-12 14:28:06 -04:00
from utilities.exceptions import AbortTransaction
2019-08-09 12:33:33 -04:00
from .constants import LOG_DEFAULT, LOG_FAILURE, LOG_INFO, LOG_SUCCESS, LOG_WARNING
from .forms import ScriptForm
2019-08-09 16:34:01 -04:00
__all__ = [
'Script',
'StringVar',
'IntegerVar',
'BooleanVar',
'ObjectVar',
2019-08-13 09:48:51 -04:00
'IPNetworkVar',
2019-08-09 16:34:01 -04:00
]
2019-08-09 12:33:33 -04:00
#
# Script variables
#
class ScriptVariable:
"""
Base model for script variables
"""
2019-08-09 12:33:33 -04:00
form_field = forms.CharField
def __init__(self, label='', description='', default=None, required=True):
2019-08-09 12:33:33 -04:00
# Default field attributes
self.field_attrs = {
'help_text': description,
'required': required
}
2019-08-09 12:33:33 -04:00
if label:
self.field_attrs['label'] = label
if default:
self.field_attrs['initial'] = default
2019-08-09 12:33:33 -04:00
def as_field(self):
"""
Render the variable as a Django form field.
"""
2019-08-14 09:40:23 -04:00
form_field = self.form_field(**self.field_attrs)
form_field.widget.attrs['class'] = 'form-control'
return form_field
2019-08-09 12:33:33 -04:00
class StringVar(ScriptVariable):
"""
Character string representation. Can enforce minimum/maximum length and/or regex validation.
"""
def __init__(self, min_length=None, max_length=None, regex=None, *args, **kwargs):
super().__init__(*args, **kwargs)
# Optional minimum/maximum lengths
if min_length:
self.field_attrs['min_length'] = min_length
if max_length:
self.field_attrs['max_length'] = max_length
# Optional regular expression validation
if regex:
self.field_attrs['validators'] = [
RegexValidator(
regex=regex,
message='Invalid value. Must match regex: {}'.format(regex),
code='invalid'
)
]
2019-08-09 12:33:33 -04:00
class IntegerVar(ScriptVariable):
"""
Integer representation. Can enforce minimum/maximum values.
"""
2019-08-09 12:33:33 -04:00
form_field = forms.IntegerField
def __init__(self, min_value=None, max_value=None, *args, **kwargs):
super().__init__(*args, **kwargs)
# Optional minimum/maximum values
if min_value:
self.field_attrs['min_value'] = min_value
if max_value:
self.field_attrs['max_value'] = max_value
2019-08-09 12:33:33 -04:00
class BooleanVar(ScriptVariable):
"""
Boolean representation (true/false). Renders as a checkbox.
"""
2019-08-09 16:45:00 -04:00
form_field = forms.BooleanField
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Boolean fields cannot be required
self.field_attrs['required'] = False
2019-08-09 12:33:33 -04:00
class ObjectVar(ScriptVariable):
"""
NetBox object representation. The provided QuerySet will determine the choices available.
"""
2019-08-09 12:33:33 -04:00
form_field = forms.ModelChoiceField
def __init__(self, queryset, *args, **kwargs):
super().__init__(*args, **kwargs)
# Queryset for field choices
2019-08-09 12:33:33 -04:00
self.field_attrs['queryset'] = queryset
2019-08-13 09:48:51 -04:00
class IPNetworkVar(ScriptVariable):
"""
An IPv4 or IPv6 prefix.
"""
form_field = IPFormField
#
# Scripts
#
2019-08-09 12:33:33 -04:00
class Script:
"""
Custom scripts inherit this object.
"""
class Meta:
pass
2019-08-09 12:33:33 -04:00
def __init__(self):
# Initiate the log
self.log = []
# Grab some info about the script
self.filename = inspect.getfile(self.__class__)
self.source = inspect.getsource(self.__class__)
def __str__(self):
return getattr(self.Meta, 'name', self.__class__.__name__)
2019-08-09 12:33:33 -04:00
def _get_vars(self):
2019-08-09 16:34:01 -04:00
vars = OrderedDict()
# Infer order from Meta.fields (Python 3.5 and lower)
2019-08-12 16:59:09 -04:00
fields = getattr(self.Meta, 'fields', [])
for name in fields:
vars[name] = getattr(self, name)
2019-08-09 16:34:01 -04:00
# Default to order of declaration on class
for name, attr in self.__class__.__dict__.items():
if name not in vars and issubclass(attr.__class__, ScriptVariable):
vars[name] = attr
return vars
2019-08-09 12:33:33 -04:00
def run(self, data):
2019-08-09 12:33:33 -04:00
raise NotImplementedError("The script must define a run() method.")
def as_form(self, data=None):
"""
Return a Django form suitable for populating the context data required to run this Script.
"""
vars = self._get_vars()
form = ScriptForm(vars, data)
return form
# Logging
def log_debug(self, message):
self.log.append((LOG_DEFAULT, message))
def log_success(self, message):
self.log.append((LOG_SUCCESS, message))
def log_info(self, message):
self.log.append((LOG_INFO, message))
def log_warning(self, message):
self.log.append((LOG_WARNING, message))
def log_failure(self, message):
self.log.append((LOG_FAILURE, message))
# Convenience functions
def load_yaml(self, filename):
"""
Return data from a YAML file
"""
file_path = os.path.join(settings.SCRIPTS_ROOT, filename)
with open(file_path, 'r') as datafile:
data = yaml.load(datafile)
return data
def load_json(self, filename):
"""
Return data from a JSON file
"""
file_path = os.path.join(settings.SCRIPTS_ROOT, filename)
with open(file_path, 'r') as datafile:
data = json.load(datafile)
return data
2019-08-09 12:33:33 -04:00
#
# Functions
#
def is_script(obj):
"""
Returns True if the object is a Script.
"""
return obj in Script.__subclasses__()
def is_variable(obj):
"""
Returns True if the object is a ScriptVariable.
"""
return isinstance(obj, ScriptVariable)
2019-08-12 14:28:06 -04:00
def run_script(script, data, commit=True):
2019-08-12 13:51:25 -04:00
"""
2019-08-12 14:28:06 -04:00
A wrapper for calling Script.run(). This performs error handling and provides a hook for committing changes. It
exists outside of the Script class to ensure it cannot be overridden by a script author.
2019-08-12 13:51:25 -04:00
"""
2019-08-12 14:28:06 -04:00
output = None
2019-08-14 10:12:30 -04:00
start_time = None
end_time = None
2019-08-12 14:28:06 -04:00
2019-08-12 13:51:25 -04:00
try:
with transaction.atomic():
2019-08-14 10:12:30 -04:00
start_time = time.time()
2019-08-12 14:28:06 -04:00
output = script.run(data)
2019-08-14 10:12:30 -04:00
end_time = time.time()
2019-08-12 14:28:06 -04:00
if not commit:
raise AbortTransaction()
except AbortTransaction:
pass
2019-08-12 13:51:25 -04:00
except Exception as e:
script.log_failure(
2019-08-12 14:28:06 -04:00
"An exception occurred. {}: {}".format(type(e).__name__, e)
2019-08-12 13:51:25 -04:00
)
2019-08-12 14:28:06 -04:00
commit = False
finally:
if not commit:
script.log_info(
"Database changes have been reverted automatically."
)
2019-08-14 10:12:30 -04:00
# Calculate execution time
if end_time is not None:
execution_time = end_time - start_time
else:
execution_time = None
return output, execution_time
2019-08-12 13:51:25 -04:00
2019-08-09 12:33:33 -04:00
def get_scripts():
scripts = OrderedDict()
# Iterate through all modules within the reports path. These are the user-created files in which reports are
# defined.
for importer, module_name, _ in pkgutil.iter_modules([settings.SCRIPTS_ROOT]):
module = importer.find_module(module_name).load_module(module_name)
2019-08-13 09:09:12 -04:00
if hasattr(module, 'name'):
module_name = module.name
2019-08-09 12:33:33 -04:00
module_scripts = OrderedDict()
for name, cls in inspect.getmembers(module, is_script):
module_scripts[name] = cls
scripts[module_name] = module_scripts
return scripts