#! /usr/bin/env python3 """ wrapper A small tool which wraps services, discovery and poller php scripts in order to run them as threads with Queue and workers Authors: Orsiris de Jong Neil Lathwood Job Snijders Distributed poller code (c) 2015, GPLv3, Daniel Preussker All code parts that belong to Daniel are enclosed in EOC comments Date: Sep 2021 Usage: This program accepts three command line arguments - the number of threads (defaults to 1 for discovery / service, and 16 for poller) - the wrapper type (service, discovery or poller) - optional debug boolean Ubuntu Linux: apt-get install python-mysqldb FreeBSD: cd /usr/ports/*/py-MySQLdb && make install clean RHEL 7: yum install MySQL-python RHEL 8: dnf install mariadb-connector-c-devel gcc && python -m pip install mysqlclient Tested on: Python 3.6.8 / PHP 7.2.11 / CentOS 8 / AlmaLinux 8.4 License: This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see https://www.gnu.org/licenses/. LICENSE.txt contains a copy of the full GPLv3 licensing conditions. """ import logging import os import queue import sys import threading import time import uuid from argparse import ArgumentParser import LibreNMS from LibreNMS.command_runner import command_runner logger = logging.getLogger(__name__) # Timeout in seconds for any poller / service / discovery action per device # Should be higher than stepping which defaults to 300 PER_DEVICE_TIMEOUT = 900 # 5 = no new discovered devices, 6 = unreachable device VALID_EXIT_CODES = [0, 5, 6] DISTRIBUTED_POLLING = False # Is overriden by config.php REAL_DURATION = 0 DISCOVERED_DEVICES_COUNT = 0 PER_DEVICE_DURATION = {} ERRORS = 0 MEMC = None IS_NODE = None STEPPING = None MASTER_TAG = None NODES_TAG = None TIME_TAG = "" """ Per wrapper type configuration All time related variables are in seconds """ wrappers = { "service": { "executable": "check-services.php", "table_name": "services", "memc_touch_time": 10, "stepping": 300, "nodes_stepping": 300, "total_exec_time": 300, }, "discovery": { "executable": "discovery.php", "table_name": "devices", "memc_touch_time": 30, "stepping": 300, "nodes_stepping": 3600, "total_exec_time": 21600, }, "poller": { "executable": "poller.php", "table_name": "devices", "memc_touch_time": 10, "stepping": 300, "nodes_stepping": 300, "total_exec_time": 300, }, } """ Threading helper functions """ # << None """ Actual code that runs various php scripts, in single node mode or distributed poller mode """ global MEMC global IS_NODE global DISTRIBUTED_POLLING global MASTER_TAG global NODES_TAG global TIME_TAG global STEPPING # Setup wrapper dependent variables STEPPING = wrappers[wrapper_type]["stepping"] if wrapper_type == "poller": if "rrd" in config and "step" in config["rrd"]: STEPPING = config["rrd"]["step"] TIME_TAG = "." + str(get_time_tag(STEPPING)) MASTER_TAG = "{}.master{}".format(wrapper_type, TIME_TAG) NODES_TAG = "{}.nodes{}".format(wrapper_type, TIME_TAG) # << amount_of_devices: amount_of_workers = amount_of_devices logger.info( "starting the {} check at {} with {} threads for {} devices".format( wrapper_type, time.strftime("%Y-%m-%d %H:%M:%S"), amount_of_workers, amount_of_devices, ) ) for device_id in devices_list: poll_queue.put(device_id) for _ in range(amount_of_workers): worker = threading.Thread( target=poll_worker, kwargs={ "poll_queue": poll_queue, "print_queue": print_queue, "config": config, "log_dir": log_dir, "wrapper_type": wrapper_type, "debug": _debug, }, ) worker.setDaemon(True) worker.start() pworker = threading.Thread( target=print_worker, kwargs={"print_queue": print_queue, "wrapper_type": wrapper_type}, ) pworker.setDaemon(True) pworker.start() try: poll_queue.join() print_queue.join() except (KeyboardInterrupt, SystemExit): raise total_time = int(time.time() - s_time) end_msg = "{}-wrapper checked {} devices in {} seconds with {} workers with {} errors".format( wrapper_type, DISCOVERED_DEVICES_COUNT, total_time, amount_of_workers, ERRORS ) if ERRORS == 0: logger.info(end_msg) else: logger.error(end_msg) # << 0: try: time.sleep(1) nodes = MEMC.get(NODES_TAG) except: pass logger.info("Clearing Locks for {}".format(NODES_TAG)) x = minlocks while x <= maxlocks: MEMC.delete("{}.device.{}".format(wrapper_type, x)) x = x + 1 logger.info("{} Locks Cleared".format(x)) logger.info("Clearing Nodes") MEMC.delete(MASTER_TAG) MEMC.delete(NODES_TAG) else: MEMC.decr(NODES_TAG) logger.info("Finished {}.".format(time.strftime("%Y-%m-%d %H:%M:%S"))) # EOC # Update poller statistics if wrapper_type == "poller": query = "UPDATE pollers SET last_polled=NOW(), devices='{}', time_taken='{}' WHERE poller_name='{}'".format( DISCOVERED_DEVICES_COUNT, total_time, config["distributed_poller_name"] ) cursor = db_connection.query(query) if cursor.rowcount < 1: query = "INSERT INTO pollers SET poller_name='{}', last_polled=NOW(), devices='{}', time_taken='{}'".format( config["distributed_poller_name"], DISCOVERED_DEVICES_COUNT, total_time ) db_connection.query(query) db_connection.close() if total_time > wrappers[wrapper_type]["total_exec_time"]: logger.warning( "the process took more than {} seconds to finish, you need faster hardware or more threads".format( wrappers[wrapper_type]["total_exec_time"] ) ) logger.warning( "in sequential style service checks the elapsed time would have been: {} seconds".format( REAL_DURATION ) ) show_stopper = False for device in PER_DEVICE_DURATION: if PER_DEVICE_DURATION[device] > wrappers[wrapper_type]["nodes_stepping"]: logger.warning( "device {} is taking too long: {} seconds".format( device, PER_DEVICE_DURATION[device] ) ) show_stopper = True if show_stopper: logger.error( "Some devices are taking more than {} seconds, the script cannot recommend you what to do.".format( wrappers[wrapper_type]["nodes_stepping"] ) ) else: recommend = int(total_time / STEPPING * amount_of_workers + 1) logger.warning( "Consider setting a minimum of {} threads. (This does not constitute professional advice!)".format( recommend ) ) sys.exit(2) if __name__ == "__main__": parser = ArgumentParser( prog="wrapper.py", usage="usage: %(prog)s [options] \n" "wrapper_type = 'service', 'poller' or 'disccovery'" "workers defaults to 1 for service and discovery, and 16 for poller " "(Do not set too high, or you will get an OOM)", description="Spawn multiple librenms php processes in parallel.", ) parser.add_argument( "-d", "--debug", action="store_true", default=False, help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.", ) parser.add_argument( dest="wrapper", default=None, help="Execute wrapper for 'service', 'poller' or 'discovery'", ) parser.add_argument( dest="threads", action="store_true", default=None, help="Number of workers" ) args = parser.parse_args() debug = args.debug wrapper_type = args.wrapper amount_of_workers = args.threads if wrapper_type not in ["service", "discovery", "poller"]: parser.error("Invalid wrapper type '{}'".format(wrapper_type)) sys.exit(4) config = LibreNMS.get_config_data( os.path.dirname(os.path.dirname(os.path.realpath(__file__))) ) log_dir = config["log_dir"] log_file = os.path.join(log_dir, wrapper_type + ".log") logger = LibreNMS.logger_get_logger(log_file, debug=debug) try: amount_of_workers = int(amount_of_workers) except (IndexError, ValueError, TypeError): amount_of_workers = ( 16 if wrapper_type == "poller" else 1 ) # Defaults to 1 for service/discovery, 16 for poller logger.warning( "Bogus number of workers given. Using default number ({}) of workers.".format( amount_of_workers ) ) wrapper(wrapper_type, amount_of_workers, config, log_dir, _debug=debug)