mirror of
				https://github.com/librenms/librenms.git
				synced 2024-10-07 16:52:45 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			382 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			382 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#! /usr/bin/env python2
 | 
						|
"""
 | 
						|
 services-wrapper A small tool which wraps around check-services.php and tries to
 | 
						|
                guide the services process with a more modern approach with a
 | 
						|
                Queue and workers.
 | 
						|
 | 
						|
 Based on the original version of poller-wrapper.py by Job Snijders
 | 
						|
 | 
						|
 Author:        Neil Lathwood <neil@librenms.org>
 | 
						|
 Date:          Oct 2016
 | 
						|
 | 
						|
 Usage:         This program accepts one command line argument: the number of threads
 | 
						|
                that should run simultaneously. If no argument is given it will assume
 | 
						|
                a default of 1 thread.
 | 
						|
 | 
						|
 Ubuntu Linux:  apt-get install python-mysqldb
 | 
						|
 FreeBSD:       cd /usr/ports/*/py-MySQLdb && make install clean
 | 
						|
 | 
						|
 License:       This program is free software: you can redistribute it and/or modify it
 | 
						|
                under the terms of the GNU General Public License as published by the
 | 
						|
                Free Software Foundation, either version 3 of the License, or (at your
 | 
						|
                option) any later version.
 | 
						|
 | 
						|
                This program is distributed in the hope that it will be useful, but
 | 
						|
                WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
                MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
 | 
						|
                Public License for more details.
 | 
						|
 | 
						|
                You should have received a copy of the GNU General Public License along
 | 
						|
                with this program. If not, see http://www.gnu.org/licenses/.
 | 
						|
 | 
						|
                LICENSE.txt contains a copy of the full GPLv3 licensing conditions.
 | 
						|
"""
 | 
						|
try:
 | 
						|
 | 
						|
    import json
 | 
						|
    import os
 | 
						|
    import Queue
 | 
						|
    import subprocess
 | 
						|
    import sys
 | 
						|
    import threading
 | 
						|
    import time
 | 
						|
 | 
						|
except:
 | 
						|
    print "ERROR: missing one or more of the following python modules:"
 | 
						|
    print "threading, Queue, sys, subprocess, time, os, json"
 | 
						|
    sys.exit(2)
 | 
						|
 | 
						|
try:
 | 
						|
    import MySQLdb
 | 
						|
except:
 | 
						|
    print "ERROR: missing the mysql python module:"
 | 
						|
    print "On ubuntu: apt-get install python-mysqldb"
 | 
						|
    print "On FreeBSD: cd /usr/ports/*/py-MySQLdb && make install clean"
 | 
						|
    sys.exit(2)
 | 
						|
 | 
						|
"""
 | 
						|
    Fetch configuration details from the config_to_json.php script
 | 
						|
"""
 | 
						|
 | 
						|
install_dir = os.path.dirname(os.path.realpath(__file__))
 | 
						|
config_file = install_dir + '/config.php'
 | 
						|
 | 
						|
 | 
						|
def get_config_data():
 | 
						|
    config_cmd = ['/usr/bin/env', 'php', '%s/config_to_json.php' % install_dir]
 | 
						|
    try:
 | 
						|
        proc = subprocess.Popen(config_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
 | 
						|
    except:
 | 
						|
        print "ERROR: Could not execute: %s" % config_cmd
 | 
						|
        sys.exit(2)
 | 
						|
    return proc.communicate()[0]
 | 
						|
 | 
						|
try:
 | 
						|
    with open(config_file) as f:
 | 
						|
        pass
 | 
						|
except IOError as e:
 | 
						|
    print "ERROR: Oh dear... %s does not seem readable" % config_file
 | 
						|
    sys.exit(2)
 | 
						|
 | 
						|
try:
 | 
						|
    config = json.loads(get_config_data())
 | 
						|
except:
 | 
						|
    print "ERROR: Could not load or parse configuration, are PATHs correct?"
 | 
						|
    sys.exit(2)
 | 
						|
 | 
						|
service_path = config['install_dir'] + '/check-services.php'
 | 
						|
db_username = config['db_user']
 | 
						|
db_password = config['db_pass']
 | 
						|
 | 
						|
if config['db_host'][:5].lower() == 'unix:':
 | 
						|
    db_server = config['db_host']
 | 
						|
    db_port = 0
 | 
						|
elif ':' in config['db_host']:
 | 
						|
    db_server = config['db_host'].rsplit(':')[0]
 | 
						|
    db_port = int(config['db_host'].rsplit(':')[1])
 | 
						|
else:
 | 
						|
    db_server = config['db_host']
 | 
						|
    db_port = 0
 | 
						|
 | 
						|
db_dbname = config['db_name']
 | 
						|
 | 
						|
 | 
						|
def db_open():
 | 
						|
    try:
 | 
						|
        if db_port == 0:
 | 
						|
            db = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname)
 | 
						|
        else:
 | 
						|
            db = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname)
 | 
						|
        return db
 | 
						|
    except:
 | 
						|
        print "ERROR: Could not connect to MySQL database!"
 | 
						|
        sys.exit(2)
 | 
						|
 | 
						|
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC1
 | 
						|
if 'distributed_poller_group' in config:
 | 
						|
    service_group = str(config['distributed_poller_group'])
 | 
						|
else:
 | 
						|
    service_group = False
 | 
						|
 | 
						|
 | 
						|
def memc_alive():
 | 
						|
    try:
 | 
						|
        global memc
 | 
						|
        key = str(uuid.uuid4())
 | 
						|
        memc.set('service.ping.' + key, key, 60)
 | 
						|
        if memc.get('service.ping.' + key) == key:
 | 
						|
            memc.delete('service.ping.' + key)
 | 
						|
            return True
 | 
						|
        else:
 | 
						|
            return False
 | 
						|
    except:
 | 
						|
        return False
 | 
						|
 | 
						|
 | 
						|
def memc_touch(key, time):
 | 
						|
    try:
 | 
						|
        global memc
 | 
						|
        val = memc.get(key)
 | 
						|
        memc.set(key, val, time)
 | 
						|
    except:
 | 
						|
        pass
 | 
						|
 | 
						|
if ('distributed_poller' in config and
 | 
						|
    'distributed_poller_memcached_host' in config and
 | 
						|
    'distributed_poller_memcached_port' in config and
 | 
						|
        config['distributed_poller']):
 | 
						|
    try:
 | 
						|
        import memcache
 | 
						|
        import uuid
 | 
						|
        memc = memcache.Client([config['distributed_poller_memcached_host'] + ':' +
 | 
						|
            str(config['distributed_poller_memcached_port'])])
 | 
						|
        if str(memc.get("service.master")) == config['distributed_poller_name']:
 | 
						|
            print "This system is already joined as the service master."
 | 
						|
            sys.exit(2)
 | 
						|
        if memc_alive():
 | 
						|
            if memc.get("service.master") is None:
 | 
						|
                print "Registered as Master"
 | 
						|
                memc.set("service.master", config['distributed_poller_name'], 10)
 | 
						|
                memc.set("service.nodes", 0, 300)
 | 
						|
                IsNode = False
 | 
						|
            else:
 | 
						|
                print "Registered as Node joining Master %s" % memc.get("service.master")
 | 
						|
                IsNode = True
 | 
						|
                memc.incr("service.nodes")
 | 
						|
            servicedisco = True
 | 
						|
        else:
 | 
						|
            print "Could not connect to memcached, disabling distributed service checks."
 | 
						|
            servicedisco = False
 | 
						|
            IsNode = False
 | 
						|
    except SystemExit:
 | 
						|
        raise
 | 
						|
    except ImportError:
 | 
						|
        print "ERROR: missing memcache python module:"
 | 
						|
        print "On deb systems: apt-get install python-memcache"
 | 
						|
        print "On other systems: easy_install python-memcached"
 | 
						|
        print "Disabling distributed discovery."
 | 
						|
        servicedisco = False
 | 
						|
else:
 | 
						|
    servicedisco = False
 | 
						|
# EOC1
 | 
						|
 | 
						|
s_time = time.time()
 | 
						|
real_duration = 0
 | 
						|
per_device_duration = {}
 | 
						|
service_devices = 0
 | 
						|
 | 
						|
"""
 | 
						|
    Take the amount of threads we want to run in parallel from the commandline
 | 
						|
    if None are given or the argument was garbage, fall back to default of 16
 | 
						|
"""
 | 
						|
try:
 | 
						|
    amount_of_workers = int(sys.argv[1])
 | 
						|
    if amount_of_workers == 0:
 | 
						|
        print "ERROR: 0 threads is not a valid value"
 | 
						|
        sys.exit(2)
 | 
						|
except:
 | 
						|
    amount_of_workers = 1
 | 
						|
 | 
						|
devices_list = []
 | 
						|
 | 
						|
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC2
 | 
						|
if service_group is not False:
 | 
						|
    query = "SELECT DISTINCT(`services`.`device_id`) FROM `services` LEFT JOIN `devices` ON `services`.`device_id` = `devices`.`device_id` WHERE `devices`.`poller_group` IN(" + service_group + ") AND `devices`.`disabled` = 0"
 | 
						|
else:
 | 
						|
    query = "SELECT DISTINCT(`services`.`device_id`) FROM `services` LEFT JOIN `devices` ON `services`.`device_id` = `devices`.`device_id` WHERE `devices`.`disabled` = 0"
 | 
						|
# EOC2
 | 
						|
 | 
						|
 | 
						|
db = db_open()
 | 
						|
cursor = db.cursor()
 | 
						|
cursor.execute(query)
 | 
						|
devices = cursor.fetchall()
 | 
						|
for row in devices:
 | 
						|
    devices_list.append(int(row[0]))
 | 
						|
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC3
 | 
						|
if servicedisco and not IsNode:
 | 
						|
    query = "SELECT MAX(`device_id`), MIN(`device_id`) FROM `services`"
 | 
						|
    cursor.execute(query)
 | 
						|
    devices = cursor.fetchall()
 | 
						|
    maxlocks = devices[0][0]
 | 
						|
    minlocks = devices[0][1]
 | 
						|
# EOC3
 | 
						|
db.close()
 | 
						|
 | 
						|
"""
 | 
						|
    A seperate queue and a single worker for printing information to the screen prevents
 | 
						|
    the good old joke:
 | 
						|
 | 
						|
        Some people, when confronted with a problem, think,
 | 
						|
        "I know, I'll use threads," and then they two they hav erpoblesms.
 | 
						|
"""
 | 
						|
 | 
						|
 | 
						|
def printworker():
 | 
						|
    nodeso = 0
 | 
						|
    while True:
 | 
						|
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC4
 | 
						|
        global IsNode
 | 
						|
        global servicedisco
 | 
						|
        if servicedisco:
 | 
						|
            if not IsNode:
 | 
						|
                memc_touch('service.master', 10)
 | 
						|
                nodes = memc.get('service.nodes')
 | 
						|
                if nodes is None and not memc_alive():
 | 
						|
                    print "WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly."
 | 
						|
                    servicedisco = False
 | 
						|
                    nodes = nodeso
 | 
						|
                if nodes is not nodeso:
 | 
						|
                    print "INFO: %s Node(s) Total" % (nodes)
 | 
						|
                    nodeso = nodes
 | 
						|
            else:
 | 
						|
                memc_touch('service.nodes', 10)
 | 
						|
            try:
 | 
						|
                worker_id, device_id, elapsed_time = print_queue.get(False)
 | 
						|
            except:
 | 
						|
                pass
 | 
						|
                try:
 | 
						|
                    time.sleep(1)
 | 
						|
                except:
 | 
						|
                    pass
 | 
						|
                continue
 | 
						|
        else:
 | 
						|
            worker_id, device_id, elapsed_time = print_queue.get()
 | 
						|
# EOC4
 | 
						|
        global real_duration
 | 
						|
        global per_device_duration
 | 
						|
        global service_devices
 | 
						|
        real_duration += elapsed_time
 | 
						|
        per_device_duration[device_id] = elapsed_time
 | 
						|
        service_devices += 1
 | 
						|
        if elapsed_time < 300:
 | 
						|
            print "INFO: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time)
 | 
						|
        else:
 | 
						|
            print "WARNING: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time)
 | 
						|
        print_queue.task_done()
 | 
						|
 | 
						|
"""
 | 
						|
    This class will fork off single instances of the check-services.php process, record
 | 
						|
    how long it takes, and push the resulting reports to the printer queue
 | 
						|
"""
 | 
						|
 | 
						|
 | 
						|
def poll_worker():
 | 
						|
    while True:
 | 
						|
        device_id = poll_queue.get()
 | 
						|
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC5
 | 
						|
        if not servicedisco or memc.get('service.device.' + str(device_id)) is None:
 | 
						|
            if servicedisco:
 | 
						|
                result = memc.add('service.device.' + str(device_id), config['distributed_poller_name'], 300)
 | 
						|
                if not result:
 | 
						|
                    print "This device (%s) appears to be being service checked by another service node" % (device_id)
 | 
						|
                    poll_queue.task_done()
 | 
						|
                    continue
 | 
						|
                if not memc_alive() and IsNode:
 | 
						|
                    print "Lost Memcached, Not service checking Device %s as Node. Master will check it." % device_id
 | 
						|
                    poll_queue.task_done()
 | 
						|
                    continue
 | 
						|
# EOC5
 | 
						|
            try:
 | 
						|
                start_time = time.time()
 | 
						|
                command = "/usr/bin/env php %s -h %s >> /dev/null 2>&1" % (service_path, device_id)
 | 
						|
                subprocess.check_call(command, shell=True)
 | 
						|
                elapsed_time = int(time.time() - start_time)
 | 
						|
                print_queue.put([threading.current_thread().name, device_id, elapsed_time])
 | 
						|
            except (KeyboardInterrupt, SystemExit):
 | 
						|
                raise
 | 
						|
            except:
 | 
						|
                pass
 | 
						|
        poll_queue.task_done()
 | 
						|
 | 
						|
poll_queue = Queue.Queue()
 | 
						|
print_queue = Queue.Queue()
 | 
						|
 | 
						|
print "INFO: starting the service check at %s with %s threads" % (time.strftime("%Y-%m-%d %H:%M:%S"),
 | 
						|
        amount_of_workers)
 | 
						|
 | 
						|
for device_id in devices_list:
 | 
						|
    poll_queue.put(device_id)
 | 
						|
 | 
						|
for i in range(amount_of_workers):
 | 
						|
    t = threading.Thread(target=poll_worker)
 | 
						|
    t.setDaemon(True)
 | 
						|
    t.start()
 | 
						|
 | 
						|
p = threading.Thread(target=printworker)
 | 
						|
p.setDaemon(True)
 | 
						|
p.start()
 | 
						|
 | 
						|
try:
 | 
						|
    poll_queue.join()
 | 
						|
    print_queue.join()
 | 
						|
except (KeyboardInterrupt, SystemExit):
 | 
						|
    raise
 | 
						|
 | 
						|
total_time = int(time.time() - s_time)
 | 
						|
 | 
						|
print "INFO: services-wrapper checked %s devices in %s seconds with %s workers" % (service_devices, total_time, amount_of_workers)
 | 
						|
 | 
						|
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC6
 | 
						|
if servicedisco or memc_alive():
 | 
						|
    master = memc.get("service.master")
 | 
						|
    if master == config['distributed_poller_name'] and not IsNode:
 | 
						|
        print "Wait for all service-nodes to finish"
 | 
						|
        nodes = memc.get("service.nodes")
 | 
						|
        while nodes > 0 and nodes is not None:
 | 
						|
            try:
 | 
						|
                time.sleep(1)
 | 
						|
                nodes = memc.get("service.nodes")
 | 
						|
            except:
 | 
						|
                pass
 | 
						|
        print "Clearing Locks"
 | 
						|
        x = minlocks
 | 
						|
        while x <= maxlocks:
 | 
						|
            memc.delete('service.device.' + str(x))
 | 
						|
            x = x + 1
 | 
						|
        print "%s Locks Cleared" % x
 | 
						|
        print "Clearing Nodes"
 | 
						|
        memc.delete("service.master")
 | 
						|
        memc.delete("service.nodes")
 | 
						|
    else:
 | 
						|
        memc.decr("service.nodes")
 | 
						|
    print "Finished %s." % time.time()
 | 
						|
# EOC6
 | 
						|
 | 
						|
show_stopper = False
 | 
						|
 | 
						|
if total_time > 300:
 | 
						|
    print "WARNING: the process took more than 5 minutes to finish, you need faster hardware or more threads"
 | 
						|
    print "INFO: in sequential style service checks the elapsed time would have been: %s seconds" % real_duration
 | 
						|
    for device in per_device_duration:
 | 
						|
        if per_device_duration[device] > 300:
 | 
						|
            print "WARNING: device %s is taking too long: %s seconds" % (device, per_device_duration[device])
 | 
						|
            show_stopper = True
 | 
						|
    if show_stopper:
 | 
						|
        print "ERROR: Some devices are taking more than 300 seconds, the script cannot recommend you what to do."
 | 
						|
    else:
 | 
						|
        recommend = int(total_time / 300.0 * amount_of_workers + 1)
 | 
						|
        print "WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" % recommend
 | 
						|
 | 
						|
    sys.exit(2)
 |