2020-05-15 07:37:34 +02:00
#! /usr/bin/env python3
2017-01-12 08:29:29 +00:00
"""
services - wrapper A small tool which wraps around check - services . php and tries to
guide the services process with a more modern approach with a
Queue and workers .
Based on the original version of poller - wrapper . py by Job Snijders
Author : Neil Lathwood < neil @librenms.org >
2020-05-15 07:37:34 +02:00
Orsiris de Jong < contact @netpower.fr >
Date : Oct 2019
2017-01-12 08:29:29 +00:00
Usage : This program accepts one command line argument : the number of threads
that should run simultaneously . If no argument is given it will assume
a default of 1 thread .
Ubuntu Linux : apt - get install python - mysqldb
FreeBSD : cd / usr / ports / * / py - MySQLdb & & make install clean
2020-05-15 07:37:34 +02:00
RHEL 7 : yum install MySQL - python
RHEL 8 : dnf install mariadb - connector - c - devel gcc & & python - m pip install mysqlclient
Tested on : Python 3.6 .8 / PHP 7.2 .11 / CentOS 8
2017-01-12 08:29:29 +00:00
License : This program is free software : you can redistribute it and / or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation , either version 3 of the License , or ( at your
option ) any later version .
This program is distributed in the hope that it will be useful , but
WITHOUT ANY WARRANTY ; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the GNU General
Public License for more details .
You should have received a copy of the GNU General Public License along
2021-02-09 00:29:04 +01:00
with this program . If not , see https : / / www . gnu . org / licenses / .
2017-01-12 08:29:29 +00:00
LICENSE . txt contains a copy of the full GPLv3 licensing conditions .
"""
2020-05-15 07:37:34 +02:00
import LibreNMS . library as LNMS
2017-01-12 08:29:29 +00:00
try :
import json
import os
2020-05-15 07:37:34 +02:00
import queue
2017-01-12 08:29:29 +00:00
import subprocess
import sys
import threading
import time
2018-06-20 16:56:51 -05:00
from optparse import OptionParser
2017-01-12 08:29:29 +00:00
2020-05-15 07:37:34 +02:00
except ImportError as exc :
print ( ' ERROR: missing one or more of the following python modules: ' )
print ( ' threading, queue, sys, subprocess, time, os, json ' )
print ( ' ERROR: %s ' % exc )
2017-01-12 08:29:29 +00:00
sys . exit ( 2 )
2020-05-15 07:37:34 +02:00
APP_NAME = " services_wrapper "
LOG_FILE = " logs/ " + APP_NAME + " .log "
_DEBUG = False
servicedisco = False
real_duration = 0
service_devices = 0
2017-01-12 08:29:29 +00:00
"""
2020-05-15 07:37:34 +02:00
Threading helper functions
2017-01-12 08:29:29 +00:00
"""
2020-05-15 07:37:34 +02:00
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC0
2017-01-12 08:29:29 +00:00
def memc_alive ( ) :
try :
global memc
key = str ( uuid . uuid4 ( ) )
2020-05-15 07:37:34 +02:00
memc . set ( ' poller.ping. ' + key , key , 60 )
if memc . get ( ' poller.ping. ' + key ) == key :
memc . delete ( ' poller.ping. ' + key )
2017-01-12 08:29:29 +00:00
return True
else :
return False
except :
return False
def memc_touch ( key , time ) :
try :
global memc
val = memc . get ( key )
memc . set ( key , val , time )
except :
pass
2020-05-15 07:37:34 +02:00
def get_time_tag ( step ) :
ts = int ( time . time ( ) )
return ts - ts % step
#EOC0
2017-01-12 08:29:29 +00:00
"""
A seperate queue and a single worker for printing information to the screen prevents
the good old joke :
Some people , when confronted with a problem , think ,
" I know, I ' ll use threads, " and then they two they hav erpoblesms .
"""
def printworker ( ) :
nodeso = 0
while True :
2020-05-15 07:37:34 +02:00
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC4
2017-01-12 08:29:29 +00:00
global IsNode
global servicedisco
if servicedisco :
if not IsNode :
memc_touch ( ' service.master ' , 10 )
nodes = memc . get ( ' service.nodes ' )
if nodes is None and not memc_alive ( ) :
2020-05-15 07:37:34 +02:00
print ( " WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly. " )
2017-01-12 08:29:29 +00:00
servicedisco = False
nodes = nodeso
if nodes is not nodeso :
2020-05-15 07:37:34 +02:00
print ( " INFO: %s Node(s) Total " % ( nodes ) )
2017-01-12 08:29:29 +00:00
nodeso = nodes
else :
memc_touch ( ' service.nodes ' , 10 )
try :
worker_id , device_id , elapsed_time = print_queue . get ( False )
except :
pass
try :
time . sleep ( 1 )
except :
pass
continue
else :
worker_id , device_id , elapsed_time = print_queue . get ( )
2020-05-15 07:37:34 +02:00
# EOC4
2017-01-12 08:29:29 +00:00
global real_duration
global per_device_duration
global service_devices
real_duration + = elapsed_time
per_device_duration [ device_id ] = elapsed_time
service_devices + = 1
if elapsed_time < 300 :
2020-05-15 07:37:34 +02:00
print ( " INFO: worker %s finished device %s in %s seconds " % ( worker_id , device_id , elapsed_time ) )
2017-01-12 08:29:29 +00:00
else :
2020-05-15 07:37:34 +02:00
print ( " WARNING: worker %s finished device %s in %s seconds " % ( worker_id , device_id , elapsed_time ) )
2017-01-12 08:29:29 +00:00
print_queue . task_done ( )
"""
This class will fork off single instances of the check - services . php process , record
how long it takes , and push the resulting reports to the printer queue
"""
def poll_worker ( ) :
while True :
device_id = poll_queue . get ( )
2020-05-15 07:37:34 +02:00
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC5
2017-01-12 08:29:29 +00:00
if not servicedisco or memc . get ( ' service.device. ' + str ( device_id ) ) is None :
if servicedisco :
result = memc . add ( ' service.device. ' + str ( device_id ) , config [ ' distributed_poller_name ' ] , 300 )
if not result :
2020-05-15 07:37:34 +02:00
print ( " This device ( %s ) appears to be being service checked by another service node " % ( device_id ) )
2017-01-12 08:29:29 +00:00
poll_queue . task_done ( )
continue
if not memc_alive ( ) and IsNode :
2020-05-15 07:37:34 +02:00
print ( " Lost Memcached, Not service checking Device %s as Node. Master will check it. " % device_id )
2017-01-12 08:29:29 +00:00
poll_queue . task_done ( )
continue
2020-05-15 07:37:34 +02:00
# EOC5
2017-01-12 08:29:29 +00:00
try :
start_time = time . time ( )
2018-06-20 16:56:51 -05:00
output = " -d >> %s /services_device_ %s .log " % ( log_dir , device_id ) if debug else " >> /dev/null "
2020-05-15 07:37:34 +02:00
# TODO replace with command_runner
2018-06-20 16:56:51 -05:00
command = " /usr/bin/env php %s -h %s %s 2>&1 " % ( service_path , device_id , output )
2017-01-12 08:29:29 +00:00
subprocess . check_call ( command , shell = True )
2018-06-20 16:56:51 -05:00
2017-01-12 08:29:29 +00:00
elapsed_time = int ( time . time ( ) - start_time )
print_queue . put ( [ threading . current_thread ( ) . name , device_id , elapsed_time ] )
except ( KeyboardInterrupt , SystemExit ) :
raise
except :
pass
poll_queue . task_done ( )
2020-05-15 07:37:34 +02:00
if __name__ == ' __main__ ' :
logger = LNMS . logger_get_logger ( LOG_FILE , debug = _DEBUG )
2017-01-12 08:29:29 +00:00
2020-05-15 07:37:34 +02:00
install_dir = os . path . dirname ( os . path . realpath ( __file__ ) )
2019-05-22 13:31:24 -05:00
LNMS . check_for_file ( install_dir + ' /.env ' )
2020-05-15 08:29:30 -05:00
config = json . loads ( LNMS . get_config_data ( install_dir ) )
2017-01-12 08:29:29 +00:00
2020-05-15 07:37:34 +02:00
service_path = config [ ' install_dir ' ] + ' /check-services.php '
log_dir = config [ ' log_dir ' ]
2017-01-12 08:29:29 +00:00
2020-05-15 07:37:34 +02:00
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC1
if ' distributed_poller_group ' in config :
service_group = str ( config [ ' distributed_poller_group ' ] )
2017-01-12 08:29:29 +00:00
else :
2020-05-15 07:37:34 +02:00
service_group = False
2017-01-12 08:29:29 +00:00
2020-05-15 07:37:34 +02:00
if ( ' distributed_poller ' in config and
' distributed_poller_memcached_host ' in config and
' distributed_poller_memcached_port ' in config and
config [ ' distributed_poller ' ] ) :
try :
import memcache
import uuid
memc = memcache . Client ( [ config [ ' distributed_poller_memcached_host ' ] + ' : ' +
str ( config [ ' distributed_poller_memcached_port ' ] ) ] )
if str ( memc . get ( " service.master " ) ) == config [ ' distributed_poller_name ' ] :
print ( " This system is already joined as the service master. " )
sys . exit ( 2 )
if memc_alive ( ) :
if memc . get ( " service.master " ) is None :
print ( " Registered as Master " )
memc . set ( " service.master " , config [ ' distributed_poller_name ' ] , 10 )
memc . set ( " service.nodes " , 0 , 300 )
IsNode = False
else :
print ( " Registered as Node joining Master %s " % memc . get ( " service.master " ) )
IsNode = True
memc . incr ( " service.nodes " )
servicedisco = True
else :
print ( " Could not connect to memcached, disabling distributed service checks. " )
servicedisco = False
IsNode = False
except SystemExit :
raise
except ImportError :
print ( " ERROR: missing memcache python module: " )
2020-06-04 20:36:25 -05:00
print ( " On deb systems: apt-get install python3-memcache " )
print ( " On other systems: pip3 install python-memcached " )
2020-05-15 07:37:34 +02:00
print ( " Disabling distributed discovery. " )
servicedisco = False
else :
servicedisco = False
# EOC1
s_time = time . time ( )
real_duration = 0
per_device_duration = { }
service_devices = 0
"""
Take the amount of threads we want to run in parallel from the commandline
if None are given or the argument was garbage , fall back to default of 16
"""
usage = " usage: % prog [options] <workers> (Default: 1 (Do not set too high) "
description = " Spawn multiple check-services.php processes in parallel. "
parser = OptionParser ( usage = usage , description = description )
parser . add_option ( ' -d ' , ' --debug ' , action = ' store_true ' , default = False ,
help = " Enable debug output. WARNING: Leaving this enabled will consume a lot of disk space. " )
( options , args ) = parser . parse_args ( )
debug = options . debug
try :
amount_of_workers = int ( args [ 0 ] )
except ( IndexError , ValueError ) :
amount_of_workers = 1
devices_list = [ ]
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC2
if service_group is not False :
query = " SELECT DISTINCT(`services`.`device_id`) FROM `services` LEFT JOIN `devices` ON `services`.`device_id` = `devices`.`device_id` WHERE `devices`.`poller_group` IN( " + service_group + " ) AND `devices`.`disabled` = 0 "
else :
query = " SELECT DISTINCT(`services`.`device_id`) FROM `services` LEFT JOIN `devices` ON `services`.`device_id` = `devices`.`device_id` WHERE `devices`.`disabled` = 0 "
# EOC2
2020-05-19 10:14:09 -05:00
db = LNMS . db_open ( config [ ' db_socket ' ] , config [ ' db_host ' ] , config [ ' db_port ' ] , config [ ' db_user ' ] , config [ ' db_pass ' ] , config [ ' db_name ' ] )
2020-05-15 07:37:34 +02:00
cursor = db . cursor ( )
cursor . execute ( query )
devices = cursor . fetchall ( )
for row in devices :
devices_list . append ( int ( row [ 0 ] ) )
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC3
if servicedisco and not IsNode :
query = " SELECT MAX(`device_id`), MIN(`device_id`) FROM `services` "
cursor . execute ( query )
devices = cursor . fetchall ( )
maxlocks = devices [ 0 ] [ 0 ] or 0
minlocks = devices [ 0 ] [ 1 ] or 0
# EOC3
db . close ( )
poll_queue = queue . Queue ( )
print_queue = queue . Queue ( )
print ( " INFO: starting the service check at %s with %s threads " % ( time . strftime ( " % Y- % m- %d % H: % M: % S " ) ,
amount_of_workers ) )
for device_id in devices_list :
poll_queue . put ( device_id )
for i in range ( amount_of_workers ) :
t = threading . Thread ( target = poll_worker )
t . setDaemon ( True )
t . start ( )
p = threading . Thread ( target = printworker )
p . setDaemon ( True )
p . start ( )
try :
poll_queue . join ( )
print_queue . join ( )
except ( KeyboardInterrupt , SystemExit ) :
raise
total_time = int ( time . time ( ) - s_time )
print ( " INFO: services-wrapper checked %s devices in %s seconds with %s workers " % ( service_devices , total_time , amount_of_workers ) )
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC6
if servicedisco or memc_alive ( ) :
master = memc . get ( " service.master " )
if master == config [ ' distributed_poller_name ' ] and not IsNode :
print ( " Wait for all service-nodes to finish " )
nodes = memc . get ( " service.nodes " )
while nodes is not None and nodes > 0 :
try :
time . sleep ( 1 )
nodes = memc . get ( " service.nodes " )
except :
pass
print ( " Clearing Locks " )
x = minlocks
while x < = maxlocks :
memc . delete ( ' service.device. ' + str ( x ) )
x = x + 1
print ( " %s Locks Cleared " % x )
print ( " Clearing Nodes " )
memc . delete ( " service.master " )
memc . delete ( " service.nodes " )
else :
memc . decr ( " service.nodes " )
print ( " Finished %s . " % time . time ( ) )
# EOC6
show_stopper = False
if total_time > 300 :
print ( " WARNING: the process took more than 5 minutes to finish, you need faster hardware or more threads " )
print ( " INFO: in sequential style service checks the elapsed time would have been: %s seconds " % real_duration )
for device in per_device_duration :
if per_device_duration [ device ] > 300 :
print ( " WARNING: device %s is taking too long: %s seconds " % ( device , per_device_duration [ device ] ) )
show_stopper = True
if show_stopper :
print ( " ERROR: Some devices are taking more than 300 seconds, the script cannot recommend you what to do. " )
else :
recommend = int ( total_time / 300.0 * amount_of_workers + 1 )
print (
" WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!) " % recommend )
sys . exit ( 2 )