mirror of
				https://github.com/librenms/librenms.git
				synced 2024-10-07 16:52:45 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			478 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			478 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import os
 | 
						|
import subprocess
 | 
						|
import threading
 | 
						|
import timeit
 | 
						|
from collections import deque
 | 
						|
 | 
						|
from logging import critical, info, debug, exception
 | 
						|
from math import ceil
 | 
						|
from queue import Queue
 | 
						|
from time import time
 | 
						|
 | 
						|
from .service import Service, ServiceConfig
 | 
						|
from .queuemanager import (
 | 
						|
    QueueManager,
 | 
						|
    TimedQueueManager,
 | 
						|
    BillingQueueManager,
 | 
						|
    PingQueueManager,
 | 
						|
    ServicesQueueManager,
 | 
						|
    AlertQueueManager,
 | 
						|
    PollerQueueManager,
 | 
						|
    DiscoveryQueueManager,
 | 
						|
)
 | 
						|
 | 
						|
 | 
						|
def normalize_wait(seconds):
 | 
						|
    return ceil(seconds - (time() % seconds))
 | 
						|
 | 
						|
 | 
						|
def call_script(script, args=()):
 | 
						|
    """
 | 
						|
    Run a LibreNMS script.  Captures all output and throws an exception if a non-zero
 | 
						|
    status is returned.  Blocks parent signals (like SIGINT and SIGTERM).
 | 
						|
    :param script: the name of the executable relative to the base directory
 | 
						|
    :param args: a tuple of arguments to send to the command
 | 
						|
    :returns the output of the command
 | 
						|
    """
 | 
						|
    if script.endswith(".php"):
 | 
						|
        # save calling the sh process
 | 
						|
        base = ("/usr/bin/env", "php")
 | 
						|
    else:
 | 
						|
        base = ()
 | 
						|
 | 
						|
    base_dir = os.path.realpath(os.path.dirname(__file__) + "/..")
 | 
						|
    cmd = base + ("{}/{}".format(base_dir, script),) + tuple(map(str, args))
 | 
						|
    debug("Running {}".format(cmd))
 | 
						|
    # preexec_fn=os.setsid here keeps process signals from propagating (close_fds=True is default)
 | 
						|
    return subprocess.check_call(
 | 
						|
        cmd,
 | 
						|
        stdout=subprocess.DEVNULL,
 | 
						|
        stderr=subprocess.STDOUT,
 | 
						|
        preexec_fn=os.setsid,
 | 
						|
        close_fds=True,
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
class DB:
 | 
						|
    def __init__(self, config, auto_connect=True):
 | 
						|
        """
 | 
						|
        Simple DB wrapper
 | 
						|
        :param config: The poller config object
 | 
						|
        """
 | 
						|
        self.config = config
 | 
						|
        self._db = {}
 | 
						|
 | 
						|
        if auto_connect:
 | 
						|
            self.connect()
 | 
						|
 | 
						|
    def connect(self):
 | 
						|
        try:
 | 
						|
            import pymysql
 | 
						|
 | 
						|
            pymysql.install_as_MySQLdb()
 | 
						|
            info("Using pure python SQL client")
 | 
						|
        except ImportError:
 | 
						|
            info("Using other SQL client")
 | 
						|
 | 
						|
        try:
 | 
						|
            import MySQLdb
 | 
						|
        except ImportError:
 | 
						|
            critical("ERROR: missing a mysql python module")
 | 
						|
            critical(
 | 
						|
                "Install either 'PyMySQL' or 'mysqlclient' from your OS software repository or from PyPI"
 | 
						|
            )
 | 
						|
            raise
 | 
						|
 | 
						|
        try:
 | 
						|
            args = {
 | 
						|
                "host": self.config.db_host,
 | 
						|
                "port": self.config.db_port,
 | 
						|
                "user": self.config.db_user,
 | 
						|
                "passwd": self.config.db_pass,
 | 
						|
                "db": self.config.db_name,
 | 
						|
            }
 | 
						|
            if self.config.db_socket:
 | 
						|
                args["unix_socket"] = self.config.db_socket
 | 
						|
 | 
						|
            conn = MySQLdb.connect(**args)
 | 
						|
            conn.autocommit(True)
 | 
						|
            conn.ping(True)
 | 
						|
            self._db[threading.get_ident()] = conn
 | 
						|
        except Exception as e:
 | 
						|
            critical("ERROR: Could not connect to MySQL database! {}".format(e))
 | 
						|
            raise
 | 
						|
 | 
						|
    def db_conn(self):
 | 
						|
        """
 | 
						|
        Refers to a database connection via thread identifier
 | 
						|
        :return: database connection handle
 | 
						|
        """
 | 
						|
 | 
						|
        # Does a connection exist for this thread
 | 
						|
        if threading.get_ident() not in self._db.keys():
 | 
						|
            self.connect()
 | 
						|
 | 
						|
        return self._db[threading.get_ident()]
 | 
						|
 | 
						|
    def query(self, query, args=None):
 | 
						|
        """
 | 
						|
        Open a cursor, fetch the query with args, close the cursor and return it.
 | 
						|
        :rtype: MySQLdb.Cursor
 | 
						|
        :param query:
 | 
						|
        :param args:
 | 
						|
        :return: the cursor with results
 | 
						|
        """
 | 
						|
        try:
 | 
						|
            cursor = self.db_conn().cursor()
 | 
						|
            cursor.execute(query, args)
 | 
						|
            cursor.close()
 | 
						|
            return cursor
 | 
						|
        except Exception as e:
 | 
						|
            critical("DB Connection exception {}".format(e))
 | 
						|
            self.close()
 | 
						|
            raise
 | 
						|
 | 
						|
    def close(self):
 | 
						|
        """
 | 
						|
        Close the connection owned by this thread.
 | 
						|
        """
 | 
						|
        conn = self._db.pop(threading.get_ident(), None)
 | 
						|
        if conn:
 | 
						|
            conn.close()
 | 
						|
 | 
						|
 | 
						|
class RecurringTimer:
 | 
						|
    def __init__(self, duration, target, thread_name=None):
 | 
						|
        self.duration = duration
 | 
						|
        self.target = target
 | 
						|
        self._timer_thread = None
 | 
						|
        self._thread_name = thread_name
 | 
						|
        self._event = threading.Event()
 | 
						|
 | 
						|
    def _loop(self):
 | 
						|
        while not self._event.is_set():
 | 
						|
            self._event.wait(normalize_wait(self.duration))
 | 
						|
            if not self._event.is_set():
 | 
						|
                self.target()
 | 
						|
 | 
						|
    def start(self):
 | 
						|
        self._timer_thread = threading.Thread(target=self._loop)
 | 
						|
        if self._thread_name:
 | 
						|
            self._timer_thread.name = self._thread_name
 | 
						|
        self._event.clear()
 | 
						|
        self._timer_thread.start()
 | 
						|
 | 
						|
    def stop(self):
 | 
						|
        self._event.set()
 | 
						|
 | 
						|
 | 
						|
class Lock:
 | 
						|
    """ Base lock class this is not thread safe"""
 | 
						|
 | 
						|
    def __init__(self):
 | 
						|
        self._locks = {}  # store a tuple (owner, expiration)
 | 
						|
 | 
						|
    def lock(self, name, owner, expiration, allow_owner_relock=False):
 | 
						|
        """
 | 
						|
        Obtain the named lock.
 | 
						|
        :param allow_owner_relock:
 | 
						|
        :param name: str the name of the lock
 | 
						|
        :param owner: str a unique name for the locking node
 | 
						|
        :param expiration: int in seconds
 | 
						|
        """
 | 
						|
        if (
 | 
						|
            (name not in self._locks)
 | 
						|
            or (  # lock doesn't exist
 | 
						|
                allow_owner_relock and self._locks.get(name, [None])[0] == owner
 | 
						|
            )
 | 
						|
            or time() > self._locks[name][1]  # owner has permission  # lock has expired
 | 
						|
        ):
 | 
						|
            self._locks[name] = (owner, expiration + time())
 | 
						|
            return self._locks[name][0] == owner
 | 
						|
 | 
						|
        return False
 | 
						|
 | 
						|
    def unlock(self, name, owner):
 | 
						|
        """
 | 
						|
        Release the named lock.
 | 
						|
        :param name: str the name of the lock
 | 
						|
        :param owner: str a unique name for the locking node
 | 
						|
        """
 | 
						|
        if (name in self._locks) and self._locks[name][0] == owner:
 | 
						|
            self._locks.pop(name, None)
 | 
						|
            return True
 | 
						|
        return False
 | 
						|
 | 
						|
    def check_lock(self, name):
 | 
						|
        lock = self._locks.get(name, None)
 | 
						|
        if lock:
 | 
						|
            return lock[1] > time()
 | 
						|
        return False
 | 
						|
 | 
						|
    def print_locks(self):
 | 
						|
        debug(self._locks)
 | 
						|
 | 
						|
 | 
						|
class ThreadingLock(Lock):
 | 
						|
    """A subclass of Lock that uses thread-safe locking"""
 | 
						|
 | 
						|
    def __init__(self):
 | 
						|
        Lock.__init__(self)
 | 
						|
        self._lock = threading.Lock()
 | 
						|
 | 
						|
    def lock(self, name, owner, expiration, allow_owner_relock=False):
 | 
						|
        """
 | 
						|
        Obtain the named lock.
 | 
						|
        :param allow_owner_relock:
 | 
						|
        :param name: str the name of the lock
 | 
						|
        :param owner: str a unique name for the locking node
 | 
						|
        :param expiration: int in seconds
 | 
						|
        """
 | 
						|
        with self._lock:
 | 
						|
            return Lock.lock(self, name, owner, expiration, allow_owner_relock)
 | 
						|
 | 
						|
    def unlock(self, name, owner):
 | 
						|
        """
 | 
						|
        Release the named lock.
 | 
						|
        :param name: str the name of the lock
 | 
						|
        :param owner: str a unique name for the locking node
 | 
						|
        """
 | 
						|
        with self._lock:
 | 
						|
            return Lock.unlock(self, name, owner)
 | 
						|
 | 
						|
    def check_lock(self, name):
 | 
						|
        return Lock.check_lock(self, name)
 | 
						|
 | 
						|
    def print_locks(self):
 | 
						|
        Lock.print_locks(self)
 | 
						|
 | 
						|
 | 
						|
class RedisLock(Lock):
 | 
						|
    def __init__(self, namespace="lock", **redis_kwargs):
 | 
						|
        import redis
 | 
						|
        from redis.sentinel import Sentinel
 | 
						|
 | 
						|
        redis_kwargs["decode_responses"] = True
 | 
						|
        if redis_kwargs.get("sentinel") and redis_kwargs.get("sentinel_service"):
 | 
						|
            sentinels = [
 | 
						|
                tuple(l.split(":")) for l in redis_kwargs.pop("sentinel").split(",")
 | 
						|
            ]
 | 
						|
            sentinel_service = redis_kwargs.pop("sentinel_service")
 | 
						|
            kwargs = {
 | 
						|
                k: v
 | 
						|
                for k, v in redis_kwargs.items()
 | 
						|
                if k in ["decode_responses", "password", "db", "socket_timeout"]
 | 
						|
            }
 | 
						|
            self._redis = Sentinel(sentinels, **kwargs).master_for(sentinel_service)
 | 
						|
        else:
 | 
						|
            kwargs = {k: v for k, v in redis_kwargs.items() if "sentinel" not in k}
 | 
						|
            self._redis = redis.Redis(**kwargs)
 | 
						|
        self._redis.ping()
 | 
						|
        self._namespace = namespace
 | 
						|
        info(
 | 
						|
            "Created redis lock manager with socket_timeout of {}s".format(
 | 
						|
                redis_kwargs["socket_timeout"]
 | 
						|
            )
 | 
						|
        )
 | 
						|
 | 
						|
    def __key(self, name):
 | 
						|
        return "{}:{}".format(self._namespace, name)
 | 
						|
 | 
						|
    def lock(self, name, owner, expiration=1, allow_owner_relock=False):
 | 
						|
        """
 | 
						|
        Obtain the named lock.
 | 
						|
        :param allow_owner_relock: bool
 | 
						|
        :param name: str the name of the lock
 | 
						|
        :param owner: str a unique name for the locking node
 | 
						|
        :param expiration: int in seconds, 0 expiration means forever
 | 
						|
        """
 | 
						|
        import redis
 | 
						|
 | 
						|
        try:
 | 
						|
            if int(expiration) < 1:
 | 
						|
                expiration = 1
 | 
						|
 | 
						|
            key = self.__key(name)
 | 
						|
            non_existing = not (allow_owner_relock and self._redis.get(key) == owner)
 | 
						|
            return self._redis.set(key, owner, ex=int(expiration), nx=non_existing)
 | 
						|
        except redis.exceptions.ResponseError as e:
 | 
						|
            exception(
 | 
						|
                "Unable to obtain lock, local state: name: %s, owner: %s, expiration: %s, allow_owner_relock: %s",
 | 
						|
                name,
 | 
						|
                owner,
 | 
						|
                expiration,
 | 
						|
                allow_owner_relock,
 | 
						|
            )
 | 
						|
 | 
						|
    def unlock(self, name, owner):
 | 
						|
        """
 | 
						|
        Release the named lock.
 | 
						|
        :param name: str the name of the lock
 | 
						|
        :param owner: str a unique name for the locking node
 | 
						|
        """
 | 
						|
        key = self.__key(name)
 | 
						|
        if self._redis.get(key) == owner:
 | 
						|
            self._redis.delete(key)
 | 
						|
            return True
 | 
						|
        return False
 | 
						|
 | 
						|
    def check_lock(self, name):
 | 
						|
        return self._redis.get(self.__key(name)) is not None
 | 
						|
 | 
						|
    def print_locks(self):
 | 
						|
        keys = self._redis.keys(self.__key("*"))
 | 
						|
        for key in keys:
 | 
						|
            print(
 | 
						|
                "{} locked by {}, expires in {} seconds".format(
 | 
						|
                    key, self._redis.get(key), self._redis.ttl(key)
 | 
						|
                )
 | 
						|
            )
 | 
						|
 | 
						|
 | 
						|
class RedisUniqueQueue(object):
 | 
						|
    def __init__(self, name, namespace="queue", **redis_kwargs):
 | 
						|
        import redis
 | 
						|
        from redis.sentinel import Sentinel
 | 
						|
 | 
						|
        redis_kwargs["decode_responses"] = True
 | 
						|
        if redis_kwargs.get("sentinel") and redis_kwargs.get("sentinel_service"):
 | 
						|
            sentinels = [
 | 
						|
                tuple(l.split(":")) for l in redis_kwargs.pop("sentinel").split(",")
 | 
						|
            ]
 | 
						|
            sentinel_service = redis_kwargs.pop("sentinel_service")
 | 
						|
            kwargs = {
 | 
						|
                k: v
 | 
						|
                for k, v in redis_kwargs.items()
 | 
						|
                if k in ["decode_responses", "password", "db", "socket_timeout"]
 | 
						|
            }
 | 
						|
            self._redis = Sentinel(sentinels, **kwargs).master_for(sentinel_service)
 | 
						|
        else:
 | 
						|
            kwargs = {k: v for k, v in redis_kwargs.items() if "sentinel" not in k}
 | 
						|
            self._redis = redis.Redis(**kwargs)
 | 
						|
        self._redis.ping()
 | 
						|
        self.key = "{}:{}".format(namespace, name)
 | 
						|
        info(
 | 
						|
            "Created redis queue with socket_timeout of {}s".format(
 | 
						|
                redis_kwargs["socket_timeout"]
 | 
						|
            )
 | 
						|
        )
 | 
						|
 | 
						|
        # clean up from previous implementations
 | 
						|
        if self._redis.type(self.key) != "zset":
 | 
						|
            self._redis.delete(self.key)
 | 
						|
 | 
						|
    def qsize(self):
 | 
						|
        return self._redis.zcount(self.key, "-inf", "+inf")
 | 
						|
 | 
						|
    def empty(self):
 | 
						|
        return self.qsize() == 0
 | 
						|
 | 
						|
    def put(self, item):
 | 
						|
        self._redis.zadd(self.key, {item: time()}, nx=True)
 | 
						|
 | 
						|
    def get(self, block=True, timeout=None):
 | 
						|
        if block:
 | 
						|
            item = self._redis.bzpopmin(self.key, timeout=timeout)
 | 
						|
        else:
 | 
						|
            item = self._redis.zpopmin(self.key)
 | 
						|
 | 
						|
        if item:
 | 
						|
            item = item[1]
 | 
						|
        return item
 | 
						|
 | 
						|
    def get_nowait(self):
 | 
						|
        return self.get(False)
 | 
						|
 | 
						|
 | 
						|
class UniqueQueue(Queue):
 | 
						|
    def _init(self, maxsize):
 | 
						|
        self.queue = deque()
 | 
						|
        self.setqueue = set()
 | 
						|
 | 
						|
    def _put(self, item):
 | 
						|
        if item not in self.setqueue:
 | 
						|
            self.setqueue.add(item)
 | 
						|
            self.queue.append(item)
 | 
						|
 | 
						|
    def _get(self):
 | 
						|
        item = self.queue.popleft()
 | 
						|
        self.setqueue.remove(item)
 | 
						|
        return item
 | 
						|
 | 
						|
 | 
						|
class PerformanceCounter(object):
 | 
						|
    """
 | 
						|
    This is a simple counter to record execution time and number of jobs. It's unique to each
 | 
						|
    poller instance, so does not need to be globally syncronised, just locally.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self):
 | 
						|
        self._count = 0
 | 
						|
        self._jobs = 0
 | 
						|
        self._lock = threading.Lock()
 | 
						|
 | 
						|
    def add(self, n):
 | 
						|
        """
 | 
						|
        Add n to the counter and increment the number of jobs by 1
 | 
						|
        :param n: Number to increment by
 | 
						|
        """
 | 
						|
        with self._lock:
 | 
						|
            self._count += n
 | 
						|
            self._jobs += 1
 | 
						|
 | 
						|
    def split(self, precise=False):
 | 
						|
        """
 | 
						|
        Return the current counter value and keep going
 | 
						|
        :param precise: Whether floating point precision is desired
 | 
						|
        :return: ((INT or FLOAT), INT)
 | 
						|
        """
 | 
						|
        return (self._count if precise else int(self._count)), self._jobs
 | 
						|
 | 
						|
    def reset(self, precise=False):
 | 
						|
        """
 | 
						|
        Return the current counter value and then zero it.
 | 
						|
        :param precise: Whether floating point precision is desired
 | 
						|
        :return: ((INT or FLOAT), INT)
 | 
						|
        """
 | 
						|
        with self._lock:
 | 
						|
            c = self._count
 | 
						|
            j = self._jobs
 | 
						|
            self._count = 0
 | 
						|
            self._jobs = 0
 | 
						|
 | 
						|
            return (c if precise else int(c)), j
 | 
						|
 | 
						|
 | 
						|
class TimeitContext(object):
 | 
						|
    """
 | 
						|
    Wrapper around timeit to allow the timing of larger blocks of code by wrapping them in "with"
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self):
 | 
						|
        self._t = timeit.default_timer()
 | 
						|
 | 
						|
    def __enter__(self):
 | 
						|
        return self
 | 
						|
 | 
						|
    def __exit__(self, *args):
 | 
						|
        del self._t
 | 
						|
 | 
						|
    def delta(self):
 | 
						|
        """
 | 
						|
        Calculate the elapsed time since the context was initialised
 | 
						|
        :return: FLOAT
 | 
						|
        """
 | 
						|
        if not self._t:
 | 
						|
            raise ArithmeticError("Timer has not been started, cannot return delta")
 | 
						|
 | 
						|
        return timeit.default_timer() - self._t
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def start(cls):
 | 
						|
        """
 | 
						|
        Factory method for TimeitContext
 | 
						|
        :param cls:
 | 
						|
        :return: TimeitContext
 | 
						|
        """
 | 
						|
        return cls()
 |