1
0
mirror of https://github.com/netbox-community/netbox.git synced 2024-05-10 07:54:54 +00:00
2022-04-12 11:42:47 -04:00

152 lines
5.5 KiB
Python

import json
import logging
import sys
import traceback
import uuid
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from extras.api.serializers import ScriptOutputSerializer
from extras.choices import JobResultStatusChoices
from extras.context_managers import change_logging
from extras.models import JobResult
from extras.scripts import get_script
from utilities.exceptions import AbortTransaction
from utilities.utils import NetBoxFakeRequest
class Command(BaseCommand):
help = "Run a script in Netbox"
def add_arguments(self, parser):
parser.add_argument(
'--loglevel',
help="Logging Level (default: info)",
dest='loglevel',
default='info',
choices=['debug', 'info', 'warning', 'error', 'critical'])
parser.add_argument('--commit', help="Commit this script to database", action='store_true')
parser.add_argument('--user', help="User script is running as")
parser.add_argument('--data', help="Data as a string encapsulated JSON blob")
parser.add_argument('script', help="Script to run")
def handle(self, *args, **options):
def _run_script():
"""
Core script execution task. We capture this within a subfunction to allow for conditionally wrapping it with
the change_logging context manager (which is bypassed if commit == False).
"""
try:
with transaction.atomic():
script.output = script.run(data=data, commit=commit)
job_result.set_status(JobResultStatusChoices.STATUS_COMPLETED)
if not commit:
raise AbortTransaction()
except AbortTransaction:
script.log_info("Database changes have been reverted automatically.")
except Exception as e:
stacktrace = traceback.format_exc()
script.log_failure(
f"An exception occurred: `{type(e).__name__}: {e}`\n```\n{stacktrace}\n```"
)
script.log_info("Database changes have been reverted due to error.")
logger.error(f"Exception raised during script execution: {e}")
job_result.set_status(JobResultStatusChoices.STATUS_ERRORED)
finally:
job_result.data = ScriptOutputSerializer(script).data
job_result.save()
logger.info(f"Script completed in {job_result.duration}")
# Params
script = options['script']
loglevel = options['loglevel']
commit = options['commit']
try:
data = json.loads(options['data'])
except TypeError:
data = {}
module, name = script.split('.', 1)
# Take user from command line if provided and exists, other
if options['user']:
try:
user = User.objects.get(username=options['user'])
except User.DoesNotExist:
user = User.objects.filter(is_superuser=True).order_by('pk')[0]
else:
user = User.objects.filter(is_superuser=True).order_by('pk')[0]
# Setup logging to Stdout
formatter = logging.Formatter(f'[%(asctime)s][%(levelname)s] - %(message)s')
stdouthandler = logging.StreamHandler(sys.stdout)
stdouthandler.setLevel(logging.DEBUG)
stdouthandler.setFormatter(formatter)
logger = logging.getLogger(f"netbox.scripts.{module}.{name}")
logger.addHandler(stdouthandler)
try:
logger.setLevel({
'critical': logging.CRITICAL,
'debug': logging.DEBUG,
'error': logging.ERROR,
'fatal': logging.FATAL,
'info': logging.INFO,
'warning': logging.WARNING,
}[loglevel])
except KeyError:
raise CommandError(f"Invalid log level: {loglevel}")
# Get the script
script = get_script(module, name)()
# Parse the parameters
form = script.as_form(data, None)
script_content_type = ContentType.objects.get(app_label='extras', model='script')
# Create the job result
job_result = JobResult.objects.create(
name=script.full_name,
obj_type=script_content_type,
user=User.objects.filter(is_superuser=True).order_by('pk')[0],
job_id=uuid.uuid4()
)
request = NetBoxFakeRequest({
'META': {},
'POST': data,
'GET': {},
'FILES': {},
'user': user,
'path': '',
'id': job_result.job_id
})
if form.is_valid():
job_result.status = JobResultStatusChoices.STATUS_RUNNING
job_result.save()
logger.info(f"Running script (commit={commit})")
script.request = request
# Execute the script. If commit is True, wrap it with the change_logging context manager to ensure we process
# change logging, webhooks, etc.
with change_logging(request):
_run_script()
else:
logger.error('Data is not valid:')
for field, errors in form.errors.get_json_data().items():
for error in errors:
logger.error(f'\t{field}: {error.get("message")}')
job_result.status = JobResultStatusChoices.STATUS_ERRORED
job_result.save()