import LibreNMS import json import logging import os import pymysql import subprocess import threading import sys import time try: import psutil except ImportError: pass from datetime import timedelta from datetime import datetime from logging import debug, info, warning, error, critical, exception from platform import python_version from time import sleep from socket import gethostname from signal import signal, SIGTERM, SIGQUIT, SIGINT, SIGHUP, SIGCHLD, SIG_DFL from uuid import uuid1 class ServiceConfig: def __init__(self): """ Stores all of the configuration variables for the LibreNMS service in a common object Starts with defaults, but can be populated with variables from config.php by calling populate() """ self._uuid = str(uuid1()) self.set_name(gethostname()) def set_name(self, name): if name: self.name = name.strip() self.unique_name = "{}-{}".format(self.name, self._uuid) class PollerConfig: def __init__(self, workers, frequency, calculate=None): self.enabled = True self.workers = workers self.frequency = frequency self.calculate = calculate # config variables with defaults BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)) node_id = None name = None unique_name = None single_instance = True distributed = False group = 0 debug = False log_level = 20 max_db_failures = 5 alerting = PollerConfig(1, 60) poller = PollerConfig(24, 300) services = PollerConfig(8, 300) discovery = PollerConfig(16, 21600) billing = PollerConfig(2, 300, 60) ping = PollerConfig(1, 60) down_retry = 60 update_enabled = True update_frequency = 86400 master_resolution = 1 master_timeout = 10 redis_host = 'localhost' redis_port = 6379 redis_db = 0 redis_pass = None redis_socket = None redis_sentinel = None redis_sentinel_service = None redis_timeout = 60 db_host = 'localhost' db_port = 0 db_socket = None db_user = 'librenms' db_pass = '' db_name = 'librenms' watchdog_enabled = False watchdog_logfile = 'logs/librenms.log' def populate(self): config = self._get_config_data() # populate config variables self.node_id = os.getenv('NODE_ID') self.set_name(config.get('distributed_poller_name', None)) self.distributed = config.get('distributed_poller', ServiceConfig.distributed) self.group = ServiceConfig.parse_group(config.get('distributed_poller_group', ServiceConfig.group)) # backward compatible options self.poller.workers = config.get('poller_service_workers', ServiceConfig.poller.workers) self.poller.frequency = config.get('poller_service_poll_frequency', ServiceConfig.poller.frequency) self.discovery.frequency = config.get('poller_service_discover_frequency', ServiceConfig.discovery.frequency) self.down_retry = config.get('poller_service_down_retry', ServiceConfig.down_retry) self.log_level = config.get('poller_service_loglevel', ServiceConfig.log_level) # new options self.poller.enabled = config.get('service_poller_enabled', True) # unused self.poller.workers = config.get('service_poller_workers', ServiceConfig.poller.workers) self.poller.frequency = config.get('service_poller_frequency', ServiceConfig.poller.frequency) self.discovery.enabled = config.get('service_discovery_enabled', True) # unused self.discovery.workers = config.get('service_discovery_workers', ServiceConfig.discovery.workers) self.discovery.frequency = config.get('service_discovery_frequency', ServiceConfig.discovery.frequency) self.services.enabled = config.get('service_services_enabled', True) self.services.workers = config.get('service_services_workers', ServiceConfig.services.workers) self.services.frequency = config.get('service_services_frequency', ServiceConfig.services.frequency) self.billing.enabled = config.get('service_billing_enabled', True) self.billing.frequency = config.get('service_billing_frequency', ServiceConfig.billing.frequency) self.billing.calculate = config.get('service_billing_calculate_frequency', ServiceConfig.billing.calculate) self.alerting.enabled = config.get('service_alerting_enabled', True) self.alerting.frequency = config.get('service_alerting_frequency', ServiceConfig.alerting.frequency) self.ping.enabled = config.get('service_ping_enabled', False) self.ping.frequency = config.get('ping_rrd_step', ServiceConfig.ping.frequency) self.down_retry = config.get('service_poller_down_retry', ServiceConfig.down_retry) self.log_level = config.get('service_loglevel', ServiceConfig.log_level) self.update_enabled = config.get('service_update_enabled', ServiceConfig.update_enabled) self.update_frequency = config.get('service_update_frequency', ServiceConfig.update_frequency) self.redis_host = os.getenv('REDIS_HOST', config.get('redis_host', ServiceConfig.redis_host)) self.redis_db = os.getenv('REDIS_DB', config.get('redis_db', ServiceConfig.redis_db)) self.redis_pass = os.getenv('REDIS_PASSWORD', config.get('redis_pass', ServiceConfig.redis_pass)) self.redis_port = int(os.getenv('REDIS_PORT', config.get('redis_port', ServiceConfig.redis_port))) self.redis_socket = os.getenv('REDIS_SOCKET', config.get('redis_socket', ServiceConfig.redis_socket)) self.redis_sentinel = os.getenv('REDIS_SENTINEL', config.get('redis_sentinel', ServiceConfig.redis_sentinel)) self.redis_sentinel_service = os.getenv('REDIS_SENTINEL_SERVICE', config.get('redis_sentinel_service', ServiceConfig.redis_sentinel_service)) self.redis_timeout = os.getenv('REDIS_TIMEOUT', self.alerting.frequency if self.alerting.frequency != 0 else self.redis_timeout) self.db_host = os.getenv('DB_HOST', config.get('db_host', ServiceConfig.db_host)) self.db_name = os.getenv('DB_DATABASE', config.get('db_name', ServiceConfig.db_name)) self.db_pass = os.getenv('DB_PASSWORD', config.get('db_pass', ServiceConfig.db_pass)) self.db_port = int(os.getenv('DB_PORT', config.get('db_port', ServiceConfig.db_port))) self.db_socket = os.getenv('DB_SOCKET', config.get('db_socket', ServiceConfig.db_socket)) self.db_user = os.getenv('DB_USERNAME', config.get('db_user', ServiceConfig.db_user)) self.watchdog_enabled = config.get('service_watchdog_enabled', ServiceConfig.watchdog_enabled) self.watchdog_logfile = config.get('log_file', ServiceConfig.watchdog_logfile) # set convenient debug variable self.debug = logging.getLogger().isEnabledFor(logging.DEBUG) if not self.debug and self.log_level: try: logging.getLogger().setLevel(self.log_level) except ValueError: error("Unknown log level {}, must be one of 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'".format(self.log_level)) logging.getLogger().setLevel(logging.INFO) def load_poller_config(self, db): try: settings = {} cursor = db.query('SELECT * FROM `poller_cluster` WHERE `node_id`=%s', self.node_id) if cursor.rowcount == 0: return for index, setting in enumerate(cursor.fetchone()): name = cursor.description[index][0] settings[name] = setting if settings['poller_name'] is not None: self.set_name(settings['poller_name']) if settings['poller_groups'] is not None: self.group = ServiceConfig.parse_group(settings['poller_groups']) if settings['poller_enabled'] is not None: self.poller.enabled = settings['poller_enabled'] if settings['poller_frequency'] is not None: self.poller.frequency = settings['poller_frequency'] if settings['poller_workers'] is not None: self.poller.workers = settings['poller_workers'] if settings['poller_down_retry'] is not None: self.down_retry = settings['poller_down_retry'] if settings['discovery_enabled'] is not None: self.discovery.enabled = settings['discovery_enabled'] if settings['discovery_frequency'] is not None: self.discovery.frequency = settings['discovery_frequency'] if settings['discovery_workers'] is not None: self.discovery.workers = settings['discovery_workers'] if settings['services_enabled'] is not None: self.services.enabled = settings['services_enabled'] if settings['services_frequency'] is not None: self.services.frequency = settings['services_frequency'] if settings['services_workers'] is not None: self.services.workers = settings['services_workers'] if settings['billing_enabled'] is not None: self.billing.enabled = settings['billing_enabled'] if settings['billing_frequency'] is not None: self.billing.frequency = settings['billing_frequency'] if settings['billing_calculate_frequency'] is not None: self.billing.calculate = settings['billing_calculate_frequency'] if settings['alerting_enabled'] is not None: self.alerting.enabled = settings['alerting_enabled'] if settings['alerting_frequency'] is not None: self.alerting.frequency = settings['alerting_frequency'] if settings['ping_enabled'] is not None: self.ping.enabled = settings['ping_enabled'] if settings['ping_frequency'] is not None: self.ping.frequency = settings['ping_frequency'] if settings['update_enabled'] is not None: self.update_enabled = settings['update_enabled'] if settings['update_frequency'] is not None: self.update_frequency = settings['update_frequency'] if settings['loglevel'] is not None: self.log_level = settings['loglevel'] if settings['watchdog_enabled'] is not None: self.watchdog_enabled = settings['watchdog_enabled'] if settings['watchdog_log'] is not None: self.watchdog_logfile = settings['watchdog_log'] except pymysql.err.Error: warning('Unable to load poller (%s) config', self.node_id) def _get_config_data(self): try: import dotenv env_path = "{}/.env".format(self.BASE_DIR) info("Attempting to load .env from '%s'", env_path) dotenv.load_dotenv(dotenv_path=env_path, verbose=True) if not os.getenv('NODE_ID'): raise ImportError(".env does not contain a valid NODE_ID setting.") except ImportError as e: exception("Could not import .env - check that the poller user can read the file, and that composer install has been run recently") sys.exit(3) config_cmd = ['/usr/bin/env', 'php', '{}/config_to_json.php'.format(self.BASE_DIR), '2>&1'] try: return json.loads(subprocess.check_output(config_cmd).decode()) except subprocess.CalledProcessError as e: error("ERROR: Could not load or parse configuration! {}: {}" .format(subprocess.list2cmdline(e.cmd), e.output.decode())) @staticmethod def parse_group(g): if g is None: return [0] elif type(g) is int: return [g] elif type(g) is str: try: return [int(x) for x in set(g.split(','))] except ValueError: pass error("Could not parse group string, defaulting to 0") return [0] class Service: config = ServiceConfig() _fp = False _started = False start_time = 0 queue_managers = {} poller_manager = None discovery_manager = None last_poll = {} reap_flag = False terminate_flag = False reload_flag = False db_failures = 0 def __init__(self): self.start_time = time.time() self.config.populate() self._db = LibreNMS.DB(self.config) self.config.load_poller_config(self._db) threading.current_thread().name = self.config.name # rename main thread self.attach_signals() self._lm = self.create_lock_manager() self.daily_timer = LibreNMS.RecurringTimer(self.config.update_frequency, self.run_maintenance, 'maintenance') self.stats_timer = LibreNMS.RecurringTimer(self.config.poller.frequency, self.log_performance_stats, 'performance') if self.config.watchdog_enabled: info("Starting watchdog timer for log file: {}".format(self.config.watchdog_logfile)) self.watchdog_timer = LibreNMS.RecurringTimer(self.config.poller.frequency, self.logfile_watchdog, 'watchdog') else: info("Watchdog is disabled.") self.is_master = False def service_age(self): return time.time() - self.start_time def attach_signals(self): info("Attaching signal handlers on thread %s", threading.current_thread().name) signal(SIGTERM, self.terminate) # capture sigterm and exit gracefully signal(SIGQUIT, self.terminate) # capture sigquit and exit gracefully signal(SIGINT, self.terminate) # capture sigint and exit gracefully signal(SIGHUP, self.reload) # capture sighup and restart gracefully if 'psutil' not in sys.modules: warning("psutil is not available, polling gap possible") else: signal(SIGCHLD, self.reap) # capture sigchld and reap the process def reap_psutil(self): """ A process from a previous invocation is trying to report its status """ # Speed things up by only looking at direct zombie children for p in psutil.Process().children(recursive=False): try: cmd = p.cmdline() # cmdline is uncached, so needs to go here to avoid NoSuchProcess status = p.status() if status == psutil.STATUS_ZOMBIE: pid = p.pid r = os.waitpid(p.pid, os.WNOHANG) warning('Reaped long running job "%s" in state %s with PID %d - job returned %d', cmd, status, r[0], r[1]) except (OSError, psutil.NoSuchProcess): # process was already reaped continue def start(self): debug("Performing startup checks...") if self.config.single_instance: self.check_single_instance() # don't allow more than one service at a time if self._started: raise RuntimeWarning("Not allowed to start Poller twice") self._started = True debug("Starting up queue managers...") # initialize and start the worker pools self.poller_manager = LibreNMS.PollerQueueManager(self.config, self._lm) self.queue_managers['poller'] = self.poller_manager self.discovery_manager = LibreNMS.DiscoveryQueueManager(self.config, self._lm) self.queue_managers['discovery'] = self.discovery_manager if self.config.alerting.enabled: self.queue_managers['alerting'] = LibreNMS.AlertQueueManager(self.config, self._lm) if self.config.services.enabled: self.queue_managers['services'] = LibreNMS.ServicesQueueManager(self.config, self._lm) if self.config.billing.enabled: self.queue_managers['billing'] = LibreNMS.BillingQueueManager(self.config, self._lm) if self.config.ping.enabled: self.queue_managers['ping'] = LibreNMS.PingQueueManager(self.config, self._lm) if self.config.update_enabled: self.daily_timer.start() self.stats_timer.start() if self.config.watchdog_enabled: self.watchdog_timer.start() info("LibreNMS Service: {} started!".format(self.config.unique_name)) info("Poller group {}. Using Python {} and {} locks and queues" .format('0 (default)' if self.config.group == [0] else self.config.group, python_version(), 'redis' if isinstance(self._lm, LibreNMS.RedisLock) else 'internal')) if self.config.update_enabled: info("Maintenance tasks will be run every {}".format(timedelta(seconds=self.config.update_frequency))) else: warning("Maintenance tasks are disabled.") # Main dispatcher loop try: while not self.terminate_flag: if self.reload_flag: info("Picked up reload flag, calling the reload process") self.restart() if self.reap_flag: self.reap_psutil() # Re-arm the signal handler signal(SIGCHLD, self.reap) self.reap_flag = False master_lock = self._acquire_master() if master_lock: if not self.is_master: info("{} is now the master dispatcher".format(self.config.name)) self.is_master = True self.start_dispatch_timers() devices = self.fetch_immediate_device_list() for device in devices: device_id = device[0] group = device[1] if device[2]: # polling self.dispatch_immediate_polling(device_id, group) if device[3]: # discovery self.dispatch_immediate_discovery(device_id, group) else: if self.is_master: info("{} is no longer the master dispatcher".format(self.config.name)) self.stop_dispatch_timers() self.is_master = False # no longer master sleep(self.config.master_resolution) except KeyboardInterrupt: pass info("Dispatch loop terminated") self.shutdown() def _acquire_master(self): return self._lm.lock('dispatch.master', self.config.unique_name, self.config.master_timeout, True) def _release_master(self): self._lm.unlock('dispatch.master', self.config.unique_name) # ------------ Discovery ------------ def dispatch_immediate_discovery(self, device_id, group): if not self.discovery_manager.is_locked(device_id): self.discovery_manager.post_work(device_id, group) # ------------ Polling ------------ def dispatch_immediate_polling(self, device_id, group): if not self.poller_manager.is_locked(device_id): self.poller_manager.post_work(device_id, group) if self.config.debug: cur_time = time.time() elapsed = cur_time - self.last_poll.get(device_id, cur_time) self.last_poll[device_id] = cur_time # arbitrary limit to reduce spam if elapsed > (self.config.poller.frequency - self.config.master_resolution): debug("Dispatching polling for device {}, time since last poll {:.2f}s" .format(device_id, elapsed)) def fetch_immediate_device_list(self): try: poller_find_time = self.config.poller.frequency - 1 discovery_find_time = self.config.discovery.frequency - 1 result = self._db.query('''SELECT `device_id`, `poller_group`, COALESCE(`last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_polled_timetaken` SECOND), 1) AS `poll`, IF(snmp_disable=1 OR status=0, 0, IF (%s < `last_discovered_timetaken` * 1.25, 0, COALESCE(`last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_discovered_timetaken` SECOND), 1))) AS `discover` FROM `devices` WHERE `disabled` = 0 AND ( `last_polled` IS NULL OR `last_discovered` IS NULL OR `last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_polled_timetaken` SECOND) OR `last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_discovered_timetaken` SECOND) ) ORDER BY `last_polled_timetaken` DESC''', (poller_find_time, self.service_age(), discovery_find_time, poller_find_time, discovery_find_time)) self.db_failures = 0 return result except pymysql.err.Error: self.db_failures += 1 if self.db_failures > self.config.max_db_failures: warning("Too many DB failures ({}), attempting to release master".format(self.db_failures)) self._release_master() sleep(self.config.master_resolution) # sleep to give another node a chance to acquire return [] def run_maintenance(self): """ Runs update and cleanup tasks by calling daily.sh. Reloads the python script after the update. Sets a schema-update lock so no distributed pollers will update until the schema has been updated. """ attempt = 0 wait = 5 max_runtime = 86100 max_tries = int(max_runtime / wait) info("Waiting for schema lock") while not self._lm.lock('schema-update', self.config.unique_name, max_runtime): attempt += 1 if attempt >= max_tries: # don't get stuck indefinitely warning('Reached max wait for other pollers to update, updating now') break sleep(wait) info("Running maintenance tasks") try: output = LibreNMS.call_script('daily.sh') info("Maintenance tasks complete\n{}".format(output)) except subprocess.CalledProcessError as e: error("Error in daily.sh:\n" + (e.output.decode() if e.output is not None else 'No output')) self._lm.unlock('schema-update', self.config.unique_name) self.restart() def create_lock_manager(self): """ Create a new LockManager. Tries to create a Redis LockManager, but falls back to python's internal threading lock implementation. Exits if distributing poller is enabled and a Redis LockManager cannot be created. :return: Instance of LockManager """ try: return LibreNMS.RedisLock(namespace='librenms.lock', host=self.config.redis_host, port=self.config.redis_port, db=self.config.redis_db, password=self.config.redis_pass, unix_socket_path=self.config.redis_socket, sentinel=self.config.redis_sentinel, sentinel_service=self.config.redis_sentinel_service, socket_timeout=self.config.redis_timeout) except ImportError: if self.config.distributed: critical("ERROR: Redis connection required for distributed polling") critical("Please install redis-py, either through your os software repository or from PyPI") self.exit(2) except Exception as e: if self.config.distributed: critical("ERROR: Redis connection required for distributed polling") critical("Could not connect to Redis. {}".format(e)) self.exit(2) return LibreNMS.ThreadingLock() def restart(self): """ Stop then recreate this entire process by re-calling the original script. Has the effect of reloading the python files from disk. """ if sys.version_info < (3, 4, 0): warning("Skipping restart as running under an incompatible interpreter") warning("Please restart manually") return info('Restarting service... ') if 'psutil' not in sys.modules: warning("psutil is not available, polling gap possible") self._stop_managers_and_wait() else: self._stop_managers() self._release_master() python = sys.executable sys.stdout.flush() os.execl(python, python, *sys.argv) def reap(self, signalnum=None, flag=None): """ Handle a set the reload flag to begin a clean restart :param signalnum: UNIX signal number :param flag: Flags accompanying signal """ handler = signal(SIGCHLD, SIG_DFL) if handler == SIG_DFL: # signal is already being handled, bail out as this handler is not reentrant - the kernel will re-raise the signal later return self.reap_flag = True def reload(self, signalnum=None, flag=None): """ Handle a set the reload flag to begin a clean restart :param signalnum: UNIX signal number :param flag: Flags accompanying signal """ info("Received signal on thread %s, handling", threading.current_thread().name) self.reload_flag = True def terminate(self, signalnum=None, flag=None): """ Handle a set the terminate flag to begin a clean shutdown :param signalnum: UNIX signal number :param flag: Flags accompanying signal """ info("Received signal on thread %s, handling", threading.current_thread().name) self.terminate_flag = True def shutdown(self, signalnum=None, flag=None): """ Stop and exit, waiting for all child processes to exit. :param signalnum: UNIX signal number :param flag: Flags accompanying signal """ info('Shutting down, waiting for running jobs to complete...') self.stop_dispatch_timers() self._release_master() self.daily_timer.stop() self.stats_timer.stop() if self.config.watchdog_enabled: self.watchdog_timer.stop() self._stop_managers_and_wait() # try to release master lock info('Shutdown of %s/%s complete', os.getpid(), threading.current_thread().name) self.exit(0) def start_dispatch_timers(self): """ Start all dispatch timers and begin pushing events into queues. This should only be started when we are the master dispatcher. """ for manager in self.queue_managers.values(): try: manager.start_dispatch() except AttributeError: pass def stop_dispatch_timers(self): """ Stop all dispatch timers, this should be called when we are no longer the master dispatcher. """ for manager in self.queue_managers.values(): try: manager.stop_dispatch() except AttributeError: pass def _stop_managers(self): for manager in self.queue_managers.values(): manager.stop() def _stop_managers_and_wait(self): """ Stop all QueueManagers, and wait for their processing threads to complete. We send the stop signal to all QueueManagers first, then wait for them to finish. """ self._stop_managers() for manager in self.queue_managers.values(): manager.stop_and_wait() def check_single_instance(self): """ Check that there is only one instance of the service running on this computer. We do this be creating a file in the base directory (.lock.service) if it doesn't exist and obtaining an exclusive lock on that file. """ lock_file = "{}/{}".format(self.config.BASE_DIR, '.lock.service') import fcntl self._fp = open(lock_file, 'w') # keep a reference so the file handle isn't garbage collected self._fp.flush() try: fcntl.lockf(self._fp, fcntl.LOCK_EX | fcntl.LOCK_NB) except IOError: warning("Another instance is already running, quitting.") self.exit(2) def log_performance_stats(self): info("Counting up time spent polling") try: # Report on the poller instance as a whole self._db.query('INSERT INTO poller_cluster(node_id, poller_name, poller_version, poller_groups, last_report, master) ' 'values("{0}", "{1}", "{2}", "{3}", NOW(), {4}) ' 'ON DUPLICATE KEY UPDATE poller_version="{2}", poller_groups="{3}", last_report=NOW(), master={4}; ' .format(self.config.node_id, self.config.name, "librenms-service", ','.join(str(g) for g in self.config.group), 1 if self.is_master else 0)) # Find our ID self._db.query('SELECT id INTO @parent_poller_id FROM poller_cluster WHERE node_id="{0}"; '.format(self.config.node_id)) for worker_type, manager in self.queue_managers.items(): worker_seconds, devices = manager.performance.reset() # Record the queue state self._db.query('INSERT INTO poller_cluster_stats(parent_poller, poller_type, depth, devices, worker_seconds, workers, frequency) ' 'values(@parent_poller_id, "{0}", {1}, {2}, {3}, {4}, {5}) ' 'ON DUPLICATE KEY UPDATE depth={1}, devices={2}, worker_seconds={3}, workers={4}, frequency={5}; ' .format(worker_type, sum([manager.get_queue(group).qsize() for group in self.config.group]), devices, worker_seconds, getattr(self.config, worker_type).workers, getattr(self.config, worker_type).frequency) ) except pymysql.err.Error: exception("Unable to log performance statistics - is the database still online?") def logfile_watchdog(self): try: # check that lofgile has been written to within last poll period logfile_mdiff = datetime.now().timestamp() - os.path.getmtime(self.config.watchdog_logfile) except FileNotFoundError as e: error("Log file not found! {}".format(e)) return if logfile_mdiff > self.config.poller.frequency: critical("BARK! Log file older than {}s, restarting service!".format(self.config.poller.frequency)) self.restart() else: info("Log file updated {}s ago".format(int(logfile_mdiff))) def exit(self, code=0): sys.stdout.flush() sys.exit(code)