From a56267e5ba469232e2af9d0e61036ee6c2a08d38 Mon Sep 17 00:00:00 2001 From: Tony Murray Date: Tue, 21 Nov 2017 03:22:53 -0600 Subject: [PATCH] fix: poller-wrapper.py keeps running when a poller takes too log (#7722) name space the memcache variables for each polling interval based on the rrd.step value. Make poller-wrapper.py respect the rrd.step setting --- poller-wrapper.py | 70 +++++++++++++++++++++++++++++------------------ 1 file changed, 43 insertions(+), 27 deletions(-) diff --git a/poller-wrapper.py b/poller-wrapper.py index ad8d27b540..513819c78f 100755 --- a/poller-wrapper.py +++ b/poller-wrapper.py @@ -130,28 +130,44 @@ def memc_touch(key, time): except: pass + +def get_time_tag(step): + ts = int(time.time()) + return ts - ts % step + + +if 'rrd' in config and 'step' in config['rrd']: + step = config['rrd']['step'] +else: + step = 300 + if ('distributed_poller' in config and 'distributed_poller_memcached_host' in config and 'distributed_poller_memcached_port' in config and config['distributed_poller']): + + time_tag = str(get_time_tag(step)) + master_tag = "poller.master." + time_tag + nodes_tag = "poller.nodes." + time_tag + try: import memcache import uuid memc = memcache.Client([config['distributed_poller_memcached_host'] + ':' + str(config['distributed_poller_memcached_port'])]) - if str(memc.get("poller.master")) == config['distributed_poller_name']: + if str(memc.get(master_tag)) == config['distributed_poller_name']: print "This system is already joined as the poller master." sys.exit(2) if memc_alive(): - if memc.get("poller.master") is None: + if memc.get(master_tag) is None: print "Registered as Master" - memc.set("poller.master", config['distributed_poller_name'], 10) - memc.set("poller.nodes", 0, 300) + memc.set(master_tag, config['distributed_poller_name'], 10) + memc.set(nodes_tag, 0, step) IsNode = False else: - print "Registered as Node joining Master %s" % memc.get("poller.master") + print "Registered as Node joining Master %s" % memc.get(master_tag) IsNode = True - memc.incr("poller.nodes") + memc.incr(nodes_tag) distpoll = True else: print "Could not connect to memcached, disabling distributed poller." @@ -235,8 +251,8 @@ def printworker(): global distpoll if distpoll: if not IsNode: - memc_touch('poller.master', 10) - nodes = memc.get('poller.nodes') + memc_touch(master_tag, 10) + nodes = memc.get(nodes_tag) if nodes is None and not memc_alive(): print "WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly." distpoll = False @@ -245,7 +261,7 @@ def printworker(): print "INFO: %s Node(s) Total" % (nodes) nodeso = nodes else: - memc_touch('poller.nodes', 10) + memc_touch(nodes_tag, 10) try: worker_id, device_id, elapsed_time = print_queue.get(False) except: @@ -264,7 +280,7 @@ def printworker(): real_duration += elapsed_time per_device_duration[device_id] = elapsed_time polled_devices += 1 - if elapsed_time < 300: + if elapsed_time < step: print "INFO: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time) else: print "WARNING: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time) @@ -280,9 +296,9 @@ def poll_worker(): while True: device_id = poll_queue.get() # (c) 2015, GPLv3, Daniel Preussker << << 0 and nodes is not None: try: time.sleep(1) - nodes = memc.get("poller.nodes") + nodes = memc.get(nodes_tag) except: pass - print "Clearing Locks" + print "Clearing Locks for %s" % time_tag x = minlocks while x <= maxlocks: - memc.delete('poller.device.' + str(x)) - x = x + 1 + res = memc.delete('poller.device.%s.%s' % (x, time_tag)) + x += 1 print "%s Locks Cleared" % x print "Clearing Nodes" - memc.delete("poller.master") - memc.delete("poller.nodes") + memc.delete(master_tag) + memc.delete(nodes_tag) else: - memc.decr("poller.nodes") - print "Finished %s." % time.time() + memc.decr(nodes_tag) + print "Finished %.3fs after interval start." % (time.time() - int(time_tag)) # EOC6 show_stopper = False @@ -375,17 +391,17 @@ else: db.close() -if total_time > 300: - print "WARNING: the process took more than 5 minutes to finish, you need faster hardware or more threads" +if total_time > step: + print "WARNING: the process took more than %s seconds to finish, you need faster hardware or more threads" % step print "INFO: in sequential style polling the elapsed time would have been: %s seconds" % real_duration for device in per_device_duration: - if per_device_duration[device] > 300: + if per_device_duration[device] > step: print "WARNING: device %s is taking too long: %s seconds" % (device, per_device_duration[device]) show_stopper = True if show_stopper: - print "ERROR: Some devices are taking more than 300 seconds, the script cannot recommend you what to do." + print "ERROR: Some devices are taking more than %s seconds, the script cannot recommend you what to do." % step else: - recommend = int(total_time / 300.0 * amount_of_workers + 1) + recommend = int(total_time / step * amount_of_workers + 1) print "WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" % recommend sys.exit(2)