1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00

#6529 - Add CLI to run scripts

This commit is contained in:
Daniel Sheppard
2021-10-28 15:14:42 -05:00
parent a090955918
commit 0a62f75a40

View File

@ -0,0 +1,160 @@
import json
import logging
import sys
import traceback
import uuid
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.core.management.base import BaseCommand
from django.db import transaction
from extras.api.serializers import ScriptOutputSerializer
from extras.choices import JobResultStatusChoices
from extras.context_managers import change_logging
from extras.models import JobResult
from extras.scripts import get_scripts
from utilities.exceptions import AbortTransaction
from utilities.utils import NetBoxFakeRequest
class Command(BaseCommand):
help = "Run a script in Netbox"
def add_arguments(self, parser):
parser.add_argument(
'--loglevel',
help="Logging Level (default: info)",
dest='loglevel',
default='info',
choices=['debug', 'info', 'warning', 'error'])
parser.add_argument('--script', help="Script to run", dest='script')
parser.add_argument('--user', help="Data as a json blob", dest='user')
parser.add_argument('data', help="Data as a json blob")
@staticmethod
def _get_script(module, name):
scripts = get_scripts()
return scripts[module][name]()
def handle(self, *args, **options):
def _run_script():
"""
Core script execution task. We capture this within a subfunction to allow for conditionally wrapping it with
the change_logging context manager (which is bypassed if commit == False).
"""
try:
with transaction.atomic():
script.output = script.run(data=data, commit=commit)
job_result.set_status(JobResultStatusChoices.STATUS_COMPLETED)
if not commit:
raise AbortTransaction()
except AbortTransaction:
script.log_info("Database changes have been reverted automatically.")
except Exception as e:
stacktrace = traceback.format_exc()
script.log_failure(
f"An exception occurred: `{type(e).__name__}: {e}`\n```\n{stacktrace}\n```"
)
script.log_info("Database changes have been reverted due to error.")
logger.error(f"Exception raised during script execution: {e}")
job_result.set_status(JobResultStatusChoices.STATUS_ERRORED)
finally:
job_result.data = ScriptOutputSerializer(script).data
job_result.save()
logger.info(f"Script completed in {job_result.duration}")
# Params
script = options['script']
loglevel = options['loglevel']
data = json.loads(options['data'])
module, name = script.split('.', 1)
# Take user from command line if provided and exists, other
if options['user']:
try:
user = User.objects.get(username=options['user'])
except User.DoesNotExist:
user = User.objects.filter(is_superuser=True).order_by('pk')[0]
else:
user = User.objects.filter(is_superuser=True).order_by('pk')[0]
# Setup logging to Stdout
formatter = logging.Formatter(f'[%(asctime)s][%(levelname)s] - %(message)s')
stdouthandler = logging.StreamHandler(sys.stdout)
stdouthandler.setLevel(logging.DEBUG)
stdouthandler.setFormatter(formatter)
logger = logging.getLogger(f"netbox.scripts.{module}.{name}")
logger.addHandler(stdouthandler)
if loglevel == 'debug':
logger.setLevel(logging.DEBUG)
elif loglevel == 'info':
logger.setLevel(logging.INFO)
elif loglevel == 'warning':
logger.setLevel(logging.WARNING)
elif loglevel == 'error':
logger.setLevel(logging.ERROR)
else:
logger.setLevel(logging.INFO)
# Get the script
script = self._get_script(module, name)
# Parse the parameters
form = script.as_form(data, None)
script_content_type = ContentType.objects.get(app_label='extras', model='script')
# Create the job result
job_result = JobResult.objects.create(
name=script.full_name,
obj_type=script_content_type,
user=User.objects.filter(is_superuser=True).order_by('pk')[0],
job_id=uuid.uuid4()
)
request = NetBoxFakeRequest({
'META': {},
'POST': data,
'GET': {},
'FILES': {},
'user': user,
'path': '',
'id': job_result.job_id
})
if form.is_valid():
job_result.status = JobResultStatusChoices.STATUS_RUNNING
job_result.save()
commit = form.cleaned_data.pop('_commit')
logger.info(f"Running script (commit={commit})")
script.request = request
# Execute the script. If commit is True, wrap it with the change_logging context manager to ensure we process
# change logging, webhooks, etc.
if commit:
with change_logging(request):
_run_script()
else:
_run_script()
# Delete any previous terminal state results
JobResult.objects.filter(
obj_type=job_result.obj_type,
name=job_result.name,
status__in=JobResultStatusChoices.TERMINAL_STATE_CHOICES
).exclude(
pk=job_result.pk
).delete()
else:
job_result.status = JobResultStatusChoices.STATUS_ERRORED
job_result.save()