Full Python code fusion / refactor and hardening (#13094)

* Add inline command_runner library

* New service/discovery/poller wrapper

* Convert old wrapper scripts to bootstrap loaders for wrapper.py

* Add command_runner to current requirements

* Move wrapper.py to LibreNMS module directory

* Reformat files

* File reformatting

* bootstrap files reformatting

* Fusion service and wrapper database connections and get_config_data functions

* Moved subprocess calls to command_runner

* LibreNMS library and __init__ fusion

* Reformat files

* Normalize logging use

* Reformatting code

* Fix missing argument for error log

* Fix refactor typo in DBConfig class

* Add default timeout for config.php data fetching

* distributed discovery should finish with a timestamp instead of an epoch

* Fix docstring inside dict prevents service key to work

* Fix poller insert statement

* Fix service wrapper typo

* Update docstring since we changed function behavior

* Normalize SQL statements

* Convert optparse to argparse

* Revert discovery thread number

* Handle debug logging

* Fix file option typo

* Reformat code

* Add credits to source package

* Rename logs depending on the wrapper type

* Cap max logfile size to 10MB

* Reformat code

* Add exception for Redis < 5.0

* Make sure we always log something from service

* Fix bogus description
This commit is contained in:
Orsiris de Jong
2021-08-10 01:49:29 +02:00
committed by GitHub
parent a4a04e6f09
commit 9c534a1a90
11 changed files with 1525 additions and 1669 deletions

View File

@@ -1,15 +1,17 @@
import json
import logging
import os
import subprocess
import sys
import tempfile
import threading
import timeit
from collections import deque
from logging import critical, info, debug, exception
from logging.handlers import RotatingFileHandler
from math import ceil
from queue import Queue
from time import time
from .service import Service, ServiceConfig
from .command_runner import command_runner
from .queuemanager import (
QueueManager,
TimedQueueManager,
@@ -20,6 +22,158 @@ from .queuemanager import (
PollerQueueManager,
DiscoveryQueueManager,
)
from .service import Service, ServiceConfig
# Hard limit script execution time so we don't get to "hang"
DEFAULT_SCRIPT_TIMEOUT = 3600
MAX_LOGFILE_SIZE = (1024 ** 2) * 10 # 10 Megabytes max log files
logger = logging.getLogger(__name__)
# Logging functions ########################################################
# Original logger functions from ofunctions.logger_utils package
FORMATTER = logging.Formatter("%(asctime)s :: %(levelname)s :: %(message)s")
def logger_get_console_handler():
try:
console_handler = logging.StreamHandler(sys.stdout)
except OSError as exc:
print("Cannot log to stdout, trying stderr. Message %s" % exc)
try:
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setFormatter(FORMATTER)
return console_handler
except OSError as exc:
print("Cannot log to stderr neither. Message %s" % exc)
return False
else:
console_handler.setFormatter(FORMATTER)
return console_handler
def logger_get_file_handler(log_file):
err_output = None
try:
file_handler = RotatingFileHandler(
log_file,
mode="a",
encoding="utf-8",
maxBytes=MAX_LOGFILE_SIZE,
backupCount=3,
)
except OSError as exc:
try:
print(
"Cannot create logfile. Trying to obtain temporary log file.\nMessage: %s"
% exc
)
err_output = str(exc)
temp_log_file = tempfile.gettempdir() + os.sep + __name__ + ".log"
print("Trying temporary log file in " + temp_log_file)
file_handler = RotatingFileHandler(
temp_log_file,
mode="a",
encoding="utf-8",
maxBytes=MAX_LOGFILE_SIZE,
backupCount=1,
)
file_handler.setFormatter(FORMATTER)
err_output += "\nUsing [%s]" % temp_log_file
return file_handler, err_output
except OSError as exc:
print(
"Cannot create temporary log file either. Will not log to file. Message: %s"
% exc
)
return False
else:
file_handler.setFormatter(FORMATTER)
return file_handler, err_output
def logger_get_logger(log_file=None, temp_log_file=None, debug=False):
# If a name is given to getLogger, than modules can't log to the root logger
_logger = logging.getLogger()
if debug is True:
_logger.setLevel(logging.DEBUG)
else:
_logger.setLevel(logging.INFO)
console_handler = logger_get_console_handler()
if console_handler:
_logger.addHandler(console_handler)
if log_file is not None:
file_handler, err_output = logger_get_file_handler(log_file)
if file_handler:
_logger.addHandler(file_handler)
_logger.propagate = False
if err_output is not None:
print(err_output)
_logger.warning(
"Failed to use log file [%s], %s.", log_file, err_output
)
if temp_log_file is not None:
if os.path.isfile(temp_log_file):
try:
os.remove(temp_log_file)
except OSError:
logger.warning("Cannot remove temp log file [%s]." % temp_log_file)
file_handler, err_output = logger_get_file_handler(temp_log_file)
if file_handler:
_logger.addHandler(file_handler)
_logger.propagate = False
if err_output is not None:
print(err_output)
_logger.warning(
"Failed to use log file [%s], %s.", log_file, err_output
)
return _logger
# Generic functions ########################################################
def check_for_file(file):
try:
with open(file) as file:
pass
except IOError as exc:
logger.error("File '%s' is not readable" % file)
logger.debug("Traceback:", exc_info=True)
sys.exit(2)
# Config functions #########################################################
def get_config_data(base_dir):
check_for_file(os.path.join(base_dir, ".env"))
try:
import dotenv
env_path = "{}/.env".format(base_dir)
logger.info("Attempting to load .env from '%s'", env_path)
dotenv.load_dotenv(dotenv_path=env_path, verbose=True)
if not os.getenv("NODE_ID"):
logger.critical(".env does not contain a valid NODE_ID setting.")
except ImportError as e:
logger.critical(
"Could not import .env - check that the poller user can read the file, and that composer install has been run recently"
)
config_cmd = ["/usr/bin/env", "php", "%s/config_to_json.php" % base_dir]
try:
exit_code, output = command_runner(config_cmd, timeout=300)
if exit_code == 0:
return json.loads(output)
raise EnvironmentError
except Exception as exc:
logger.critical("ERROR: Could not execute command [%s]: %s" % (config_cmd, exc))
logger.debug("Traceback:", exc_info=True)
def normalize_wait(seconds):
@@ -28,8 +182,9 @@ def normalize_wait(seconds):
def call_script(script, args=()):
"""
Run a LibreNMS script. Captures all output and throws an exception if a non-zero
status is returned. Blocks parent signals (like SIGINT and SIGTERM).
Run a LibreNMS script. Captures all output returns exit code.
Blocks parent signals (like SIGINT and SIGTERM).
Kills script if it takes too long
:param script: the name of the executable relative to the base directory
:param args: a tuple of arguments to send to the command
:returns the output of the command
@@ -42,14 +197,10 @@ def call_script(script, args=()):
base_dir = os.path.realpath(os.path.dirname(__file__) + "/..")
cmd = base + ("{}/{}".format(base_dir, script),) + tuple(map(str, args))
debug("Running {}".format(cmd))
logger.debug("Running {}".format(cmd))
# preexec_fn=os.setsid here keeps process signals from propagating (close_fds=True is default)
return subprocess.check_call(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid,
close_fds=True,
return command_runner(
cmd, preexec_fn=os.setsid, close_fds=True, timeout=DEFAULT_SCRIPT_TIMEOUT
)
@@ -70,15 +221,15 @@ class DB:
import pymysql
pymysql.install_as_MySQLdb()
info("Using pure python SQL client")
logger.info("Using pure python SQL client")
except ImportError:
info("Using other SQL client")
logger.info("Using other SQL client")
try:
import MySQLdb
except ImportError:
critical("ERROR: missing a mysql python module")
critical(
logger.critical("ERROR: missing a mysql python module")
logger.critical(
"Install either 'PyMySQL' or 'mysqlclient' from your OS software repository or from PyPI"
)
raise
@@ -99,7 +250,7 @@ class DB:
conn.ping(True)
self._db[threading.get_ident()] = conn
except Exception as e:
critical("ERROR: Could not connect to MySQL database! {}".format(e))
logger.critical("ERROR: Could not connect to MySQL database! {}".format(e))
raise
def db_conn(self):
@@ -128,7 +279,7 @@ class DB:
cursor.close()
return cursor
except Exception as e:
critical("DB Connection exception {}".format(e))
logger.critical("DB Connection exception {}".format(e))
self.close()
raise
@@ -167,7 +318,7 @@ class RecurringTimer:
class Lock:
""" Base lock class this is not thread safe"""
"""Base lock class this is not thread safe"""
def __init__(self):
self._locks = {} # store a tuple (owner, expiration)
@@ -210,7 +361,7 @@ class Lock:
return False
def print_locks(self):
debug(self._locks)
logger.debug(self._locks)
class ThreadingLock(Lock):
@@ -269,7 +420,7 @@ class RedisLock(Lock):
self._redis = redis.Redis(**kwargs)
self._redis.ping()
self._namespace = namespace
info(
logger.info(
"Created redis lock manager with socket_timeout of {}s".format(
redis_kwargs["socket_timeout"]
)
@@ -296,7 +447,7 @@ class RedisLock(Lock):
non_existing = not (allow_owner_relock and self._redis.get(key) == owner)
return self._redis.set(key, owner, ex=int(expiration), nx=non_existing)
except redis.exceptions.ResponseError as e:
exception(
logger.critical(
"Unable to obtain lock, local state: name: %s, owner: %s, expiration: %s, allow_owner_relock: %s",
name,
owner,
@@ -351,7 +502,7 @@ class RedisUniqueQueue(object):
self._redis = redis.Redis(**kwargs)
self._redis.ping()
self.key = "{}:{}".format(namespace, name)
info(
logger.info(
"Created redis queue with socket_timeout of {}s".format(
redis_kwargs["socket_timeout"]
)
@@ -371,10 +522,20 @@ class RedisUniqueQueue(object):
self._redis.zadd(self.key, {item: time()}, nx=True)
def get(self, block=True, timeout=None):
if block:
item = self._redis.bzpopmin(self.key, timeout=timeout)
else:
item = self._redis.zpopmin(self.key)
try:
if block:
item = self._redis.bzpopmin(self.key, timeout=timeout)
else:
item = self._redis.zpopmin(self.key)
# Unfortunately we cannot use _redis.exceptions.ResponseError Exception here
# Since it would trigger another exception in queuemanager
except Exception as e:
logger.critical(
"It seems like BZPOPMIN command is not supported by redis. Redis >= 5.0 required: {}".format(
e
)
)
sys.exit(1)
if item:
item = item[1]

397
LibreNMS/command_runner.py Normal file
View File

@@ -0,0 +1,397 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# This file is part of command_runner module
"""
command_runner is a quick tool to launch commands from Python, get exit code
and output, and handle most errors that may happen
Versioning semantics:
Major version: backward compatibility breaking changes
Minor version: New functionality
Patch version: Backwards compatible bug fixes
"""
__intname__ = "command_runner"
__author__ = "Orsiris de Jong"
__copyright__ = "Copyright (C) 2015-2021 Orsiris de Jong"
__licence__ = "BSD 3 Clause"
__version__ = "0.7.0"
__build__ = "2021080201"
import os
import shlex
import subprocess
import sys
from datetime import datetime
from logging import getLogger
from time import sleep
# Python 2.7 compat fixes (queue was Queue)
try:
import Queue
except ImportError:
import queue as Queue
import threading
# Python 2.7 compat fixes (missing typing and FileNotFoundError)
try:
from typing import Union, Optional, List, Tuple, NoReturn, Any
except ImportError:
pass
try:
FileNotFoundError
except NameError:
# pylint: disable=W0622 (redefined-builtin)
FileNotFoundError = IOError
try:
TimeoutExpired = subprocess.TimeoutExpired
except AttributeError:
class TimeoutExpired(BaseException):
"""
Basic redeclaration when subprocess.TimeoutExpired does not exist, python <= 3.3
"""
pass
logger = getLogger(__intname__)
PIPE = subprocess.PIPE
def thread_with_result_queue(fn):
"""
Python 2.7 compatible implementation of concurrent.futures
Executes decorated function as thread and adds a queue for result communication
"""
def wrapped_fn(queue, *args, **kwargs):
result = fn(*args, **kwargs)
queue.put(result)
def wrapper(*args, **kwargs):
queue = Queue.Queue()
child_thread = threading.Thread(
target=wrapped_fn, args=(queue,) + args, kwargs=kwargs
)
child_thread.start()
child_thread.result_queue = queue
return child_thread
return wrapper
def command_runner(
command, # type: Union[str, List[str]]
valid_exit_codes=None, # type: Optional[List[int]]
timeout=1800, # type: Optional[int]
shell=False, # type: bool
encoding=None, # type: str
stdout=None, # type: Union[int, str]
stderr=None, # type: Union[int, str]
windows_no_window=False, # type: bool
**kwargs # type: Any
):
# type: (...) -> Tuple[Optional[int], str]
"""
Unix & Windows compatible subprocess wrapper that handles output encoding and timeouts
Newer Python check_output already handles encoding and timeouts, but this one is retro-compatible
It is still recommended to set cp437 for windows and utf-8 for unix
Also allows a list of various valid exit codes (ie no error when exit code = arbitrary int)
command should be a list of strings, eg ['ping', '127.0.0.1', '-c 2']
command can also be a single string, ex 'ping 127.0.0.1 -c 2' if shell=True or if os is Windows
Accepts all of subprocess.popen arguments
Whenever we can, we need to avoid shell=True in order to preseve better security
When no stdout option is given, we'll get output into the returned tuple
When stdout = PIPE or subprocess.PIPE, output is also displayed on the fly on stdout
When stdout = filename or stderr = filename, we'll write output to the given file
Returns a tuple (exit_code, output)
"""
# Choose default encoding when none set
# cp437 encoding assures we catch most special characters from cmd.exe
if not encoding:
encoding = "cp437" if os.name == "nt" else "utf-8"
# Fix when unix command was given as single string
# This is more secure than setting shell=True
if os.name == "posix" and shell is False and isinstance(command, str):
command = shlex.split(command)
# Set default values for kwargs
errors = kwargs.pop(
"errors", "backslashreplace"
) # Don't let encoding issues make you mad
universal_newlines = kwargs.pop("universal_newlines", False)
creationflags = kwargs.pop("creationflags", 0)
# subprocess.CREATE_NO_WINDOW was added in Python 3.7 for Windows OS only
if (
windows_no_window
and sys.version_info[0] >= 3
and sys.version_info[1] >= 7
and os.name == "nt"
):
# Disable the following pylint error since the code also runs on nt platform, but
# triggers an error on Unix
# pylint: disable=E1101
creationflags = creationflags | subprocess.CREATE_NO_WINDOW
# Decide whether we write to output variable only (stdout=None), to output variable and stdout (stdout=PIPE)
# or to output variable and to file (stdout='path/to/file')
if stdout is None:
_stdout = subprocess.PIPE
stdout_to_file = False
live_output = False
elif isinstance(stdout, str):
# We will send anything to file
_stdout = open(stdout, "wb")
stdout_to_file = True
live_output = False
else:
# We will send anything to given stdout pipe
_stdout = stdout
stdout_to_file = False
live_output = True
# The only situation where we don't add stderr to stdout is if a specific target file was given
if isinstance(stderr, str):
_stderr = open(stderr, "wb")
stderr_to_file = True
else:
_stderr = subprocess.STDOUT
stderr_to_file = False
@thread_with_result_queue
def _pipe_read(pipe, read_timeout, encoding, errors):
# type: (subprocess.PIPE, Union[int, float], str, str) -> str
"""
Read pipe will read from subprocess.PIPE for at most 1 second
"""
pipe_output = ""
begin_time = datetime.now()
try:
while True:
current_output = pipe.readline()
# Compatibility for earlier Python versions where Popen has no 'encoding' nor 'errors' arguments
if isinstance(current_output, bytes):
try:
current_output = current_output.decode(encoding, errors=errors)
except TypeError:
# handle TypeError: don't know how to handle UnicodeDecodeError in error callback
current_output = current_output.decode(
encoding, errors="ignore"
)
pipe_output += current_output
if live_output:
sys.stdout.write(current_output)
if (
len(current_output) <= 0
or (datetime.now() - begin_time).total_seconds() > read_timeout
):
break
# Pipe may not have readline() anymore when process gets killed
except AttributeError:
pass
return pipe_output
def _poll_process(
process, # type: Union[subprocess.Popen[str], subprocess.Popen]
timeout, # type: int
encoding, # type: str
errors, # type: str
):
# type: (...) -> Tuple[Optional[int], str]
"""
Reads from process output pipe until:
- Timeout is reached, in which case we'll terminate the process
- Process ends by itself
Returns an encoded string of the pipe output
"""
output = ""
begin_time = datetime.now()
timeout_reached = False
while process.poll() is None:
child = _pipe_read(
process.stdout, read_timeout=1, encoding=encoding, errors=errors
)
try:
output += child.result_queue.get(timeout=1)
except Queue.Empty:
pass
except ValueError:
# What happens when str cannot be concatenated
pass
if timeout:
if (datetime.now() - begin_time).total_seconds() > timeout:
# Try to terminate nicely before killing the process
process.terminate()
# Let the process terminate itself before trying to kill it not nicely
# Under windows, terminate() and kill() are equivalent
sleep(0.5)
if process.poll() is None:
process.kill()
timeout_reached = True
# Get remaining output from process after a grace period
sleep(0.5)
exit_code = process.poll()
if timeout:
# Let's wait a second more so we may get the remaining queue content after process is to be ended
final_read_timeout = max(
1, timeout - (datetime.now() - begin_time).total_seconds(), 1
)
else:
final_read_timeout = 1
child = _pipe_read(
process.stdout,
read_timeout=final_read_timeout,
encoding=encoding,
errors=errors,
)
try:
output += child.result_queue.get(timeout=final_read_timeout)
except Queue.Empty:
pass
except ValueError:
# What happens when str cannot be concatenated
pass
if timeout_reached:
raise TimeoutExpired(process, timeout)
return exit_code, output
try:
# Python >= 3.3 has SubProcessError(TimeoutExpired) class
# Python >= 3.6 has encoding & error arguments
# universal_newlines=True makes netstat command fail under windows
# timeout does not work under Python 2.7 with subprocess32 < 3.5
# decoder may be cp437 or unicode_escape for dos commands or utf-8 for powershell
# Disabling pylint error for the same reason as above
# pylint: disable=E1123
if sys.version_info >= (3, 6):
process = subprocess.Popen(
command,
stdout=_stdout,
stderr=_stderr,
shell=shell,
universal_newlines=universal_newlines,
encoding=encoding,
errors=errors,
creationflags=creationflags,
**kwargs
)
else:
process = subprocess.Popen(
command,
stdout=_stdout,
stderr=_stderr,
shell=shell,
universal_newlines=universal_newlines,
creationflags=creationflags,
**kwargs
)
try:
exit_code, output = _poll_process(process, timeout, encoding, errors)
except KeyboardInterrupt:
exit_code = -252
output = "KeyboardInterrupted"
try:
process.kill()
except AttributeError:
pass
logger.debug(
'Command "{}" returned with exit code "{}". Command output was:'.format(
command, exit_code
)
)
except subprocess.CalledProcessError as exc:
exit_code = exc.returncode
try:
output = exc.output
except AttributeError:
output = "command_runner: Could not obtain output from command."
if exit_code in valid_exit_codes if valid_exit_codes is not None else [0]:
logger.debug(
'Command "{}" returned with exit code "{}". Command output was:'.format(
command, exit_code
)
)
logger.error(
'Command "{}" failed with exit code "{}". Command output was:'.format(
command, exc.returncode
)
)
logger.error(output)
except FileNotFoundError as exc:
logger.error('Command "{}" failed, file not found: {}'.format(command, exc))
exit_code, output = -253, exc.__str__()
# OSError can also catch FileNotFoundErrors
# pylint: disable=W0705 (duplicate-except)
except (OSError, IOError) as exc:
logger.error('Command "{}" failed because of OS: {}'.format(command, exc))
exit_code, output = -253, exc.__str__()
except TimeoutExpired:
message = 'Timeout {} seconds expired for command "{}" execution.'.format(
timeout, command
)
logger.error(message)
if stdout_to_file:
_stdout.write(message.encode(encoding, errors=errors))
exit_code, output = -254, "Timeout of {} seconds expired.".format(timeout)
# We need to be able to catch a broad exception
# pylint: disable=W0703
except Exception as exc:
logger.error(
'Command "{}" failed for unknown reasons: {}'.format(command, exc),
exc_info=True,
)
logger.debug("Error:", exc_info=True)
exit_code, output = -255, exc.__str__()
finally:
if stdout_to_file:
_stdout.close()
if stderr_to_file:
_stderr.close()
logger.debug(output)
return exit_code, output
def deferred_command(command, defer_time=300):
# type: (str, int) -> NoReturn
"""
This is basically an ugly hack to launch commands which are detached from parent process
Especially useful to launch an auto update/deletion of a running executable after a given amount of
seconds after it finished
"""
# Use ping as a standard timer in shell since it's present on virtually *any* system
if os.name == "nt":
deferrer = "ping 127.0.0.1 -n {} > NUL & ".format(defer_time)
else:
deferrer = "ping 127.0.0.1 -c {} > /dev/null && ".format(defer_time)
# We'll create a independent shell process that will not be attached to any stdio interface
# Our command shall be a single string since shell=True
subprocess.Popen(
deferrer + command,
shell=True,
stdin=None,
stdout=None,
stderr=None,
close_fds=True,
)

View File

@@ -1,174 +0,0 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
import os
import logging
import tempfile
import subprocess
import threading
import time
from logging.handlers import RotatingFileHandler
try:
import MySQLdb
except ImportError:
try:
import pymysql
pymysql.install_as_MySQLdb()
import MySQLdb
except ImportError as exc:
print("ERROR: missing the mysql python module please run:")
print("pip install -r requirements.txt")
print("ERROR: %s" % exc)
sys.exit(2)
logger = logging.getLogger(__name__)
# Logging functions ########################################################
FORMATTER = logging.Formatter("%(asctime)s :: %(levelname)s :: %(message)s")
def logger_get_console_handler():
try:
console_handler = logging.StreamHandler(sys.stdout)
except OSError as exc:
print("Cannot log to stdout, trying stderr. Message %s" % exc)
try:
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setFormatter(FORMATTER)
return console_handler
except OSError as exc:
print("Cannot log to stderr neither. Message %s" % exc)
return False
else:
console_handler.setFormatter(FORMATTER)
return console_handler
def logger_get_file_handler(log_file):
err_output = None
try:
file_handler = RotatingFileHandler(
log_file, mode="a", encoding="utf-8", maxBytes=1024000, backupCount=3
)
except OSError as exc:
try:
print(
"Cannot create logfile. Trying to obtain temporary log file.\nMessage: %s"
% exc
)
err_output = str(exc)
temp_log_file = tempfile.gettempdir() + os.sep + __name__ + ".log"
print("Trying temporary log file in " + temp_log_file)
file_handler = RotatingFileHandler(
temp_log_file,
mode="a",
encoding="utf-8",
maxBytes=1000000,
backupCount=1,
)
file_handler.setFormatter(FORMATTER)
err_output += "\nUsing [%s]" % temp_log_file
return file_handler, err_output
except OSError as exc:
print(
"Cannot create temporary log file either. Will not log to file. Message: %s"
% exc
)
return False
else:
file_handler.setFormatter(FORMATTER)
return file_handler, err_output
def logger_get_logger(log_file=None, temp_log_file=None, debug=False):
# If a name is given to getLogger, than modules can't log to the root logger
_logger = logging.getLogger()
if debug is True:
_logger.setLevel(logging.DEBUG)
else:
_logger.setLevel(logging.INFO)
console_handler = logger_get_console_handler()
if console_handler:
_logger.addHandler(console_handler)
if log_file is not None:
file_handler, err_output = logger_get_file_handler(log_file)
if file_handler:
_logger.addHandler(file_handler)
_logger.propagate = False
if err_output is not None:
print(err_output)
_logger.warning(
"Failed to use log file [%s], %s.", log_file, err_output
)
if temp_log_file is not None:
if os.path.isfile(temp_log_file):
try:
os.remove(temp_log_file)
except OSError:
logger.warning("Cannot remove temp log file [%s]." % temp_log_file)
file_handler, err_output = logger_get_file_handler(temp_log_file)
if file_handler:
_logger.addHandler(file_handler)
_logger.propagate = False
if err_output is not None:
print(err_output)
_logger.warning(
"Failed to use log file [%s], %s.", log_file, err_output
)
return _logger
# Generic functions ########################################################
def check_for_file(file):
try:
with open(file) as f:
pass
except IOError as exc:
logger.error("Oh dear... %s does not seem readable" % file)
logger.debug("ERROR:", exc_info=True)
sys.exit(2)
# Config functions #########################################################
def get_config_data(install_dir):
config_cmd = ["/usr/bin/env", "php", "%s/config_to_json.php" % install_dir]
try:
proc = subprocess.Popen(
config_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE
)
return proc.communicate()[0].decode()
except Exception as e:
print("ERROR: Could not execute: %s" % config_cmd)
print(e)
sys.exit(2)
# Database functions #######################################################
def db_open(db_socket, db_server, db_port, db_username, db_password, db_dbname):
try:
options = dict(
host=db_server,
port=int(db_port),
user=db_username,
passwd=db_password,
db=db_dbname,
)
if db_socket:
options["unix_socket"] = db_socket
return MySQLdb.connect(**options)
except Exception as dbexc:
print("ERROR: Could not connect to MySQL database!")
print("ERROR: %s" % dbexc)
sys.exit(2)

View File

@@ -1,14 +1,16 @@
import logging
import pymysql
import subprocess
import threading
import traceback
from logging import debug, info, error, critical, warning
from queue import Empty
from subprocess import CalledProcessError
import LibreNMS
logger = logging.getLogger(__name__)
class QueueManager:
def __init__(
self, config, lock_manager, type_desc, uses_groups=False, auto_start=True
@@ -39,8 +41,8 @@ class QueueManager:
self._stop_event = threading.Event()
info("Groups: {}".format(self.config.group))
info(
logger.info("Groups: {}".format(self.config.group))
logger.info(
"{} QueueManager created: {} workers, {}s frequency".format(
self.type.title(),
self.get_poller_config().workers,
@@ -52,9 +54,9 @@ class QueueManager:
self.start()
def _service_worker(self, queue_id):
debug("Worker started {}".format(threading.current_thread().getName()))
logger.debug("Worker started {}".format(threading.current_thread().getName()))
while not self._stop_event.is_set():
debug(
logger.debug(
"Worker {} checking queue {} ({}) for work".format(
threading.current_thread().getName(),
queue_id,
@@ -68,13 +70,13 @@ class QueueManager:
if (
device_id is not None
): # None returned by redis after timeout when empty
debug(
logger.debug(
"Worker {} ({}) got work {} ".format(
threading.current_thread().getName(), queue_id, device_id
)
)
with LibreNMS.TimeitContext.start() as t:
debug("Queues: {}".format(self._queues))
logger.debug("Queues: {}".format(self._queues))
target_desc = (
"{} ({})".format(device_id if device_id else "", queue_id)
if queue_id
@@ -83,7 +85,7 @@ class QueueManager:
self.do_work(device_id, queue_id)
runtime = t.delta()
info(
logger.info(
"Completed {} run for {} in {:.2f}s".format(
self.type, target_desc, runtime
)
@@ -92,13 +94,13 @@ class QueueManager:
except Empty:
pass # ignore empty queue exception from subprocess.Queue
except CalledProcessError as e:
error(
logger.error(
"{} poller script error! {} returned {}: {}".format(
self.type.title(), e.cmd, e.returncode, e.output
)
)
except Exception as e:
error("{} poller exception! {}".format(self.type.title(), e))
logger.error("{} poller exception! {}".format(self.type.title(), e))
traceback.print_exc()
def post_work(self, payload, queue_id):
@@ -108,7 +110,7 @@ class QueueManager:
:param queue_id: which queue to post to, 0 is the default
"""
self.get_queue(queue_id).put(payload)
debug(
logger.debug(
"Posted work for {} to {}:{} queue size: {}".format(
payload, self.type, queue_id, self.get_queue(queue_id).qsize()
)
@@ -131,7 +133,7 @@ class QueueManager:
thread_name = "{}_{}-{}".format(self.type.title(), group, i + 1)
self.spawn_worker(thread_name, group)
debug(
logger.debug(
"Started {} {} threads for group {}".format(
group_workers, self.type, group
)
@@ -196,7 +198,7 @@ class QueueManager:
:param group:
:return:
"""
info("Creating queue {}".format(self.queue_name(queue_type, group)))
logger.info("Creating queue {}".format(self.queue_name(queue_type, group)))
try:
return LibreNMS.RedisUniqueQueue(
self.queue_name(queue_type, group),
@@ -213,15 +215,19 @@ class QueueManager:
except ImportError:
if self.config.distributed:
critical("ERROR: Redis connection required for distributed polling")
critical(
logger.critical(
"ERROR: Redis connection required for distributed polling"
)
logger.critical(
"Please install redis-py, either through your os software repository or from PyPI"
)
exit(2)
except Exception as e:
if self.config.distributed:
critical("ERROR: Redis connection required for distributed polling")
critical("Could not connect to Redis. {}".format(e))
logger.critical(
"ERROR: Redis connection required for distributed polling"
)
logger.critical("Could not connect to Redis. {}".format(e))
exit(2)
return LibreNMS.UniqueQueue()
@@ -341,11 +347,19 @@ class BillingQueueManager(TimedQueueManager):
def do_work(self, run_type, group):
if run_type == "poll":
info("Polling billing")
LibreNMS.call_script("poll-billing.php")
logger.info("Polling billing")
exit_code, output = LibreNMS.call_script("poll-billing.php")
if exit_code != 0:
logger.warning(
"Error {} in Polling billing:\n{}".format(exit_code, output)
)
else: # run_type == 'calculate'
info("Calculating billing")
LibreNMS.call_script("billing-calculate.php")
logger.info("Calculating billing")
exit_code, output = LibreNMS.call_script("billing-calculate.php")
if exit_code != 0:
logger.warning(
"Error {} in Calculating billing:\n{}".format(exit_code, output)
)
class PingQueueManager(TimedQueueManager):
@@ -365,13 +379,19 @@ class PingQueueManager(TimedQueueManager):
for group in groups:
self.post_work("", group[0])
except pymysql.err.Error as e:
critical("DB Exception ({})".format(e))
logger.critical("DB Exception ({})".format(e))
def do_work(self, context, group):
if self.lock(group, "group", timeout=self.config.ping.frequency):
try:
info("Running fast ping")
LibreNMS.call_script("ping.php", ("-g", group))
logger.info("Running fast ping")
exit_code, output = LibreNMS.call_script("ping.php", ("-g", group))
if exit_code != 0:
logger.warning(
"Running fast ping for {} failed with error code {}: {}".format(
group, exit_code, output
)
)
finally:
self.unlock(group, "group")
@@ -396,16 +416,19 @@ class ServicesQueueManager(TimedQueueManager):
for device in devices:
self.post_work(device[0], device[1])
except pymysql.err.Error as e:
critical("DB Exception ({})".format(e))
logger.critical("DB Exception ({})".format(e))
def do_work(self, device_id, group):
if self.lock(device_id, timeout=self.config.services.frequency):
try:
info("Checking services on device {}".format(device_id))
LibreNMS.call_script("check-services.php", ("-h", device_id))
except subprocess.CalledProcessError as e:
if e.returncode == 5:
info(
logger.info("Checking services on device {}".format(device_id))
exit_code, output = LibreNMS.call_script(
"check-services.php", ("-h", device_id)
)
if exit_code == 0:
self.unlock(device_id)
else:
if exit_code == 5:
logger.info(
"Device {} is down, cannot poll service, waiting {}s for retry".format(
device_id, self.config.down_retry
)
@@ -413,8 +436,12 @@ class ServicesQueueManager(TimedQueueManager):
self.lock(
device_id, allow_relock=True, timeout=self.config.down_retry
)
else:
self.unlock(device_id)
else:
logger.warning(
"Unknown error while checking services on device {} with exit code {}: {}".format(
device_id, exit_code, output
)
)
class AlertQueueManager(TimedQueueManager):
@@ -432,14 +459,13 @@ class AlertQueueManager(TimedQueueManager):
self.post_work("alerts", 0)
def do_work(self, device_id, group):
try:
info("Checking alerts")
LibreNMS.call_script("alerts.php")
except subprocess.CalledProcessError as e:
if e.returncode == 1:
warning("There was an error issuing alerts: {}".format(e.output))
logger.info("Checking alerts")
exit_code, output = LibreNMS.call_script("alerts.php")
if exit_code != 0:
if exit_code == 1:
logger.warning("There was an error issuing alerts: {}".format(output))
else:
raise
raise CalledProcessError
class PollerQueueManager(QueueManager):
@@ -454,13 +480,14 @@ class PollerQueueManager(QueueManager):
def do_work(self, device_id, group):
if self.lock(device_id, timeout=self.config.poller.frequency):
info("Polling device {}".format(device_id))
logger.info("Polling device {}".format(device_id))
try:
LibreNMS.call_script("poller.php", ("-h", device_id))
except subprocess.CalledProcessError as e:
if e.returncode == 6:
warning(
exit_code, output = LibreNMS.call_script("poller.php", ("-h", device_id))
if exit_code == 0:
self.unlock(device_id)
else:
if exit_code == 6:
logger.warning(
"Polling device {} unreachable, waiting {}s for retry".format(
device_id, self.config.down_retry
)
@@ -470,12 +497,14 @@ class PollerQueueManager(QueueManager):
device_id, allow_relock=True, timeout=self.config.down_retry
)
else:
error("Polling device {} failed! {}".format(device_id, e))
logger.error(
"Polling device {} failed with exit code {}: {}".format(
device_id, exit_code, output
)
)
self.unlock(device_id)
else:
self.unlock(device_id)
else:
debug("Tried to poll {}, but it is locked".format(device_id))
logger.debug("Tried to poll {}, but it is locked".format(device_id))
class DiscoveryQueueManager(TimedQueueManager):
@@ -497,18 +526,19 @@ class DiscoveryQueueManager(TimedQueueManager):
for device in devices:
self.post_work(device[0], device[1])
except pymysql.err.Error as e:
critical("DB Exception ({})".format(e))
logger.critical("DB Exception ({})".format(e))
def do_work(self, device_id, group):
if self.lock(
device_id, timeout=LibreNMS.normalize_wait(self.config.discovery.frequency)
):
try:
info("Discovering device {}".format(device_id))
LibreNMS.call_script("discovery.php", ("-h", device_id))
except subprocess.CalledProcessError as e:
if e.returncode == 5:
info(
logger.info("Discovering device {}".format(device_id))
exit_code, output = LibreNMS.call_script("discovery.php", ("-h", device_id))
if exit_code == 0:
self.unlock(device_id)
else:
if exit_code == 5:
logger.info(
"Device {} is down, cannot discover, waiting {}s for retry".format(
device_id, self.config.down_retry
)
@@ -517,6 +547,9 @@ class DiscoveryQueueManager(TimedQueueManager):
device_id, allow_relock=True, timeout=self.config.down_retry
)
else:
logger.error(
"Discovering device {} failed with exit code {}: {}".format(
device_id, exit_code, output
)
)
self.unlock(device_id)
else:
self.unlock(device_id)

View File

@@ -1,10 +1,7 @@
import LibreNMS
import json
import logging
import os
import pymysql
import subprocess
import threading
import sys
import time
@@ -16,7 +13,6 @@ except ImportError:
from datetime import timedelta
from datetime import datetime
from logging import debug, info, warning, error, critical, exception
from platform import python_version
from time import sleep
from socket import gethostname
@@ -28,6 +24,8 @@ try:
except ImportError:
pass
logger = logging.getLogger(__name__)
class ServiceConfig:
def __init__(self):
@@ -99,7 +97,7 @@ class ServiceConfig:
watchdog_logfile = "logs/librenms.log"
def populate(self):
config = self._get_config_data()
config = LibreNMS.get_config_data(self.BASE_DIR)
# populate config variables
self.node_id = os.getenv("NODE_ID")
@@ -232,7 +230,7 @@ class ServiceConfig:
try:
logging.getLogger().setLevel(self.log_level)
except ValueError:
error(
logger.error(
"Unknown log level {}, must be one of 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'".format(
self.log_level
)
@@ -301,39 +299,7 @@ class ServiceConfig:
if settings["watchdog_log"] is not None:
self.watchdog_logfile = settings["watchdog_log"]
except pymysql.err.Error:
warning("Unable to load poller (%s) config", self.node_id)
def _get_config_data(self):
try:
import dotenv
env_path = "{}/.env".format(self.BASE_DIR)
info("Attempting to load .env from '%s'", env_path)
dotenv.load_dotenv(dotenv_path=env_path, verbose=True)
if not os.getenv("NODE_ID"):
raise ImportError(".env does not contain a valid NODE_ID setting.")
except ImportError as e:
exception(
"Could not import .env - check that the poller user can read the file, and that composer install has been run recently"
)
sys.exit(3)
config_cmd = [
"/usr/bin/env",
"php",
"{}/config_to_json.php".format(self.BASE_DIR),
"2>&1",
]
try:
return json.loads(subprocess.check_output(config_cmd).decode())
except subprocess.CalledProcessError as e:
error(
"ERROR: Could not load or parse configuration! {}: {}".format(
subprocess.list2cmdline(e.cmd), e.output.decode()
)
)
logger.warning("Unable to load poller (%s) config", self.node_id)
@staticmethod
def parse_group(g):
@@ -347,7 +313,7 @@ class ServiceConfig:
except ValueError:
pass
error("Could not parse group string, defaulting to 0")
logger.error("Could not parse group string, defaulting to 0")
return [0]
@@ -382,7 +348,7 @@ class Service:
self.config.poller.frequency, self.log_performance_stats, "performance"
)
if self.config.watchdog_enabled:
info(
logger.info(
"Starting watchdog timer for log file: {}".format(
self.config.watchdog_logfile
)
@@ -391,7 +357,7 @@ class Service:
self.config.poller.frequency, self.logfile_watchdog, "watchdog"
)
else:
info("Watchdog is disabled.")
logger.info("Watchdog is disabled.")
self.systemd_watchdog_timer = LibreNMS.RecurringTimer(
10, self.systemd_watchdog, "systemd-watchdog"
)
@@ -401,14 +367,16 @@ class Service:
return time.time() - self.start_time
def attach_signals(self):
info("Attaching signal handlers on thread %s", threading.current_thread().name)
logger.info(
"Attaching signal handlers on thread %s", threading.current_thread().name
)
signal(SIGTERM, self.terminate) # capture sigterm and exit gracefully
signal(SIGQUIT, self.terminate) # capture sigquit and exit gracefully
signal(SIGINT, self.terminate) # capture sigint and exit gracefully
signal(SIGHUP, self.reload) # capture sighup and restart gracefully
if "psutil" not in sys.modules:
warning("psutil is not available, polling gap possible")
logger.warning("psutil is not available, polling gap possible")
else:
signal(SIGCHLD, self.reap) # capture sigchld and reap the process
@@ -427,7 +395,7 @@ class Service:
if status == psutil.STATUS_ZOMBIE:
pid = p.pid
r = os.waitpid(p.pid, os.WNOHANG)
warning(
logger.warning(
'Reaped long running job "%s" in state %s with PID %d - job returned %d',
cmd,
status,
@@ -439,7 +407,7 @@ class Service:
continue
def start(self):
debug("Performing startup checks...")
logger.debug("Performing startup checks...")
if self.config.single_instance:
self.check_single_instance() # don't allow more than one service at a time
@@ -448,7 +416,7 @@ class Service:
raise RuntimeWarning("Not allowed to start Poller twice")
self._started = True
debug("Starting up queue managers...")
logger.debug("Starting up queue managers...")
# initialize and start the worker pools
self.poller_manager = LibreNMS.PollerQueueManager(self.config, self._lm)
@@ -478,8 +446,8 @@ class Service:
if self.config.watchdog_enabled:
self.watchdog_timer.start()
info("LibreNMS Service: {} started!".format(self.config.unique_name))
info(
logger.info("LibreNMS Service: {} started!".format(self.config.unique_name))
logger.info(
"Poller group {}. Using Python {} and {} locks and queues".format(
"0 (default)" if self.config.group == [0] else self.config.group,
python_version(),
@@ -487,19 +455,19 @@ class Service:
)
)
if self.config.update_enabled:
info(
logger.info(
"Maintenance tasks will be run every {}".format(
timedelta(seconds=self.config.update_frequency)
)
)
else:
warning("Maintenance tasks are disabled.")
logger.warning("Maintenance tasks are disabled.")
# Main dispatcher loop
try:
while not self.terminate_flag:
if self.reload_flag:
info("Picked up reload flag, calling the reload process")
logger.info("Picked up reload flag, calling the reload process")
self.restart()
if self.reap_flag:
@@ -509,7 +477,9 @@ class Service:
master_lock = self._acquire_master()
if master_lock:
if not self.is_master:
info("{} is now the master dispatcher".format(self.config.name))
logger.info(
"{} is now the master dispatcher".format(self.config.name)
)
self.is_master = True
self.start_dispatch_timers()
@@ -525,7 +495,7 @@ class Service:
self.dispatch_immediate_discovery(device_id, group)
else:
if self.is_master:
info(
logger.info(
"{} is no longer the master dispatcher".format(
self.config.name
)
@@ -536,7 +506,7 @@ class Service:
except KeyboardInterrupt:
pass
info("Dispatch loop terminated")
logger.info("Dispatch loop terminated")
self.shutdown()
def _acquire_master(self):
@@ -565,7 +535,7 @@ class Service:
if elapsed > (
self.config.poller.frequency - self.config.master_resolution
):
debug(
logger.debug(
"Dispatching polling for device {}, time since last poll {:.2f}s".format(
device_id, elapsed
)
@@ -602,7 +572,7 @@ class Service:
except pymysql.err.Error:
self.db_failures += 1
if self.db_failures > self.config.max_db_failures:
warning(
logger.warning(
"Too many DB failures ({}), attempting to release master".format(
self.db_failures
)
@@ -622,23 +592,22 @@ class Service:
wait = 5
max_runtime = 86100
max_tries = int(max_runtime / wait)
info("Waiting for schema lock")
logger.info("Waiting for schema lock")
while not self._lm.lock("schema-update", self.config.unique_name, max_runtime):
attempt += 1
if attempt >= max_tries: # don't get stuck indefinitely
warning("Reached max wait for other pollers to update, updating now")
logger.warning(
"Reached max wait for other pollers to update, updating now"
)
break
sleep(wait)
info("Running maintenance tasks")
try:
output = LibreNMS.call_script("daily.sh")
info("Maintenance tasks complete\n{}".format(output))
except subprocess.CalledProcessError as e:
error(
"Error in daily.sh:\n"
+ (e.output.decode() if e.output is not None else "No output")
)
logger.info("Running maintenance tasks")
exit_code, output = LibreNMS.call_script("daily.sh")
if exit_code == 0:
logger.info("Maintenance tasks complete\n{}".format(output))
else:
logger.error("Error {} in daily.sh:\n{}".format(exit_code, output))
self._lm.unlock("schema-update", self.config.unique_name)
@@ -665,15 +634,19 @@ class Service:
)
except ImportError:
if self.config.distributed:
critical("ERROR: Redis connection required for distributed polling")
critical(
logger.critical(
"ERROR: Redis connection required for distributed polling"
)
logger.critical(
"Please install redis-py, either through your os software repository or from PyPI"
)
self.exit(2)
except Exception as e:
if self.config.distributed:
critical("ERROR: Redis connection required for distributed polling")
critical("Could not connect to Redis. {}".format(e))
logger.critical(
"ERROR: Redis connection required for distributed polling"
)
logger.critical("Could not connect to Redis. {}".format(e))
self.exit(2)
return LibreNMS.ThreadingLock()
@@ -684,14 +657,16 @@ class Service:
Has the effect of reloading the python files from disk.
"""
if sys.version_info < (3, 4, 0):
warning("Skipping restart as running under an incompatible interpreter")
warning("Please restart manually")
logger.warning(
"Skipping restart as running under an incompatible interpreter"
)
logger.warning("Please restart manually")
return
info("Restarting service... ")
logger.info("Restarting service... ")
if "psutil" not in sys.modules:
warning("psutil is not available, polling gap possible")
logger.warning("psutil is not available, polling gap possible")
self._stop_managers_and_wait()
else:
self._stop_managers()
@@ -715,7 +690,9 @@ class Service:
:param signalnum: UNIX signal number
:param flag: Flags accompanying signal
"""
info("Received signal on thread %s, handling", threading.current_thread().name)
logger.info(
"Received signal on thread %s, handling", threading.current_thread().name
)
self.reload_flag = True
def terminate(self, signalnum=None, flag=None):
@@ -724,7 +701,9 @@ class Service:
:param signalnum: UNIX signal number
:param flag: Flags accompanying signal
"""
info("Received signal on thread %s, handling", threading.current_thread().name)
logger.info(
"Received signal on thread %s, handling", threading.current_thread().name
)
self.terminate_flag = True
def shutdown(self, signalnum=None, flag=None):
@@ -733,7 +712,7 @@ class Service:
:param signalnum: UNIX signal number
:param flag: Flags accompanying signal
"""
info("Shutting down, waiting for running jobs to complete...")
logger.info("Shutting down, waiting for running jobs to complete...")
self.stop_dispatch_timers()
self._release_master()
@@ -747,7 +726,9 @@ class Service:
self._stop_managers_and_wait()
# try to release master lock
info("Shutdown of %s/%s complete", os.getpid(), threading.current_thread().name)
logger.info(
"Shutdown of %s/%s complete", os.getpid(), threading.current_thread().name
)
self.exit(0)
def start_dispatch_timers(self):
@@ -802,11 +783,11 @@ class Service:
try:
fcntl.lockf(self._fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
warning("Another instance is already running, quitting.")
logger.warning("Another instance is already running, quitting.")
self.exit(2)
def log_performance_stats(self):
info("Counting up time spent polling")
logger.info("Counting up time spent polling")
try:
# Report on the poller instance as a whole
@@ -851,8 +832,9 @@ class Service:
)
)
except pymysql.err.Error:
exception(
"Unable to log performance statistics - is the database still online?"
logger.critical(
"Unable to log performance statistics - is the database still online?",
exc_info=True,
)
def systemd_watchdog(self):
@@ -867,18 +849,19 @@ class Service:
self.config.watchdog_logfile
)
except FileNotFoundError as e:
error("Log file not found! {}".format(e))
logger.error("Log file not found! {}".format(e))
return
if logfile_mdiff > self.config.poller.frequency:
critical(
logger.critical(
"BARK! Log file older than {}s, restarting service!".format(
self.config.poller.frequency
)
),
exc_info=True,
)
self.restart()
else:
info("Log file updated {}s ago".format(int(logfile_mdiff)))
logger.info("Log file updated {}s ago".format(int(logfile_mdiff)))
def exit(self, code=0):
sys.stdout.flush()

627
LibreNMS/wrapper.py Normal file
View File

@@ -0,0 +1,627 @@
#! /usr/bin/env python3
"""
wrapper A small tool which wraps services, discovery and poller php scripts
in order to run them as threads with Queue and workers
Authors: Orsiris de Jong <contact@netpower.fr>
Neil Lathwood <neil@librenms.org>
Job Snijders <job.snijders@atrato.com>
Distributed poller code (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org>
All code parts that belong to Daniel are enclosed in EOC comments
Date: Jul 2021
Usage: This program accepts three command line arguments
- the number of threads (defaults to 1 for discovery / service, and 16 for poller)
- the wrapper type (service, discovery or poller)
- optional debug boolean
Ubuntu Linux: apt-get install python-mysqldb
FreeBSD: cd /usr/ports/*/py-MySQLdb && make install clean
RHEL 7: yum install MySQL-python
RHEL 8: dnf install mariadb-connector-c-devel gcc && python -m pip install mysqlclient
Tested on: Python 3.6.8 / PHP 7.2.11 / CentOS 8 / AlmaLinux 8.4
License: This program is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
Public License for more details.
You should have received a copy of the GNU General Public License along
with this program. If not, see https://www.gnu.org/licenses/.
LICENSE.txt contains a copy of the full GPLv3 licensing conditions.
"""
import logging
import os
import queue
import sys
import threading
import time
import uuid
from argparse import ArgumentParser
import LibreNMS
from LibreNMS.command_runner import command_runner
logger = logging.getLogger(__name__)
PER_DEVICE_TIMEOUT = (
900 # Timeout in seconds for any poller / service / discovery action per device
)
DISTRIBUTED_POLLING = False # Is overriden by config.php
REAL_DURATION = 0
DISCOVERED_DEVICES_COUNT = 0
PER_DEVICE_DURATION = {}
MEMC = None
IS_NODE = None
STEPPING = None
MASTER_TAG = None
NODES_TAG = None
TIME_TAG = ""
"""
Per wrapper type configuration
All time related variables are in seconds
"""
wrappers = {
"service": {
"executable": "check-services.php",
"table_name": "services",
"memc_touch_time": 10,
"stepping": 300,
"nodes_stepping": 300,
"total_exec_time": 300,
},
"discovery": {
"executable": "discovery.php",
"table_name": "devices",
"memc_touch_time": 30,
"stepping": 300,
"nodes_stepping": 3600,
"total_exec_time": 21600,
},
"poller": {
"executable": "poller.php",
"table_name": "devices",
"memc_touch_time": 10,
"stepping": 300,
"nodes_stepping": 300,
"total_exec_time": 300,
},
}
"""
Threading helper functions
"""
# <<<EOC
def memc_alive(name): # Type: str
"""
Checks if memcache is working by injecting a random string and trying to read it again
"""
try:
key = str(uuid.uuid4())
MEMC.set(name + ".ping." + key, key, 60)
if MEMC.get(name + ".ping." + key) == key:
MEMC.delete(name + ".ping." + key)
return True
return False
except:
return False
def memc_touch(key, _time): # Type: str # Type: int
"""
Updates a memcache key wait time
"""
try:
val = MEMC.get(key)
MEMC.set(key, val, _time)
except:
pass
def get_time_tag(step): # Type: int
"""
Get current time tag as timestamp module stepping
"""
timestamp = int(time.time())
return timestamp - timestamp % step
# EOC
def print_worker(print_queue, wrapper_type): # Type: Queue # Type: str
"""
A seperate queue and a single worker for printing information to the screen prevents
the good old joke:
Some people, when confronted with a problem, think,
"I know, I'll use threads," and then they have two problems.
"""
nodeso = 0
while True:
# <<<EOC
global IS_NODE
global DISTRIBUTED_POLLING
if DISTRIBUTED_POLLING:
if not IS_NODE:
memc_touch(MASTER_TAG, wrappers[wrapper_type]["memc_touch_time"])
nodes = MEMC.get(NODES_TAG)
if nodes is None and not memc_alive(wrapper_type):
logger.warning(
"Lost Memcached. Taking over all devices. Nodes will quit shortly."
)
DISTRIBUTED_POLLING = False
nodes = nodeso
if nodes is not nodeso:
logger.info("{} Node(s) Total".format(nodes))
nodeso = nodes
else:
memc_touch(NODES_TAG, wrappers[wrapper_type]["memc_touch_time"])
try:
worker_id, device_id, elapsed_time = print_queue.get(False)
except:
pass
try:
time.sleep(1)
except:
pass
continue
else:
worker_id, device_id, elapsed_time = print_queue.get()
# EOC
global REAL_DURATION
global PER_DEVICE_DURATION
global DISCOVERED_DEVICES_COUNT
REAL_DURATION += elapsed_time
PER_DEVICE_DURATION[device_id] = elapsed_time
DISCOVERED_DEVICES_COUNT += 1
if elapsed_time < STEPPING:
logger.info(
"worker {} finished device {} in {} seconds".format(
worker_id, device_id, elapsed_time
)
)
else:
logger.warning(
"worker {} finished device {} in {} seconds".format(
worker_id, device_id, elapsed_time
)
% (worker_id, device_id, elapsed_time)
)
print_queue.task_done()
def poll_worker(
poll_queue, # Type: Queue
print_queue, # Type: Queue
config, # Type: dict
log_dir, # Type: str
wrapper_type, # Type: str
debug, # Type: bool
):
"""
This function will fork off single instances of the php process, record
how long it takes, and push the resulting reports to the printer queue
"""
while True:
device_id = poll_queue.get()
# <<<EOC
if (
not DISTRIBUTED_POLLING
or MEMC.get("{}.device.{}{}".format(wrapper_type, device_id, TIME_TAG))
is None
):
if DISTRIBUTED_POLLING:
result = MEMC.add(
"{}.device.{}{}".format(wrapper_type, device_id, TIME_TAG),
config["distributed_poller_name"],
STEPPING,
)
if not result:
logger.info(
"The device {} appears to be being checked by another node".format(
device_id
)
)
poll_queue.task_done()
continue
if not memc_alive(wrapper_type) and IS_NODE:
logger.warning(
"Lost Memcached, Not checking Device {} as Node. Master will check it.".format(
device_id
)
)
poll_queue.task_done()
continue
# EOC
try:
start_time = time.time()
device_log = os.path.join(
log_dir, "{}_device_{}.log".format(wrapper_type, device_id)
)
command = "/usr/bin/env php {} -h {}".format(
wrappers[wrapper_type]["executable"], device_id
)
if debug:
command = command + " -d"
exit_code, output = command_runner(
command, shell=True, timeout=PER_DEVICE_TIMEOUT
)
logger.debug(output)
if debug:
with open(device_log, "w", encoding="utf-8") as dev_log_file:
dev_log_file.write(output)
elapsed_time = int(time.time() - start_time)
print_queue.put(
[threading.current_thread().name, device_id, elapsed_time]
)
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
logger.error("Unknown problem happened: ")
logger.error("Traceback:", exc_info=True)
poll_queue.task_done()
class DBConfig:
"""
Bare minimal config class for LibreNMS.service.DB class usage
"""
def __init__(self, _config):
self.db_socket = _config["db_socket"]
self.db_host = _config["db_host"]
self.db_port = int(_config["db_port"])
self.db_user = _config["db_user"]
self.db_pass = _config["db_pass"]
self.db_name = _config["db_name"]
def wrapper(
wrapper_type, # Type: str
amount_of_workers, # Type: int
config, # Type: dict
log_dir, # Type: str
_debug=False, # Type: bool
): # -> None
"""
Actual code that runs various php scripts, in single node mode or distributed poller mode
"""
global MEMC
global IS_NODE
global DISTRIBUTED_POLLING
global MASTER_TAG
global NODES_TAG
global TIME_TAG
global STEPPING
# Setup wrapper dependent variables
STEPPING = wrappers[wrapper_type]["stepping"]
if wrapper_type == "poller":
if "rrd" in config and "step" in config["rrd"]:
STEPPING = config["rrd"]["step"]
TIME_TAG = "." + str(get_time_tag(STEPPING))
MASTER_TAG = "{}.master{}".format(wrapper_type, TIME_TAG)
NODES_TAG = "{}.nodes{}".format(wrapper_type, TIME_TAG)
# <<<EOC
if "distributed_poller_group" in config:
poller_group = str(config["distributed_poller_group"])
else:
poller_group = False
if (
"distributed_poller" in config
and "distributed_poller_memcached_host" in config
and "distributed_poller_memcached_port" in config
and config["distributed_poller"]
):
try:
import memcache
MEMC = memcache.Client(
[
config["distributed_poller_memcached_host"]
+ ":"
+ str(config["distributed_poller_memcached_port"])
]
)
if str(MEMC.get(MASTER_TAG)) == config["distributed_poller_name"]:
logger.info("This system is already joined as the service master.")
sys.exit(2)
if memc_alive(wrapper_type):
if MEMC.get(MASTER_TAG) is None:
logger.info("Registered as Master")
MEMC.set(MASTER_TAG, config["distributed_poller_name"], 10)
MEMC.set(NODES_TAG, 0, wrappers[wrapper_type]["nodes_stepping"])
IS_NODE = False
else:
logger.info(
"Registered as Node joining Master {}".format(
MEMC.get(MASTER_TAG)
)
)
IS_NODE = True
MEMC.incr(NODES_TAG)
DISTRIBUTED_POLLING = True
else:
logger.warning(
"Could not connect to memcached, disabling distributed service checks."
)
DISTRIBUTED_POLLING = False
IS_NODE = False
except SystemExit:
raise
except ImportError:
logger.critical("ERROR: missing memcache python module:")
logger.critical("On deb systems: apt-get install python3-memcache")
logger.critical("On other systems: pip3 install python-memcached")
logger.critical("Disabling distributed discovery.")
DISTRIBUTED_POLLING = False
else:
DISTRIBUTED_POLLING = False
# EOC
s_time = time.time()
devices_list = []
if wrapper_type == "service":
# <<<EOC
if poller_group is not False:
query = (
"SELECT DISTINCT(services.device_id) FROM services LEFT JOIN devices ON "
"services.device_id = devices.device_id WHERE devices.poller_group IN({}) AND "
"devices.disabled = 0".format(poller_group)
)
else:
query = (
"SELECT DISTINCT(services.device_id) FROM services LEFT JOIN devices ON "
"services.device_id = devices.device_id WHERE devices.disabled = 0"
)
# EOC
elif wrapper_type in ["discovery", "poller"]:
"""
This query specificly orders the results depending on the last_discovered_timetaken variable
Because this way, we put the devices likely to be slow, in the top of the queue
thus greatening our chances of completing _all_ the work in exactly the time it takes to
discover the slowest device! cool stuff he
"""
# <<<EOC
if poller_group is not False:
query = (
"SELECT device_id FROM devices WHERE poller_group IN ({}) AND "
"disabled = 0 ORDER BY last_polled_timetaken DESC".format(poller_group)
)
else:
query = "SELECT device_id FROM devices WHERE disabled = 0 ORDER BY last_polled_timetaken DESC"
# EOC
else:
logger.critical("Bogus wrapper type called")
sys.exit(3)
sconfig = DBConfig(config)
db_connection = LibreNMS.DB(sconfig)
cursor = db_connection.query(query)
devices = cursor.fetchall()
for row in devices:
devices_list.append(int(row[0]))
# <<<EOC
if DISTRIBUTED_POLLING and not IS_NODE:
query = "SELECT max(device_id),min(device_id) FROM {}".format(
wrappers[wrapper_type]["table_name"]
)
cursor = db_connection.query(query)
devices = cursor.fetchall()
maxlocks = devices[0][0] or 0
minlocks = devices[0][1] or 0
# EOC
poll_queue = queue.Queue()
print_queue = queue.Queue()
logger.info(
"starting the {} check at {} with {} threads for {} devices".format(
wrapper_type,
time.strftime("%Y-%m-%d %H:%M:%S"),
amount_of_workers,
len(devices_list),
)
)
for device_id in devices_list:
poll_queue.put(device_id)
for _ in range(amount_of_workers):
worker = threading.Thread(
target=poll_worker,
kwargs={
"poll_queue": poll_queue,
"print_queue": print_queue,
"config": config,
"log_dir": log_dir,
"wrapper_type": wrapper_type,
"debug": _debug,
},
)
worker.setDaemon(True)
worker.start()
pworker = threading.Thread(
target=print_worker,
kwargs={"print_queue": print_queue, "wrapper_type": wrapper_type},
)
pworker.setDaemon(True)
pworker.start()
try:
poll_queue.join()
print_queue.join()
except (KeyboardInterrupt, SystemExit):
raise
total_time = int(time.time() - s_time)
logger.info(
"{}-wrapper checked {} devices in {} seconds with {} workers".format(
wrapper_type, DISCOVERED_DEVICES_COUNT, total_time, amount_of_workers
)
)
# <<<EOC
if DISTRIBUTED_POLLING or memc_alive(wrapper_type):
master = MEMC.get(MASTER_TAG)
if master == config["distributed_poller_name"] and not IS_NODE:
logger.info("Wait for all service-nodes to finish")
nodes = MEMC.get(NODES_TAG)
while nodes is not None and nodes > 0:
try:
time.sleep(1)
nodes = MEMC.get(NODES_TAG)
except:
pass
logger.info("Clearing Locks for {}".format(NODES_TAG))
x = minlocks
while x <= maxlocks:
MEMC.delete("{}.device.{}".format(wrapper_type, x))
x = x + 1
logger.info("{} Locks Cleared".format(x))
logger.info("Clearing Nodes")
MEMC.delete(MASTER_TAG)
MEMC.delete(NODES_TAG)
else:
MEMC.decr(NODES_TAG)
logger.info("Finished {}.".format(time.strftime("%Y-%m-%d %H:%M:%S")))
# EOC
# Update poller statistics
if wrapper_type == "poller":
query = "UPDATE pollers SET last_polled=NOW(), devices='{}', time_taken='{}' WHERE poller_name='{}'".format(
DISCOVERED_DEVICES_COUNT, total_time, config["distributed_poller_name"]
)
cursor = db_connection.query(query)
if cursor.rowcount < 1:
query = "INSERT INTO pollers SET poller_name='{}', last_polled=NOW(), devices='{}', time_taken='{}'".format(
config["distributed_poller_name"], DISCOVERED_DEVICES_COUNT, total_time
)
db_connection.query(query)
db_connection.close()
if total_time > wrappers[wrapper_type]["total_exec_time"]:
logger.warning(
"the process took more than {} seconds to finish, you need faster hardware or more threads".format(
wrappers[wrapper_type]["total_exec_time"]
)
)
logger.warning(
"in sequential style service checks the elapsed time would have been: {} seconds".format(
REAL_DURATION
)
)
show_stopper = False
for device in PER_DEVICE_DURATION:
if PER_DEVICE_DURATION[device] > wrappers[wrapper_type]["nodes_stepping"]:
logger.warning(
"device {} is taking too long: {} seconds".format(
device, PER_DEVICE_DURATION[device]
)
)
show_stopper = True
if show_stopper:
logger.error(
"Some devices are taking more than {} seconds, the script cannot recommend you what to do.".format(
wrappers[wrapper_type]["nodes_stepping"]
)
)
else:
recommend = int(total_time / STEPPING * amount_of_workers + 1)
logger.warning(
"Consider setting a minimum of {} threads. (This does not constitute professional advice!)".format(
recommend
)
)
sys.exit(2)
if __name__ == "__main__":
parser = ArgumentParser(
prog="wrapper.py",
usage="usage: %(prog)s [options] <wrapper_type> <workers>\n"
"wrapper_type = 'service', 'poller' or 'disccovery'"
"workers defaults to 1 for service and discovery, and 16 for poller "
"(Do not set too high, or you will get an OOM)",
description="Spawn multiple librenms php processes in parallel.",
)
parser.add_argument(
"-d",
"--debug",
action="store_true",
default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.",
)
parser.add_argument(
dest="wrapper",
default=None,
help="Execute wrapper for 'service', 'poller' or 'discovery'",
)
parser.add_argument(
dest="threads", action="store_true", default=None, help="Number of workers"
)
args = parser.parse_args()
debug = args.debug
wrapper_type = args.wrapper
amount_of_workers = args.threads
if wrapper_type not in ["service", "discovery", "poller"]:
parser.error("Invalid wrapper type '{}'".format(wrapper_type))
sys.exit(4)
config = LibreNMS.get_config_data(
os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
)
log_dir = config["log_dir"]
log_file = os.path.join(log_dir, wrapper_type + ".log")
logger = LibreNMS.logger_get_logger(log_file, debug=debug)
try:
amount_of_workers = int(amount_of_workers)
except (IndexError, ValueError, TypeError):
amount_of_workers = (
16 if wrapper_type == "poller" else 1
) # Defaults to 1 for service/discovery, 16 for poller
logger.warning(
"Bogus number of workers given. Using default number ({}) of workers.".format(
amount_of_workers
)
)
wrapper(wrapper_type, amount_of_workers, config, log_dir, _debug=debug)

View File

@@ -1,436 +1,57 @@
#! /usr/bin/env python3
"""
discovery-wrapper A small tool which wraps around discovery and tries to
guide the discovery process with a more modern approach with a
Queue and workers.
Based on the original version of poller-wrapper.py by Job Snijders
Author: Neil Lathwood <neil@librenms.org>
Orsiris de Jong <contact@netpower.fr>
Date: Oct 2019
Usage: This program accepts one command line argument: the number of threads
that should run simultaneously. If no argument is given it will assume
a default of 1 thread.
Ubuntu Linux: apt-get install python-mysqldb
FreeBSD: cd /usr/ports/*/py-MySQLdb && make install clean
RHEL 7: yum install MySQL-python
RHEL 8: dnf install mariadb-connector-c-devel gcc && python -m pip install mysqlclient
Tested on: Python 3.6.8 / PHP 7.2.11 / CentOS 8
License: This program is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
Public License for more details.
You should have received a copy of the GNU General Public License along
with this program. If not, see https://www.gnu.org/licenses/.
LICENSE.txt contains a copy of the full GPLv3 licensing conditions.
This is a Bootstrap script for wrapper.py, in order to retain compatibility with earlier LibreNMS setups
"""
import LibreNMS.library as LNMS
import os
from argparse import ArgumentParser
import LibreNMS
import LibreNMS.wrapper as wrapper
WRAPPER_TYPE = "discovery"
DEFAULT_WORKERS = 1
"""
Take the amount of threads we want to run in parallel from the commandline
if None are given or the argument was garbage, fall back to default
"""
usage = (
"usage: %(prog)s [options] <amount_of_workers> (Default: {}"
"(Do not set too high, or you will get an OOM)".format(DEFAULT_WORKERS)
)
description = "Spawn multiple discovery.php processes in parallel."
parser = ArgumentParser(usage=usage, description=description)
parser.add_argument(dest="amount_of_workers", default=DEFAULT_WORKERS)
parser.add_argument(
"-d",
"--debug",
dest="debug",
action="store_true",
default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.",
)
args = parser.parse_args()
config = LibreNMS.get_config_data(os.path.dirname(os.path.realpath(__file__)))
log_dir = config["log_dir"]
log_file = os.path.join(log_dir, WRAPPER_TYPE + ".log")
logger = LibreNMS.logger_get_logger(log_file, debug=args.debug)
try:
import json
import os
import queue
import subprocess
import sys
import threading
import time
from optparse import OptionParser
except ImportError as exc:
print("ERROR: missing one or more of the following python modules:")
print("threading, queue, sys, subprocess, time, os, json")
print("ERROR: %s" % exc)
sys.exit(2)
APP_NAME = "discovery_wrapper"
LOG_FILE = "logs/" + APP_NAME + ".log"
_DEBUG = False
distdisco = False
real_duration = 0
discovered_devices = 0
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC0
def memc_alive():
try:
global memc
key = str(uuid.uuid4())
memc.set("discovery.ping." + key, key, 60)
if memc.get("discovery.ping." + key) == key:
memc.delete("discovery.ping." + key)
return True
else:
return False
except:
return False
def memc_touch(key, time):
try:
global memc
val = memc.get(key)
memc.set(key, val, time)
except:
pass
# EOC0
"""
A seperate queue and a single worker for printing information to the screen prevents
the good old joke:
Some people, when confronted with a problem, think,
"I know, I'll use threads," and then they two they hav erpoblesms.
"""
def printworker():
nodeso = 0
while True:
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC4
global IsNode
global distdisco
if distdisco:
if not IsNode:
memc_touch("discovery.master", 30)
nodes = memc.get("discovery.nodes")
if nodes is None and not memc_alive():
print(
"WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly."
)
distdisco = False
nodes = nodeso
if nodes is not nodeso:
print("INFO: %s Node(s) Total" % (nodes))
nodeso = nodes
else:
memc_touch("discovery.nodes", 30)
try:
worker_id, device_id, elapsed_time = print_queue.get(False)
except:
pass
try:
time.sleep(1)
except:
pass
continue
else:
worker_id, device_id, elapsed_time = print_queue.get()
# EOC4
global real_duration
global per_device_duration
global discovered_devices
real_duration += elapsed_time
per_device_duration[device_id] = elapsed_time
discovered_devices += 1
if elapsed_time < 300:
print(
"INFO: worker %s finished device %s in %s seconds"
% (worker_id, device_id, elapsed_time)
)
else:
print(
"WARNING: worker %s finished device %s in %s seconds"
% (worker_id, device_id, elapsed_time)
)
print_queue.task_done()
"""
This class will fork off single instances of the discovery.php process, record
how long it takes, and push the resulting reports to the printer queue
"""
def poll_worker():
while True:
device_id = poll_queue.get()
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC5
if not distdisco or memc.get("discovery.device." + str(device_id)) is None:
if distdisco:
result = memc.add(
"discovery.device." + str(device_id),
config["distributed_poller_name"],
300,
)
if not result:
print(
"This device (%s) appears to be being discovered by another discovery node"
% (device_id)
)
poll_queue.task_done()
continue
if not memc_alive() and IsNode:
print(
"Lost Memcached, Not discovering Device %s as Node. Master will discover it."
% device_id
)
poll_queue.task_done()
continue
# EOC5
try:
start_time = time.time()
output = (
"-d >> %s/discover_device_%s.log" % (log_dir, device_id)
if debug
else ">> /dev/null"
)
command = "/usr/bin/env php %s -h %s %s 2>&1" % (
discovery_path,
device_id,
output,
)
# TODO: Replace with command_runner
subprocess.check_call(command, shell=True)
elapsed_time = int(time.time() - start_time)
print_queue.put(
[threading.current_thread().name, device_id, elapsed_time]
)
except (KeyboardInterrupt, SystemExit):
raise
except:
pass
poll_queue.task_done()
if __name__ == "__main__":
logger = LNMS.logger_get_logger(LOG_FILE, debug=_DEBUG)
install_dir = os.path.dirname(os.path.realpath(__file__))
LNMS.check_for_file(install_dir + "/.env")
config = json.loads(LNMS.get_config_data(install_dir))
discovery_path = config["install_dir"] + "/discovery.php"
log_dir = config["log_dir"]
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC1
if "distributed_poller_group" in config:
discovery_group = str(config["distributed_poller_group"])
else:
discovery_group = False
if (
"distributed_poller" in config
and "distributed_poller_memcached_host" in config
and "distributed_poller_memcached_port" in config
and config["distributed_poller"]
):
try:
import memcache
import uuid
memc = memcache.Client(
[
config["distributed_poller_memcached_host"]
+ ":"
+ str(config["distributed_poller_memcached_port"])
]
)
if str(memc.get("discovery.master")) == config["distributed_poller_name"]:
print("This system is already joined as the discovery master.")
sys.exit(2)
if memc_alive():
if memc.get("discovery.master") is None:
print("Registered as Master")
memc.set("discovery.master", config["distributed_poller_name"], 30)
memc.set("discovery.nodes", 0, 3600)
IsNode = False
else:
print(
"Registered as Node joining Master %s"
% memc.get("discovery.master")
)
IsNode = True
memc.incr("discovery.nodes")
distdisco = True
else:
print(
"Could not connect to memcached, disabling distributed discovery."
)
distdisco = False
IsNode = False
except SystemExit:
raise
except ImportError:
print("ERROR: missing memcache python module:")
print("On deb systems: apt-get install python3-memcache")
print("On other systems: pip3 install python-memcached")
print("Disabling distributed discovery.")
distdisco = False
else:
distdisco = False
# EOC1
s_time = time.time()
real_duration = 0
per_device_duration = {}
discovered_devices = 0
"""
Take the amount of threads we want to run in parallel from the commandline
if None are given or the argument was garbage, fall back to default of 1
"""
usage = "usage: %prog [options] <workers> (Default: 1 Do not set too high)"
description = "Spawn multiple discovery.php processes in parallel."
parser = OptionParser(usage=usage, description=description)
parser.add_option(
"-d",
"--debug",
action="store_true",
default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.",
)
(options, args) = parser.parse_args()
debug = options.debug
try:
amount_of_workers = int(args[0])
except (IndexError, ValueError):
amount_of_workers = 1
devices_list = []
"""
This query specificly orders the results depending on the last_discovered_timetaken variable
Because this way, we put the devices likely to be slow, in the top of the queue
thus greatening our chances of completing _all_ the work in exactly the time it takes to
discover the slowest device! cool stuff he
"""
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC2
if discovery_group is not False:
query = (
"select device_id from devices where poller_group IN("
+ discovery_group
+ ") and disabled = 0 order by last_polled_timetaken desc"
amount_of_workers = int(args.amount_of_workers)
except (IndexError, ValueError):
amount_of_workers = DEFAULT_WORKERS
logger.warning(
"Bogus number of workers given. Using default number ({}) of workers.".format(
amount_of_workers
)
else:
query = "select device_id from devices where disabled = 0 order by last_polled_timetaken desc"
# EOC2
db = LNMS.db_open(
config["db_socket"],
config["db_host"],
int(config["db_port"]),
config["db_user"],
config["db_pass"],
config["db_name"],
)
cursor = db.cursor()
cursor.execute(query)
devices = cursor.fetchall()
for row in devices:
devices_list.append(int(row[0]))
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC3
if distdisco and not IsNode:
query = "select max(device_id),min(device_id) from devices"
cursor.execute(query)
devices = cursor.fetchall()
maxlocks = devices[0][0] or 0
minlocks = devices[0][1] or 0
# EOC3
db.close()
poll_queue = queue.Queue()
print_queue = queue.Queue()
print(
"INFO: starting the discovery at %s with %s threads, slowest devices first"
% (time.strftime("%Y-%m-%d %H:%M:%S"), amount_of_workers)
)
for device_id in devices_list:
poll_queue.put(device_id)
for i in range(amount_of_workers):
t = threading.Thread(target=poll_worker)
t.setDaemon(True)
t.start()
p = threading.Thread(target=printworker)
p.setDaemon(True)
p.start()
try:
poll_queue.join()
print_queue.join()
except (KeyboardInterrupt, SystemExit):
raise
total_time = int(time.time() - s_time)
print(
"INFO: discovery-wrapper polled %s devices in %s seconds with %s workers"
% (discovered_devices, total_time, amount_of_workers)
)
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC6
if distdisco or memc_alive():
master = memc.get("discovery.master")
if master == config["distributed_poller_name"] and not IsNode:
print("Wait for all discovery-nodes to finish")
nodes = memc.get("discovery.nodes")
while nodes is not None and nodes > 0:
try:
time.sleep(1)
nodes = memc.get("discovery.nodes")
except:
pass
print("Clearing Locks")
x = minlocks
while x <= maxlocks:
memc.delete("discovery.device." + str(x))
x = x + 1
print("%s Locks Cleared" % x)
print("Clearing Nodes")
memc.delete("discovery.master")
memc.delete("discovery.nodes")
else:
memc.decr("discovery.nodes")
print("Finished %s." % time.time())
# EOC6
show_stopper = False
if total_time > 21600:
print(
"WARNING: the process took more than 6 hours to finish, you need faster hardware or more threads"
)
print(
"INFO: in sequential style discovery the elapsed time would have been: %s seconds"
% real_duration
)
for device in per_device_duration:
if per_device_duration[device] > 3600:
print(
"WARNING: device %s is taking too long: %s seconds"
% (device, per_device_duration[device])
)
show_stopper = True
if show_stopper:
print(
"ERROR: Some devices are taking more than 3600 seconds, the script cannot recommend you what to do."
)
else:
recommend = int(total_time / 300.0 * amount_of_workers + 1)
print(
"WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)"
% recommend
)
sys.exit(2)
wrapper.wrapper(
WRAPPER_TYPE,
amount_of_workers=amount_of_workers,
config=config,
log_dir=log_dir,
_debug=args.debug,
)

View File

@@ -43,9 +43,10 @@ if __name__ == "__main__":
if args.verbose:
logging.getLogger().setLevel(logging.INFO)
if args.debug:
elif args.debug:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.WARNING)
info("Configuring LibreNMS service")
try:

View File

@@ -1,468 +1,57 @@
#! /usr/bin/env python3
"""
poller-wrapper A small tool which wraps around the poller and tries to
guide the polling process with a more modern approach with a
Queue and workers
Authors: Job Snijders <job.snijders@atrato.com>
Orsiris de Jong <contact@netpower.fr>
Date: Oct 2019
Usage: This program accepts one command line argument: the number of threads
that should run simultaneously. If no argument is given it will assume
a default of 16 threads.
Ubuntu Linux: apt-get install python-mysqldb
FreeBSD: cd /usr/ports/*/py-MySQLdb && make install clean
RHEL 7: yum install MySQL-python
RHEL 8: dnf install mariadb-connector-c-devel gcc && python -m pip install mysqlclient
Tested on: Python 3.6.8 / PHP 7.2.11 / CentOS 8.0
License: To the extent possible under law, Job Snijders has waived all
copyright and related or neighboring rights to this script.
This script has been put into the Public Domain. This work is
published from: The Netherlands.
This is a Bootstrap script for wrapper.py, in order to retain compatibility with earlier LibreNMS setups
"""
import LibreNMS.library as LNMS
import os
from argparse import ArgumentParser
import LibreNMS
import LibreNMS.wrapper as wrapper
WRAPPER_TYPE = "poller"
DEFAULT_WORKERS = 16
"""
Take the amount of threads we want to run in parallel from the commandline
if None are given or the argument was garbage, fall back to default
"""
usage = (
"usage: %(prog)s [options] <amount_of_workers> (Default: {}"
"(Do not set too high, or you will get an OOM)".format(DEFAULT_WORKERS)
)
description = "Spawn multiple poller.php processes in parallel."
parser = ArgumentParser(usage=usage, description=description)
parser.add_argument(dest="amount_of_workers", default=DEFAULT_WORKERS)
parser.add_argument(
"-d",
"--debug",
dest="debug",
action="store_true",
default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.",
)
args = parser.parse_args()
config = LibreNMS.get_config_data(os.path.dirname(os.path.realpath(__file__)))
log_dir = config["log_dir"]
log_file = os.path.join(log_dir, WRAPPER_TYPE + ".log")
logger = LibreNMS.logger_get_logger(log_file, debug=args.debug)
try:
import json
import os
import queue
import subprocess
import sys
import threading
import time
from optparse import OptionParser
except ImportError as exc:
print("ERROR: missing one or more of the following python modules:")
print("threading, queue, sys, subprocess, time, os, json")
print("ERROR: %s" % exc)
sys.exit(2)
APP_NAME = "poller_wrapper"
LOG_FILE = "logs/" + APP_NAME + ".log"
_DEBUG = False
distpoll = False
real_duration = 0
polled_devices = 0
"""
Threading helper functions
"""
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC0
def memc_alive():
try:
global memc
key = str(uuid.uuid4())
memc.set("poller.ping." + key, key, 60)
if memc.get("poller.ping." + key) == key:
memc.delete("poller.ping." + key)
return True
else:
return False
except:
return False
def memc_touch(key, time):
try:
global memc
val = memc.get(key)
memc.set(key, val, time)
except:
pass
def get_time_tag(step):
ts = int(time.time())
return ts - ts % step
# EOC0
"""
A seperate queue and a single worker for printing information to the screen prevents
the good old joke:
Some people, when confronted with a problem, think,
"I know, I'll use threads," and then two they hav erpoblesms.
"""
def printworker():
nodeso = 0
while True:
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC4
global IsNode
global distpoll
if distpoll:
if not IsNode:
memc_touch(master_tag, 10)
nodes = memc.get(nodes_tag)
if nodes is None and not memc_alive():
print(
"WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly."
)
distpoll = False
nodes = nodeso
if nodes is not nodeso:
print("INFO: %s Node(s) Total" % (nodes))
nodeso = nodes
else:
memc_touch(nodes_tag, 10)
try:
worker_id, device_id, elapsed_time = print_queue.get(False)
except:
pass
try:
time.sleep(1)
except:
pass
continue
else:
worker_id, device_id, elapsed_time = print_queue.get()
# EOC4
global real_duration
global per_device_duration
global polled_devices
real_duration += elapsed_time
per_device_duration[device_id] = elapsed_time
polled_devices += 1
if elapsed_time < step:
print(
"INFO: worker %s finished device %s in %s seconds"
% (worker_id, device_id, elapsed_time)
)
else:
print(
"WARNING: worker %s finished device %s in %s seconds"
% (worker_id, device_id, elapsed_time)
)
print_queue.task_done()
"""
This class will fork off single instances of the poller.php process, record
how long it takes, and push the resulting reports to the printer queue
"""
def poll_worker():
while True:
device_id = poll_queue.get()
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC5
if (
not distpoll
or memc.get("poller.device.%s.%s" % (device_id, time_tag)) is None
):
if distpoll:
result = memc.add(
"poller.device.%s.%s" % (device_id, time_tag),
config["distributed_poller_name"],
step,
)
if not result:
print(
"This device (%s) appears to be being polled by another poller"
% (device_id)
)
poll_queue.task_done()
continue
if not memc_alive() and IsNode:
print(
"Lost Memcached, Not polling Device %s as Node. Master will poll it."
% device_id
)
poll_queue.task_done()
continue
# EOC5
try:
start_time = time.time()
output = (
"-d >> %s/poll_device_%s.log" % (log_dir, device_id)
if debug
else ">> /dev/null"
)
command = "/usr/bin/env php %s -h %s %s 2>&1" % (
poller_path,
device_id,
output,
)
# TODO: replace with command_runner
subprocess.check_call(command, shell=True)
elapsed_time = int(time.time() - start_time)
print_queue.put(
[threading.current_thread().name, device_id, elapsed_time]
)
except (KeyboardInterrupt, SystemExit):
raise
except:
pass
poll_queue.task_done()
if __name__ == "__main__":
logger = LNMS.logger_get_logger(LOG_FILE, debug=_DEBUG)
install_dir = os.path.dirname(os.path.realpath(__file__))
LNMS.check_for_file(install_dir + "/.env")
config = json.loads(LNMS.get_config_data(install_dir))
poller_path = config["install_dir"] + "/poller.php"
log_dir = config["log_dir"]
if "rrd" in config and "step" in config["rrd"]:
step = config["rrd"]["step"]
else:
step = 300
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC1
if "distributed_poller_group" in config:
poller_group = str(config["distributed_poller_group"])
else:
poller_group = False
if (
"distributed_poller" in config
and "distributed_poller_memcached_host" in config
and "distributed_poller_memcached_port" in config
and config["distributed_poller"]
):
time_tag = str(get_time_tag(step))
master_tag = "poller.master." + time_tag
nodes_tag = "poller.nodes." + time_tag
try:
import memcache
import uuid
memc = memcache.Client(
[
config["distributed_poller_memcached_host"]
+ ":"
+ str(config["distributed_poller_memcached_port"])
]
)
if str(memc.get(master_tag)) == config["distributed_poller_name"]:
print("This system is already joined as the poller master.")
sys.exit(2)
if memc_alive():
if memc.get(master_tag) is None:
print("Registered as Master")
memc.set(master_tag, config["distributed_poller_name"], 10)
memc.set(nodes_tag, 0, step)
IsNode = False
else:
print("Registered as Node joining Master %s" % memc.get(master_tag))
IsNode = True
memc.incr(nodes_tag)
distpoll = True
else:
print("Could not connect to memcached, disabling distributed poller.")
distpoll = False
IsNode = False
except SystemExit:
raise
except ImportError:
print("ERROR: missing memcache python module:")
print("On deb systems: apt-get install python3-memcache")
print("On other systems: pip3 install python-memcached")
print("Disabling distributed poller.")
distpoll = False
else:
distpoll = False
# EOC1
s_time = time.time()
real_duration = 0
per_device_duration = {}
polled_devices = 0
"""
Take the amount of threads we want to run in parallel from the commandline
if None are given or the argument was garbage, fall back to default of 16
"""
usage = "usage: %prog [options] <workers> (Default: 16 (Do not set too high)"
description = "Spawn multiple poller.php processes in parallel."
parser = OptionParser(usage=usage, description=description)
parser.add_option(
"-d",
"--debug",
action="store_true",
default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.",
)
(options, args) = parser.parse_args()
debug = options.debug
try:
amount_of_workers = int(args[0])
except (IndexError, ValueError):
amount_of_workers = 16
devices_list = []
"""
This query specificly orders the results depending on the last_polled_timetaken variable
Because this way, we put the devices likely to be slow, in the top of the queue
thus greatening our chances of completing _all_ the work in exactly the time it takes to
poll the slowest device! cool stuff he
"""
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC2
if poller_group is not False:
query = (
"select device_id from devices where poller_group IN("
+ poller_group
+ ") and disabled = 0 order by last_polled_timetaken desc"
amount_of_workers = int(args.amount_of_workers)
except (IndexError, ValueError):
amount_of_workers = DEFAULT_WORKERS
logger.warning(
"Bogus number of workers given. Using default number ({}) of workers.".format(
amount_of_workers
)
else:
query = "select device_id from devices where disabled = 0 order by last_polled_timetaken desc"
# EOC2
db = LNMS.db_open(
config["db_socket"],
config["db_host"],
config["db_port"],
config["db_user"],
config["db_pass"],
config["db_name"],
)
cursor = db.cursor()
cursor.execute(query)
devices = cursor.fetchall()
for row in devices:
devices_list.append(int(row[0]))
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC3
if distpoll and not IsNode:
query = "select max(device_id),min(device_id) from devices"
cursor.execute(query)
devices = cursor.fetchall()
maxlocks = devices[0][0] or 0
minlocks = devices[0][1] or 0
# EOC3
db.close()
poll_queue = queue.Queue()
print_queue = queue.Queue()
print(
"INFO: starting the poller at %s with %s threads, slowest devices first"
% (time.strftime("%Y-%m-%d %H:%M:%S"), amount_of_workers)
)
for device_id in devices_list:
poll_queue.put(device_id)
for i in range(amount_of_workers):
t = threading.Thread(target=poll_worker)
t.setDaemon(True)
t.start()
p = threading.Thread(target=printworker)
p.setDaemon(True)
p.start()
try:
poll_queue.join()
print_queue.join()
except (KeyboardInterrupt, SystemExit):
raise
total_time = int(time.time() - s_time)
print(
"INFO: poller-wrapper polled %s devices in %s seconds with %s workers"
% (polled_devices, total_time, amount_of_workers)
)
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC6
if distpoll or memc_alive():
master = memc.get(master_tag)
if master == config["distributed_poller_name"] and not IsNode:
print("Wait for all poller-nodes to finish")
nodes = memc.get(nodes_tag)
while nodes is not None and nodes > 0:
try:
time.sleep(1)
nodes = memc.get(nodes_tag)
except:
pass
print("Clearing Locks for %s" % time_tag)
x = minlocks
while x <= maxlocks:
res = memc.delete("poller.device.%s.%s" % (x, time_tag))
x += 1
print("%s Locks Cleared" % x)
print("Clearing Nodes")
memc.delete(master_tag)
memc.delete(nodes_tag)
else:
memc.decr(nodes_tag)
print("Finished %.3fs after interval start." % (time.time() - int(time_tag)))
# EOC6
show_stopper = False
db = LNMS.db_open(
config["db_socket"],
config["db_host"],
config["db_port"],
config["db_user"],
config["db_pass"],
config["db_name"],
)
cursor = db.cursor()
query = (
"update pollers set last_polled=NOW(), devices='%d', time_taken='%d' where poller_name='%s'"
% (polled_devices, total_time, config["distributed_poller_name"])
)
response = cursor.execute(query)
if response == 1:
db.commit()
else:
query = (
"insert into pollers set poller_name='%s', last_polled=NOW(), devices='%d', time_taken='%d'"
% (config["distributed_poller_name"], polled_devices, total_time)
)
cursor.execute(query)
db.commit()
db.close()
if total_time > step:
print(
"WARNING: the process took more than %s seconds to finish, you need faster hardware or more threads"
% step
)
print(
"INFO: in sequential style polling the elapsed time would have been: %s seconds"
% real_duration
)
for device in per_device_duration:
if per_device_duration[device] > step:
print(
"WARNING: device %s is taking too long: %s seconds"
% (device, per_device_duration[device])
)
show_stopper = True
if show_stopper:
print(
"ERROR: Some devices are taking more than %s seconds, the script cannot recommend you what to do."
% step
)
else:
recommend = int(total_time / step * amount_of_workers + 1)
print(
"WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)"
% recommend
)
sys.exit(2)
wrapper.wrapper(
WRAPPER_TYPE,
amount_of_workers=amount_of_workers,
config=config,
log_dir=log_dir,
_debug=args.debug,
)

View File

@@ -3,3 +3,4 @@ python-dotenv
redis>=3.0
setuptools
psutil
command_runner>=0.7.0

View File

@@ -1,440 +1,57 @@
#! /usr/bin/env python3
"""
services-wrapper A small tool which wraps around check-services.php and tries to
guide the services process with a more modern approach with a
Queue and workers.
Based on the original version of poller-wrapper.py by Job Snijders
Author: Neil Lathwood <neil@librenms.org>
Orsiris de Jong <contact@netpower.fr>
Date: Oct 2019
Usage: This program accepts one command line argument: the number of threads
that should run simultaneously. If no argument is given it will assume
a default of 1 thread.
Ubuntu Linux: apt-get install python-mysqldb
FreeBSD: cd /usr/ports/*/py-MySQLdb && make install clean
RHEL 7: yum install MySQL-python
RHEL 8: dnf install mariadb-connector-c-devel gcc && python -m pip install mysqlclient
Tested on: Python 3.6.8 / PHP 7.2.11 / CentOS 8
License: This program is free software: you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your
option) any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
Public License for more details.
You should have received a copy of the GNU General Public License along
with this program. If not, see https://www.gnu.org/licenses/.
LICENSE.txt contains a copy of the full GPLv3 licensing conditions.
This is a Bootstrap script for wrapper.py, in order to retain compatibility with earlier LibreNMS setups
"""
import LibreNMS.library as LNMS
import os
from argparse import ArgumentParser
import LibreNMS
import LibreNMS.wrapper as wrapper
WRAPPER_TYPE = "service"
DEFAULT_WORKERS = 1
"""
Take the amount of threads we want to run in parallel from the commandline
if None are given or the argument was garbage, fall back to default
"""
usage = (
"usage: %(prog)s [options] <amount_of_workers> (Default: {}"
"(Do not set too high, or you will get an OOM)".format(DEFAULT_WORKERS)
)
description = "Spawn multiple check-services.php processes in parallel."
parser = ArgumentParser(usage=usage, description=description)
parser.add_argument(dest="amount_of_workers", default=DEFAULT_WORKERS)
parser.add_argument(
"-d",
"--debug",
dest="debug",
action="store_true",
default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.",
)
args = parser.parse_args()
config = LibreNMS.get_config_data(os.path.dirname(os.path.realpath(__file__)))
log_dir = config["log_dir"]
log_file = os.path.join(log_dir, WRAPPER_TYPE + ".log")
logger = LibreNMS.logger_get_logger(log_file, debug=args.debug)
try:
import json
import os
import queue
import subprocess
import sys
import threading
import time
from optparse import OptionParser
except ImportError as exc:
print("ERROR: missing one or more of the following python modules:")
print("threading, queue, sys, subprocess, time, os, json")
print("ERROR: %s" % exc)
sys.exit(2)
APP_NAME = "services_wrapper"
LOG_FILE = "logs/" + APP_NAME + ".log"
_DEBUG = False
servicedisco = False
real_duration = 0
service_devices = 0
"""
Threading helper functions
"""
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC0
def memc_alive():
try:
global memc
key = str(uuid.uuid4())
memc.set("poller.ping." + key, key, 60)
if memc.get("poller.ping." + key) == key:
memc.delete("poller.ping." + key)
return True
else:
return False
except:
return False
def memc_touch(key, time):
try:
global memc
val = memc.get(key)
memc.set(key, val, time)
except:
pass
def get_time_tag(step):
ts = int(time.time())
return ts - ts % step
# EOC0
"""
A seperate queue and a single worker for printing information to the screen prevents
the good old joke:
Some people, when confronted with a problem, think,
"I know, I'll use threads," and then they two they hav erpoblesms.
"""
def printworker():
nodeso = 0
while True:
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC4
global IsNode
global servicedisco
if servicedisco:
if not IsNode:
memc_touch("service.master", 10)
nodes = memc.get("service.nodes")
if nodes is None and not memc_alive():
print(
"WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly."
)
servicedisco = False
nodes = nodeso
if nodes is not nodeso:
print("INFO: %s Node(s) Total" % (nodes))
nodeso = nodes
else:
memc_touch("service.nodes", 10)
try:
worker_id, device_id, elapsed_time = print_queue.get(False)
except:
pass
try:
time.sleep(1)
except:
pass
continue
else:
worker_id, device_id, elapsed_time = print_queue.get()
# EOC4
global real_duration
global per_device_duration
global service_devices
real_duration += elapsed_time
per_device_duration[device_id] = elapsed_time
service_devices += 1
if elapsed_time < 300:
print(
"INFO: worker %s finished device %s in %s seconds"
% (worker_id, device_id, elapsed_time)
)
else:
print(
"WARNING: worker %s finished device %s in %s seconds"
% (worker_id, device_id, elapsed_time)
)
print_queue.task_done()
"""
This class will fork off single instances of the check-services.php process, record
how long it takes, and push the resulting reports to the printer queue
"""
def poll_worker():
while True:
device_id = poll_queue.get()
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC5
if not servicedisco or memc.get("service.device." + str(device_id)) is None:
if servicedisco:
result = memc.add(
"service.device." + str(device_id),
config["distributed_poller_name"],
300,
)
if not result:
print(
"This device (%s) appears to be being service checked by another service node"
% (device_id)
)
poll_queue.task_done()
continue
if not memc_alive() and IsNode:
print(
"Lost Memcached, Not service checking Device %s as Node. Master will check it."
% device_id
)
poll_queue.task_done()
continue
# EOC5
try:
start_time = time.time()
output = (
"-d >> %s/services_device_%s.log" % (log_dir, device_id)
if debug
else ">> /dev/null"
)
# TODO replace with command_runner
command = "/usr/bin/env php %s -h %s %s 2>&1" % (
service_path,
device_id,
output,
)
subprocess.check_call(command, shell=True)
elapsed_time = int(time.time() - start_time)
print_queue.put(
[threading.current_thread().name, device_id, elapsed_time]
)
except (KeyboardInterrupt, SystemExit):
raise
except:
pass
poll_queue.task_done()
if __name__ == "__main__":
logger = LNMS.logger_get_logger(LOG_FILE, debug=_DEBUG)
install_dir = os.path.dirname(os.path.realpath(__file__))
LNMS.check_for_file(install_dir + "/.env")
config = json.loads(LNMS.get_config_data(install_dir))
service_path = config["install_dir"] + "/check-services.php"
log_dir = config["log_dir"]
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC1
if "distributed_poller_group" in config:
service_group = str(config["distributed_poller_group"])
else:
service_group = False
if (
"distributed_poller" in config
and "distributed_poller_memcached_host" in config
and "distributed_poller_memcached_port" in config
and config["distributed_poller"]
):
try:
import memcache
import uuid
memc = memcache.Client(
[
config["distributed_poller_memcached_host"]
+ ":"
+ str(config["distributed_poller_memcached_port"])
]
)
if str(memc.get("service.master")) == config["distributed_poller_name"]:
print("This system is already joined as the service master.")
sys.exit(2)
if memc_alive():
if memc.get("service.master") is None:
print("Registered as Master")
memc.set("service.master", config["distributed_poller_name"], 10)
memc.set("service.nodes", 0, 300)
IsNode = False
else:
print(
"Registered as Node joining Master %s"
% memc.get("service.master")
)
IsNode = True
memc.incr("service.nodes")
servicedisco = True
else:
print(
"Could not connect to memcached, disabling distributed service checks."
)
servicedisco = False
IsNode = False
except SystemExit:
raise
except ImportError:
print("ERROR: missing memcache python module:")
print("On deb systems: apt-get install python3-memcache")
print("On other systems: pip3 install python-memcached")
print("Disabling distributed discovery.")
servicedisco = False
else:
servicedisco = False
# EOC1
s_time = time.time()
real_duration = 0
per_device_duration = {}
service_devices = 0
"""
Take the amount of threads we want to run in parallel from the commandline
if None are given or the argument was garbage, fall back to default of 16
"""
usage = "usage: %prog [options] <workers> (Default: 1 (Do not set too high)"
description = "Spawn multiple check-services.php processes in parallel."
parser = OptionParser(usage=usage, description=description)
parser.add_option(
"-d",
"--debug",
action="store_true",
default=False,
help="Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space.",
)
(options, args) = parser.parse_args()
debug = options.debug
try:
amount_of_workers = int(args[0])
except (IndexError, ValueError):
amount_of_workers = 1
devices_list = []
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC2
if service_group is not False:
query = (
"SELECT DISTINCT(`services`.`device_id`) FROM `services` LEFT JOIN `devices` ON `services`.`device_id` = `devices`.`device_id` WHERE `devices`.`poller_group` IN("
+ service_group
+ ") AND `devices`.`disabled` = 0"
amount_of_workers = int(args.amount_of_workers)
except (IndexError, ValueError):
amount_of_workers = DEFAULT_WORKERS
logger.warning(
"Bogus number of workers given. Using default number ({}) of workers.".format(
amount_of_workers
)
else:
query = "SELECT DISTINCT(`services`.`device_id`) FROM `services` LEFT JOIN `devices` ON `services`.`device_id` = `devices`.`device_id` WHERE `devices`.`disabled` = 0"
# EOC2
db = LNMS.db_open(
config["db_socket"],
config["db_host"],
config["db_port"],
config["db_user"],
config["db_pass"],
config["db_name"],
)
cursor = db.cursor()
cursor.execute(query)
devices = cursor.fetchall()
for row in devices:
devices_list.append(int(row[0]))
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC3
if servicedisco and not IsNode:
query = "SELECT MAX(`device_id`), MIN(`device_id`) FROM `services`"
cursor.execute(query)
devices = cursor.fetchall()
maxlocks = devices[0][0] or 0
minlocks = devices[0][1] or 0
# EOC3
db.close()
poll_queue = queue.Queue()
print_queue = queue.Queue()
print(
"INFO: starting the service check at %s with %s threads"
% (time.strftime("%Y-%m-%d %H:%M:%S"), amount_of_workers)
)
for device_id in devices_list:
poll_queue.put(device_id)
for i in range(amount_of_workers):
t = threading.Thread(target=poll_worker)
t.setDaemon(True)
t.start()
p = threading.Thread(target=printworker)
p.setDaemon(True)
p.start()
try:
poll_queue.join()
print_queue.join()
except (KeyboardInterrupt, SystemExit):
raise
total_time = int(time.time() - s_time)
print(
"INFO: services-wrapper checked %s devices in %s seconds with %s workers"
% (service_devices, total_time, amount_of_workers)
)
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC6
if servicedisco or memc_alive():
master = memc.get("service.master")
if master == config["distributed_poller_name"] and not IsNode:
print("Wait for all service-nodes to finish")
nodes = memc.get("service.nodes")
while nodes is not None and nodes > 0:
try:
time.sleep(1)
nodes = memc.get("service.nodes")
except:
pass
print("Clearing Locks")
x = minlocks
while x <= maxlocks:
memc.delete("service.device." + str(x))
x = x + 1
print("%s Locks Cleared" % x)
print("Clearing Nodes")
memc.delete("service.master")
memc.delete("service.nodes")
else:
memc.decr("service.nodes")
print("Finished %s." % time.time())
# EOC6
show_stopper = False
if total_time > 300:
print(
"WARNING: the process took more than 5 minutes to finish, you need faster hardware or more threads"
)
print(
"INFO: in sequential style service checks the elapsed time would have been: %s seconds"
% real_duration
)
for device in per_device_duration:
if per_device_duration[device] > 300:
print(
"WARNING: device %s is taking too long: %s seconds"
% (device, per_device_duration[device])
)
show_stopper = True
if show_stopper:
print(
"ERROR: Some devices are taking more than 300 seconds, the script cannot recommend you what to do."
)
else:
recommend = int(total_time / 300.0 * amount_of_workers + 1)
print(
"WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)"
% recommend
)
sys.exit(2)
wrapper.wrapper(
WRAPPER_TYPE,
amount_of_workers=amount_of_workers,
config=config,
log_dir=log_dir,
_debug=args.debug,
)