Adds Distributed Polling via Memcached.

This commit is contained in:
f0o
2015-03-15 16:29:59 +00:00
parent fe06a0a443
commit ffd5a3d928
5 changed files with 199 additions and 15 deletions

View File

@@ -74,9 +74,66 @@ db_password = config['db_pass']
db_server = config['db_host']
db_dbname = config['db_name']
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC1
if 'distributed_poller_group' in config:
poller_group = str(config['distributed_poller_group'])
else:
poller_group = False
def memc_alive():
try:
global memc
key = str(uuid.uuid4())
memc.set('poller.ping.'+key,key,60)
if memc.get('poller.ping.'+key) == key:
memc.delete('poller.ping.'+key)
return True
else:
return False
except:
return False
def memc_touch(key,time):
try:
global memc
val = memc.get(key)
memc.set(key,val,time)
except:
pass
if 'distributed_poller' in config and 'distributed_poller_memcached_host' in config and 'distributed_poller_memcached_port' in config and config['distributed_poller'] == True:
try:
import memcache, uuid
memc = memcache.Client([config['distributed_poller_memcached_host']+':'+str(config['distributed_poller_memcached_port'])])
if memc_alive() == True:
if memc.get("poller.master") == None:
print "Registered as Master"
memc.set("poller.master",config['distributed_poller_name'],10)
memc.set("poller.nodes",0,300)
IsNode = False
else:
print "Registered as Node joining Master %s" % memc.get("poller.master")
IsNode = True
memc.incr("poller.nodes")
distpoll = True
else:
print "Could not connect to memcached, disabling distributed poller."
distpoll = False
IsNode = False
except:
print "ERROR: missing memcache python module:"
print "On deb systems: apt-get install python-memcache"
print "On other systems: easy_install python-memcached"
print "Disabling distributed poller."
distpoll = False
else:
distpoll = False
# EOC1
s_time = time.time()
real_duration = 0
per_device_duration = {}
polled_devices = 0
"""
Take the amount of threads we want to run in parallel from the commandline
@@ -105,13 +162,25 @@ except:
thus greatening our chances of completing _all_ the work in exactly the time it takes to
poll the slowest device! cool stuff he
"""
query = "select device_id from devices where disabled = 0 order by last_polled_timetaken desc"
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC2
if poller_group is not False:
query = "select device_id from devices where (poller_group = " + poller_group + " or poller_group = 0) and disabled = 0 order by last_polled_timetaken desc"
else:
query = "select device_id from devices where disabled = 0 order by last_polled_timetaken desc"
# EOC2
cursor.execute(query)
devices = cursor.fetchall()
for row in devices:
devices_list.append(int(row[0]))
db.close()
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC3
if distpoll is True and IsNode is False:
query = "select max(device_id),min(device_id) from devices"
cursor.execute(query)
devices = cursor.fetchall()
maxlocks = devices[0][0]
minlocks = devices[0][1]
# EOC3
"""
A seperate queue and a single worker for printing information to the screen prevents
@@ -122,12 +191,42 @@ db.close()
"""
def printworker():
nodeso = 0
while True:
worker_id, device_id, elapsed_time = print_queue.get()
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC4
global IsNode
global distpoll
if distpoll is True:
if IsNode is False:
memc_touch('poller.master',10)
nodes = memc.get('poller.nodes')
if nodes is None and memc_alive() == False:
print "WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly."
distpoll = False
nodes = nodeso
if nodes is not nodeso:
print "INFO: %s Node(s) Total" % (nodes)
nodeso = nodes
else:
memc_touch('poller.nodes',10)
try:
worker_id, device_id, elapsed_time = print_queue.get(False)
except:
pass
try:
time.sleep(1)
except:
pass
continue
else:
worker_id, device_id, elapsed_time = print_queue.get()
# EOC4
global real_duration
global per_device_duration
global polled_devices
real_duration += elapsed_time
per_device_duration[device_id] = elapsed_time
polled_devices += 1
if elapsed_time < 300:
print "INFO: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time)
else:
@@ -142,16 +241,29 @@ def printworker():
def poll_worker():
while True:
device_id = poll_queue.get()
try:
start_time = time.time()
command = "/usr/bin/env php %s -h %s >> /dev/null 2>&1" % (poller_path, device_id)
subprocess.check_call(command, shell=True)
elapsed_time = int(time.time() - start_time)
print_queue.put([threading.current_thread().name, device_id, elapsed_time])
except (KeyboardInterrupt, SystemExit):
raise
except:
pass
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC5
if distpoll == False or memc.get('poller.device.'+str(device_id)) == None:
if distpoll == True:
result = memc.add('poller.device.'+str(device_id),config['distributed_poller_name'],300)
if result == False:
print "This device (%s) appears to be being polled by another poller" % (device_id)
poll_queue.task_done()
continue
if memc_alive() == False and IsNode is True:
print "Lost Memcached, Not polling Device %s as Node. Master will poll it." % device_id
poll_queue.task_done()
continue
# EOC5
try:
start_time = time.time()
command = "/usr/bin/env php %s -h %s >> /dev/null 2>&1" % (poller_path, device_id)
subprocess.check_call(command, shell=True)
elapsed_time = int(time.time() - start_time)
print_queue.put([threading.current_thread().name, device_id, elapsed_time])
except (KeyboardInterrupt, SystemExit):
raise
except:
pass
poll_queue.task_done()
poll_queue = Queue.Queue()
@@ -179,10 +291,47 @@ except (KeyboardInterrupt, SystemExit):
total_time = int(time.time() - s_time)
print "INFO: poller-wrapper polled %s devices in %s seconds with %s workers" % (len(devices_list), total_time, amount_of_workers)
print "INFO: poller-wrapper polled %s devices in %s seconds with %s workers" % (polled_devices, total_time, amount_of_workers)
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC6
if distpoll == True or memc_alive() is True:
master = memc.get("poller.master")
if master == config['distributed_poller_name'] and IsNode == False:
print "Wait for all poller-nodes to finish"
nodes = memc.get("poller.nodes")
while nodes > 0 and nodes is not None:
try:
time.sleep(1)
nodes = memc.get("poller.nodes")
except:
pass
print "Clearing Locks"
x = minlocks
while x <= maxlocks:
memc.delete('poller.device.'+str(x))
x = x+1
print "%s Locks Cleared" % x
print "Clearing Nodes"
memc.delete("poller.master")
memc.delete("poller.nodes")
else:
memc.decr("poller.nodes")
print "Finished %s." % time.time()
# EOC6
show_stopper = False
query = "update pollers set last_polled=NOW(), devices='%d', time_taken='%d' where poller_name='%s'" % (polled_devices, total_time, config['distributed_poller_name'])
response = cursor.execute(query)
if response == 1:
db.commit()
else:
query = "insert into pollers set poller_name='%s', last_polled=NOW(), devices='%d', time_taken='%d'" % (config['distributed_poller_name'],polled_devices, total_time)
cursor.execute(query)
db.commit()
db.close()
if total_time > 300:
recommend = int(total_time / 300.0 * amount_of_workers + 1)
print "WARNING: the process took more than 5 minutes to finish, you need faster hardware or more threads"
@@ -195,4 +344,5 @@ if total_time > 300:
print "ERROR: Some devices are taking more than 300 seconds, the script cannot recommend you what to do."
if show_stopper == False:
print "WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" % recommend
sys.exit(2)