Files
librenms-librenms/poller-service.py

289 lines
11 KiB
Python
Raw Normal View History

2015-07-06 10:52:32 -04:00
#! /usr/bin/env python
2015-07-06 10:47:18 -04:00
"""
poller-service A service to wrap SNMP polling. It will poll up to $threads devices at a time, and will not re-poll
2015-07-07 14:33:33 -04:00
devices that have been polled within the last $poll_frequency seconds. It will prioritize devices based
on the last time polled. If resources are sufficient, this service should poll every device every
2015-07-07 13:02:13 -04:00
$poll_frequency seconds, but should gracefully degrade if resources are inefficient, polling devices as
2015-07-06 10:47:18 -04:00
frequently as possible. This service is based on poller-wrapper.py.
Author: Clint Armstrong <clint@clintarmstrong.net>
Date: July 2015
2015-07-07 14:16:55 -04:00
License: BSD
2015-07-06 10:47:18 -04:00
"""
import json
import os
import subprocess
import sys
import threading
import time
import MySQLdb
2015-07-07 11:20:09 -04:00
import logging
import logging.handlers
2015-07-06 12:00:21 -04:00
from datetime import datetime, timedelta
2015-07-06 10:47:18 -04:00
2015-07-07 11:20:09 -04:00
log = logging.getLogger('poller-service')
log.setLevel(logging.DEBUG)
formatter = logging.Formatter('poller-service: %(message)s')
2015-07-07 14:33:33 -04:00
handler = logging.handlers.SysLogHandler(address='/dev/log')
2015-07-07 11:20:09 -04:00
handler.setFormatter(formatter)
log.addHandler(handler)
2015-07-06 10:47:18 -04:00
install_dir = os.path.dirname(os.path.realpath(__file__))
2015-07-06 10:52:59 -04:00
config_file = install_dir + '/config.php'
2015-07-06 10:47:18 -04:00
2015-07-07 11:40:29 -04:00
log.info('INFO: Starting poller-service')
2015-07-06 15:10:48 -04:00
2015-07-07 14:33:33 -04:00
2015-07-06 10:47:18 -04:00
def get_config_data():
2015-07-06 10:52:59 -04:00
config_cmd = ['/usr/bin/env', 'php', '%s/config_to_json.php' % install_dir]
2015-07-06 10:47:18 -04:00
try:
proc = subprocess.Popen(config_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
except:
2015-07-07 11:20:09 -04:00
log.critical("ERROR: Could not execute: %s" % config_cmd)
2015-07-06 10:47:18 -04:00
sys.exit(2)
return proc.communicate()[0]
try:
with open(config_file) as f:
pass
except IOError as e:
2015-07-07 11:20:09 -04:00
log.critical("ERROR: Oh dear... %s does not seem readable" % config_file)
2015-07-06 10:47:18 -04:00
sys.exit(2)
try:
config = json.loads(get_config_data())
except:
2015-07-07 11:20:09 -04:00
log.critical("ERROR: Could not load or parse configuration, are PATHs correct?")
2015-07-06 10:47:18 -04:00
sys.exit(2)
2015-07-07 11:29:04 -04:00
try:
2015-07-07 11:40:29 -04:00
loglevel = logging.getLevelName(config['poller_service_loglevel'].upper())
2015-07-07 11:29:04 -04:00
except KeyError:
2015-07-07 11:40:29 -04:00
loglevel = logging.getLevelName('INFO')
if not isinstance(loglevel, int):
log.warning('ERROR: {} is not a valid log level'.format(str(loglevel)))
loglevel = logging.getLevelName('INFO')
log.setLevel(loglevel)
2015-07-07 11:29:04 -04:00
2015-07-06 10:47:18 -04:00
poller_path = config['install_dir'] + '/poller.php'
2015-07-07 13:02:13 -04:00
discover_path = config['install_dir'] + '/discovery.php'
2015-07-06 10:47:18 -04:00
db_username = config['db_user']
db_password = config['db_pass']
if config['db_host'][:5].lower() == 'unix:':
2015-07-06 15:10:48 -04:00
db_server = config['db_host']
db_port = 0
2015-07-06 10:47:18 -04:00
elif ':' in config['db_host']:
2015-07-06 15:10:48 -04:00
db_server = config['db_host'].rsplit(':')[0]
db_port = int(config['db_host'].rsplit(':')[1])
2015-07-06 10:47:18 -04:00
else:
2015-07-06 15:10:48 -04:00
db_server = config['db_host']
db_port = 0
2015-07-06 10:47:18 -04:00
db_dbname = config['db_name']
2015-07-06 12:00:21 -04:00
2015-07-06 10:47:18 -04:00
try:
2015-07-07 11:40:29 -04:00
amount_of_workers = int(config['poller_service_workers'])
2015-07-06 10:47:18 -04:00
if amount_of_workers == 0:
2015-07-07 11:20:09 -04:00
log.critical("ERROR: 0 threads is not a valid value")
2015-07-06 10:47:18 -04:00
sys.exit(2)
2015-07-07 11:40:29 -04:00
except KeyError:
2015-07-06 10:47:18 -04:00
amount_of_workers = 16
try:
2015-07-07 13:02:13 -04:00
poll_frequency = int(config['poller_service_poll_frequency'])
if poll_frequency == 0:
2015-07-07 11:20:09 -04:00
log.critical("ERROR: 0 seconds is not a valid value")
2015-07-06 10:47:18 -04:00
sys.exit(2)
2015-07-07 11:40:29 -04:00
except KeyError:
2015-07-07 13:02:13 -04:00
poll_frequency = 300
try:
discover_frequency = int(config['poller_service_discover_frequency'])
2015-07-07 13:10:36 -04:00
if discover_frequency == 0:
2015-07-07 13:02:13 -04:00
log.critical("ERROR: 0 seconds is not a valid value")
sys.exit(2)
except KeyError:
2015-07-07 13:10:36 -04:00
discover_frequency = 21600
2015-07-06 10:47:18 -04:00
2015-07-06 14:51:03 -04:00
try:
2015-07-07 11:40:29 -04:00
down_retry = int(config['poller_service_down_retry'])
2015-07-06 14:51:03 -04:00
if down_retry == 0:
2015-07-07 11:20:09 -04:00
log.critical("ERROR: 0 seconds is not a valid value")
2015-07-06 14:51:03 -04:00
sys.exit(2)
2015-07-07 11:40:29 -04:00
except KeyError:
2015-07-07 13:33:16 -04:00
down_retry = 60
2015-07-06 14:51:03 -04:00
2015-07-06 10:47:18 -04:00
try:
if db_port == 0:
db = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname)
else:
db = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname)
2015-07-06 14:51:03 -04:00
db.autocommit(True)
2015-07-06 10:47:18 -04:00
cursor = db.cursor()
except:
2015-07-07 11:20:09 -04:00
log.critical("ERROR: Could not connect to MySQL database!")
2015-07-06 10:47:18 -04:00
sys.exit(2)
2015-07-06 15:10:48 -04:00
2015-07-07 13:02:13 -04:00
def poll_worker(device_id, action):
2015-07-06 12:00:21 -04:00
try:
start_time = time.time()
2015-07-07 13:02:13 -04:00
path = poller_path
2015-07-07 13:10:36 -04:00
if action == 'discovery':
path = discover_path
2015-07-07 13:02:13 -04:00
command = "/usr/bin/env php %s -h %s >> /dev/null 2>&1" % (path, device_id)
2015-07-06 12:00:21 -04:00
subprocess.check_call(command, shell=True)
elapsed_time = int(time.time() - start_time)
2015-07-07 11:20:09 -04:00
if elapsed_time < 300:
log.debug("DEBUG: worker finished device %s in %s seconds" % (device_id, elapsed_time))
else:
log.warning("WARNING: worker finished device %s in %s seconds" % (device_id, elapsed_time))
2015-07-06 12:00:21 -04:00
except (KeyboardInterrupt, SystemExit):
raise
2015-07-06 15:10:48 -04:00
except:
pass
2015-07-06 12:00:21 -04:00
2015-07-06 15:10:48 -04:00
2015-07-06 14:51:03 -04:00
def lockFree(lock):
global cursor
query = "SELECT IS_FREE_LOCK('{}')".format(lock)
cursor.execute(query)
return cursor.fetchall()[0][0] == 1
2015-07-06 12:00:21 -04:00
2015-07-06 15:10:48 -04:00
2015-07-06 14:51:03 -04:00
def getLock(lock):
global cursor
query = "SELECT GET_LOCK('{}', 0)".format(lock)
cursor.execute(query)
return cursor.fetchall()[0][0] == 1
2015-07-06 15:10:48 -04:00
2015-07-06 14:51:03 -04:00
def releaseLock(lock):
global cursor
query = "SELECT RELEASE_LOCK('{}')".format(lock)
cursor.execute(query)
return cursor.fetchall()[0][0] == 1
2015-07-07 14:33:33 -04:00
def sleep_until(timestamp):
now = datetime.now()
if timestamp > now:
sleeptime = (timestamp - now).seconds
else:
sleeptime = 0
time.sleep(sleeptime)
poller_group = ('and poller_group IN({}) '
2015-07-07 14:33:33 -04:00
.format(str(config['distributed_poller_group'])) if 'distributed_poller_group' in config else '')
# Add last_polled and last_polled_timetaken so we can sort by the time the last poll started, with the goal
# of having each device complete a poll within the given time range.
2015-07-07 14:33:33 -04:00
dev_query = ('SELECT device_id, '
'DATE_ADD( '
' DATE_SUB( '
' last_polled, '
' INTERVAL last_polled_timetaken SECOND '
' ), '
' INTERVAL {0} SECOND) AS next_poll, '
'DATE_ADD( '
' DATE_SUB( '
' last_discovered, '
' INTERVAL last_discovered_timetaken SECOND '
' ), '
' INTERVAL {1} SECOND) AS next_discovery '
'FROM devices WHERE '
'disabled = 0 '
'AND IS_FREE_LOCK(CONCAT("polling.", device_id)) '
'AND IS_FREE_LOCK(CONCAT("queued.", device_id)) '
'{2} '
'ORDER BY next_poll asc ').format(poll_frequency,
discover_frequency,
poller_group)
dont_retry = {}
threads = 0
2015-07-07 11:20:09 -04:00
next_update = datetime.now() + timedelta(minutes=5)
2015-07-07 11:29:04 -04:00
devices_scanned = 0
2015-07-06 14:51:03 -04:00
while True:
cur_threads = threading.active_count()
if cur_threads != threads:
threads = cur_threads
2015-07-07 11:20:09 -04:00
log.debug('DEBUG: {} threads currently active'.format(threads))
if next_update < datetime.now():
2015-07-10 16:59:16 -04:00
seconds_taken = (datetime.now() - (next_update - timedelta(minutes=5))).seconds
2015-07-10 17:19:51 -04:00
update_query = ("INSERT INTO pollers(poller_name, "
" last_polled, "
" devices, "
" time_taken) "
" values({0}, NOW(), {1}, {2}) "
"ON DUPLICATE KEY UPDATE "
" poller_name=values(poller_name), "
" last_polled=values(last_polled), "
" devices=values(devices) "
" time_taken=values(time_taken) ").format(config['distributed_poller_name'],
devices_scanned,
seconds_taken)
2015-07-10 16:59:16 -04:00
cursor.execute(update_query)
2015-07-07 11:20:09 -04:00
log.info('INFO: {} devices scanned in the last 5 minutes'.format(devices_scanned))
devices_scanned = 0
next_update = datetime.now() + timedelta(minutes=5)
dont_retry = dict((dev, time) for dev, time in dont_retry.iteritems() if time > datetime.now())
2015-07-06 14:51:03 -04:00
while threading.active_count() >= amount_of_workers:
time.sleep(.5)
cursor.execute(dev_query)
2015-07-06 14:51:03 -04:00
devices = cursor.fetchall()
2015-07-07 10:27:40 -04:00
retry_devs_found = {}
2015-07-07 13:02:13 -04:00
for device_id, next_poll, next_discovery in devices:
# add queue lock, so we lock the next device against any other pollers
# if this fails, the device is locked by another poller already
if not getLock('queued.{}'.format(device_id)):
continue
2015-07-06 14:51:03 -04:00
if not lockFree('polling.{}'.format(device_id)):
releaseLock('queued.{}'.format(device_id))
2015-07-06 14:51:03 -04:00
continue
if device_id in dont_retry:
2015-07-07 10:27:40 -04:00
retry_devs_found[device_id] = dont_retry[device_id]
releaseLock('queued.{}'.format(device_id))
2015-07-06 14:51:03 -04:00
continue
2015-07-06 10:47:18 -04:00
2015-07-07 13:10:36 -04:00
if next_poll and next_poll > datetime.now():
2015-07-07 10:27:40 -04:00
poll_action = 'polling'
sleep_until_time = next_poll
try:
next_retry_device = min(retry_devs_found, key=retry_devs_found.get)
next_retry_time = retry_devs_found[next_retry_device]
if next_retry_time < next_poll:
device_id = next_retry_device
if not getLock('queued.{}'.format(device_id)):
continue
poll_action = 'retrying failed device'
sleep_until_time = next_retry_time
except ValueError:
pass
2015-07-07 11:20:09 -04:00
log.debug('DEBUG: Sleeping until {0} before {1} {2}'.format(next_poll, poll_action, device_id))
2015-07-07 10:27:40 -04:00
sleep_until(sleep_until_time)
2015-07-07 13:02:13 -04:00
action = 'poll'
if not next_discovery or next_discovery < datetime.now():
action = 'discovery'
log.debug('DEBUG: Starting {} of device {}'.format(action, device_id))
2015-07-07 11:20:09 -04:00
devices_scanned += 1
dont_retry[device_id] = datetime.now() + timedelta(seconds=down_retry)
2015-07-07 13:02:13 -04:00
t = threading.Thread(target=poll_worker, args=[device_id, action])
2015-07-06 14:51:03 -04:00
t.start()
# If we made it this far, break out of the loop and query again.
break