From 9946fe8b150667ebf172c4e67c0b0ba08736ba25 Mon Sep 17 00:00:00 2001 From: Jellyfrog Date: Sun, 28 Mar 2021 18:02:33 +0200 Subject: [PATCH] Format python code with Black (#12663) --- .github/workflows/lint.yml | 2 + LibreNMS/__init__.py | 120 +++++--- LibreNMS/library.py | 79 +++-- LibreNMS/queuemanager.py | 238 ++++++++++----- LibreNMS/service.py | 544 +++++++++++++++++++++++----------- discovery-wrapper.py | 173 +++++++---- librenms-service.py | 42 ++- poller-wrapper.py | 199 +++++++++---- scripts/check_requirements.py | 6 +- services-wrapper.py | 178 +++++++---- snmp-scan.py | 187 ++++++++---- tests/tests.py | 113 ++++--- 12 files changed, 1295 insertions(+), 586 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 42c460186f..495cac4291 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -21,6 +21,8 @@ jobs: VALIDATE_BASH: true VALIDATE_PHP_BUILTIN: true VALIDATE_PYTHON_PYLINT: true + VALIDATE_PYTHON_BLACK: true + VALIDATE_ALL_CODEBASE: false DEFAULT_BRANCH: master GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/LibreNMS/__init__.py b/LibreNMS/__init__.py index fe9610a8bf..28701ba344 100644 --- a/LibreNMS/__init__.py +++ b/LibreNMS/__init__.py @@ -10,8 +10,16 @@ from queue import Queue from time import time from .service import Service, ServiceConfig -from .queuemanager import QueueManager, TimedQueueManager, BillingQueueManager, PingQueueManager, ServicesQueueManager, \ - AlertQueueManager, PollerQueueManager, DiscoveryQueueManager +from .queuemanager import ( + QueueManager, + TimedQueueManager, + BillingQueueManager, + PingQueueManager, + ServicesQueueManager, + AlertQueueManager, + PollerQueueManager, + DiscoveryQueueManager, +) def normalize_wait(seconds): @@ -26,9 +34,9 @@ def call_script(script, args=()): :param args: a tuple of arguments to send to the command :returns the output of the command """ - if script.endswith('.php'): + if script.endswith(".php"): # save calling the sh process - base = ('/usr/bin/env', 'php') + base = ("/usr/bin/env", "php") else: base = () @@ -36,7 +44,13 @@ def call_script(script, args=()): cmd = base + ("{}/{}".format(base_dir, script),) + tuple(map(str, args)) debug("Running {}".format(cmd)) # preexec_fn=os.setsid here keeps process signals from propagating (close_fds=True is default) - return subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT, preexec_fn=os.setsid, close_fds=True) + return subprocess.check_call( + cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + preexec_fn=os.setsid, + close_fds=True, + ) class DB: @@ -54,6 +68,7 @@ class DB: def connect(self): try: import pymysql + pymysql.install_as_MySQLdb() info("Using pure python SQL client") except ImportError: @@ -63,19 +78,21 @@ class DB: import MySQLdb except ImportError: critical("ERROR: missing a mysql python module") - critical("Install either 'PyMySQL' or 'mysqlclient' from your OS software repository or from PyPI") + critical( + "Install either 'PyMySQL' or 'mysqlclient' from your OS software repository or from PyPI" + ) raise try: args = { - 'host': self.config.db_host, - 'port': self.config.db_port, - 'user': self.config.db_user, - 'passwd': self.config.db_pass, - 'db': self.config.db_name + "host": self.config.db_host, + "port": self.config.db_port, + "user": self.config.db_user, + "passwd": self.config.db_pass, + "db": self.config.db_name, } if self.config.db_socket: - args['unix_socket'] = self.config.db_socket + args["unix_socket"] = self.config.db_socket conn = MySQLdb.connect(**args) conn.autocommit(True) @@ -164,9 +181,11 @@ class Lock: :param expiration: int in seconds """ if ( - (name not in self._locks) or # lock doesn't exist - (allow_owner_relock and self._locks.get(name, [None])[0] == owner) or # owner has permission - time() > self._locks[name][1] # lock has expired + (name not in self._locks) + or ( # lock doesn't exist + allow_owner_relock and self._locks.get(name, [None])[0] == owner + ) + or time() > self._locks[name][1] # owner has permission # lock has expired ): self._locks[name] = (owner, expiration + time()) return self._locks[name][0] == owner @@ -229,21 +248,32 @@ class ThreadingLock(Lock): class RedisLock(Lock): - def __init__(self, namespace='lock', **redis_kwargs): + def __init__(self, namespace="lock", **redis_kwargs): import redis from redis.sentinel import Sentinel - redis_kwargs['decode_responses'] = True - if redis_kwargs.get('sentinel') and redis_kwargs.get('sentinel_service'): - sentinels = [tuple(l.split(':')) for l in redis_kwargs.pop('sentinel').split(',')] - sentinel_service = redis_kwargs.pop('sentinel_service') - kwargs = {k: v for k, v in redis_kwargs.items() if k in ["decode_responses", "password", "db", "socket_timeout"]} + + redis_kwargs["decode_responses"] = True + if redis_kwargs.get("sentinel") and redis_kwargs.get("sentinel_service"): + sentinels = [ + tuple(l.split(":")) for l in redis_kwargs.pop("sentinel").split(",") + ] + sentinel_service = redis_kwargs.pop("sentinel_service") + kwargs = { + k: v + for k, v in redis_kwargs.items() + if k in ["decode_responses", "password", "db", "socket_timeout"] + } self._redis = Sentinel(sentinels, **kwargs).master_for(sentinel_service) else: kwargs = {k: v for k, v in redis_kwargs.items() if "sentinel" not in k} self._redis = redis.Redis(**kwargs) self._redis.ping() self._namespace = namespace - info("Created redis lock manager with socket_timeout of {}s".format(redis_kwargs['socket_timeout'])) + info( + "Created redis lock manager with socket_timeout of {}s".format( + redis_kwargs["socket_timeout"] + ) + ) def __key(self, name): return "{}:{}".format(self._namespace, name) @@ -266,8 +296,13 @@ class RedisLock(Lock): non_existing = not (allow_owner_relock and self._redis.get(key) == owner) return self._redis.set(key, owner, ex=int(expiration), nx=non_existing) except redis.exceptions.ResponseError as e: - exception("Unable to obtain lock, local state: name: %s, owner: %s, expiration: %s, allow_owner_relock: %s", - name, owner, expiration, allow_owner_relock) + exception( + "Unable to obtain lock, local state: name: %s, owner: %s, expiration: %s, allow_owner_relock: %s", + name, + owner, + expiration, + allow_owner_relock, + ) def unlock(self, name, owner): """ @@ -285,34 +320,49 @@ class RedisLock(Lock): return self._redis.get(self.__key(name)) is not None def print_locks(self): - keys = self._redis.keys(self.__key('*')) + keys = self._redis.keys(self.__key("*")) for key in keys: - print("{} locked by {}, expires in {} seconds".format(key, self._redis.get(key), self._redis.ttl(key))) + print( + "{} locked by {}, expires in {} seconds".format( + key, self._redis.get(key), self._redis.ttl(key) + ) + ) class RedisUniqueQueue(object): - def __init__(self, name, namespace='queue', **redis_kwargs): + def __init__(self, name, namespace="queue", **redis_kwargs): import redis from redis.sentinel import Sentinel - redis_kwargs['decode_responses'] = True - if redis_kwargs.get('sentinel') and redis_kwargs.get('sentinel_service'): - sentinels = [tuple(l.split(':')) for l in redis_kwargs.pop('sentinel').split(',')] - sentinel_service = redis_kwargs.pop('sentinel_service') - kwargs = {k: v for k, v in redis_kwargs.items() if k in ["decode_responses", "password", "db", "socket_timeout"]} + + redis_kwargs["decode_responses"] = True + if redis_kwargs.get("sentinel") and redis_kwargs.get("sentinel_service"): + sentinels = [ + tuple(l.split(":")) for l in redis_kwargs.pop("sentinel").split(",") + ] + sentinel_service = redis_kwargs.pop("sentinel_service") + kwargs = { + k: v + for k, v in redis_kwargs.items() + if k in ["decode_responses", "password", "db", "socket_timeout"] + } self._redis = Sentinel(sentinels, **kwargs).master_for(sentinel_service) else: kwargs = {k: v for k, v in redis_kwargs.items() if "sentinel" not in k} self._redis = redis.Redis(**kwargs) self._redis.ping() self.key = "{}:{}".format(namespace, name) - info("Created redis queue with socket_timeout of {}s".format(redis_kwargs['socket_timeout'])) + info( + "Created redis queue with socket_timeout of {}s".format( + redis_kwargs["socket_timeout"] + ) + ) # clean up from previous implementations - if self._redis.type(self.key) != 'zset': + if self._redis.type(self.key) != "zset": self._redis.delete(self.key) def qsize(self): - return self._redis.zcount(self.key, '-inf', '+inf') + return self._redis.zcount(self.key, "-inf", "+inf") def empty(self): return self.qsize() == 0 diff --git a/LibreNMS/library.py b/LibreNMS/library.py index 6daffd40f9..e0916201db 100644 --- a/LibreNMS/library.py +++ b/LibreNMS/library.py @@ -15,32 +15,33 @@ try: except ImportError: try: import pymysql + pymysql.install_as_MySQLdb() import MySQLdb except ImportError as exc: - print('ERROR: missing the mysql python module please run:') - print('pip install -r requirements.txt') - print('ERROR: %s' % exc) + print("ERROR: missing the mysql python module please run:") + print("pip install -r requirements.txt") + print("ERROR: %s" % exc) sys.exit(2) logger = logging.getLogger(__name__) # Logging functions ######################################################## -FORMATTER = logging.Formatter('%(asctime)s :: %(levelname)s :: %(message)s') +FORMATTER = logging.Formatter("%(asctime)s :: %(levelname)s :: %(message)s") def logger_get_console_handler(): try: console_handler = logging.StreamHandler(sys.stdout) except OSError as exc: - print('Cannot log to stdout, trying stderr. Message %s' % exc) + print("Cannot log to stdout, trying stderr. Message %s" % exc) try: console_handler = logging.StreamHandler(sys.stderr) console_handler.setFormatter(FORMATTER) return console_handler except OSError as exc: - print('Cannot log to stderr neither. Message %s' % exc) + print("Cannot log to stderr neither. Message %s" % exc) return False else: console_handler.setFormatter(FORMATTER) @@ -50,20 +51,33 @@ def logger_get_console_handler(): def logger_get_file_handler(log_file): err_output = None try: - file_handler = RotatingFileHandler(log_file, mode='a', encoding='utf-8', maxBytes=1024000, backupCount=3) + file_handler = RotatingFileHandler( + log_file, mode="a", encoding="utf-8", maxBytes=1024000, backupCount=3 + ) except OSError as exc: try: - print('Cannot create logfile. Trying to obtain temporary log file.\nMessage: %s' % exc) + print( + "Cannot create logfile. Trying to obtain temporary log file.\nMessage: %s" + % exc + ) err_output = str(exc) - temp_log_file = tempfile.gettempdir() + os.sep + __name__ + '.log' - print('Trying temporary log file in ' + temp_log_file) - file_handler = RotatingFileHandler(temp_log_file, mode='a', encoding='utf-8', maxBytes=1000000, - backupCount=1) + temp_log_file = tempfile.gettempdir() + os.sep + __name__ + ".log" + print("Trying temporary log file in " + temp_log_file) + file_handler = RotatingFileHandler( + temp_log_file, + mode="a", + encoding="utf-8", + maxBytes=1000000, + backupCount=1, + ) file_handler.setFormatter(FORMATTER) - err_output += '\nUsing [%s]' % temp_log_file + err_output += "\nUsing [%s]" % temp_log_file return file_handler, err_output except OSError as exc: - print('Cannot create temporary log file either. Will not log to file. Message: %s' % exc) + print( + "Cannot create temporary log file either. Will not log to file. Message: %s" + % exc + ) return False else: file_handler.setFormatter(FORMATTER) @@ -87,59 +101,74 @@ def logger_get_logger(log_file=None, temp_log_file=None, debug=False): _logger.propagate = False if err_output is not None: print(err_output) - _logger.warning('Failed to use log file [%s], %s.', log_file, err_output) + _logger.warning( + "Failed to use log file [%s], %s.", log_file, err_output + ) if temp_log_file is not None: if os.path.isfile(temp_log_file): try: os.remove(temp_log_file) except OSError: - logger.warning('Cannot remove temp log file [%s].' % temp_log_file) + logger.warning("Cannot remove temp log file [%s]." % temp_log_file) file_handler, err_output = logger_get_file_handler(temp_log_file) if file_handler: _logger.addHandler(file_handler) _logger.propagate = False if err_output is not None: print(err_output) - _logger.warning('Failed to use log file [%s], %s.', log_file, err_output) + _logger.warning( + "Failed to use log file [%s], %s.", log_file, err_output + ) return _logger # Generic functions ######################################################## + def check_for_file(file): try: with open(file) as f: pass except IOError as exc: - logger.error('Oh dear... %s does not seem readable' % file) - logger.debug('ERROR:', exc_info=True) + logger.error("Oh dear... %s does not seem readable" % file) + logger.debug("ERROR:", exc_info=True) sys.exit(2) # Config functions ######################################################### + def get_config_data(install_dir): - config_cmd = ['/usr/bin/env', 'php', '%s/config_to_json.php' % install_dir] + config_cmd = ["/usr/bin/env", "php", "%s/config_to_json.php" % install_dir] try: - proc = subprocess.Popen(config_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE) + proc = subprocess.Popen( + config_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE + ) return proc.communicate()[0].decode() except Exception as e: print("ERROR: Could not execute: %s" % config_cmd) print(e) sys.exit(2) + # Database functions ####################################################### def db_open(db_socket, db_server, db_port, db_username, db_password, db_dbname): try: - options = dict(host=db_server, port=int(db_port), user=db_username, passwd=db_password, db=db_dbname) + options = dict( + host=db_server, + port=int(db_port), + user=db_username, + passwd=db_password, + db=db_dbname, + ) if db_socket: - options['unix_socket'] = db_socket + options["unix_socket"] = db_socket return MySQLdb.connect(**options) except Exception as dbexc: - print('ERROR: Could not connect to MySQL database!') - print('ERROR: %s' % dbexc) + print("ERROR: Could not connect to MySQL database!") + print("ERROR: %s" % dbexc) sys.exit(2) diff --git a/LibreNMS/queuemanager.py b/LibreNMS/queuemanager.py index 32a5c12c10..d5baf97ce5 100644 --- a/LibreNMS/queuemanager.py +++ b/LibreNMS/queuemanager.py @@ -10,7 +10,9 @@ import LibreNMS class QueueManager: - def __init__(self, config, lock_manager, type_desc, uses_groups=False, auto_start=True): + def __init__( + self, config, lock_manager, type_desc, uses_groups=False, auto_start=True + ): """ This class manages a queue of jobs and can be used to submit jobs to the queue with post_work() and process jobs in that queue in worker threads using the work_function @@ -38,8 +40,13 @@ class QueueManager: self._stop_event = threading.Event() info("Groups: {}".format(self.config.group)) - info("{} QueueManager created: {} workers, {}s frequency" - .format(self.type.title(), self.get_poller_config().workers, self.get_poller_config().frequency)) + info( + "{} QueueManager created: {} workers, {}s frequency".format( + self.type.title(), + self.get_poller_config().workers, + self.get_poller_config().frequency, + ) + ) if auto_start: self.start() @@ -47,31 +54,51 @@ class QueueManager: def _service_worker(self, queue_id): debug("Worker started {}".format(threading.current_thread().getName())) while not self._stop_event.is_set(): - debug("Worker {} checking queue {} ({}) for work".format(threading.current_thread().getName(), queue_id, - self.get_queue(queue_id).qsize())) + debug( + "Worker {} checking queue {} ({}) for work".format( + threading.current_thread().getName(), + queue_id, + self.get_queue(queue_id).qsize(), + ) + ) try: # cannot break blocking request with redis-py, so timeout :( device_id = self.get_queue(queue_id).get(True, 10) - if device_id is not None: # None returned by redis after timeout when empty + if ( + device_id is not None + ): # None returned by redis after timeout when empty debug( - "Worker {} ({}) got work {} ".format(threading.current_thread().getName(), queue_id, device_id)) + "Worker {} ({}) got work {} ".format( + threading.current_thread().getName(), queue_id, device_id + ) + ) with LibreNMS.TimeitContext.start() as t: debug("Queues: {}".format(self._queues)) - target_desc = "{} ({})".format(device_id if device_id else '', - queue_id) if queue_id else device_id + target_desc = ( + "{} ({})".format(device_id if device_id else "", queue_id) + if queue_id + else device_id + ) self.do_work(device_id, queue_id) runtime = t.delta() - info("Completed {} run for {} in {:.2f}s".format(self.type, target_desc, runtime)) + info( + "Completed {} run for {} in {:.2f}s".format( + self.type, target_desc, runtime + ) + ) self.performance.add(runtime) except Empty: pass # ignore empty queue exception from subprocess.Queue except CalledProcessError as e: - error('{} poller script error! {} returned {}: {}' - .format(self.type.title(), e.cmd, e.returncode, e.output)) + error( + "{} poller script error! {} returned {}: {}".format( + self.type.title(), e.cmd, e.returncode, e.output + ) + ) except Exception as e: - error('{} poller exception! {}'.format(self.type.title(), e)) + error("{} poller exception! {}".format(self.type.title(), e)) traceback.print_exc() def post_work(self, payload, queue_id): @@ -81,15 +108,22 @@ class QueueManager: :param queue_id: which queue to post to, 0 is the default """ self.get_queue(queue_id).put(payload) - debug("Posted work for {} to {}:{} queue size: {}" - .format(payload, self.type, queue_id, self.get_queue(queue_id).qsize())) + debug( + "Posted work for {} to {}:{} queue size: {}".format( + payload, self.type, queue_id, self.get_queue(queue_id).qsize() + ) + ) def start(self): """ Start worker threads """ workers = self.get_poller_config().workers - groups = self.config.group if hasattr(self.config.group, "__iter__") else [self.config.group] + groups = ( + self.config.group + if hasattr(self.config.group, "__iter__") + else [self.config.group] + ) if self.uses_groups: for group in groups: group_workers = max(int(workers / len(groups)), 1) @@ -97,7 +131,11 @@ class QueueManager: thread_name = "{}_{}-{}".format(self.type.title(), group, i + 1) self.spawn_worker(thread_name, group) - debug("Started {} {} threads for group {}".format(group_workers, self.type, group)) + debug( + "Started {} {} threads for group {}".format( + group_workers, self.type, group + ) + ) else: self.spawn_worker(self.type.title(), 0) @@ -105,8 +143,9 @@ class QueueManager: pass def spawn_worker(self, thread_name, group): - pt = threading.Thread(target=self._service_worker, name=thread_name, - args=(group,)) + pt = threading.Thread( + target=self._service_worker, name=thread_name, args=(group,) + ) pt.daemon = True self._threads.append(pt) pt.start() @@ -159,21 +198,25 @@ class QueueManager: """ info("Creating queue {}".format(self.queue_name(queue_type, group))) try: - return LibreNMS.RedisUniqueQueue(self.queue_name(queue_type, group), - namespace='librenms.queue', - host=self.config.redis_host, - port=self.config.redis_port, - db=self.config.redis_db, - password=self.config.redis_pass, - unix_socket_path=self.config.redis_socket, - sentinel=self.config.redis_sentinel, - sentinel_service=self.config.redis_sentinel_service, - socket_timeout=self.config.redis_timeout) + return LibreNMS.RedisUniqueQueue( + self.queue_name(queue_type, group), + namespace="librenms.queue", + host=self.config.redis_host, + port=self.config.redis_port, + db=self.config.redis_db, + password=self.config.redis_pass, + unix_socket_path=self.config.redis_socket, + sentinel=self.config.redis_sentinel, + sentinel_service=self.config.redis_sentinel_service, + socket_timeout=self.config.redis_timeout, + ) except ImportError: if self.config.distributed: critical("ERROR: Redis connection required for distributed polling") - critical("Please install redis-py, either through your os software repository or from PyPI") + critical( + "Please install redis-py, either through your os software repository or from PyPI" + ) exit(2) except Exception as e: if self.config.distributed: @@ -188,30 +231,41 @@ class QueueManager: if queue_type and type(group) == int: return "{}:{}".format(queue_type, group) else: - raise ValueError("Refusing to create improperly scoped queue - parameters were invalid or not set") + raise ValueError( + "Refusing to create improperly scoped queue - parameters were invalid or not set" + ) def record_runtime(self, duration): self.performance.add(duration) # ------ Locking Helpers ------ - def lock(self, context, context_name='device', allow_relock=False, timeout=0): - return self._lm.lock(self._gen_lock_name(context, context_name), self._gen_lock_owner(), timeout, allow_relock) + def lock(self, context, context_name="device", allow_relock=False, timeout=0): + return self._lm.lock( + self._gen_lock_name(context, context_name), + self._gen_lock_owner(), + timeout, + allow_relock, + ) - def unlock(self, context, context_name='device'): - return self._lm.unlock(self._gen_lock_name(context, context_name), self._gen_lock_owner()) + def unlock(self, context, context_name="device"): + return self._lm.unlock( + self._gen_lock_name(context, context_name), self._gen_lock_owner() + ) - def is_locked(self, context, context_name='device'): + def is_locked(self, context, context_name="device"): return self._lm.check_lock(self._gen_lock_name(context, context_name)) def _gen_lock_name(self, context, context_name): - return '{}.{}.{}'.format(self.type, context_name, context) + return "{}.{}.{}".format(self.type, context_name, context) def _gen_lock_owner(self): return "{}-{}".format(self.config.unique_name, threading.current_thread().name) class TimedQueueManager(QueueManager): - def __init__(self, config, lock_manager, type_desc, uses_groups=False, auto_start=True): + def __init__( + self, config, lock_manager, type_desc, uses_groups=False, auto_start=True + ): """ A queue manager that periodically dispatches work to the queue The times are normalized like they started at 0:00 @@ -220,8 +274,12 @@ class TimedQueueManager(QueueManager): :param uses_groups: If this queue respects assigned groups or there is only one group :param auto_start: automatically start worker threads """ - QueueManager.__init__(self, config, lock_manager, type_desc, uses_groups, auto_start) - self.timer = LibreNMS.RecurringTimer(self.get_poller_config().frequency, self.do_dispatch) + QueueManager.__init__( + self, config, lock_manager, type_desc, uses_groups, auto_start + ) + self.timer = LibreNMS.RecurringTimer( + self.get_poller_config().frequency, self.do_dispatch + ) def start_dispatch(self): """ @@ -254,9 +312,12 @@ class BillingQueueManager(TimedQueueManager): :param config: LibreNMS.ServiceConfig reference to the service config object :param lock_manager: the single instance of lock manager """ - TimedQueueManager.__init__(self, config, lock_manager, 'billing') - self.calculate_timer = LibreNMS.RecurringTimer(self.get_poller_config().calculate, - self.dispatch_calculate_billing, 'calculate_billing_timer') + TimedQueueManager.__init__(self, config, lock_manager, "billing") + self.calculate_timer = LibreNMS.RecurringTimer( + self.get_poller_config().calculate, + self.dispatch_calculate_billing, + "calculate_billing_timer", + ) def start_dispatch(self): """ @@ -273,18 +334,18 @@ class BillingQueueManager(TimedQueueManager): TimedQueueManager.stop_dispatch(self) def dispatch_calculate_billing(self): - self.post_work('calculate', 0) + self.post_work("calculate", 0) def do_dispatch(self): - self.post_work('poll', 0) + self.post_work("poll", 0) def do_work(self, run_type, group): - if run_type == 'poll': + if run_type == "poll": info("Polling billing") - LibreNMS.call_script('poll-billing.php') + LibreNMS.call_script("poll-billing.php") else: # run_type == 'calculate' info("Calculating billing") - LibreNMS.call_script('billing-calculate.php') + LibreNMS.call_script("billing-calculate.php") class PingQueueManager(TimedQueueManager): @@ -295,24 +356,24 @@ class PingQueueManager(TimedQueueManager): :param config: LibreNMS.ServiceConfig reference to the service config object :param lock_manager: the single instance of lock manager """ - TimedQueueManager.__init__(self, config, lock_manager, 'ping', True) + TimedQueueManager.__init__(self, config, lock_manager, "ping", True) self._db = LibreNMS.DB(self.config) def do_dispatch(self): try: groups = self._db.query("SELECT DISTINCT (`poller_group`) FROM `devices`") for group in groups: - self.post_work('', group[0]) + self.post_work("", group[0]) except pymysql.err.Error as e: critical("DB Exception ({})".format(e)) def do_work(self, context, group): - if self.lock(group, 'group', timeout=self.config.ping.frequency): + if self.lock(group, "group", timeout=self.config.ping.frequency): try: info("Running fast ping") - LibreNMS.call_script('ping.php', ('-g', group)) + LibreNMS.call_script("ping.php", ("-g", group)) finally: - self.unlock(group, 'group') + self.unlock(group, "group") class ServicesQueueManager(TimedQueueManager): @@ -323,13 +384,15 @@ class ServicesQueueManager(TimedQueueManager): :param config: LibreNMS.ServiceConfig reference to the service config object :param lock_manager: the single instance of lock manager """ - TimedQueueManager.__init__(self, config, lock_manager, 'services', True) + TimedQueueManager.__init__(self, config, lock_manager, "services", True) self._db = LibreNMS.DB(self.config) def do_dispatch(self): try: - devices = self._db.query("SELECT DISTINCT(`device_id`), `poller_group` FROM `services`" - " LEFT JOIN `devices` USING (`device_id`) WHERE `disabled`=0") + devices = self._db.query( + "SELECT DISTINCT(`device_id`), `poller_group` FROM `services`" + " LEFT JOIN `devices` USING (`device_id`) WHERE `disabled`=0" + ) for device in devices: self.post_work(device[0], device[1]) except pymysql.err.Error as e: @@ -339,12 +402,17 @@ class ServicesQueueManager(TimedQueueManager): if self.lock(device_id, timeout=self.config.services.frequency): try: info("Checking services on device {}".format(device_id)) - LibreNMS.call_script('check-services.php', ('-h', device_id)) + LibreNMS.call_script("check-services.php", ("-h", device_id)) except subprocess.CalledProcessError as e: if e.returncode == 5: - info("Device {} is down, cannot poll service, waiting {}s for retry" - .format(device_id, self.config.down_retry)) - self.lock(device_id, allow_relock=True, timeout=self.config.down_retry) + info( + "Device {} is down, cannot poll service, waiting {}s for retry".format( + device_id, self.config.down_retry + ) + ) + self.lock( + device_id, allow_relock=True, timeout=self.config.down_retry + ) else: self.unlock(device_id) @@ -357,16 +425,16 @@ class AlertQueueManager(TimedQueueManager): :param config: LibreNMS.ServiceConfig reference to the service config object :param lock_manager: the single instance of lock manager """ - TimedQueueManager.__init__(self, config, lock_manager, 'alerting') + TimedQueueManager.__init__(self, config, lock_manager, "alerting") self._db = LibreNMS.DB(self.config) def do_dispatch(self): - self.post_work('alerts', 0) + self.post_work("alerts", 0) def do_work(self, device_id, group): try: info("Checking alerts") - LibreNMS.call_script('alerts.php') + LibreNMS.call_script("alerts.php") except subprocess.CalledProcessError as e: if e.returncode == 1: warning("There was an error issuing alerts: {}".format(e.output)) @@ -382,27 +450,32 @@ class PollerQueueManager(QueueManager): :param config: LibreNMS.ServiceConfig reference to the service config object :param lock_manager: the single instance of lock manager """ - QueueManager.__init__(self, config, lock_manager, 'poller', True) + QueueManager.__init__(self, config, lock_manager, "poller", True) def do_work(self, device_id, group): if self.lock(device_id, timeout=self.config.poller.frequency): - info('Polling device {}'.format(device_id)) + info("Polling device {}".format(device_id)) try: - LibreNMS.call_script('poller.php', ('-h', device_id)) + LibreNMS.call_script("poller.php", ("-h", device_id)) except subprocess.CalledProcessError as e: if e.returncode == 6: - warning('Polling device {} unreachable, waiting {}s for retry'.format(device_id, - self.config.down_retry)) + warning( + "Polling device {} unreachable, waiting {}s for retry".format( + device_id, self.config.down_retry + ) + ) # re-lock to set retry timer - self.lock(device_id, allow_relock=True, timeout=self.config.down_retry) + self.lock( + device_id, allow_relock=True, timeout=self.config.down_retry + ) else: - error('Polling device {} failed! {}'.format(device_id, e)) + error("Polling device {} failed! {}".format(device_id, e)) self.unlock(device_id) else: self.unlock(device_id) else: - debug('Tried to poll {}, but it is locked'.format(device_id)) + debug("Tried to poll {}, but it is locked".format(device_id)) class DiscoveryQueueManager(TimedQueueManager): @@ -413,27 +486,36 @@ class DiscoveryQueueManager(TimedQueueManager): :param config: LibreNMS.ServiceConfig reference to the service config object :param lock_manager: the single instance of lock manager """ - TimedQueueManager.__init__(self, config, lock_manager, 'discovery', True) + TimedQueueManager.__init__(self, config, lock_manager, "discovery", True) self._db = LibreNMS.DB(self.config) def do_dispatch(self): try: - devices = self._db.query("SELECT `device_id`, `poller_group` FROM `devices` WHERE `disabled`=0") + devices = self._db.query( + "SELECT `device_id`, `poller_group` FROM `devices` WHERE `disabled`=0" + ) for device in devices: self.post_work(device[0], device[1]) except pymysql.err.Error as e: critical("DB Exception ({})".format(e)) def do_work(self, device_id, group): - if self.lock(device_id, timeout=LibreNMS.normalize_wait(self.config.discovery.frequency)): + if self.lock( + device_id, timeout=LibreNMS.normalize_wait(self.config.discovery.frequency) + ): try: info("Discovering device {}".format(device_id)) - LibreNMS.call_script('discovery.php', ('-h', device_id)) + LibreNMS.call_script("discovery.php", ("-h", device_id)) except subprocess.CalledProcessError as e: if e.returncode == 5: - info("Device {} is down, cannot discover, waiting {}s for retry" - .format(device_id, self.config.down_retry)) - self.lock(device_id, allow_relock=True, timeout=self.config.down_retry) + info( + "Device {} is down, cannot discover, waiting {}s for retry".format( + device_id, self.config.down_retry + ) + ) + self.lock( + device_id, allow_relock=True, timeout=self.config.down_retry + ) else: self.unlock(device_id) else: diff --git a/LibreNMS/service.py b/LibreNMS/service.py index cd6bb10922..215b4cff5a 100644 --- a/LibreNMS/service.py +++ b/LibreNMS/service.py @@ -29,7 +29,6 @@ except ImportError: pass - class ServiceConfig: def __init__(self): """ @@ -52,7 +51,9 @@ class ServiceConfig: self.calculate = calculate # config variables with defaults - BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir)) + BASE_DIR = os.path.abspath( + os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir) + ) node_id = None name = None @@ -78,7 +79,7 @@ class ServiceConfig: master_resolution = 1 master_timeout = 10 - redis_host = 'localhost' + redis_host = "localhost" redis_port = 6379 redis_db = 0 redis_pass = None @@ -87,74 +88,142 @@ class ServiceConfig: redis_sentinel_service = None redis_timeout = 60 - db_host = 'localhost' + db_host = "localhost" db_port = 0 db_socket = None - db_user = 'librenms' - db_pass = '' - db_name = 'librenms' + db_user = "librenms" + db_pass = "" + db_name = "librenms" watchdog_enabled = False - watchdog_logfile = 'logs/librenms.log' + watchdog_logfile = "logs/librenms.log" def populate(self): config = self._get_config_data() # populate config variables - self.node_id = os.getenv('NODE_ID') - self.set_name(config.get('distributed_poller_name', None)) - self.distributed = config.get('distributed_poller', ServiceConfig.distributed) - self.group = ServiceConfig.parse_group(config.get('distributed_poller_group', ServiceConfig.group)) + self.node_id = os.getenv("NODE_ID") + self.set_name(config.get("distributed_poller_name", None)) + self.distributed = config.get("distributed_poller", ServiceConfig.distributed) + self.group = ServiceConfig.parse_group( + config.get("distributed_poller_group", ServiceConfig.group) + ) # backward compatible options - self.poller.workers = config.get('poller_service_workers', ServiceConfig.poller.workers) - self.poller.frequency = config.get('poller_service_poll_frequency', ServiceConfig.poller.frequency) - self.discovery.frequency = config.get('poller_service_discover_frequency', ServiceConfig.discovery.frequency) - self.down_retry = config.get('poller_service_down_retry', ServiceConfig.down_retry) - self.log_level = config.get('poller_service_loglevel', ServiceConfig.log_level) + self.poller.workers = config.get( + "poller_service_workers", ServiceConfig.poller.workers + ) + self.poller.frequency = config.get( + "poller_service_poll_frequency", ServiceConfig.poller.frequency + ) + self.discovery.frequency = config.get( + "poller_service_discover_frequency", ServiceConfig.discovery.frequency + ) + self.down_retry = config.get( + "poller_service_down_retry", ServiceConfig.down_retry + ) + self.log_level = config.get("poller_service_loglevel", ServiceConfig.log_level) # new options - self.poller.enabled = config.get('service_poller_enabled', True) # unused - self.poller.workers = config.get('service_poller_workers', ServiceConfig.poller.workers) - self.poller.frequency = config.get('service_poller_frequency', ServiceConfig.poller.frequency) - self.discovery.enabled = config.get('service_discovery_enabled', True) # unused - self.discovery.workers = config.get('service_discovery_workers', ServiceConfig.discovery.workers) - self.discovery.frequency = config.get('service_discovery_frequency', ServiceConfig.discovery.frequency) - self.services.enabled = config.get('service_services_enabled', True) - self.services.workers = config.get('service_services_workers', ServiceConfig.services.workers) - self.services.frequency = config.get('service_services_frequency', ServiceConfig.services.frequency) - self.billing.enabled = config.get('service_billing_enabled', True) - self.billing.frequency = config.get('service_billing_frequency', ServiceConfig.billing.frequency) - self.billing.calculate = config.get('service_billing_calculate_frequency', ServiceConfig.billing.calculate) - self.alerting.enabled = config.get('service_alerting_enabled', True) - self.alerting.frequency = config.get('service_alerting_frequency', ServiceConfig.alerting.frequency) - self.ping.enabled = config.get('service_ping_enabled', False) - self.ping.frequency = config.get('ping_rrd_step', ServiceConfig.ping.frequency) - self.down_retry = config.get('service_poller_down_retry', ServiceConfig.down_retry) - self.log_level = config.get('service_loglevel', ServiceConfig.log_level) - self.update_enabled = config.get('service_update_enabled', ServiceConfig.update_enabled) - self.update_frequency = config.get('service_update_frequency', ServiceConfig.update_frequency) + self.poller.enabled = config.get("service_poller_enabled", True) # unused + self.poller.workers = config.get( + "service_poller_workers", ServiceConfig.poller.workers + ) + self.poller.frequency = config.get( + "service_poller_frequency", ServiceConfig.poller.frequency + ) + self.discovery.enabled = config.get("service_discovery_enabled", True) # unused + self.discovery.workers = config.get( + "service_discovery_workers", ServiceConfig.discovery.workers + ) + self.discovery.frequency = config.get( + "service_discovery_frequency", ServiceConfig.discovery.frequency + ) + self.services.enabled = config.get("service_services_enabled", True) + self.services.workers = config.get( + "service_services_workers", ServiceConfig.services.workers + ) + self.services.frequency = config.get( + "service_services_frequency", ServiceConfig.services.frequency + ) + self.billing.enabled = config.get("service_billing_enabled", True) + self.billing.frequency = config.get( + "service_billing_frequency", ServiceConfig.billing.frequency + ) + self.billing.calculate = config.get( + "service_billing_calculate_frequency", ServiceConfig.billing.calculate + ) + self.alerting.enabled = config.get("service_alerting_enabled", True) + self.alerting.frequency = config.get( + "service_alerting_frequency", ServiceConfig.alerting.frequency + ) + self.ping.enabled = config.get("service_ping_enabled", False) + self.ping.frequency = config.get("ping_rrd_step", ServiceConfig.ping.frequency) + self.down_retry = config.get( + "service_poller_down_retry", ServiceConfig.down_retry + ) + self.log_level = config.get("service_loglevel", ServiceConfig.log_level) + self.update_enabled = config.get( + "service_update_enabled", ServiceConfig.update_enabled + ) + self.update_frequency = config.get( + "service_update_frequency", ServiceConfig.update_frequency + ) - self.redis_host = os.getenv('REDIS_HOST', config.get('redis_host', ServiceConfig.redis_host)) - self.redis_db = os.getenv('REDIS_DB', config.get('redis_db', ServiceConfig.redis_db)) - self.redis_pass = os.getenv('REDIS_PASSWORD', config.get('redis_pass', ServiceConfig.redis_pass)) - self.redis_port = int(os.getenv('REDIS_PORT', config.get('redis_port', ServiceConfig.redis_port))) - self.redis_socket = os.getenv('REDIS_SOCKET', config.get('redis_socket', ServiceConfig.redis_socket)) - self.redis_sentinel = os.getenv('REDIS_SENTINEL', config.get('redis_sentinel', ServiceConfig.redis_sentinel)) - self.redis_sentinel_service = os.getenv('REDIS_SENTINEL_SERVICE', - config.get('redis_sentinel_service', - ServiceConfig.redis_sentinel_service)) - self.redis_timeout = int(os.getenv('REDIS_TIMEOUT', self.alerting.frequency if self.alerting.frequency != 0 else self.redis_timeout)) + self.redis_host = os.getenv( + "REDIS_HOST", config.get("redis_host", ServiceConfig.redis_host) + ) + self.redis_db = os.getenv( + "REDIS_DB", config.get("redis_db", ServiceConfig.redis_db) + ) + self.redis_pass = os.getenv( + "REDIS_PASSWORD", config.get("redis_pass", ServiceConfig.redis_pass) + ) + self.redis_port = int( + os.getenv("REDIS_PORT", config.get("redis_port", ServiceConfig.redis_port)) + ) + self.redis_socket = os.getenv( + "REDIS_SOCKET", config.get("redis_socket", ServiceConfig.redis_socket) + ) + self.redis_sentinel = os.getenv( + "REDIS_SENTINEL", config.get("redis_sentinel", ServiceConfig.redis_sentinel) + ) + self.redis_sentinel_service = os.getenv( + "REDIS_SENTINEL_SERVICE", + config.get("redis_sentinel_service", ServiceConfig.redis_sentinel_service), + ) + self.redis_timeout = int( + os.getenv( + "REDIS_TIMEOUT", + self.alerting.frequency + if self.alerting.frequency != 0 + else self.redis_timeout, + ) + ) - self.db_host = os.getenv('DB_HOST', config.get('db_host', ServiceConfig.db_host)) - self.db_name = os.getenv('DB_DATABASE', config.get('db_name', ServiceConfig.db_name)) - self.db_pass = os.getenv('DB_PASSWORD', config.get('db_pass', ServiceConfig.db_pass)) - self.db_port = int(os.getenv('DB_PORT', config.get('db_port', ServiceConfig.db_port))) - self.db_socket = os.getenv('DB_SOCKET', config.get('db_socket', ServiceConfig.db_socket)) - self.db_user = os.getenv('DB_USERNAME', config.get('db_user', ServiceConfig.db_user)) + self.db_host = os.getenv( + "DB_HOST", config.get("db_host", ServiceConfig.db_host) + ) + self.db_name = os.getenv( + "DB_DATABASE", config.get("db_name", ServiceConfig.db_name) + ) + self.db_pass = os.getenv( + "DB_PASSWORD", config.get("db_pass", ServiceConfig.db_pass) + ) + self.db_port = int( + os.getenv("DB_PORT", config.get("db_port", ServiceConfig.db_port)) + ) + self.db_socket = os.getenv( + "DB_SOCKET", config.get("db_socket", ServiceConfig.db_socket) + ) + self.db_user = os.getenv( + "DB_USERNAME", config.get("db_user", ServiceConfig.db_user) + ) - self.watchdog_enabled = config.get('service_watchdog_enabled', ServiceConfig.watchdog_enabled) - self.watchdog_logfile = config.get('log_file', ServiceConfig.watchdog_logfile) + self.watchdog_enabled = config.get( + "service_watchdog_enabled", ServiceConfig.watchdog_enabled + ) + self.watchdog_logfile = config.get("log_file", ServiceConfig.watchdog_logfile) # set convenient debug variable self.debug = logging.getLogger().isEnabledFor(logging.DEBUG) @@ -163,13 +232,19 @@ class ServiceConfig: try: logging.getLogger().setLevel(self.log_level) except ValueError: - error("Unknown log level {}, must be one of 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'".format(self.log_level)) + error( + "Unknown log level {}, must be one of 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'".format( + self.log_level + ) + ) logging.getLogger().setLevel(logging.INFO) def load_poller_config(self, db): try: settings = {} - cursor = db.query('SELECT * FROM `poller_cluster` WHERE `node_id`=%s', self.node_id) + cursor = db.query( + "SELECT * FROM `poller_cluster` WHERE `node_id`=%s", self.node_id + ) if cursor.rowcount == 0: return @@ -177,77 +252,88 @@ class ServiceConfig: name = cursor.description[index][0] settings[name] = setting - if settings['poller_name'] is not None: - self.set_name(settings['poller_name']) - if settings['poller_groups'] is not None: - self.group = ServiceConfig.parse_group(settings['poller_groups']) - if settings['poller_enabled'] is not None: - self.poller.enabled = settings['poller_enabled'] - if settings['poller_frequency'] is not None: - self.poller.frequency = settings['poller_frequency'] - if settings['poller_workers'] is not None: - self.poller.workers = settings['poller_workers'] - if settings['poller_down_retry'] is not None: - self.down_retry = settings['poller_down_retry'] - if settings['discovery_enabled'] is not None: - self.discovery.enabled = settings['discovery_enabled'] - if settings['discovery_frequency'] is not None: - self.discovery.frequency = settings['discovery_frequency'] - if settings['discovery_workers'] is not None: - self.discovery.workers = settings['discovery_workers'] - if settings['services_enabled'] is not None: - self.services.enabled = settings['services_enabled'] - if settings['services_frequency'] is not None: - self.services.frequency = settings['services_frequency'] - if settings['services_workers'] is not None: - self.services.workers = settings['services_workers'] - if settings['billing_enabled'] is not None: - self.billing.enabled = settings['billing_enabled'] - if settings['billing_frequency'] is not None: - self.billing.frequency = settings['billing_frequency'] - if settings['billing_calculate_frequency'] is not None: - self.billing.calculate = settings['billing_calculate_frequency'] - if settings['alerting_enabled'] is not None: - self.alerting.enabled = settings['alerting_enabled'] - if settings['alerting_frequency'] is not None: - self.alerting.frequency = settings['alerting_frequency'] - if settings['ping_enabled'] is not None: - self.ping.enabled = settings['ping_enabled'] - if settings['ping_frequency'] is not None: - self.ping.frequency = settings['ping_frequency'] - if settings['update_enabled'] is not None: - self.update_enabled = settings['update_enabled'] - if settings['update_frequency'] is not None: - self.update_frequency = settings['update_frequency'] - if settings['loglevel'] is not None: - self.log_level = settings['loglevel'] - if settings['watchdog_enabled'] is not None: - self.watchdog_enabled = settings['watchdog_enabled'] - if settings['watchdog_log'] is not None: - self.watchdog_logfile = settings['watchdog_log'] + if settings["poller_name"] is not None: + self.set_name(settings["poller_name"]) + if settings["poller_groups"] is not None: + self.group = ServiceConfig.parse_group(settings["poller_groups"]) + if settings["poller_enabled"] is not None: + self.poller.enabled = settings["poller_enabled"] + if settings["poller_frequency"] is not None: + self.poller.frequency = settings["poller_frequency"] + if settings["poller_workers"] is not None: + self.poller.workers = settings["poller_workers"] + if settings["poller_down_retry"] is not None: + self.down_retry = settings["poller_down_retry"] + if settings["discovery_enabled"] is not None: + self.discovery.enabled = settings["discovery_enabled"] + if settings["discovery_frequency"] is not None: + self.discovery.frequency = settings["discovery_frequency"] + if settings["discovery_workers"] is not None: + self.discovery.workers = settings["discovery_workers"] + if settings["services_enabled"] is not None: + self.services.enabled = settings["services_enabled"] + if settings["services_frequency"] is not None: + self.services.frequency = settings["services_frequency"] + if settings["services_workers"] is not None: + self.services.workers = settings["services_workers"] + if settings["billing_enabled"] is not None: + self.billing.enabled = settings["billing_enabled"] + if settings["billing_frequency"] is not None: + self.billing.frequency = settings["billing_frequency"] + if settings["billing_calculate_frequency"] is not None: + self.billing.calculate = settings["billing_calculate_frequency"] + if settings["alerting_enabled"] is not None: + self.alerting.enabled = settings["alerting_enabled"] + if settings["alerting_frequency"] is not None: + self.alerting.frequency = settings["alerting_frequency"] + if settings["ping_enabled"] is not None: + self.ping.enabled = settings["ping_enabled"] + if settings["ping_frequency"] is not None: + self.ping.frequency = settings["ping_frequency"] + if settings["update_enabled"] is not None: + self.update_enabled = settings["update_enabled"] + if settings["update_frequency"] is not None: + self.update_frequency = settings["update_frequency"] + if settings["loglevel"] is not None: + self.log_level = settings["loglevel"] + if settings["watchdog_enabled"] is not None: + self.watchdog_enabled = settings["watchdog_enabled"] + if settings["watchdog_log"] is not None: + self.watchdog_logfile = settings["watchdog_log"] except pymysql.err.Error: - warning('Unable to load poller (%s) config', self.node_id) + warning("Unable to load poller (%s) config", self.node_id) def _get_config_data(self): try: import dotenv + env_path = "{}/.env".format(self.BASE_DIR) info("Attempting to load .env from '%s'", env_path) dotenv.load_dotenv(dotenv_path=env_path, verbose=True) - if not os.getenv('NODE_ID'): + if not os.getenv("NODE_ID"): raise ImportError(".env does not contain a valid NODE_ID setting.") except ImportError as e: - exception("Could not import .env - check that the poller user can read the file, and that composer install has been run recently") + exception( + "Could not import .env - check that the poller user can read the file, and that composer install has been run recently" + ) sys.exit(3) - config_cmd = ['/usr/bin/env', 'php', '{}/config_to_json.php'.format(self.BASE_DIR), '2>&1'] + config_cmd = [ + "/usr/bin/env", + "php", + "{}/config_to_json.php".format(self.BASE_DIR), + "2>&1", + ] try: return json.loads(subprocess.check_output(config_cmd).decode()) except subprocess.CalledProcessError as e: - error("ERROR: Could not load or parse configuration! {}: {}" - .format(subprocess.list2cmdline(e.cmd), e.output.decode())) + error( + "ERROR: Could not load or parse configuration! {}: {}".format( + subprocess.list2cmdline(e.cmd), e.output.decode() + ) + ) @staticmethod def parse_group(g): @@ -257,7 +343,7 @@ class ServiceConfig: return [g] elif type(g) is str: try: - return [int(x) for x in set(g.split(','))] + return [int(x) for x in set(g.split(","))] except ValueError: pass @@ -289,14 +375,26 @@ class Service: self.attach_signals() self._lm = self.create_lock_manager() - self.daily_timer = LibreNMS.RecurringTimer(self.config.update_frequency, self.run_maintenance, 'maintenance') - self.stats_timer = LibreNMS.RecurringTimer(self.config.poller.frequency, self.log_performance_stats, 'performance') + self.daily_timer = LibreNMS.RecurringTimer( + self.config.update_frequency, self.run_maintenance, "maintenance" + ) + self.stats_timer = LibreNMS.RecurringTimer( + self.config.poller.frequency, self.log_performance_stats, "performance" + ) if self.config.watchdog_enabled: - info("Starting watchdog timer for log file: {}".format(self.config.watchdog_logfile)) - self.watchdog_timer = LibreNMS.RecurringTimer(self.config.poller.frequency, self.logfile_watchdog, 'watchdog') + info( + "Starting watchdog timer for log file: {}".format( + self.config.watchdog_logfile + ) + ) + self.watchdog_timer = LibreNMS.RecurringTimer( + self.config.poller.frequency, self.logfile_watchdog, "watchdog" + ) else: info("Watchdog is disabled.") - self.systemd_watchdog_timer = LibreNMS.RecurringTimer(10, self.systemd_watchdog, 'systemd-watchdog') + self.systemd_watchdog_timer = LibreNMS.RecurringTimer( + 10, self.systemd_watchdog, "systemd-watchdog" + ) self.is_master = False def service_age(self): @@ -309,7 +407,7 @@ class Service: signal(SIGINT, self.terminate) # capture sigint and exit gracefully signal(SIGHUP, self.reload) # capture sighup and restart gracefully - if 'psutil' not in sys.modules: + if "psutil" not in sys.modules: warning("psutil is not available, polling gap possible") else: signal(SIGCHLD, self.reap) # capture sigchld and reap the process @@ -321,13 +419,21 @@ class Service: # Speed things up by only looking at direct zombie children for p in psutil.Process().children(recursive=False): try: - cmd = p.cmdline() # cmdline is uncached, so needs to go here to avoid NoSuchProcess + cmd = ( + p.cmdline() + ) # cmdline is uncached, so needs to go here to avoid NoSuchProcess status = p.status() if status == psutil.STATUS_ZOMBIE: pid = p.pid r = os.waitpid(p.pid, os.WNOHANG) - warning('Reaped long running job "%s" in state %s with PID %d - job returned %d', cmd, status, r[0], r[1]) + warning( + 'Reaped long running job "%s" in state %s with PID %d - job returned %d', + cmd, + status, + r[0], + r[1], + ) except (OSError, psutil.NoSuchProcess): # process was already reaped continue @@ -346,17 +452,25 @@ class Service: # initialize and start the worker pools self.poller_manager = LibreNMS.PollerQueueManager(self.config, self._lm) - self.queue_managers['poller'] = self.poller_manager + self.queue_managers["poller"] = self.poller_manager self.discovery_manager = LibreNMS.DiscoveryQueueManager(self.config, self._lm) - self.queue_managers['discovery'] = self.discovery_manager + self.queue_managers["discovery"] = self.discovery_manager if self.config.alerting.enabled: - self.queue_managers['alerting'] = LibreNMS.AlertQueueManager(self.config, self._lm) + self.queue_managers["alerting"] = LibreNMS.AlertQueueManager( + self.config, self._lm + ) if self.config.services.enabled: - self.queue_managers['services'] = LibreNMS.ServicesQueueManager(self.config, self._lm) + self.queue_managers["services"] = LibreNMS.ServicesQueueManager( + self.config, self._lm + ) if self.config.billing.enabled: - self.queue_managers['billing'] = LibreNMS.BillingQueueManager(self.config, self._lm) + self.queue_managers["billing"] = LibreNMS.BillingQueueManager( + self.config, self._lm + ) if self.config.ping.enabled: - self.queue_managers['ping'] = LibreNMS.PingQueueManager(self.config, self._lm) + self.queue_managers["ping"] = LibreNMS.PingQueueManager( + self.config, self._lm + ) if self.config.update_enabled: self.daily_timer.start() self.stats_timer.start() @@ -365,11 +479,19 @@ class Service: self.watchdog_timer.start() info("LibreNMS Service: {} started!".format(self.config.unique_name)) - info("Poller group {}. Using Python {} and {} locks and queues" - .format('0 (default)' if self.config.group == [0] else self.config.group, python_version(), - 'redis' if isinstance(self._lm, LibreNMS.RedisLock) else 'internal')) + info( + "Poller group {}. Using Python {} and {} locks and queues".format( + "0 (default)" if self.config.group == [0] else self.config.group, + python_version(), + "redis" if isinstance(self._lm, LibreNMS.RedisLock) else "internal", + ) + ) if self.config.update_enabled: - info("Maintenance tasks will be run every {}".format(timedelta(seconds=self.config.update_frequency))) + info( + "Maintenance tasks will be run every {}".format( + timedelta(seconds=self.config.update_frequency) + ) + ) else: warning("Maintenance tasks are disabled.") @@ -403,7 +525,11 @@ class Service: self.dispatch_immediate_discovery(device_id, group) else: if self.is_master: - info("{} is no longer the master dispatcher".format(self.config.name)) + info( + "{} is no longer the master dispatcher".format( + self.config.name + ) + ) self.stop_dispatch_timers() self.is_master = False # no longer master sleep(self.config.master_resolution) @@ -414,10 +540,12 @@ class Service: self.shutdown() def _acquire_master(self): - return self._lm.lock('dispatch.master', self.config.unique_name, self.config.master_timeout, True) + return self._lm.lock( + "dispatch.master", self.config.unique_name, self.config.master_timeout, True + ) def _release_master(self): - self._lm.unlock('dispatch.master', self.config.unique_name) + self._lm.unlock("dispatch.master", self.config.unique_name) # ------------ Discovery ------------ def dispatch_immediate_discovery(self, device_id, group): @@ -434,16 +562,22 @@ class Service: elapsed = cur_time - self.last_poll.get(device_id, cur_time) self.last_poll[device_id] = cur_time # arbitrary limit to reduce spam - if elapsed > (self.config.poller.frequency - self.config.master_resolution): - debug("Dispatching polling for device {}, time since last poll {:.2f}s" - .format(device_id, elapsed)) + if elapsed > ( + self.config.poller.frequency - self.config.master_resolution + ): + debug( + "Dispatching polling for device {}, time since last poll {:.2f}s".format( + device_id, elapsed + ) + ) def fetch_immediate_device_list(self): try: poller_find_time = self.config.poller.frequency - 1 discovery_find_time = self.config.discovery.frequency - 1 - result = self._db.query('''SELECT `device_id`, + result = self._db.query( + """SELECT `device_id`, `poller_group`, COALESCE(`last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_polled_timetaken` SECOND), 1) AS `poll`, IF(snmp_disable=1 OR status=0, 0, IF (%s < `last_discovered_timetaken` * 1.25, 0, COALESCE(`last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_discovered_timetaken` SECOND), 1))) AS `discover` @@ -454,15 +588,29 @@ class Service: `last_polled` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_polled_timetaken` SECOND) OR `last_discovered` <= DATE_ADD(DATE_ADD(NOW(), INTERVAL -%s SECOND), INTERVAL `last_discovered_timetaken` SECOND) ) - ORDER BY `last_polled_timetaken` DESC''', (poller_find_time, self.service_age(), discovery_find_time, poller_find_time, discovery_find_time)) + ORDER BY `last_polled_timetaken` DESC""", + ( + poller_find_time, + self.service_age(), + discovery_find_time, + poller_find_time, + discovery_find_time, + ), + ) self.db_failures = 0 return result except pymysql.err.Error: self.db_failures += 1 if self.db_failures > self.config.max_db_failures: - warning("Too many DB failures ({}), attempting to release master".format(self.db_failures)) + warning( + "Too many DB failures ({}), attempting to release master".format( + self.db_failures + ) + ) self._release_master() - sleep(self.config.master_resolution) # sleep to give another node a chance to acquire + sleep( + self.config.master_resolution + ) # sleep to give another node a chance to acquire return [] def run_maintenance(self): @@ -475,21 +623,24 @@ class Service: max_runtime = 86100 max_tries = int(max_runtime / wait) info("Waiting for schema lock") - while not self._lm.lock('schema-update', self.config.unique_name, max_runtime): + while not self._lm.lock("schema-update", self.config.unique_name, max_runtime): attempt += 1 if attempt >= max_tries: # don't get stuck indefinitely - warning('Reached max wait for other pollers to update, updating now') + warning("Reached max wait for other pollers to update, updating now") break sleep(wait) info("Running maintenance tasks") try: - output = LibreNMS.call_script('daily.sh') + output = LibreNMS.call_script("daily.sh") info("Maintenance tasks complete\n{}".format(output)) except subprocess.CalledProcessError as e: - error("Error in daily.sh:\n" + (e.output.decode() if e.output is not None else 'No output')) + error( + "Error in daily.sh:\n" + + (e.output.decode() if e.output is not None else "No output") + ) - self._lm.unlock('schema-update', self.config.unique_name) + self._lm.unlock("schema-update", self.config.unique_name) self.restart() @@ -501,19 +652,23 @@ class Service: :return: Instance of LockManager """ try: - return LibreNMS.RedisLock(namespace='librenms.lock', - host=self.config.redis_host, - port=self.config.redis_port, - db=self.config.redis_db, - password=self.config.redis_pass, - unix_socket_path=self.config.redis_socket, - sentinel=self.config.redis_sentinel, - sentinel_service=self.config.redis_sentinel_service, - socket_timeout=self.config.redis_timeout) + return LibreNMS.RedisLock( + namespace="librenms.lock", + host=self.config.redis_host, + port=self.config.redis_port, + db=self.config.redis_db, + password=self.config.redis_pass, + unix_socket_path=self.config.redis_socket, + sentinel=self.config.redis_sentinel, + sentinel_service=self.config.redis_sentinel_service, + socket_timeout=self.config.redis_timeout, + ) except ImportError: if self.config.distributed: critical("ERROR: Redis connection required for distributed polling") - critical("Please install redis-py, either through your os software repository or from PyPI") + critical( + "Please install redis-py, either through your os software repository or from PyPI" + ) self.exit(2) except Exception as e: if self.config.distributed: @@ -533,9 +688,9 @@ class Service: warning("Please restart manually") return - info('Restarting service... ') + info("Restarting service... ") - if 'psutil' not in sys.modules: + if "psutil" not in sys.modules: warning("psutil is not available, polling gap possible") self._stop_managers_and_wait() else: @@ -578,7 +733,7 @@ class Service: :param signalnum: UNIX signal number :param flag: Flags accompanying signal """ - info('Shutting down, waiting for running jobs to complete...') + info("Shutting down, waiting for running jobs to complete...") self.stop_dispatch_timers() self._release_master() @@ -592,7 +747,7 @@ class Service: self._stop_managers_and_wait() # try to release master lock - info('Shutdown of %s/%s complete', os.getpid(), threading.current_thread().name) + info("Shutdown of %s/%s complete", os.getpid(), threading.current_thread().name) self.exit(0) def start_dispatch_timers(self): @@ -636,10 +791,13 @@ class Service: We do this be creating a file in the base directory (.lock.service) if it doesn't exist and obtaining an exclusive lock on that file. """ - lock_file = "{}/{}".format(self.config.BASE_DIR, '.lock.service') + lock_file = "{}/{}".format(self.config.BASE_DIR, ".lock.service") import fcntl - self._fp = open(lock_file, 'w') # keep a reference so the file handle isn't garbage collected + + self._fp = open( + lock_file, "w" + ) # keep a reference so the file handle isn't garbage collected self._fp.flush() try: fcntl.lockf(self._fp, fcntl.LOCK_EX | fcntl.LOCK_NB) @@ -652,46 +810,72 @@ class Service: try: # Report on the poller instance as a whole - self._db.query('INSERT INTO poller_cluster(node_id, poller_name, poller_version, poller_groups, last_report, master) ' - 'values("{0}", "{1}", "{2}", "{3}", NOW(), {4}) ' - 'ON DUPLICATE KEY UPDATE poller_version="{2}", poller_groups="{3}", last_report=NOW(), master={4}; ' - .format(self.config.node_id, self.config.name, "librenms-service", ','.join(str(g) for g in self.config.group), 1 if self.is_master else 0)) + self._db.query( + "INSERT INTO poller_cluster(node_id, poller_name, poller_version, poller_groups, last_report, master) " + 'values("{0}", "{1}", "{2}", "{3}", NOW(), {4}) ' + 'ON DUPLICATE KEY UPDATE poller_version="{2}", poller_groups="{3}", last_report=NOW(), master={4}; '.format( + self.config.node_id, + self.config.name, + "librenms-service", + ",".join(str(g) for g in self.config.group), + 1 if self.is_master else 0, + ) + ) # Find our ID - self._db.query('SELECT id INTO @parent_poller_id FROM poller_cluster WHERE node_id="{0}"; '.format(self.config.node_id)) + self._db.query( + 'SELECT id INTO @parent_poller_id FROM poller_cluster WHERE node_id="{0}"; '.format( + self.config.node_id + ) + ) for worker_type, manager in self.queue_managers.items(): worker_seconds, devices = manager.performance.reset() # Record the queue state - self._db.query('INSERT INTO poller_cluster_stats(parent_poller, poller_type, depth, devices, worker_seconds, workers, frequency) ' - 'values(@parent_poller_id, "{0}", {1}, {2}, {3}, {4}, {5}) ' - 'ON DUPLICATE KEY UPDATE depth={1}, devices={2}, worker_seconds={3}, workers={4}, frequency={5}; ' - .format(worker_type, - sum([manager.get_queue(group).qsize() for group in self.config.group]), - devices, - worker_seconds, - getattr(self.config, worker_type).workers, - getattr(self.config, worker_type).frequency) - ) + self._db.query( + "INSERT INTO poller_cluster_stats(parent_poller, poller_type, depth, devices, worker_seconds, workers, frequency) " + 'values(@parent_poller_id, "{0}", {1}, {2}, {3}, {4}, {5}) ' + "ON DUPLICATE KEY UPDATE depth={1}, devices={2}, worker_seconds={3}, workers={4}, frequency={5}; ".format( + worker_type, + sum( + [ + manager.get_queue(group).qsize() + for group in self.config.group + ] + ), + devices, + worker_seconds, + getattr(self.config, worker_type).workers, + getattr(self.config, worker_type).frequency, + ) + ) except pymysql.err.Error: - exception("Unable to log performance statistics - is the database still online?") + exception( + "Unable to log performance statistics - is the database still online?" + ) def systemd_watchdog(self): - if 'systemd.daemon' in sys.modules: + if "systemd.daemon" in sys.modules: notify("WATCHDOG=1") def logfile_watchdog(self): try: # check that lofgile has been written to within last poll period - logfile_mdiff = datetime.now().timestamp() - os.path.getmtime(self.config.watchdog_logfile) + logfile_mdiff = datetime.now().timestamp() - os.path.getmtime( + self.config.watchdog_logfile + ) except FileNotFoundError as e: error("Log file not found! {}".format(e)) return if logfile_mdiff > self.config.poller.frequency: - critical("BARK! Log file older than {}s, restarting service!".format(self.config.poller.frequency)) + critical( + "BARK! Log file older than {}s, restarting service!".format( + self.config.poller.frequency + ) + ) self.restart() else: info("Log file updated {}s ago".format(int(logfile_mdiff))) diff --git a/discovery-wrapper.py b/discovery-wrapper.py index 171f551ee6..1ae8c2d725 100755 --- a/discovery-wrapper.py +++ b/discovery-wrapper.py @@ -51,9 +51,9 @@ try: from optparse import OptionParser except ImportError as exc: - print('ERROR: missing one or more of the following python modules:') - print('threading, queue, sys, subprocess, time, os, json') - print('ERROR: %s' % exc) + print("ERROR: missing one or more of the following python modules:") + print("threading, queue, sys, subprocess, time, os, json") + print("ERROR: %s" % exc) sys.exit(2) APP_NAME = "discovery_wrapper" @@ -68,9 +68,9 @@ def memc_alive(): try: global memc key = str(uuid.uuid4()) - memc.set('discovery.ping.' + key, key, 60) - if memc.get('discovery.ping.' + key) == key: - memc.delete('discovery.ping.' + key) + memc.set("discovery.ping." + key, key, 60) + if memc.get("discovery.ping." + key) == key: + memc.delete("discovery.ping." + key) return True else: return False @@ -106,17 +106,19 @@ def printworker(): global distdisco if distdisco: if not IsNode: - memc_touch('discovery.master', 30) - nodes = memc.get('discovery.nodes') + memc_touch("discovery.master", 30) + nodes = memc.get("discovery.nodes") if nodes is None and not memc_alive(): - print("WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly.") + print( + "WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly." + ) distdisco = False nodes = nodeso if nodes is not nodeso: print("INFO: %s Node(s) Total" % (nodes)) nodeso = nodes else: - memc_touch('discovery.nodes', 30) + memc_touch("discovery.nodes", 30) try: worker_id, device_id, elapsed_time = print_queue.get(False) except: @@ -136,9 +138,15 @@ def printworker(): per_device_duration[device_id] = elapsed_time discovered_devices += 1 if elapsed_time < 300: - print("INFO: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time)) + print( + "INFO: worker %s finished device %s in %s seconds" + % (worker_id, device_id, elapsed_time) + ) else: - print("WARNING: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time)) + print( + "WARNING: worker %s finished device %s in %s seconds" + % (worker_id, device_id, elapsed_time) + ) print_queue.task_done() @@ -152,28 +160,48 @@ def poll_worker(): while True: device_id = poll_queue.get() # (c) 2015, GPLv3, Daniel Preussker <<> /dev/null" - command = "/usr/bin/env php %s -h %s %s 2>&1" % (discovery_path, device_id, output) + output = ( + "-d >> %s/discover_device_%s.log" % (log_dir, device_id) + if debug + else ">> /dev/null" + ) + command = "/usr/bin/env php %s -h %s %s 2>&1" % ( + discovery_path, + device_id, + output, + ) # TODO: Replace with command_runner subprocess.check_call(command, shell=True) elapsed_time = int(time.time() - start_time) - print_queue.put([threading.current_thread().name, device_id, elapsed_time]) + print_queue.put( + [threading.current_thread().name, device_id, elapsed_time] + ) except (KeyboardInterrupt, SystemExit): raise except: @@ -181,48 +209,60 @@ def poll_worker(): poll_queue.task_done() -if __name__ == '__main__': +if __name__ == "__main__": logger = LNMS.logger_get_logger(LOG_FILE, debug=_DEBUG) install_dir = os.path.dirname(os.path.realpath(__file__)) - LNMS.check_for_file(install_dir + '/.env') + LNMS.check_for_file(install_dir + "/.env") config = json.loads(LNMS.get_config_data(install_dir)) - discovery_path = config['install_dir'] + '/discovery.php' - log_dir = config['log_dir'] + discovery_path = config["install_dir"] + "/discovery.php" + log_dir = config["log_dir"] # (c) 2015, GPLv3, Daniel Preussker << << << 0: @@ -335,7 +394,7 @@ if __name__ == '__main__': print("Clearing Locks") x = minlocks while x <= maxlocks: - memc.delete('discovery.device.' + str(x)) + memc.delete("discovery.device." + str(x)) x = x + 1 print("%s Locks Cleared" % x) print("Clearing Nodes") @@ -349,17 +408,29 @@ if __name__ == '__main__': show_stopper = False if total_time > 21600: - print("WARNING: the process took more than 6 hours to finish, you need faster hardware or more threads") - print("INFO: in sequential style discovery the elapsed time would have been: %s seconds" % real_duration) + print( + "WARNING: the process took more than 6 hours to finish, you need faster hardware or more threads" + ) + print( + "INFO: in sequential style discovery the elapsed time would have been: %s seconds" + % real_duration + ) for device in per_device_duration: if per_device_duration[device] > 3600: - print("WARNING: device %s is taking too long: %s seconds" % (device, per_device_duration[device])) + print( + "WARNING: device %s is taking too long: %s seconds" + % (device, per_device_duration[device]) + ) show_stopper = True if show_stopper: - print("ERROR: Some devices are taking more than 3600 seconds, the script cannot recommend you what to do.") + print( + "ERROR: Some devices are taking more than 3600 seconds, the script cannot recommend you what to do." + ) else: recommend = int(total_time / 300.0 * amount_of_workers + 1) print( - "WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" % recommend) + "WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" + % recommend + ) sys.exit(2) diff --git a/librenms-service.py b/librenms-service.py index 3bffc11476..0be34c5d60 100755 --- a/librenms-service.py +++ b/librenms-service.py @@ -10,20 +10,36 @@ import LibreNMS from logging import info -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='LibreNMS Service - manages polling and other periodic processes') - parser.add_argument('-g', '--group', type=int, help="Set the poller group for this poller") - parser.add_argument('-v', '--verbose', action='count', help="Show verbose output.") - parser.add_argument('-d', '--debug', action="store_true", help="Show debug output.") - parser.add_argument('-m', '--multiple', action="store_true", help="Allow multiple instances of the service.") - parser.add_argument('-t', '--timestamps', action="store_true", help="Include timestamps in the logs (not normally needed for syslog/journald") +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="LibreNMS Service - manages polling and other periodic processes" + ) + parser.add_argument( + "-g", "--group", type=int, help="Set the poller group for this poller" + ) + parser.add_argument("-v", "--verbose", action="count", help="Show verbose output.") + parser.add_argument("-d", "--debug", action="store_true", help="Show debug output.") + parser.add_argument( + "-m", + "--multiple", + action="store_true", + help="Allow multiple instances of the service.", + ) + parser.add_argument( + "-t", + "--timestamps", + action="store_true", + help="Include timestamps in the logs (not normally needed for syslog/journald", + ) args = parser.parse_args() if args.timestamps: - logging.basicConfig(format='%(asctime)s %(threadName)s(%(levelname)s):%(message)s') + logging.basicConfig( + format="%(asctime)s %(threadName)s(%(levelname)s):%(message)s" + ) else: - logging.basicConfig(format='%(threadName)s(%(levelname)s):%(message)s') + logging.basicConfig(format="%(threadName)s(%(levelname)s):%(message)s") if args.verbose: logging.getLogger().setLevel(logging.INFO) @@ -42,7 +58,11 @@ if __name__ == '__main__': service.config.single_instance = args.multiple if args.group: - service.config.group = [ args.group ] + service.config.group = [args.group] - info('Entering main LibreNMS service loop on {}/{}...'.format(os.getpid(), threading.current_thread().name)) + info( + "Entering main LibreNMS service loop on {}/{}...".format( + os.getpid(), threading.current_thread().name + ) + ) service.start() diff --git a/poller-wrapper.py b/poller-wrapper.py index 8ac94bd2f5..78e46c75fc 100755 --- a/poller-wrapper.py +++ b/poller-wrapper.py @@ -39,9 +39,9 @@ try: from optparse import OptionParser except ImportError as exc: - print('ERROR: missing one or more of the following python modules:') - print('threading, queue, sys, subprocess, time, os, json') - print('ERROR: %s' % exc) + print("ERROR: missing one or more of the following python modules:") + print("threading, queue, sys, subprocess, time, os, json") + print("ERROR: %s" % exc) sys.exit(2) @@ -60,9 +60,9 @@ def memc_alive(): try: global memc key = str(uuid.uuid4()) - memc.set('poller.ping.' + key, key, 60) - if memc.get('poller.ping.' + key) == key: - memc.delete('poller.ping.' + key) + memc.set("poller.ping." + key, key, 60) + if memc.get("poller.ping." + key) == key: + memc.delete("poller.ping." + key) return True else: return False @@ -82,7 +82,9 @@ def memc_touch(key, time): def get_time_tag(step): ts = int(time.time()) return ts - ts % step -#EOC0 + + +# EOC0 """ A seperate queue and a single worker for printing information to the screen prevents @@ -91,6 +93,8 @@ def get_time_tag(step): Some people, when confronted with a problem, think, "I know, I'll use threads," and then two they hav erpoblesms. """ + + def printworker(): nodeso = 0 while True: @@ -102,7 +106,9 @@ def printworker(): memc_touch(master_tag, 10) nodes = memc.get(nodes_tag) if nodes is None and not memc_alive(): - print("WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly.") + print( + "WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly." + ) distpoll = False nodes = nodeso if nodes is not nodeso: @@ -129,9 +135,15 @@ def printworker(): per_device_duration[device_id] = elapsed_time polled_devices += 1 if elapsed_time < step: - print("INFO: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time)) + print( + "INFO: worker %s finished device %s in %s seconds" + % (worker_id, device_id, elapsed_time) + ) else: - print("WARNING: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time)) + print( + "WARNING: worker %s finished device %s in %s seconds" + % (worker_id, device_id, elapsed_time) + ) print_queue.task_done() @@ -139,33 +151,57 @@ def printworker(): This class will fork off single instances of the poller.php process, record how long it takes, and push the resulting reports to the printer queue """ + + def poll_worker(): while True: device_id = poll_queue.get() # (c) 2015, GPLv3, Daniel Preussker <<> /dev/null" - command = "/usr/bin/env php %s -h %s %s 2>&1" % (poller_path, device_id, output) + output = ( + "-d >> %s/poll_device_%s.log" % (log_dir, device_id) + if debug + else ">> /dev/null" + ) + command = "/usr/bin/env php %s -h %s %s 2>&1" % ( + poller_path, + device_id, + output, + ) # TODO: replace with command_runner subprocess.check_call(command, shell=True) elapsed_time = int(time.time() - start_time) - print_queue.put([threading.current_thread().name, device_id, elapsed_time]) + print_queue.put( + [threading.current_thread().name, device_id, elapsed_time] + ) except (KeyboardInterrupt, SystemExit): raise except: @@ -173,32 +209,33 @@ def poll_worker(): poll_queue.task_done() -if __name__ == '__main__': +if __name__ == "__main__": logger = LNMS.logger_get_logger(LOG_FILE, debug=_DEBUG) install_dir = os.path.dirname(os.path.realpath(__file__)) - LNMS.check_for_file(install_dir + '/.env') + LNMS.check_for_file(install_dir + "/.env") config = json.loads(LNMS.get_config_data(install_dir)) - poller_path = config['install_dir'] + '/poller.php' - log_dir = config['log_dir'] + poller_path = config["install_dir"] + "/poller.php" + log_dir = config["log_dir"] - if 'rrd' in config and 'step' in config['rrd']: - step = config['rrd']['step'] + if "rrd" in config and "step" in config["rrd"]: + step = config["rrd"]["step"] else: step = 300 - # (c) 2015, GPLv3, Daniel Preussker << << << 0: @@ -339,7 +399,7 @@ if __name__ == '__main__': print("Clearing Locks for %s" % time_tag) x = minlocks while x <= maxlocks: - res = memc.delete('poller.device.%s.%s' % (x, time_tag)) + res = memc.delete("poller.device.%s.%s" % (x, time_tag)) x += 1 print("%s Locks Cleared" % x) print("Clearing Nodes") @@ -352,36 +412,57 @@ if __name__ == '__main__': show_stopper = False - db = LNMS.db_open(config['db_socket'], config['db_host'], config['db_port'], config['db_user'], config['db_pass'], config['db_name']) + db = LNMS.db_open( + config["db_socket"], + config["db_host"], + config["db_port"], + config["db_user"], + config["db_pass"], + config["db_name"], + ) cursor = db.cursor() - query = "update pollers set last_polled=NOW(), devices='%d', time_taken='%d' where poller_name='%s'" % ( - polled_devices, - total_time, - config['distributed_poller_name']) + query = ( + "update pollers set last_polled=NOW(), devices='%d', time_taken='%d' where poller_name='%s'" + % (polled_devices, total_time, config["distributed_poller_name"]) + ) response = cursor.execute(query) if response == 1: db.commit() else: - query = "insert into pollers set poller_name='%s', last_polled=NOW(), devices='%d', time_taken='%d'" % ( - config['distributed_poller_name'], polled_devices, total_time) + query = ( + "insert into pollers set poller_name='%s', last_polled=NOW(), devices='%d', time_taken='%d'" + % (config["distributed_poller_name"], polled_devices, total_time) + ) cursor.execute(query) db.commit() db.close() if total_time > step: print( - "WARNING: the process took more than %s seconds to finish, you need faster hardware or more threads" % step) - print("INFO: in sequential style polling the elapsed time would have been: %s seconds" % real_duration) + "WARNING: the process took more than %s seconds to finish, you need faster hardware or more threads" + % step + ) + print( + "INFO: in sequential style polling the elapsed time would have been: %s seconds" + % real_duration + ) for device in per_device_duration: if per_device_duration[device] > step: - print("WARNING: device %s is taking too long: %s seconds" % (device, per_device_duration[device])) + print( + "WARNING: device %s is taking too long: %s seconds" + % (device, per_device_duration[device]) + ) show_stopper = True if show_stopper: print( - "ERROR: Some devices are taking more than %s seconds, the script cannot recommend you what to do." % step) + "ERROR: Some devices are taking more than %s seconds, the script cannot recommend you what to do." + % step + ) else: recommend = int(total_time / step * amount_of_workers + 1) print( - "WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" % recommend) + "WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" + % recommend + ) sys.exit(2) diff --git a/scripts/check_requirements.py b/scripts/check_requirements.py index 5d5b409d60..6a329211e2 100755 --- a/scripts/check_requirements.py +++ b/scripts/check_requirements.py @@ -6,11 +6,9 @@ from pkg_resources import DistributionNotFound, VersionConflict args = sys.argv # verbose flag -verbose = '-v' in args +verbose = "-v" in args -requirements = [ - 'PyMySQL' -] +requirements = ["PyMySQL"] try: pkg_resources.require(requirements) diff --git a/services-wrapper.py b/services-wrapper.py index f596ad2483..5a4f1c08da 100755 --- a/services-wrapper.py +++ b/services-wrapper.py @@ -51,9 +51,9 @@ try: from optparse import OptionParser except ImportError as exc: - print('ERROR: missing one or more of the following python modules:') - print('threading, queue, sys, subprocess, time, os, json') - print('ERROR: %s' % exc) + print("ERROR: missing one or more of the following python modules:") + print("threading, queue, sys, subprocess, time, os, json") + print("ERROR: %s" % exc) sys.exit(2) @@ -72,9 +72,9 @@ def memc_alive(): try: global memc key = str(uuid.uuid4()) - memc.set('poller.ping.' + key, key, 60) - if memc.get('poller.ping.' + key) == key: - memc.delete('poller.ping.' + key) + memc.set("poller.ping." + key, key, 60) + if memc.get("poller.ping." + key) == key: + memc.delete("poller.ping." + key) return True else: return False @@ -94,7 +94,9 @@ def memc_touch(key, time): def get_time_tag(step): ts = int(time.time()) return ts - ts % step -#EOC0 + + +# EOC0 """ @@ -114,17 +116,19 @@ def printworker(): global servicedisco if servicedisco: if not IsNode: - memc_touch('service.master', 10) - nodes = memc.get('service.nodes') + memc_touch("service.master", 10) + nodes = memc.get("service.nodes") if nodes is None and not memc_alive(): - print("WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly.") + print( + "WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly." + ) servicedisco = False nodes = nodeso if nodes is not nodeso: print("INFO: %s Node(s) Total" % (nodes)) nodeso = nodes else: - memc_touch('service.nodes', 10) + memc_touch("service.nodes", 10) try: worker_id, device_id, elapsed_time = print_queue.get(False) except: @@ -144,11 +148,18 @@ def printworker(): per_device_duration[device_id] = elapsed_time service_devices += 1 if elapsed_time < 300: - print("INFO: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time)) + print( + "INFO: worker %s finished device %s in %s seconds" + % (worker_id, device_id, elapsed_time) + ) else: - print("WARNING: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time)) + print( + "WARNING: worker %s finished device %s in %s seconds" + % (worker_id, device_id, elapsed_time) + ) print_queue.task_done() + """ This class will fork off single instances of the check-services.php process, record how long it takes, and push the resulting reports to the printer queue @@ -159,28 +170,48 @@ def poll_worker(): while True: device_id = poll_queue.get() # (c) 2015, GPLv3, Daniel Preussker <<> /dev/null" + output = ( + "-d >> %s/services_device_%s.log" % (log_dir, device_id) + if debug + else ">> /dev/null" + ) # TODO replace with command_runner - command = "/usr/bin/env php %s -h %s %s 2>&1" % (service_path, device_id, output) + command = "/usr/bin/env php %s -h %s %s 2>&1" % ( + service_path, + device_id, + output, + ) subprocess.check_call(command, shell=True) elapsed_time = int(time.time() - start_time) - print_queue.put([threading.current_thread().name, device_id, elapsed_time]) + print_queue.put( + [threading.current_thread().name, device_id, elapsed_time] + ) except (KeyboardInterrupt, SystemExit): raise except: @@ -188,48 +219,60 @@ def poll_worker(): poll_queue.task_done() -if __name__ == '__main__': +if __name__ == "__main__": logger = LNMS.logger_get_logger(LOG_FILE, debug=_DEBUG) install_dir = os.path.dirname(os.path.realpath(__file__)) - LNMS.check_for_file(install_dir + '/.env') + LNMS.check_for_file(install_dir + "/.env") config = json.loads(LNMS.get_config_data(install_dir)) - service_path = config['install_dir'] + '/check-services.php' - log_dir = config['log_dir'] + service_path = config["install_dir"] + "/check-services.php" + log_dir = config["log_dir"] # (c) 2015, GPLv3, Daniel Preussker << << << 0: @@ -334,7 +398,7 @@ if __name__ == '__main__': print("Clearing Locks") x = minlocks while x <= maxlocks: - memc.delete('service.device.' + str(x)) + memc.delete("service.device." + str(x)) x = x + 1 print("%s Locks Cleared" % x) print("Clearing Nodes") @@ -348,17 +412,29 @@ if __name__ == '__main__': show_stopper = False if total_time > 300: - print("WARNING: the process took more than 5 minutes to finish, you need faster hardware or more threads") - print("INFO: in sequential style service checks the elapsed time would have been: %s seconds" % real_duration) + print( + "WARNING: the process took more than 5 minutes to finish, you need faster hardware or more threads" + ) + print( + "INFO: in sequential style service checks the elapsed time would have been: %s seconds" + % real_duration + ) for device in per_device_duration: if per_device_duration[device] > 300: - print("WARNING: device %s is taking too long: %s seconds" % (device, per_device_duration[device])) + print( + "WARNING: device %s is taking too long: %s seconds" + % (device, per_device_duration[device]) + ) show_stopper = True if show_stopper: - print("ERROR: Some devices are taking more than 300 seconds, the script cannot recommend you what to do.") + print( + "ERROR: Some devices are taking more than 300 seconds, the script cannot recommend you what to do." + ) else: recommend = int(total_time / 300.0 * amount_of_workers + 1) print( - "WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" % recommend) + "WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" + % recommend + ) sys.exit(2) diff --git a/snmp-scan.py b/snmp-scan.py index 193066d02a..71f9bde9c9 100755 --- a/snmp-scan.py +++ b/snmp-scan.py @@ -32,7 +32,7 @@ from subprocess import check_output, CalledProcessError from sys import stdout from time import time -Result = namedtuple('Result', ['ip', 'hostname', 'outcome', 'output']) +Result = namedtuple("Result", ["ip", "hostname", "outcome", "output"]) class Outcome: @@ -45,20 +45,20 @@ class Outcome: TERMINATED = 6 -POLLER_GROUP = '0' +POLLER_GROUP = "0" VERBOSE_LEVEL = 0 THREADS = 32 CONFIG = {} EXCLUDED_NETS = [] start_time = time() stats = { - 'count': 0, + "count": 0, Outcome.ADDED: 0, Outcome.UNPINGABLE: 0, Outcome.KNOWN: 0, Outcome.FAILED: 0, Outcome.EXCLUDED: 0, - Outcome.TERMINATED: 0 + Outcome.TERMINATED: 0, } @@ -69,31 +69,44 @@ def debug(message, level=2): def get_outcome_symbol(outcome): return { - Outcome.UNDEFINED: '?', # should not occur - Outcome.ADDED: '+', - Outcome.UNPINGABLE: '.', - Outcome.KNOWN: '*', - Outcome.FAILED: '-', - Outcome.TERMINATED: '' + Outcome.UNDEFINED: "?", # should not occur + Outcome.ADDED: "+", + Outcome.UNPINGABLE: ".", + Outcome.KNOWN: "*", + Outcome.FAILED: "-", + Outcome.TERMINATED: "", }[outcome] def handle_result(data): if VERBOSE_LEVEL > 0: - print('Scanned \033[1m{}\033[0m {}'.format( - ("{} ({})".format(data.hostname, data.ip) if data.hostname else data.ip), data.output)) + print( + "Scanned \033[1m{}\033[0m {}".format( + ( + "{} ({})".format(data.hostname, data.ip) + if data.hostname + else data.ip + ), + data.output, + ) + ) else: - print(get_outcome_symbol(data.outcome), end='') + print(get_outcome_symbol(data.outcome), end="") stdout.flush() - stats['count'] += 0 if data.outcome == Outcome.TERMINATED else 1 + stats["count"] += 0 if data.outcome == Outcome.TERMINATED else 1 stats[data.outcome] += 1 def check_ip_excluded(check_ip): for network_check in EXCLUDED_NETS: if check_ip in network_check: - debug("\033[91m{} excluded by autodiscovery.nets-exclude\033[0m".format(check_ip), 1) + debug( + "\033[91m{} excluded by autodiscovery.nets-exclude\033[0m".format( + check_ip + ), + 1, + ) stats[Outcome.EXCLUDED] += 1 return True return False @@ -113,7 +126,14 @@ def scan_host(scan_ip): try: - arguments = ['/usr/bin/env', 'php', 'addhost.php', '-g', POLLER_GROUP, hostname or scan_ip] + arguments = [ + "/usr/bin/env", + "php", + "addhost.php", + "-g", + POLLER_GROUP, + hostname or scan_ip, + ] if args.ping: arguments.insert(5, args.ping) add_output = check_output(arguments) @@ -121,45 +141,79 @@ def scan_host(scan_ip): except CalledProcessError as err: output = err.output.decode().rstrip() if err.returncode == 2: - if 'Could not ping' in output: + if "Could not ping" in output: return Result(scan_ip, hostname, Outcome.UNPINGABLE, output) else: return Result(scan_ip, hostname, Outcome.FAILED, output) elif err.returncode == 3: return Result(scan_ip, hostname, Outcome.KNOWN, output) except KeyboardInterrupt: - return Result(scan_ip, hostname, Outcome.TERMINATED, 'Terminated') + return Result(scan_ip, hostname, Outcome.TERMINATED, "Terminated") return Result(scan_ip, hostname, Outcome.UNDEFINED, output) -if __name__ == '__main__': +if __name__ == "__main__": ################### # Parse arguments # ################### - parser = argparse.ArgumentParser(description='Scan network for snmp hosts and add them to LibreNMS.', formatter_class=argparse.RawTextHelpFormatter) - parser.add_argument('network', action='append', nargs='*', type=str, help="""CIDR noted IP-Range to scan. Can be specified multiple times + parser = argparse.ArgumentParser( + description="Scan network for snmp hosts and add them to LibreNMS.", + formatter_class=argparse.RawTextHelpFormatter, + ) + parser.add_argument( + "network", + action="append", + nargs="*", + type=str, + help="""CIDR noted IP-Range to scan. Can be specified multiple times This argument is only required if 'nets' config is not set Example: 192.168.0.0/24 Example: 192.168.0.0/31 will be treated as an RFC3021 p-t-p network with two addresses, 192.168.0.0 and 192.168.0.1 -Example: 192.168.0.1/32 will be treated as a single host address""") - parser.add_argument('-P', '--ping', action='store_const', const="-b", default="", help="""Add the device as an ICMP only device if it replies to ping but not SNMP. -Example: """ + __file__ + """ -P 192.168.0.0/24""") - parser.add_argument('-t', dest='threads', type=int, - help="How many IPs to scan at a time. More will increase the scan speed," + - " but could overload your system. Default: {}".format(THREADS)) - parser.add_argument('-g', dest='group', type=str, - help="The poller group all scanned devices will be added to." - " Default: The first group listed in 'distributed_poller_group', or {} if not specificed".format(POLLER_GROUP)) - parser.add_argument('-l', '--legend', action='store_true', help="Print the legend.") - parser.add_argument('-v', '--verbose', action='count', - help="Show debug output. Specifying multiple times increases the verbosity.") +Example: 192.168.0.1/32 will be treated as a single host address""", + ) + parser.add_argument( + "-P", + "--ping", + action="store_const", + const="-b", + default="", + help="""Add the device as an ICMP only device if it replies to ping but not SNMP. +Example: """ + + __file__ + + """ -P 192.168.0.0/24""", + ) + parser.add_argument( + "-t", + dest="threads", + type=int, + help="How many IPs to scan at a time. More will increase the scan speed," + + " but could overload your system. Default: {}".format(THREADS), + ) + parser.add_argument( + "-g", + dest="group", + type=str, + help="The poller group all scanned devices will be added to." + " Default: The first group listed in 'distributed_poller_group', or {} if not specificed".format( + POLLER_GROUP + ), + ) + parser.add_argument("-l", "--legend", action="store_true", help="Print the legend.") + parser.add_argument( + "-v", + "--verbose", + action="count", + help="Show debug output. Specifying multiple times increases the verbosity.", + ) # compatibility arguments - parser.add_argument('-r', dest='network', action='append', help=argparse.SUPPRESS) - parser.add_argument('-d', '-i', dest='verbose', action='count', help=argparse.SUPPRESS) - parser.add_argument('-n', action='store_true', help=argparse.SUPPRESS) - parser.add_argument('-b', action='store_true', help=argparse.SUPPRESS) + parser.add_argument("-r", dest="network", action="append", help=argparse.SUPPRESS) + parser.add_argument( + "-d", "-i", dest="verbose", action="count", help=argparse.SUPPRESS + ) + parser.add_argument("-n", action="store_true", help=argparse.SUPPRESS) + parser.add_argument("-b", action="store_true", help=argparse.SUPPRESS) args = parser.parse_args() @@ -170,12 +224,20 @@ Example: """ + __file__ + """ -P 192.168.0.0/24""") install_dir = path.dirname(path.realpath(__file__)) chdir(install_dir) try: - CONFIG = json.loads(check_output(['/usr/bin/env', 'php', 'config_to_json.php']).decode()) + CONFIG = json.loads( + check_output(["/usr/bin/env", "php", "config_to_json.php"]).decode() + ) except CalledProcessError as e: - parser.error("Could not execute: {}\n{}".format(' '.join(e.cmd), e.output.decode().rstrip())) + parser.error( + "Could not execute: {}\n{}".format( + " ".join(e.cmd), e.output.decode().rstrip() + ) + ) exit(2) - POLLER_GROUP = args.group or str(CONFIG.get('distributed_poller_group')).split(',')[0] + POLLER_GROUP = ( + args.group or str(CONFIG.get("distributed_poller_group")).split(",")[0] + ) ####################### # Build network lists # @@ -190,34 +252,40 @@ Example: """ + __file__ + """ -P 192.168.0.0/24""") netargs.append(a) # make sure we have something to scan - if not CONFIG.get('nets', []) and not netargs: - parser.error('\'nets\' is not set in your LibreNMS config, you must specify a network to scan') + if not CONFIG.get("nets", []) and not netargs: + parser.error( + "'nets' is not set in your LibreNMS config, you must specify a network to scan" + ) # check for valid networks networks = [] - for net in (netargs if netargs else CONFIG.get('nets', [])): + for net in netargs if netargs else CONFIG.get("nets", []): try: - networks.append(ip_network(u'%s' % net, True)) - debug('Network parsed: {}'.format(net), 2) + networks.append(ip_network(u"%s" % net, True)) + debug("Network parsed: {}".format(net), 2) except ValueError as e: - parser.error('Invalid network format {}'.format(e)) + parser.error("Invalid network format {}".format(e)) - for net in CONFIG.get('autodiscovery', {}).get('nets-exclude', {}): + for net in CONFIG.get("autodiscovery", {}).get("nets-exclude", {}): try: EXCLUDED_NETS.append(ip_network(net, True)) - debug('Excluded network: {}'.format(net), 2) + debug("Excluded network: {}".format(net), 2) except ValueError as e: - parser.error('Invalid excluded network format {}, check your config.php'.format(e)) + parser.error( + "Invalid excluded network format {}, check your config.php".format(e) + ) ################# # Scan networks # ################# - debug('SNMP settings from config.php: {}'.format(CONFIG.get('snmp', {})), 2) + debug("SNMP settings from config.php: {}".format(CONFIG.get("snmp", {})), 2) if args.legend and not VERBOSE_LEVEL: - print('Legend:\n+ Added device\n* Known device\n- Failed to add device\n. Ping failed\n') + print( + "Legend:\n+ Added device\n* Known device\n- Failed to add device\n. Ping failed\n" + ) - print('Scanning IPs:') + print("Scanning IPs:") pool = Pool(processes=THREADS) @@ -240,9 +308,16 @@ Example: """ + __file__ + """ -P 192.168.0.0/24""") if VERBOSE_LEVEL == 0: print("\n") - base = 'Scanned {} IPs: {} known devices, added {} devices, failed to add {} devices' - summary = base.format(stats['count'], stats[Outcome.KNOWN], stats[Outcome.ADDED], stats[Outcome.FAILED]) + base = ( + "Scanned {} IPs: {} known devices, added {} devices, failed to add {} devices" + ) + summary = base.format( + stats["count"], + stats[Outcome.KNOWN], + stats[Outcome.ADDED], + stats[Outcome.FAILED], + ) if stats[Outcome.EXCLUDED]: - summary += ', {} ips excluded by config'.format(stats[Outcome.EXCLUDED]) + summary += ", {} ips excluded by config".format(stats[Outcome.EXCLUDED]) print(summary) - print('Runtime: {:.2f} seconds'.format(time() - start_time)) + print("Runtime: {:.2f} seconds".format(time() - start_time)) diff --git a/tests/tests.py b/tests/tests.py index 53784025d8..20864c8be7 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -21,73 +21,111 @@ class TestLocks(unittest.TestCase): @staticmethod def lock_thread(manager, lock_name, expiration, unlock_sleep=0): - manager.lock(lock_name, 'lock_thread', expiration) + manager.lock(lock_name, "lock_thread", expiration) if unlock_sleep: sleep(unlock_sleep) - manager.unlock(lock_name, 'lock_thread') + manager.unlock(lock_name, "lock_thread") def test_threading_lock(self): lm = LibreNMS.ThreadingLock() - thread = threading.Thread(target=self.lock_thread, args=(lm, 'first.lock', 2, 1)) + thread = threading.Thread( + target=self.lock_thread, args=(lm, "first.lock", 2, 1) + ) thread.daemon = True thread.start() sleep(0.05) - self.assertFalse(lm.lock('first.lock', 'main_thread', 0), "Acquired lock when it is held by thread") - self.assertFalse(lm.unlock('first.lock', 'main_thread'), "Unlocked lock main doesn't own") + self.assertFalse( + lm.lock("first.lock", "main_thread", 0), + "Acquired lock when it is held by thread", + ) + self.assertFalse( + lm.unlock("first.lock", "main_thread"), "Unlocked lock main doesn't own" + ) sleep(1.1) - self.assertTrue(lm.lock('first.lock', 'main_thread', 1), - "Could not acquire lock previously held by thread") - self.assertFalse(lm.lock('first.lock', 'main_thread', 1, False), "Was able to re-lock a lock main owns") - self.assertTrue(lm.lock('first.lock', 'main_thread', 1, True), "Could not re-lock a lock main owns") - self.assertTrue(lm.check_lock('first.lock')) - self.assertTrue(lm.unlock('first.lock', 'main_thread'), "Could not unlock lock main holds") - self.assertFalse(lm.unlock('first.lock', 'main_thread'), "Unlocked an unlocked lock?") - self.assertFalse(lm.check_lock('first.lock')) + self.assertTrue( + lm.lock("first.lock", "main_thread", 1), + "Could not acquire lock previously held by thread", + ) + self.assertFalse( + lm.lock("first.lock", "main_thread", 1, False), + "Was able to re-lock a lock main owns", + ) + self.assertTrue( + lm.lock("first.lock", "main_thread", 1, True), + "Could not re-lock a lock main owns", + ) + self.assertTrue(lm.check_lock("first.lock")) + self.assertTrue( + lm.unlock("first.lock", "main_thread"), "Could not unlock lock main holds" + ) + self.assertFalse( + lm.unlock("first.lock", "main_thread"), "Unlocked an unlocked lock?" + ) + self.assertFalse(lm.check_lock("first.lock")) def test_redis_lock(self): - if 'redis' not in sys.modules: - self.assertTrue(True, 'Skipped Redis tests') + if "redis" not in sys.modules: + self.assertTrue(True, "Skipped Redis tests") else: rc = redis.Redis() - rc.delete('lock:redis.lock') # make sure no previous data exists + rc.delete("lock:redis.lock") # make sure no previous data exists - lm = LibreNMS.RedisLock(namespace='lock') - thread = threading.Thread(target=self.lock_thread, args=(lm, 'redis.lock', 2, 1)) + lm = LibreNMS.RedisLock(namespace="lock") + thread = threading.Thread( + target=self.lock_thread, args=(lm, "redis.lock", 2, 1) + ) thread.daemon = True thread.start() sleep(0.05) - self.assertFalse(lm.lock('redis.lock', 'main_thread', 1), "Acquired lock when it is held by thread") - self.assertFalse(lm.unlock('redis.lock', 'main_thread'), "Unlocked lock main doesn't own") + self.assertFalse( + lm.lock("redis.lock", "main_thread", 1), + "Acquired lock when it is held by thread", + ) + self.assertFalse( + lm.unlock("redis.lock", "main_thread"), "Unlocked lock main doesn't own" + ) sleep(1.1) - self.assertTrue(lm.lock('redis.lock', 'main_thread', 1), - "Could not acquire lock previously held by thread") - self.assertFalse(lm.lock('redis.lock', 'main_thread', 1), "Relocked an existing lock") - self.assertTrue(lm.lock('redis.lock', 'main_thread', 1, True), "Could not re-lock a lock main owns") - self.assertTrue(lm.unlock('redis.lock', 'main_thread'), "Could not unlock lock main holds") - self.assertFalse(lm.unlock('redis.lock', 'main_thread'), "Unlocked an unlocked lock?") + self.assertTrue( + lm.lock("redis.lock", "main_thread", 1), + "Could not acquire lock previously held by thread", + ) + self.assertFalse( + lm.lock("redis.lock", "main_thread", 1), "Relocked an existing lock" + ) + self.assertTrue( + lm.lock("redis.lock", "main_thread", 1, True), + "Could not re-lock a lock main owns", + ) + self.assertTrue( + lm.unlock("redis.lock", "main_thread"), + "Could not unlock lock main holds", + ) + self.assertFalse( + lm.unlock("redis.lock", "main_thread"), "Unlocked an unlocked lock?" + ) def queue_thread(self, manager, expect, wait=True): - self.assertEqual(expect, manager.get(wait), 'Got unexpected data in thread') + self.assertEqual(expect, manager.get(wait), "Got unexpected data in thread") def test_redis_queue(self): - if 'redis' not in sys.modules: - self.assertTrue(True, 'Skipped Redis tests') + if "redis" not in sys.modules: + self.assertTrue(True, "Skipped Redis tests") else: rc = redis.Redis() - rc.delete('queue:testing') # make sure no previous data exists - qm = LibreNMS.RedisUniqueQueue('testing', namespace='queue') + rc.delete("queue:testing") # make sure no previous data exists + qm = LibreNMS.RedisUniqueQueue("testing", namespace="queue") thread = threading.Thread(target=self.queue_thread, args=(qm, None, False)) thread.daemon = True thread.start() - thread = threading.Thread(target=self.queue_thread, args=(qm, '2')) + thread = threading.Thread(target=self.queue_thread, args=(qm, "2")) thread.daemon = True thread.start() qm.put(2) @@ -96,9 +134,11 @@ class TestLocks(unittest.TestCase): qm.put(4) sleep(0.05) self.assertEqual(2, qm.qsize()) - self.assertEqual('3', qm.get()) - self.assertEqual('4', qm.get(), "Did not get second item in queue") - self.assertEqual(None, qm.get_nowait(), "Did not get None when queue should be empty") + self.assertEqual("3", qm.get()) + self.assertEqual("4", qm.get(), "Did not get second item in queue") + self.assertEqual( + None, qm.get_nowait(), "Did not get None when queue should be empty" + ) self.assertTrue(qm.empty(), "Queue should be empty") @@ -128,5 +168,6 @@ class TestTimer(unittest.TestCase): self.assertEqual(3, self.counter) timer.stop() -if __name__ == '__main__': + +if __name__ == "__main__": unittest.main()