From 31246c6ba69bd6207671ed9da39cc8bf7cae4a08 Mon Sep 17 00:00:00 2001 From: Tony Murray Date: Tue, 10 Aug 2021 15:13:05 -0500 Subject: [PATCH] Revert "Full Python code fusion / refactor and hardening (#13094)" (#13123) This reverts commit 9c534a1a90537d90116bb5dba900c565dcb8d772. --- LibreNMS/__init__.py | 217 ++----------- LibreNMS/command_runner.py | 397 ----------------------- LibreNMS/library.py | 174 ++++++++++ LibreNMS/queuemanager.py | 153 ++++----- LibreNMS/service.py | 159 +++++----- LibreNMS/wrapper.py | 627 ------------------------------------- discovery-wrapper.py | 475 +++++++++++++++++++++++++--- librenms-service.py | 5 +- poller-wrapper.py | 507 +++++++++++++++++++++++++++--- requirements.txt | 1 - services-wrapper.py | 479 +++++++++++++++++++++++++--- 11 files changed, 1669 insertions(+), 1525 deletions(-) delete mode 100644 LibreNMS/command_runner.py create mode 100644 LibreNMS/library.py delete mode 100644 LibreNMS/wrapper.py diff --git a/LibreNMS/__init__.py b/LibreNMS/__init__.py index e3f68d16c7..28701ba344 100644 --- a/LibreNMS/__init__.py +++ b/LibreNMS/__init__.py @@ -1,17 +1,15 @@ -import json -import logging import os -import sys -import tempfile +import subprocess import threading import timeit from collections import deque -from logging.handlers import RotatingFileHandler + +from logging import critical, info, debug, exception from math import ceil from queue import Queue from time import time -from .command_runner import command_runner +from .service import Service, ServiceConfig from .queuemanager import ( QueueManager, TimedQueueManager, @@ -22,158 +20,6 @@ from .queuemanager import ( PollerQueueManager, DiscoveryQueueManager, ) -from .service import Service, ServiceConfig - -# Hard limit script execution time so we don't get to "hang" -DEFAULT_SCRIPT_TIMEOUT = 3600 -MAX_LOGFILE_SIZE = (1024 ** 2) * 10 # 10 Megabytes max log files - -logger = logging.getLogger(__name__) - -# Logging functions ######################################################## -# Original logger functions from ofunctions.logger_utils package - -FORMATTER = logging.Formatter("%(asctime)s :: %(levelname)s :: %(message)s") - - -def logger_get_console_handler(): - try: - console_handler = logging.StreamHandler(sys.stdout) - except OSError as exc: - print("Cannot log to stdout, trying stderr. Message %s" % exc) - try: - console_handler = logging.StreamHandler(sys.stderr) - console_handler.setFormatter(FORMATTER) - return console_handler - except OSError as exc: - print("Cannot log to stderr neither. Message %s" % exc) - return False - else: - console_handler.setFormatter(FORMATTER) - return console_handler - - -def logger_get_file_handler(log_file): - err_output = None - try: - file_handler = RotatingFileHandler( - log_file, - mode="a", - encoding="utf-8", - maxBytes=MAX_LOGFILE_SIZE, - backupCount=3, - ) - except OSError as exc: - try: - print( - "Cannot create logfile. Trying to obtain temporary log file.\nMessage: %s" - % exc - ) - err_output = str(exc) - temp_log_file = tempfile.gettempdir() + os.sep + __name__ + ".log" - print("Trying temporary log file in " + temp_log_file) - file_handler = RotatingFileHandler( - temp_log_file, - mode="a", - encoding="utf-8", - maxBytes=MAX_LOGFILE_SIZE, - backupCount=1, - ) - file_handler.setFormatter(FORMATTER) - err_output += "\nUsing [%s]" % temp_log_file - return file_handler, err_output - except OSError as exc: - print( - "Cannot create temporary log file either. Will not log to file. Message: %s" - % exc - ) - return False - else: - file_handler.setFormatter(FORMATTER) - return file_handler, err_output - - -def logger_get_logger(log_file=None, temp_log_file=None, debug=False): - # If a name is given to getLogger, than modules can't log to the root logger - _logger = logging.getLogger() - if debug is True: - _logger.setLevel(logging.DEBUG) - else: - _logger.setLevel(logging.INFO) - console_handler = logger_get_console_handler() - if console_handler: - _logger.addHandler(console_handler) - if log_file is not None: - file_handler, err_output = logger_get_file_handler(log_file) - if file_handler: - _logger.addHandler(file_handler) - _logger.propagate = False - if err_output is not None: - print(err_output) - _logger.warning( - "Failed to use log file [%s], %s.", log_file, err_output - ) - if temp_log_file is not None: - if os.path.isfile(temp_log_file): - try: - os.remove(temp_log_file) - except OSError: - logger.warning("Cannot remove temp log file [%s]." % temp_log_file) - file_handler, err_output = logger_get_file_handler(temp_log_file) - if file_handler: - _logger.addHandler(file_handler) - _logger.propagate = False - if err_output is not None: - print(err_output) - _logger.warning( - "Failed to use log file [%s], %s.", log_file, err_output - ) - return _logger - - -# Generic functions ######################################################## - - -def check_for_file(file): - try: - with open(file) as file: - pass - except IOError as exc: - logger.error("File '%s' is not readable" % file) - logger.debug("Traceback:", exc_info=True) - sys.exit(2) - - -# Config functions ######################################################### - - -def get_config_data(base_dir): - check_for_file(os.path.join(base_dir, ".env")) - - try: - import dotenv - - env_path = "{}/.env".format(base_dir) - logger.info("Attempting to load .env from '%s'", env_path) - dotenv.load_dotenv(dotenv_path=env_path, verbose=True) - - if not os.getenv("NODE_ID"): - logger.critical(".env does not contain a valid NODE_ID setting.") - - except ImportError as e: - logger.critical( - "Could not import .env - check that the poller user can read the file, and that composer install has been run recently" - ) - - config_cmd = ["/usr/bin/env", "php", "%s/config_to_json.php" % base_dir] - try: - exit_code, output = command_runner(config_cmd, timeout=300) - if exit_code == 0: - return json.loads(output) - raise EnvironmentError - except Exception as exc: - logger.critical("ERROR: Could not execute command [%s]: %s" % (config_cmd, exc)) - logger.debug("Traceback:", exc_info=True) def normalize_wait(seconds): @@ -182,9 +28,8 @@ def normalize_wait(seconds): def call_script(script, args=()): """ - Run a LibreNMS script. Captures all output returns exit code. - Blocks parent signals (like SIGINT and SIGTERM). - Kills script if it takes too long + Run a LibreNMS script. Captures all output and throws an exception if a non-zero + status is returned. Blocks parent signals (like SIGINT and SIGTERM). :param script: the name of the executable relative to the base directory :param args: a tuple of arguments to send to the command :returns the output of the command @@ -197,10 +42,14 @@ def call_script(script, args=()): base_dir = os.path.realpath(os.path.dirname(__file__) + "/..") cmd = base + ("{}/{}".format(base_dir, script),) + tuple(map(str, args)) - logger.debug("Running {}".format(cmd)) + debug("Running {}".format(cmd)) # preexec_fn=os.setsid here keeps process signals from propagating (close_fds=True is default) - return command_runner( - cmd, preexec_fn=os.setsid, close_fds=True, timeout=DEFAULT_SCRIPT_TIMEOUT + return subprocess.check_call( + cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + preexec_fn=os.setsid, + close_fds=True, ) @@ -221,15 +70,15 @@ class DB: import pymysql pymysql.install_as_MySQLdb() - logger.info("Using pure python SQL client") + info("Using pure python SQL client") except ImportError: - logger.info("Using other SQL client") + info("Using other SQL client") try: import MySQLdb except ImportError: - logger.critical("ERROR: missing a mysql python module") - logger.critical( + critical("ERROR: missing a mysql python module") + critical( "Install either 'PyMySQL' or 'mysqlclient' from your OS software repository or from PyPI" ) raise @@ -250,7 +99,7 @@ class DB: conn.ping(True) self._db[threading.get_ident()] = conn except Exception as e: - logger.critical("ERROR: Could not connect to MySQL database! {}".format(e)) + critical("ERROR: Could not connect to MySQL database! {}".format(e)) raise def db_conn(self): @@ -279,7 +128,7 @@ class DB: cursor.close() return cursor except Exception as e: - logger.critical("DB Connection exception {}".format(e)) + critical("DB Connection exception {}".format(e)) self.close() raise @@ -318,7 +167,7 @@ class RecurringTimer: class Lock: - """Base lock class this is not thread safe""" + """ Base lock class this is not thread safe""" def __init__(self): self._locks = {} # store a tuple (owner, expiration) @@ -361,7 +210,7 @@ class Lock: return False def print_locks(self): - logger.debug(self._locks) + debug(self._locks) class ThreadingLock(Lock): @@ -420,7 +269,7 @@ class RedisLock(Lock): self._redis = redis.Redis(**kwargs) self._redis.ping() self._namespace = namespace - logger.info( + info( "Created redis lock manager with socket_timeout of {}s".format( redis_kwargs["socket_timeout"] ) @@ -447,7 +296,7 @@ class RedisLock(Lock): non_existing = not (allow_owner_relock and self._redis.get(key) == owner) return self._redis.set(key, owner, ex=int(expiration), nx=non_existing) except redis.exceptions.ResponseError as e: - logger.critical( + exception( "Unable to obtain lock, local state: name: %s, owner: %s, expiration: %s, allow_owner_relock: %s", name, owner, @@ -502,7 +351,7 @@ class RedisUniqueQueue(object): self._redis = redis.Redis(**kwargs) self._redis.ping() self.key = "{}:{}".format(namespace, name) - logger.info( + info( "Created redis queue with socket_timeout of {}s".format( redis_kwargs["socket_timeout"] ) @@ -522,20 +371,10 @@ class RedisUniqueQueue(object): self._redis.zadd(self.key, {item: time()}, nx=True) def get(self, block=True, timeout=None): - try: - if block: - item = self._redis.bzpopmin(self.key, timeout=timeout) - else: - item = self._redis.zpopmin(self.key) - # Unfortunately we cannot use _redis.exceptions.ResponseError Exception here - # Since it would trigger another exception in queuemanager - except Exception as e: - logger.critical( - "It seems like BZPOPMIN command is not supported by redis. Redis >= 5.0 required: {}".format( - e - ) - ) - sys.exit(1) + if block: + item = self._redis.bzpopmin(self.key, timeout=timeout) + else: + item = self._redis.zpopmin(self.key) if item: item = item[1] diff --git a/LibreNMS/command_runner.py b/LibreNMS/command_runner.py deleted file mode 100644 index 0d11069a33..0000000000 --- a/LibreNMS/command_runner.py +++ /dev/null @@ -1,397 +0,0 @@ -#! /usr/bin/env python -# -*- coding: utf-8 -*- -# -# This file is part of command_runner module - -""" -command_runner is a quick tool to launch commands from Python, get exit code -and output, and handle most errors that may happen - -Versioning semantics: - Major version: backward compatibility breaking changes - Minor version: New functionality - Patch version: Backwards compatible bug fixes - -""" - -__intname__ = "command_runner" -__author__ = "Orsiris de Jong" -__copyright__ = "Copyright (C) 2015-2021 Orsiris de Jong" -__licence__ = "BSD 3 Clause" -__version__ = "0.7.0" -__build__ = "2021080201" - -import os -import shlex -import subprocess -import sys -from datetime import datetime -from logging import getLogger -from time import sleep - -# Python 2.7 compat fixes (queue was Queue) -try: - import Queue -except ImportError: - import queue as Queue -import threading - - -# Python 2.7 compat fixes (missing typing and FileNotFoundError) -try: - from typing import Union, Optional, List, Tuple, NoReturn, Any -except ImportError: - pass -try: - FileNotFoundError -except NameError: - # pylint: disable=W0622 (redefined-builtin) - FileNotFoundError = IOError -try: - TimeoutExpired = subprocess.TimeoutExpired -except AttributeError: - - class TimeoutExpired(BaseException): - """ - Basic redeclaration when subprocess.TimeoutExpired does not exist, python <= 3.3 - """ - - pass - - -logger = getLogger(__intname__) -PIPE = subprocess.PIPE - - -def thread_with_result_queue(fn): - """ - Python 2.7 compatible implementation of concurrent.futures - Executes decorated function as thread and adds a queue for result communication - """ - - def wrapped_fn(queue, *args, **kwargs): - result = fn(*args, **kwargs) - queue.put(result) - - def wrapper(*args, **kwargs): - queue = Queue.Queue() - - child_thread = threading.Thread( - target=wrapped_fn, args=(queue,) + args, kwargs=kwargs - ) - child_thread.start() - child_thread.result_queue = queue - return child_thread - - return wrapper - - -def command_runner( - command, # type: Union[str, List[str]] - valid_exit_codes=None, # type: Optional[List[int]] - timeout=1800, # type: Optional[int] - shell=False, # type: bool - encoding=None, # type: str - stdout=None, # type: Union[int, str] - stderr=None, # type: Union[int, str] - windows_no_window=False, # type: bool - **kwargs # type: Any -): - # type: (...) -> Tuple[Optional[int], str] - """ - Unix & Windows compatible subprocess wrapper that handles output encoding and timeouts - Newer Python check_output already handles encoding and timeouts, but this one is retro-compatible - It is still recommended to set cp437 for windows and utf-8 for unix - - Also allows a list of various valid exit codes (ie no error when exit code = arbitrary int) - - command should be a list of strings, eg ['ping', '127.0.0.1', '-c 2'] - command can also be a single string, ex 'ping 127.0.0.1 -c 2' if shell=True or if os is Windows - - Accepts all of subprocess.popen arguments - - Whenever we can, we need to avoid shell=True in order to preseve better security - - When no stdout option is given, we'll get output into the returned tuple - When stdout = PIPE or subprocess.PIPE, output is also displayed on the fly on stdout - When stdout = filename or stderr = filename, we'll write output to the given file - - Returns a tuple (exit_code, output) - """ - - # Choose default encoding when none set - # cp437 encoding assures we catch most special characters from cmd.exe - if not encoding: - encoding = "cp437" if os.name == "nt" else "utf-8" - - # Fix when unix command was given as single string - # This is more secure than setting shell=True - if os.name == "posix" and shell is False and isinstance(command, str): - command = shlex.split(command) - - # Set default values for kwargs - errors = kwargs.pop( - "errors", "backslashreplace" - ) # Don't let encoding issues make you mad - universal_newlines = kwargs.pop("universal_newlines", False) - creationflags = kwargs.pop("creationflags", 0) - # subprocess.CREATE_NO_WINDOW was added in Python 3.7 for Windows OS only - if ( - windows_no_window - and sys.version_info[0] >= 3 - and sys.version_info[1] >= 7 - and os.name == "nt" - ): - # Disable the following pylint error since the code also runs on nt platform, but - # triggers an error on Unix - # pylint: disable=E1101 - creationflags = creationflags | subprocess.CREATE_NO_WINDOW - - # Decide whether we write to output variable only (stdout=None), to output variable and stdout (stdout=PIPE) - # or to output variable and to file (stdout='path/to/file') - if stdout is None: - _stdout = subprocess.PIPE - stdout_to_file = False - live_output = False - elif isinstance(stdout, str): - # We will send anything to file - _stdout = open(stdout, "wb") - stdout_to_file = True - live_output = False - else: - # We will send anything to given stdout pipe - _stdout = stdout - stdout_to_file = False - live_output = True - - # The only situation where we don't add stderr to stdout is if a specific target file was given - if isinstance(stderr, str): - _stderr = open(stderr, "wb") - stderr_to_file = True - else: - _stderr = subprocess.STDOUT - stderr_to_file = False - - @thread_with_result_queue - def _pipe_read(pipe, read_timeout, encoding, errors): - # type: (subprocess.PIPE, Union[int, float], str, str) -> str - """ - Read pipe will read from subprocess.PIPE for at most 1 second - """ - pipe_output = "" - begin_time = datetime.now() - try: - while True: - current_output = pipe.readline() - # Compatibility for earlier Python versions where Popen has no 'encoding' nor 'errors' arguments - if isinstance(current_output, bytes): - try: - current_output = current_output.decode(encoding, errors=errors) - except TypeError: - # handle TypeError: don't know how to handle UnicodeDecodeError in error callback - current_output = current_output.decode( - encoding, errors="ignore" - ) - pipe_output += current_output - if live_output: - sys.stdout.write(current_output) - if ( - len(current_output) <= 0 - or (datetime.now() - begin_time).total_seconds() > read_timeout - ): - break - # Pipe may not have readline() anymore when process gets killed - except AttributeError: - pass - return pipe_output - - def _poll_process( - process, # type: Union[subprocess.Popen[str], subprocess.Popen] - timeout, # type: int - encoding, # type: str - errors, # type: str - ): - # type: (...) -> Tuple[Optional[int], str] - """ - Reads from process output pipe until: - - Timeout is reached, in which case we'll terminate the process - - Process ends by itself - - Returns an encoded string of the pipe output - """ - output = "" - begin_time = datetime.now() - timeout_reached = False - while process.poll() is None: - child = _pipe_read( - process.stdout, read_timeout=1, encoding=encoding, errors=errors - ) - try: - output += child.result_queue.get(timeout=1) - except Queue.Empty: - pass - except ValueError: - # What happens when str cannot be concatenated - pass - if timeout: - if (datetime.now() - begin_time).total_seconds() > timeout: - # Try to terminate nicely before killing the process - process.terminate() - # Let the process terminate itself before trying to kill it not nicely - # Under windows, terminate() and kill() are equivalent - sleep(0.5) - if process.poll() is None: - process.kill() - timeout_reached = True - - # Get remaining output from process after a grace period - sleep(0.5) - - exit_code = process.poll() - if timeout: - # Let's wait a second more so we may get the remaining queue content after process is to be ended - final_read_timeout = max( - 1, timeout - (datetime.now() - begin_time).total_seconds(), 1 - ) - else: - final_read_timeout = 1 - child = _pipe_read( - process.stdout, - read_timeout=final_read_timeout, - encoding=encoding, - errors=errors, - ) - try: - output += child.result_queue.get(timeout=final_read_timeout) - except Queue.Empty: - pass - except ValueError: - # What happens when str cannot be concatenated - pass - if timeout_reached: - raise TimeoutExpired(process, timeout) - return exit_code, output - - try: - # Python >= 3.3 has SubProcessError(TimeoutExpired) class - # Python >= 3.6 has encoding & error arguments - # universal_newlines=True makes netstat command fail under windows - # timeout does not work under Python 2.7 with subprocess32 < 3.5 - # decoder may be cp437 or unicode_escape for dos commands or utf-8 for powershell - # Disabling pylint error for the same reason as above - # pylint: disable=E1123 - if sys.version_info >= (3, 6): - process = subprocess.Popen( - command, - stdout=_stdout, - stderr=_stderr, - shell=shell, - universal_newlines=universal_newlines, - encoding=encoding, - errors=errors, - creationflags=creationflags, - **kwargs - ) - else: - process = subprocess.Popen( - command, - stdout=_stdout, - stderr=_stderr, - shell=shell, - universal_newlines=universal_newlines, - creationflags=creationflags, - **kwargs - ) - - try: - exit_code, output = _poll_process(process, timeout, encoding, errors) - except KeyboardInterrupt: - exit_code = -252 - output = "KeyboardInterrupted" - try: - process.kill() - except AttributeError: - pass - - logger.debug( - 'Command "{}" returned with exit code "{}". Command output was:'.format( - command, exit_code - ) - ) - except subprocess.CalledProcessError as exc: - exit_code = exc.returncode - try: - output = exc.output - except AttributeError: - output = "command_runner: Could not obtain output from command." - if exit_code in valid_exit_codes if valid_exit_codes is not None else [0]: - logger.debug( - 'Command "{}" returned with exit code "{}". Command output was:'.format( - command, exit_code - ) - ) - logger.error( - 'Command "{}" failed with exit code "{}". Command output was:'.format( - command, exc.returncode - ) - ) - logger.error(output) - except FileNotFoundError as exc: - logger.error('Command "{}" failed, file not found: {}'.format(command, exc)) - exit_code, output = -253, exc.__str__() - # OSError can also catch FileNotFoundErrors - # pylint: disable=W0705 (duplicate-except) - except (OSError, IOError) as exc: - logger.error('Command "{}" failed because of OS: {}'.format(command, exc)) - exit_code, output = -253, exc.__str__() - except TimeoutExpired: - message = 'Timeout {} seconds expired for command "{}" execution.'.format( - timeout, command - ) - logger.error(message) - if stdout_to_file: - _stdout.write(message.encode(encoding, errors=errors)) - exit_code, output = -254, "Timeout of {} seconds expired.".format(timeout) - # We need to be able to catch a broad exception - # pylint: disable=W0703 - except Exception as exc: - logger.error( - 'Command "{}" failed for unknown reasons: {}'.format(command, exc), - exc_info=True, - ) - logger.debug("Error:", exc_info=True) - exit_code, output = -255, exc.__str__() - finally: - if stdout_to_file: - _stdout.close() - if stderr_to_file: - _stderr.close() - - logger.debug(output) - - return exit_code, output - - -def deferred_command(command, defer_time=300): - # type: (str, int) -> NoReturn - """ - This is basically an ugly hack to launch commands which are detached from parent process - Especially useful to launch an auto update/deletion of a running executable after a given amount of - seconds after it finished - """ - # Use ping as a standard timer in shell since it's present on virtually *any* system - if os.name == "nt": - deferrer = "ping 127.0.0.1 -n {} > NUL & ".format(defer_time) - else: - deferrer = "ping 127.0.0.1 -c {} > /dev/null && ".format(defer_time) - - # We'll create a independent shell process that will not be attached to any stdio interface - # Our command shall be a single string since shell=True - subprocess.Popen( - deferrer + command, - shell=True, - stdin=None, - stdout=None, - stderr=None, - close_fds=True, - ) diff --git a/LibreNMS/library.py b/LibreNMS/library.py new file mode 100644 index 0000000000..e0916201db --- /dev/null +++ b/LibreNMS/library.py @@ -0,0 +1,174 @@ +#! /usr/bin/env python3 +# -*- coding: utf-8 -*- + +import sys +import os +import logging +import tempfile +import subprocess +import threading +import time +from logging.handlers import RotatingFileHandler + +try: + import MySQLdb +except ImportError: + try: + import pymysql + + pymysql.install_as_MySQLdb() + import MySQLdb + except ImportError as exc: + print("ERROR: missing the mysql python module please run:") + print("pip install -r requirements.txt") + print("ERROR: %s" % exc) + sys.exit(2) + +logger = logging.getLogger(__name__) + +# Logging functions ######################################################## + +FORMATTER = logging.Formatter("%(asctime)s :: %(levelname)s :: %(message)s") + + +def logger_get_console_handler(): + try: + console_handler = logging.StreamHandler(sys.stdout) + except OSError as exc: + print("Cannot log to stdout, trying stderr. Message %s" % exc) + try: + console_handler = logging.StreamHandler(sys.stderr) + console_handler.setFormatter(FORMATTER) + return console_handler + except OSError as exc: + print("Cannot log to stderr neither. Message %s" % exc) + return False + else: + console_handler.setFormatter(FORMATTER) + return console_handler + + +def logger_get_file_handler(log_file): + err_output = None + try: + file_handler = RotatingFileHandler( + log_file, mode="a", encoding="utf-8", maxBytes=1024000, backupCount=3 + ) + except OSError as exc: + try: + print( + "Cannot create logfile. Trying to obtain temporary log file.\nMessage: %s" + % exc + ) + err_output = str(exc) + temp_log_file = tempfile.gettempdir() + os.sep + __name__ + ".log" + print("Trying temporary log file in " + temp_log_file) + file_handler = RotatingFileHandler( + temp_log_file, + mode="a", + encoding="utf-8", + maxBytes=1000000, + backupCount=1, + ) + file_handler.setFormatter(FORMATTER) + err_output += "\nUsing [%s]" % temp_log_file + return file_handler, err_output + except OSError as exc: + print( + "Cannot create temporary log file either. Will not log to file. Message: %s" + % exc + ) + return False + else: + file_handler.setFormatter(FORMATTER) + return file_handler, err_output + + +def logger_get_logger(log_file=None, temp_log_file=None, debug=False): + # If a name is given to getLogger, than modules can't log to the root logger + _logger = logging.getLogger() + if debug is True: + _logger.setLevel(logging.DEBUG) + else: + _logger.setLevel(logging.INFO) + console_handler = logger_get_console_handler() + if console_handler: + _logger.addHandler(console_handler) + if log_file is not None: + file_handler, err_output = logger_get_file_handler(log_file) + if file_handler: + _logger.addHandler(file_handler) + _logger.propagate = False + if err_output is not None: + print(err_output) + _logger.warning( + "Failed to use log file [%s], %s.", log_file, err_output + ) + if temp_log_file is not None: + if os.path.isfile(temp_log_file): + try: + os.remove(temp_log_file) + except OSError: + logger.warning("Cannot remove temp log file [%s]." % temp_log_file) + file_handler, err_output = logger_get_file_handler(temp_log_file) + if file_handler: + _logger.addHandler(file_handler) + _logger.propagate = False + if err_output is not None: + print(err_output) + _logger.warning( + "Failed to use log file [%s], %s.", log_file, err_output + ) + return _logger + + +# Generic functions ######################################################## + + +def check_for_file(file): + try: + with open(file) as f: + pass + except IOError as exc: + logger.error("Oh dear... %s does not seem readable" % file) + logger.debug("ERROR:", exc_info=True) + sys.exit(2) + + +# Config functions ######################################################### + + +def get_config_data(install_dir): + config_cmd = ["/usr/bin/env", "php", "%s/config_to_json.php" % install_dir] + try: + proc = subprocess.Popen( + config_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE + ) + return proc.communicate()[0].decode() + except Exception as e: + print("ERROR: Could not execute: %s" % config_cmd) + print(e) + sys.exit(2) + + +# Database functions ####################################################### + + +def db_open(db_socket, db_server, db_port, db_username, db_password, db_dbname): + try: + options = dict( + host=db_server, + port=int(db_port), + user=db_username, + passwd=db_password, + db=db_dbname, + ) + + if db_socket: + options["unix_socket"] = db_socket + + return MySQLdb.connect(**options) + except Exception as dbexc: + print("ERROR: Could not connect to MySQL database!") + print("ERROR: %s" % dbexc) + sys.exit(2) diff --git a/LibreNMS/queuemanager.py b/LibreNMS/queuemanager.py index 934212ee77..d5baf97ce5 100644 --- a/LibreNMS/queuemanager.py +++ b/LibreNMS/queuemanager.py @@ -1,16 +1,14 @@ -import logging import pymysql +import subprocess import threading import traceback +from logging import debug, info, error, critical, warning from queue import Empty from subprocess import CalledProcessError import LibreNMS -logger = logging.getLogger(__name__) - - class QueueManager: def __init__( self, config, lock_manager, type_desc, uses_groups=False, auto_start=True @@ -41,8 +39,8 @@ class QueueManager: self._stop_event = threading.Event() - logger.info("Groups: {}".format(self.config.group)) - logger.info( + info("Groups: {}".format(self.config.group)) + info( "{} QueueManager created: {} workers, {}s frequency".format( self.type.title(), self.get_poller_config().workers, @@ -54,9 +52,9 @@ class QueueManager: self.start() def _service_worker(self, queue_id): - logger.debug("Worker started {}".format(threading.current_thread().getName())) + debug("Worker started {}".format(threading.current_thread().getName())) while not self._stop_event.is_set(): - logger.debug( + debug( "Worker {} checking queue {} ({}) for work".format( threading.current_thread().getName(), queue_id, @@ -70,13 +68,13 @@ class QueueManager: if ( device_id is not None ): # None returned by redis after timeout when empty - logger.debug( + debug( "Worker {} ({}) got work {} ".format( threading.current_thread().getName(), queue_id, device_id ) ) with LibreNMS.TimeitContext.start() as t: - logger.debug("Queues: {}".format(self._queues)) + debug("Queues: {}".format(self._queues)) target_desc = ( "{} ({})".format(device_id if device_id else "", queue_id) if queue_id @@ -85,7 +83,7 @@ class QueueManager: self.do_work(device_id, queue_id) runtime = t.delta() - logger.info( + info( "Completed {} run for {} in {:.2f}s".format( self.type, target_desc, runtime ) @@ -94,13 +92,13 @@ class QueueManager: except Empty: pass # ignore empty queue exception from subprocess.Queue except CalledProcessError as e: - logger.error( + error( "{} poller script error! {} returned {}: {}".format( self.type.title(), e.cmd, e.returncode, e.output ) ) except Exception as e: - logger.error("{} poller exception! {}".format(self.type.title(), e)) + error("{} poller exception! {}".format(self.type.title(), e)) traceback.print_exc() def post_work(self, payload, queue_id): @@ -110,7 +108,7 @@ class QueueManager: :param queue_id: which queue to post to, 0 is the default """ self.get_queue(queue_id).put(payload) - logger.debug( + debug( "Posted work for {} to {}:{} queue size: {}".format( payload, self.type, queue_id, self.get_queue(queue_id).qsize() ) @@ -133,7 +131,7 @@ class QueueManager: thread_name = "{}_{}-{}".format(self.type.title(), group, i + 1) self.spawn_worker(thread_name, group) - logger.debug( + debug( "Started {} {} threads for group {}".format( group_workers, self.type, group ) @@ -198,7 +196,7 @@ class QueueManager: :param group: :return: """ - logger.info("Creating queue {}".format(self.queue_name(queue_type, group))) + info("Creating queue {}".format(self.queue_name(queue_type, group))) try: return LibreNMS.RedisUniqueQueue( self.queue_name(queue_type, group), @@ -215,19 +213,15 @@ class QueueManager: except ImportError: if self.config.distributed: - logger.critical( - "ERROR: Redis connection required for distributed polling" - ) - logger.critical( + critical("ERROR: Redis connection required for distributed polling") + critical( "Please install redis-py, either through your os software repository or from PyPI" ) exit(2) except Exception as e: if self.config.distributed: - logger.critical( - "ERROR: Redis connection required for distributed polling" - ) - logger.critical("Could not connect to Redis. {}".format(e)) + critical("ERROR: Redis connection required for distributed polling") + critical("Could not connect to Redis. {}".format(e)) exit(2) return LibreNMS.UniqueQueue() @@ -347,19 +341,11 @@ class BillingQueueManager(TimedQueueManager): def do_work(self, run_type, group): if run_type == "poll": - logger.info("Polling billing") - exit_code, output = LibreNMS.call_script("poll-billing.php") - if exit_code != 0: - logger.warning( - "Error {} in Polling billing:\n{}".format(exit_code, output) - ) + info("Polling billing") + LibreNMS.call_script("poll-billing.php") else: # run_type == 'calculate' - logger.info("Calculating billing") - exit_code, output = LibreNMS.call_script("billing-calculate.php") - if exit_code != 0: - logger.warning( - "Error {} in Calculating billing:\n{}".format(exit_code, output) - ) + info("Calculating billing") + LibreNMS.call_script("billing-calculate.php") class PingQueueManager(TimedQueueManager): @@ -379,19 +365,13 @@ class PingQueueManager(TimedQueueManager): for group in groups: self.post_work("", group[0]) except pymysql.err.Error as e: - logger.critical("DB Exception ({})".format(e)) + critical("DB Exception ({})".format(e)) def do_work(self, context, group): if self.lock(group, "group", timeout=self.config.ping.frequency): try: - logger.info("Running fast ping") - exit_code, output = LibreNMS.call_script("ping.php", ("-g", group)) - if exit_code != 0: - logger.warning( - "Running fast ping for {} failed with error code {}: {}".format( - group, exit_code, output - ) - ) + info("Running fast ping") + LibreNMS.call_script("ping.php", ("-g", group)) finally: self.unlock(group, "group") @@ -416,19 +396,16 @@ class ServicesQueueManager(TimedQueueManager): for device in devices: self.post_work(device[0], device[1]) except pymysql.err.Error as e: - logger.critical("DB Exception ({})".format(e)) + critical("DB Exception ({})".format(e)) def do_work(self, device_id, group): if self.lock(device_id, timeout=self.config.services.frequency): - logger.info("Checking services on device {}".format(device_id)) - exit_code, output = LibreNMS.call_script( - "check-services.php", ("-h", device_id) - ) - if exit_code == 0: - self.unlock(device_id) - else: - if exit_code == 5: - logger.info( + try: + info("Checking services on device {}".format(device_id)) + LibreNMS.call_script("check-services.php", ("-h", device_id)) + except subprocess.CalledProcessError as e: + if e.returncode == 5: + info( "Device {} is down, cannot poll service, waiting {}s for retry".format( device_id, self.config.down_retry ) @@ -436,12 +413,8 @@ class ServicesQueueManager(TimedQueueManager): self.lock( device_id, allow_relock=True, timeout=self.config.down_retry ) - else: - logger.warning( - "Unknown error while checking services on device {} with exit code {}: {}".format( - device_id, exit_code, output - ) - ) + else: + self.unlock(device_id) class AlertQueueManager(TimedQueueManager): @@ -459,13 +432,14 @@ class AlertQueueManager(TimedQueueManager): self.post_work("alerts", 0) def do_work(self, device_id, group): - logger.info("Checking alerts") - exit_code, output = LibreNMS.call_script("alerts.php") - if exit_code != 0: - if exit_code == 1: - logger.warning("There was an error issuing alerts: {}".format(output)) + try: + info("Checking alerts") + LibreNMS.call_script("alerts.php") + except subprocess.CalledProcessError as e: + if e.returncode == 1: + warning("There was an error issuing alerts: {}".format(e.output)) else: - raise CalledProcessError + raise class PollerQueueManager(QueueManager): @@ -480,14 +454,13 @@ class PollerQueueManager(QueueManager): def do_work(self, device_id, group): if self.lock(device_id, timeout=self.config.poller.frequency): - logger.info("Polling device {}".format(device_id)) + info("Polling device {}".format(device_id)) - exit_code, output = LibreNMS.call_script("poller.php", ("-h", device_id)) - if exit_code == 0: - self.unlock(device_id) - else: - if exit_code == 6: - logger.warning( + try: + LibreNMS.call_script("poller.php", ("-h", device_id)) + except subprocess.CalledProcessError as e: + if e.returncode == 6: + warning( "Polling device {} unreachable, waiting {}s for retry".format( device_id, self.config.down_retry ) @@ -497,14 +470,12 @@ class PollerQueueManager(QueueManager): device_id, allow_relock=True, timeout=self.config.down_retry ) else: - logger.error( - "Polling device {} failed with exit code {}: {}".format( - device_id, exit_code, output - ) - ) + error("Polling device {} failed! {}".format(device_id, e)) self.unlock(device_id) + else: + self.unlock(device_id) else: - logger.debug("Tried to poll {}, but it is locked".format(device_id)) + debug("Tried to poll {}, but it is locked".format(device_id)) class DiscoveryQueueManager(TimedQueueManager): @@ -526,19 +497,18 @@ class DiscoveryQueueManager(TimedQueueManager): for device in devices: self.post_work(device[0], device[1]) except pymysql.err.Error as e: - logger.critical("DB Exception ({})".format(e)) + critical("DB Exception ({})".format(e)) def do_work(self, device_id, group): if self.lock( device_id, timeout=LibreNMS.normalize_wait(self.config.discovery.frequency) ): - logger.info("Discovering device {}".format(device_id)) - exit_code, output = LibreNMS.call_script("discovery.php", ("-h", device_id)) - if exit_code == 0: - self.unlock(device_id) - else: - if exit_code == 5: - logger.info( + try: + info("Discovering device {}".format(device_id)) + LibreNMS.call_script("discovery.php", ("-h", device_id)) + except subprocess.CalledProcessError as e: + if e.returncode == 5: + info( "Device {} is down, cannot discover, waiting {}s for retry".format( device_id, self.config.down_retry ) @@ -547,9 +517,6 @@ class DiscoveryQueueManager(TimedQueueManager): device_id, allow_relock=True, timeout=self.config.down_retry ) else: - logger.error( - "Discovering device {} failed with exit code {}: {}".format( - device_id, exit_code, output - ) - ) self.unlock(device_id) + else: + self.unlock(device_id) diff --git a/LibreNMS/service.py b/LibreNMS/service.py index 73f02bc21d..215b4cff5a 100644 --- a/LibreNMS/service.py +++ b/LibreNMS/service.py @@ -1,7 +1,10 @@ import LibreNMS + +import json import logging import os import pymysql +import subprocess import threading import sys import time @@ -13,6 +16,7 @@ except ImportError: from datetime import timedelta from datetime import datetime +from logging import debug, info, warning, error, critical, exception from platform import python_version from time import sleep from socket import gethostname @@ -24,8 +28,6 @@ try: except ImportError: pass -logger = logging.getLogger(__name__) - class ServiceConfig: def __init__(self): @@ -97,7 +99,7 @@ class ServiceConfig: watchdog_logfile = "logs/librenms.log" def populate(self): - config = LibreNMS.get_config_data(self.BASE_DIR) + config = self._get_config_data() # populate config variables self.node_id = os.getenv("NODE_ID") @@ -230,7 +232,7 @@ class ServiceConfig: try: logging.getLogger().setLevel(self.log_level) except ValueError: - logger.error( + error( "Unknown log level {}, must be one of 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'".format( self.log_level ) @@ -299,7 +301,39 @@ class ServiceConfig: if settings["watchdog_log"] is not None: self.watchdog_logfile = settings["watchdog_log"] except pymysql.err.Error: - logger.warning("Unable to load poller (%s) config", self.node_id) + warning("Unable to load poller (%s) config", self.node_id) + + def _get_config_data(self): + try: + import dotenv + + env_path = "{}/.env".format(self.BASE_DIR) + info("Attempting to load .env from '%s'", env_path) + dotenv.load_dotenv(dotenv_path=env_path, verbose=True) + + if not os.getenv("NODE_ID"): + raise ImportError(".env does not contain a valid NODE_ID setting.") + + except ImportError as e: + exception( + "Could not import .env - check that the poller user can read the file, and that composer install has been run recently" + ) + sys.exit(3) + + config_cmd = [ + "/usr/bin/env", + "php", + "{}/config_to_json.php".format(self.BASE_DIR), + "2>&1", + ] + try: + return json.loads(subprocess.check_output(config_cmd).decode()) + except subprocess.CalledProcessError as e: + error( + "ERROR: Could not load or parse configuration! {}: {}".format( + subprocess.list2cmdline(e.cmd), e.output.decode() + ) + ) @staticmethod def parse_group(g): @@ -313,7 +347,7 @@ class ServiceConfig: except ValueError: pass - logger.error("Could not parse group string, defaulting to 0") + error("Could not parse group string, defaulting to 0") return [0] @@ -348,7 +382,7 @@ class Service: self.config.poller.frequency, self.log_performance_stats, "performance" ) if self.config.watchdog_enabled: - logger.info( + info( "Starting watchdog timer for log file: {}".format( self.config.watchdog_logfile ) @@ -357,7 +391,7 @@ class Service: self.config.poller.frequency, self.logfile_watchdog, "watchdog" ) else: - logger.info("Watchdog is disabled.") + info("Watchdog is disabled.") self.systemd_watchdog_timer = LibreNMS.RecurringTimer( 10, self.systemd_watchdog, "systemd-watchdog" ) @@ -367,16 +401,14 @@ class Service: return time.time() - self.start_time def attach_signals(self): - logger.info( - "Attaching signal handlers on thread %s", threading.current_thread().name - ) + info("Attaching signal handlers on thread %s", threading.current_thread().name) signal(SIGTERM, self.terminate) # capture sigterm and exit gracefully signal(SIGQUIT, self.terminate) # capture sigquit and exit gracefully signal(SIGINT, self.terminate) # capture sigint and exit gracefully signal(SIGHUP, self.reload) # capture sighup and restart gracefully if "psutil" not in sys.modules: - logger.warning("psutil is not available, polling gap possible") + warning("psutil is not available, polling gap possible") else: signal(SIGCHLD, self.reap) # capture sigchld and reap the process @@ -395,7 +427,7 @@ class Service: if status == psutil.STATUS_ZOMBIE: pid = p.pid r = os.waitpid(p.pid, os.WNOHANG) - logger.warning( + warning( 'Reaped long running job "%s" in state %s with PID %d - job returned %d', cmd, status, @@ -407,7 +439,7 @@ class Service: continue def start(self): - logger.debug("Performing startup checks...") + debug("Performing startup checks...") if self.config.single_instance: self.check_single_instance() # don't allow more than one service at a time @@ -416,7 +448,7 @@ class Service: raise RuntimeWarning("Not allowed to start Poller twice") self._started = True - logger.debug("Starting up queue managers...") + debug("Starting up queue managers...") # initialize and start the worker pools self.poller_manager = LibreNMS.PollerQueueManager(self.config, self._lm) @@ -446,8 +478,8 @@ class Service: if self.config.watchdog_enabled: self.watchdog_timer.start() - logger.info("LibreNMS Service: {} started!".format(self.config.unique_name)) - logger.info( + info("LibreNMS Service: {} started!".format(self.config.unique_name)) + info( "Poller group {}. Using Python {} and {} locks and queues".format( "0 (default)" if self.config.group == [0] else self.config.group, python_version(), @@ -455,19 +487,19 @@ class Service: ) ) if self.config.update_enabled: - logger.info( + info( "Maintenance tasks will be run every {}".format( timedelta(seconds=self.config.update_frequency) ) ) else: - logger.warning("Maintenance tasks are disabled.") + warning("Maintenance tasks are disabled.") # Main dispatcher loop try: while not self.terminate_flag: if self.reload_flag: - logger.info("Picked up reload flag, calling the reload process") + info("Picked up reload flag, calling the reload process") self.restart() if self.reap_flag: @@ -477,9 +509,7 @@ class Service: master_lock = self._acquire_master() if master_lock: if not self.is_master: - logger.info( - "{} is now the master dispatcher".format(self.config.name) - ) + info("{} is now the master dispatcher".format(self.config.name)) self.is_master = True self.start_dispatch_timers() @@ -495,7 +525,7 @@ class Service: self.dispatch_immediate_discovery(device_id, group) else: if self.is_master: - logger.info( + info( "{} is no longer the master dispatcher".format( self.config.name ) @@ -506,7 +536,7 @@ class Service: except KeyboardInterrupt: pass - logger.info("Dispatch loop terminated") + info("Dispatch loop terminated") self.shutdown() def _acquire_master(self): @@ -535,7 +565,7 @@ class Service: if elapsed > ( self.config.poller.frequency - self.config.master_resolution ): - logger.debug( + debug( "Dispatching polling for device {}, time since last poll {:.2f}s".format( device_id, elapsed ) @@ -572,7 +602,7 @@ class Service: except pymysql.err.Error: self.db_failures += 1 if self.db_failures > self.config.max_db_failures: - logger.warning( + warning( "Too many DB failures ({}), attempting to release master".format( self.db_failures ) @@ -592,22 +622,23 @@ class Service: wait = 5 max_runtime = 86100 max_tries = int(max_runtime / wait) - logger.info("Waiting for schema lock") + info("Waiting for schema lock") while not self._lm.lock("schema-update", self.config.unique_name, max_runtime): attempt += 1 if attempt >= max_tries: # don't get stuck indefinitely - logger.warning( - "Reached max wait for other pollers to update, updating now" - ) + warning("Reached max wait for other pollers to update, updating now") break sleep(wait) - logger.info("Running maintenance tasks") - exit_code, output = LibreNMS.call_script("daily.sh") - if exit_code == 0: - logger.info("Maintenance tasks complete\n{}".format(output)) - else: - logger.error("Error {} in daily.sh:\n{}".format(exit_code, output)) + info("Running maintenance tasks") + try: + output = LibreNMS.call_script("daily.sh") + info("Maintenance tasks complete\n{}".format(output)) + except subprocess.CalledProcessError as e: + error( + "Error in daily.sh:\n" + + (e.output.decode() if e.output is not None else "No output") + ) self._lm.unlock("schema-update", self.config.unique_name) @@ -634,19 +665,15 @@ class Service: ) except ImportError: if self.config.distributed: - logger.critical( - "ERROR: Redis connection required for distributed polling" - ) - logger.critical( + critical("ERROR: Redis connection required for distributed polling") + critical( "Please install redis-py, either through your os software repository or from PyPI" ) self.exit(2) except Exception as e: if self.config.distributed: - logger.critical( - "ERROR: Redis connection required for distributed polling" - ) - logger.critical("Could not connect to Redis. {}".format(e)) + critical("ERROR: Redis connection required for distributed polling") + critical("Could not connect to Redis. {}".format(e)) self.exit(2) return LibreNMS.ThreadingLock() @@ -657,16 +684,14 @@ class Service: Has the effect of reloading the python files from disk. """ if sys.version_info < (3, 4, 0): - logger.warning( - "Skipping restart as running under an incompatible interpreter" - ) - logger.warning("Please restart manually") + warning("Skipping restart as running under an incompatible interpreter") + warning("Please restart manually") return - logger.info("Restarting service... ") + info("Restarting service... ") if "psutil" not in sys.modules: - logger.warning("psutil is not available, polling gap possible") + warning("psutil is not available, polling gap possible") self._stop_managers_and_wait() else: self._stop_managers() @@ -690,9 +715,7 @@ class Service: :param signalnum: UNIX signal number :param flag: Flags accompanying signal """ - logger.info( - "Received signal on thread %s, handling", threading.current_thread().name - ) + info("Received signal on thread %s, handling", threading.current_thread().name) self.reload_flag = True def terminate(self, signalnum=None, flag=None): @@ -701,9 +724,7 @@ class Service: :param signalnum: UNIX signal number :param flag: Flags accompanying signal """ - logger.info( - "Received signal on thread %s, handling", threading.current_thread().name - ) + info("Received signal on thread %s, handling", threading.current_thread().name) self.terminate_flag = True def shutdown(self, signalnum=None, flag=None): @@ -712,7 +733,7 @@ class Service: :param signalnum: UNIX signal number :param flag: Flags accompanying signal """ - logger.info("Shutting down, waiting for running jobs to complete...") + info("Shutting down, waiting for running jobs to complete...") self.stop_dispatch_timers() self._release_master() @@ -726,9 +747,7 @@ class Service: self._stop_managers_and_wait() # try to release master lock - logger.info( - "Shutdown of %s/%s complete", os.getpid(), threading.current_thread().name - ) + info("Shutdown of %s/%s complete", os.getpid(), threading.current_thread().name) self.exit(0) def start_dispatch_timers(self): @@ -783,11 +802,11 @@ class Service: try: fcntl.lockf(self._fp, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: - logger.warning("Another instance is already running, quitting.") + warning("Another instance is already running, quitting.") self.exit(2) def log_performance_stats(self): - logger.info("Counting up time spent polling") + info("Counting up time spent polling") try: # Report on the poller instance as a whole @@ -832,9 +851,8 @@ class Service: ) ) except pymysql.err.Error: - logger.critical( - "Unable to log performance statistics - is the database still online?", - exc_info=True, + exception( + "Unable to log performance statistics - is the database still online?" ) def systemd_watchdog(self): @@ -849,19 +867,18 @@ class Service: self.config.watchdog_logfile ) except FileNotFoundError as e: - logger.error("Log file not found! {}".format(e)) + error("Log file not found! {}".format(e)) return if logfile_mdiff > self.config.poller.frequency: - logger.critical( + critical( "BARK! Log file older than {}s, restarting service!".format( self.config.poller.frequency - ), - exc_info=True, + ) ) self.restart() else: - logger.info("Log file updated {}s ago".format(int(logfile_mdiff))) + info("Log file updated {}s ago".format(int(logfile_mdiff))) def exit(self, code=0): sys.stdout.flush() diff --git a/LibreNMS/wrapper.py b/LibreNMS/wrapper.py deleted file mode 100644 index 26328bd7b6..0000000000 --- a/LibreNMS/wrapper.py +++ /dev/null @@ -1,627 +0,0 @@ -#! /usr/bin/env python3 -""" - wrapper A small tool which wraps services, discovery and poller php scripts - in order to run them as threads with Queue and workers - - Authors: Orsiris de Jong - Neil Lathwood - Job Snijders - - Distributed poller code (c) 2015, GPLv3, Daniel Preussker - All code parts that belong to Daniel are enclosed in EOC comments - - Date: Jul 2021 - - Usage: This program accepts three command line arguments - - the number of threads (defaults to 1 for discovery / service, and 16 for poller) - - the wrapper type (service, discovery or poller) - - optional debug boolean - - - Ubuntu Linux: apt-get install python-mysqldb - FreeBSD: cd /usr/ports/*/py-MySQLdb && make install clean - RHEL 7: yum install MySQL-python - RHEL 8: dnf install mariadb-connector-c-devel gcc && python -m pip install mysqlclient - - Tested on: Python 3.6.8 / PHP 7.2.11 / CentOS 8 / AlmaLinux 8.4 - - License: This program is free software: you can redistribute it and/or modify it - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - This program is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General - Public License for more details. - - You should have received a copy of the GNU General Public License along - with this program. If not, see https://www.gnu.org/licenses/. - - LICENSE.txt contains a copy of the full GPLv3 licensing conditions. -""" - -import logging -import os -import queue -import sys -import threading -import time -import uuid -from argparse import ArgumentParser - -import LibreNMS -from LibreNMS.command_runner import command_runner - - -logger = logging.getLogger(__name__) - -PER_DEVICE_TIMEOUT = ( - 900 # Timeout in seconds for any poller / service / discovery action per device -) - -DISTRIBUTED_POLLING = False # Is overriden by config.php -REAL_DURATION = 0 -DISCOVERED_DEVICES_COUNT = 0 -PER_DEVICE_DURATION = {} - -MEMC = None -IS_NODE = None -STEPPING = None -MASTER_TAG = None -NODES_TAG = None -TIME_TAG = "" - -""" -Per wrapper type configuration -All time related variables are in seconds -""" -wrappers = { - "service": { - "executable": "check-services.php", - "table_name": "services", - "memc_touch_time": 10, - "stepping": 300, - "nodes_stepping": 300, - "total_exec_time": 300, - }, - "discovery": { - "executable": "discovery.php", - "table_name": "devices", - "memc_touch_time": 30, - "stepping": 300, - "nodes_stepping": 3600, - "total_exec_time": 21600, - }, - "poller": { - "executable": "poller.php", - "table_name": "devices", - "memc_touch_time": 10, - "stepping": 300, - "nodes_stepping": 300, - "total_exec_time": 300, - }, -} - -""" - Threading helper functions -""" - - -# << None - """ - Actual code that runs various php scripts, in single node mode or distributed poller mode - """ - - global MEMC - global IS_NODE - global DISTRIBUTED_POLLING - global MASTER_TAG - global NODES_TAG - global TIME_TAG - global STEPPING - - # Setup wrapper dependent variables - STEPPING = wrappers[wrapper_type]["stepping"] - if wrapper_type == "poller": - if "rrd" in config and "step" in config["rrd"]: - STEPPING = config["rrd"]["step"] - TIME_TAG = "." + str(get_time_tag(STEPPING)) - - MASTER_TAG = "{}.master{}".format(wrapper_type, TIME_TAG) - NODES_TAG = "{}.nodes{}".format(wrapper_type, TIME_TAG) - - # << 0: - try: - time.sleep(1) - nodes = MEMC.get(NODES_TAG) - except: - pass - logger.info("Clearing Locks for {}".format(NODES_TAG)) - x = minlocks - while x <= maxlocks: - MEMC.delete("{}.device.{}".format(wrapper_type, x)) - x = x + 1 - logger.info("{} Locks Cleared".format(x)) - logger.info("Clearing Nodes") - MEMC.delete(MASTER_TAG) - MEMC.delete(NODES_TAG) - else: - MEMC.decr(NODES_TAG) - logger.info("Finished {}.".format(time.strftime("%Y-%m-%d %H:%M:%S"))) - # EOC - - # Update poller statistics - if wrapper_type == "poller": - query = "UPDATE pollers SET last_polled=NOW(), devices='{}', time_taken='{}' WHERE poller_name='{}'".format( - DISCOVERED_DEVICES_COUNT, total_time, config["distributed_poller_name"] - ) - cursor = db_connection.query(query) - if cursor.rowcount < 1: - query = "INSERT INTO pollers SET poller_name='{}', last_polled=NOW(), devices='{}', time_taken='{}'".format( - config["distributed_poller_name"], DISCOVERED_DEVICES_COUNT, total_time - ) - db_connection.query(query) - - db_connection.close() - - if total_time > wrappers[wrapper_type]["total_exec_time"]: - logger.warning( - "the process took more than {} seconds to finish, you need faster hardware or more threads".format( - wrappers[wrapper_type]["total_exec_time"] - ) - ) - logger.warning( - "in sequential style service checks the elapsed time would have been: {} seconds".format( - REAL_DURATION - ) - ) - show_stopper = False - for device in PER_DEVICE_DURATION: - if PER_DEVICE_DURATION[device] > wrappers[wrapper_type]["nodes_stepping"]: - logger.warning( - "device {} is taking too long: {} seconds".format( - device, PER_DEVICE_DURATION[device] - ) - ) - show_stopper = True - if show_stopper: - logger.error( - "Some devices are taking more than {} seconds, the script cannot recommend you what to do.".format( - wrappers[wrapper_type]["nodes_stepping"] - ) - ) - else: - recommend = int(total_time / STEPPING * amount_of_workers + 1) - logger.warning( - "Consider setting a minimum of {} threads. (This does not constitute professional advice!)".format( - recommend - ) - ) - sys.exit(2) - - -if __name__ == "__main__": - parser = ArgumentParser( - prog="wrapper.py", - usage="usage: %(prog)s [options] \n" - "wrapper_type = 'service', 'poller' or 'disccovery'" - "workers defaults to 1 for service and discovery, and 16 for poller " - "(Do not set too high, or you will get an OOM)", - description="Spawn multiple librenms php processes in parallel.", - ) - parser.add_argument( - "-d", - "--debug", - action="store_true", - default=False, - help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.", - ) - - parser.add_argument( - dest="wrapper", - default=None, - help="Execute wrapper for 'service', 'poller' or 'discovery'", - ) - parser.add_argument( - dest="threads", action="store_true", default=None, help="Number of workers" - ) - - args = parser.parse_args() - - debug = args.debug - wrapper_type = args.wrapper - amount_of_workers = args.threads - - if wrapper_type not in ["service", "discovery", "poller"]: - parser.error("Invalid wrapper type '{}'".format(wrapper_type)) - sys.exit(4) - - config = LibreNMS.get_config_data( - os.path.dirname(os.path.dirname(os.path.realpath(__file__))) - ) - log_dir = config["log_dir"] - log_file = os.path.join(log_dir, wrapper_type + ".log") - logger = LibreNMS.logger_get_logger(log_file, debug=debug) - - try: - amount_of_workers = int(amount_of_workers) - except (IndexError, ValueError, TypeError): - amount_of_workers = ( - 16 if wrapper_type == "poller" else 1 - ) # Defaults to 1 for service/discovery, 16 for poller - logger.warning( - "Bogus number of workers given. Using default number ({}) of workers.".format( - amount_of_workers - ) - ) - - wrapper(wrapper_type, amount_of_workers, config, log_dir, _debug=debug) diff --git a/discovery-wrapper.py b/discovery-wrapper.py index 8d773bd57b..1ae8c2d725 100755 --- a/discovery-wrapper.py +++ b/discovery-wrapper.py @@ -1,57 +1,436 @@ #! /usr/bin/env python3 """ -This is a Bootstrap script for wrapper.py, in order to retain compatibility with earlier LibreNMS setups + discovery-wrapper A small tool which wraps around discovery and tries to + guide the discovery process with a more modern approach with a + Queue and workers. + + Based on the original version of poller-wrapper.py by Job Snijders + + Author: Neil Lathwood + Orsiris de Jong + Date: Oct 2019 + + Usage: This program accepts one command line argument: the number of threads + that should run simultaneously. If no argument is given it will assume + a default of 1 thread. + + Ubuntu Linux: apt-get install python-mysqldb + FreeBSD: cd /usr/ports/*/py-MySQLdb && make install clean + RHEL 7: yum install MySQL-python + RHEL 8: dnf install mariadb-connector-c-devel gcc && python -m pip install mysqlclient + + Tested on: Python 3.6.8 / PHP 7.2.11 / CentOS 8 + + License: This program is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General + Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program. If not, see https://www.gnu.org/licenses/. + + LICENSE.txt contains a copy of the full GPLv3 licensing conditions. """ -import os -from argparse import ArgumentParser - -import LibreNMS -import LibreNMS.wrapper as wrapper - -WRAPPER_TYPE = "discovery" -DEFAULT_WORKERS = 1 - -""" - Take the amount of threads we want to run in parallel from the commandline - if None are given or the argument was garbage, fall back to default -""" -usage = ( - "usage: %(prog)s [options] (Default: {}" - "(Do not set too high, or you will get an OOM)".format(DEFAULT_WORKERS) -) -description = "Spawn multiple discovery.php processes in parallel." -parser = ArgumentParser(usage=usage, description=description) -parser.add_argument(dest="amount_of_workers", default=DEFAULT_WORKERS) -parser.add_argument( - "-d", - "--debug", - dest="debug", - action="store_true", - default=False, - help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.", -) -args = parser.parse_args() - -config = LibreNMS.get_config_data(os.path.dirname(os.path.realpath(__file__))) -log_dir = config["log_dir"] -log_file = os.path.join(log_dir, WRAPPER_TYPE + ".log") -logger = LibreNMS.logger_get_logger(log_file, debug=args.debug) +import LibreNMS.library as LNMS try: - amount_of_workers = int(args.amount_of_workers) -except (IndexError, ValueError): - amount_of_workers = DEFAULT_WORKERS - logger.warning( - "Bogus number of workers given. Using default number ({}) of workers.".format( - amount_of_workers + + import json + import os + import queue + import subprocess + import sys + import threading + import time + from optparse import OptionParser + +except ImportError as exc: + print("ERROR: missing one or more of the following python modules:") + print("threading, queue, sys, subprocess, time, os, json") + print("ERROR: %s" % exc) + sys.exit(2) + +APP_NAME = "discovery_wrapper" +LOG_FILE = "logs/" + APP_NAME + ".log" +_DEBUG = False +distdisco = False +real_duration = 0 +discovered_devices = 0 + +# (c) 2015, GPLv3, Daniel Preussker << << <<> %s/discover_device_%s.log" % (log_dir, device_id) + if debug + else ">> /dev/null" + ) + command = "/usr/bin/env php %s -h %s %s 2>&1" % ( + discovery_path, + device_id, + output, + ) + # TODO: Replace with command_runner + subprocess.check_call(command, shell=True) + + elapsed_time = int(time.time() - start_time) + print_queue.put( + [threading.current_thread().name, device_id, elapsed_time] + ) + except (KeyboardInterrupt, SystemExit): + raise + except: + pass + poll_queue.task_done() + + +if __name__ == "__main__": + logger = LNMS.logger_get_logger(LOG_FILE, debug=_DEBUG) + + install_dir = os.path.dirname(os.path.realpath(__file__)) + LNMS.check_for_file(install_dir + "/.env") + config = json.loads(LNMS.get_config_data(install_dir)) + + discovery_path = config["install_dir"] + "/discovery.php" + log_dir = config["log_dir"] + + # (c) 2015, GPLv3, Daniel Preussker << << << << 0: + try: + time.sleep(1) + nodes = memc.get("discovery.nodes") + except: + pass + print("Clearing Locks") + x = minlocks + while x <= maxlocks: + memc.delete("discovery.device." + str(x)) + x = x + 1 + print("%s Locks Cleared" % x) + print("Clearing Nodes") + memc.delete("discovery.master") + memc.delete("discovery.nodes") + else: + memc.decr("discovery.nodes") + print("Finished %s." % time.time()) + # EOC6 + + show_stopper = False + + if total_time > 21600: + print( + "WARNING: the process took more than 6 hours to finish, you need faster hardware or more threads" + ) + print( + "INFO: in sequential style discovery the elapsed time would have been: %s seconds" + % real_duration + ) + for device in per_device_duration: + if per_device_duration[device] > 3600: + print( + "WARNING: device %s is taking too long: %s seconds" + % (device, per_device_duration[device]) + ) + show_stopper = True + if show_stopper: + print( + "ERROR: Some devices are taking more than 3600 seconds, the script cannot recommend you what to do." + ) + else: + recommend = int(total_time / 300.0 * amount_of_workers + 1) + print( + "WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" + % recommend + ) + + sys.exit(2) diff --git a/librenms-service.py b/librenms-service.py index 83f52dcb7c..0be34c5d60 100755 --- a/librenms-service.py +++ b/librenms-service.py @@ -43,10 +43,9 @@ if __name__ == "__main__": if args.verbose: logging.getLogger().setLevel(logging.INFO) - elif args.debug: + + if args.debug: logging.getLogger().setLevel(logging.DEBUG) - else: - logging.getLogger().setLevel(logging.WARNING) info("Configuring LibreNMS service") try: diff --git a/poller-wrapper.py b/poller-wrapper.py index 26af56c026..78e46c75fc 100755 --- a/poller-wrapper.py +++ b/poller-wrapper.py @@ -1,57 +1,468 @@ #! /usr/bin/env python3 """ -This is a Bootstrap script for wrapper.py, in order to retain compatibility with earlier LibreNMS setups + poller-wrapper A small tool which wraps around the poller and tries to + guide the polling process with a more modern approach with a + Queue and workers + + Authors: Job Snijders + Orsiris de Jong + Date: Oct 2019 + + Usage: This program accepts one command line argument: the number of threads + that should run simultaneously. If no argument is given it will assume + a default of 16 threads. + + Ubuntu Linux: apt-get install python-mysqldb + FreeBSD: cd /usr/ports/*/py-MySQLdb && make install clean + RHEL 7: yum install MySQL-python + RHEL 8: dnf install mariadb-connector-c-devel gcc && python -m pip install mysqlclient + + Tested on: Python 3.6.8 / PHP 7.2.11 / CentOS 8.0 + + License: To the extent possible under law, Job Snijders has waived all + copyright and related or neighboring rights to this script. + This script has been put into the Public Domain. This work is + published from: The Netherlands. """ -import os -from argparse import ArgumentParser - -import LibreNMS -import LibreNMS.wrapper as wrapper - -WRAPPER_TYPE = "poller" -DEFAULT_WORKERS = 16 - -""" - Take the amount of threads we want to run in parallel from the commandline - if None are given or the argument was garbage, fall back to default -""" -usage = ( - "usage: %(prog)s [options] (Default: {}" - "(Do not set too high, or you will get an OOM)".format(DEFAULT_WORKERS) -) -description = "Spawn multiple poller.php processes in parallel." -parser = ArgumentParser(usage=usage, description=description) -parser.add_argument(dest="amount_of_workers", default=DEFAULT_WORKERS) -parser.add_argument( - "-d", - "--debug", - dest="debug", - action="store_true", - default=False, - help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.", -) -args = parser.parse_args() - -config = LibreNMS.get_config_data(os.path.dirname(os.path.realpath(__file__))) -log_dir = config["log_dir"] -log_file = os.path.join(log_dir, WRAPPER_TYPE + ".log") -logger = LibreNMS.logger_get_logger(log_file, debug=args.debug) +import LibreNMS.library as LNMS try: - amount_of_workers = int(args.amount_of_workers) -except (IndexError, ValueError): - amount_of_workers = DEFAULT_WORKERS - logger.warning( - "Bogus number of workers given. Using default number ({}) of workers.".format( - amount_of_workers + + import json + import os + import queue + import subprocess + import sys + import threading + import time + from optparse import OptionParser + +except ImportError as exc: + print("ERROR: missing one or more of the following python modules:") + print("threading, queue, sys, subprocess, time, os, json") + print("ERROR: %s" % exc) + sys.exit(2) + + +APP_NAME = "poller_wrapper" +LOG_FILE = "logs/" + APP_NAME + ".log" +_DEBUG = False +distpoll = False +real_duration = 0 +polled_devices = 0 + +""" + Threading helper functions +""" +# (c) 2015, GPLv3, Daniel Preussker << << <<> %s/poll_device_%s.log" % (log_dir, device_id) + if debug + else ">> /dev/null" + ) + command = "/usr/bin/env php %s -h %s %s 2>&1" % ( + poller_path, + device_id, + output, + ) + # TODO: replace with command_runner + subprocess.check_call(command, shell=True) + + elapsed_time = int(time.time() - start_time) + print_queue.put( + [threading.current_thread().name, device_id, elapsed_time] + ) + except (KeyboardInterrupt, SystemExit): + raise + except: + pass + poll_queue.task_done() + + +if __name__ == "__main__": + logger = LNMS.logger_get_logger(LOG_FILE, debug=_DEBUG) + + install_dir = os.path.dirname(os.path.realpath(__file__)) + LNMS.check_for_file(install_dir + "/.env") + config = json.loads(LNMS.get_config_data(install_dir)) + + poller_path = config["install_dir"] + "/poller.php" + log_dir = config["log_dir"] + + if "rrd" in config and "step" in config["rrd"]: + step = config["rrd"]["step"] + else: + step = 300 + + # (c) 2015, GPLv3, Daniel Preussker << << << << 0: + try: + time.sleep(1) + nodes = memc.get(nodes_tag) + except: + pass + print("Clearing Locks for %s" % time_tag) + x = minlocks + while x <= maxlocks: + res = memc.delete("poller.device.%s.%s" % (x, time_tag)) + x += 1 + print("%s Locks Cleared" % x) + print("Clearing Nodes") + memc.delete(master_tag) + memc.delete(nodes_tag) + else: + memc.decr(nodes_tag) + print("Finished %.3fs after interval start." % (time.time() - int(time_tag))) + # EOC6 + + show_stopper = False + + db = LNMS.db_open( + config["db_socket"], + config["db_host"], + config["db_port"], + config["db_user"], + config["db_pass"], + config["db_name"], + ) + cursor = db.cursor() + query = ( + "update pollers set last_polled=NOW(), devices='%d', time_taken='%d' where poller_name='%s'" + % (polled_devices, total_time, config["distributed_poller_name"]) + ) + response = cursor.execute(query) + if response == 1: + db.commit() + else: + query = ( + "insert into pollers set poller_name='%s', last_polled=NOW(), devices='%d', time_taken='%d'" + % (config["distributed_poller_name"], polled_devices, total_time) + ) + cursor.execute(query) + db.commit() + db.close() + + if total_time > step: + print( + "WARNING: the process took more than %s seconds to finish, you need faster hardware or more threads" + % step + ) + print( + "INFO: in sequential style polling the elapsed time would have been: %s seconds" + % real_duration + ) + for device in per_device_duration: + if per_device_duration[device] > step: + print( + "WARNING: device %s is taking too long: %s seconds" + % (device, per_device_duration[device]) + ) + show_stopper = True + if show_stopper: + print( + "ERROR: Some devices are taking more than %s seconds, the script cannot recommend you what to do." + % step + ) + else: + recommend = int(total_time / step * amount_of_workers + 1) + print( + "WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" + % recommend + ) + + sys.exit(2) diff --git a/requirements.txt b/requirements.txt index 84c60d65b6..b5d86ebea7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,3 @@ python-dotenv redis>=3.0 setuptools psutil -command_runner>=0.7.0 diff --git a/services-wrapper.py b/services-wrapper.py index 2f1e944159..5a4f1c08da 100755 --- a/services-wrapper.py +++ b/services-wrapper.py @@ -1,57 +1,440 @@ #! /usr/bin/env python3 """ -This is a Bootstrap script for wrapper.py, in order to retain compatibility with earlier LibreNMS setups + services-wrapper A small tool which wraps around check-services.php and tries to + guide the services process with a more modern approach with a + Queue and workers. + + Based on the original version of poller-wrapper.py by Job Snijders + + Author: Neil Lathwood + Orsiris de Jong + Date: Oct 2019 + + Usage: This program accepts one command line argument: the number of threads + that should run simultaneously. If no argument is given it will assume + a default of 1 thread. + + Ubuntu Linux: apt-get install python-mysqldb + FreeBSD: cd /usr/ports/*/py-MySQLdb && make install clean + RHEL 7: yum install MySQL-python + RHEL 8: dnf install mariadb-connector-c-devel gcc && python -m pip install mysqlclient + + Tested on: Python 3.6.8 / PHP 7.2.11 / CentOS 8 + + License: This program is free software: you can redistribute it and/or modify it + under the terms of the GNU General Public License as published by the + Free Software Foundation, either version 3 of the License, or (at your + option) any later version. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General + Public License for more details. + + You should have received a copy of the GNU General Public License along + with this program. If not, see https://www.gnu.org/licenses/. + + LICENSE.txt contains a copy of the full GPLv3 licensing conditions. """ -import os -from argparse import ArgumentParser - -import LibreNMS -import LibreNMS.wrapper as wrapper - -WRAPPER_TYPE = "service" -DEFAULT_WORKERS = 1 - -""" - Take the amount of threads we want to run in parallel from the commandline - if None are given or the argument was garbage, fall back to default -""" -usage = ( - "usage: %(prog)s [options] (Default: {}" - "(Do not set too high, or you will get an OOM)".format(DEFAULT_WORKERS) -) -description = "Spawn multiple check-services.php processes in parallel." -parser = ArgumentParser(usage=usage, description=description) -parser.add_argument(dest="amount_of_workers", default=DEFAULT_WORKERS) -parser.add_argument( - "-d", - "--debug", - dest="debug", - action="store_true", - default=False, - help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.", -) -args = parser.parse_args() - -config = LibreNMS.get_config_data(os.path.dirname(os.path.realpath(__file__))) -log_dir = config["log_dir"] -log_file = os.path.join(log_dir, WRAPPER_TYPE + ".log") -logger = LibreNMS.logger_get_logger(log_file, debug=args.debug) +import LibreNMS.library as LNMS try: - amount_of_workers = int(args.amount_of_workers) -except (IndexError, ValueError): - amount_of_workers = DEFAULT_WORKERS - logger.warning( - "Bogus number of workers given. Using default number ({}) of workers.".format( - amount_of_workers + + import json + import os + import queue + import subprocess + import sys + import threading + import time + from optparse import OptionParser + +except ImportError as exc: + print("ERROR: missing one or more of the following python modules:") + print("threading, queue, sys, subprocess, time, os, json") + print("ERROR: %s" % exc) + sys.exit(2) + + +APP_NAME = "services_wrapper" +LOG_FILE = "logs/" + APP_NAME + ".log" +_DEBUG = False +servicedisco = False +real_duration = 0 +service_devices = 0 + +""" + Threading helper functions +""" +# (c) 2015, GPLv3, Daniel Preussker << << <<> %s/services_device_%s.log" % (log_dir, device_id) + if debug + else ">> /dev/null" + ) + # TODO replace with command_runner + command = "/usr/bin/env php %s -h %s %s 2>&1" % ( + service_path, + device_id, + output, + ) + subprocess.check_call(command, shell=True) + + elapsed_time = int(time.time() - start_time) + print_queue.put( + [threading.current_thread().name, device_id, elapsed_time] + ) + except (KeyboardInterrupt, SystemExit): + raise + except: + pass + poll_queue.task_done() + + +if __name__ == "__main__": + logger = LNMS.logger_get_logger(LOG_FILE, debug=_DEBUG) + + install_dir = os.path.dirname(os.path.realpath(__file__)) + LNMS.check_for_file(install_dir + "/.env") + config = json.loads(LNMS.get_config_data(install_dir)) + + service_path = config["install_dir"] + "/check-services.php" + log_dir = config["log_dir"] + + # (c) 2015, GPLv3, Daniel Preussker << << << << 0: + try: + time.sleep(1) + nodes = memc.get("service.nodes") + except: + pass + print("Clearing Locks") + x = minlocks + while x <= maxlocks: + memc.delete("service.device." + str(x)) + x = x + 1 + print("%s Locks Cleared" % x) + print("Clearing Nodes") + memc.delete("service.master") + memc.delete("service.nodes") + else: + memc.decr("service.nodes") + print("Finished %s." % time.time()) + # EOC6 + + show_stopper = False + + if total_time > 300: + print( + "WARNING: the process took more than 5 minutes to finish, you need faster hardware or more threads" + ) + print( + "INFO: in sequential style service checks the elapsed time would have been: %s seconds" + % real_duration + ) + for device in per_device_duration: + if per_device_duration[device] > 300: + print( + "WARNING: device %s is taking too long: %s seconds" + % (device, per_device_duration[device]) + ) + show_stopper = True + if show_stopper: + print( + "ERROR: Some devices are taking more than 300 seconds, the script cannot recommend you what to do." + ) + else: + recommend = int(total_time / 300.0 * amount_of_workers + 1) + print( + "WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" + % recommend + ) + + sys.exit(2)