mirror of
				https://github.com/librenms/librenms.git
				synced 2024-10-07 16:52:45 +00:00 
			
		
		
		
	fix: poller-wrapper.py keeps running when a poller takes too log (#7722)
name space the memcache variables for each polling interval based on the rrd.step value. Make poller-wrapper.py respect the rrd.step setting
This commit is contained in:
		
				
					committed by
					
						 Neil Lathwood
						Neil Lathwood
					
				
			
			
				
	
			
			
			
						parent
						
							f04d7f1e94
						
					
				
				
					commit
					a56267e5ba
				
			| @@ -130,28 +130,44 @@ def memc_touch(key, time): | |||||||
|     except: |     except: | ||||||
|         pass |         pass | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def get_time_tag(step): | ||||||
|  |     ts = int(time.time()) | ||||||
|  |     return ts - ts % step | ||||||
|  |  | ||||||
|  |  | ||||||
|  | if 'rrd' in config and 'step' in config['rrd']: | ||||||
|  |     step = config['rrd']['step'] | ||||||
|  | else: | ||||||
|  |     step = 300 | ||||||
|  |  | ||||||
| if ('distributed_poller' in config and | if ('distributed_poller' in config and | ||||||
|     'distributed_poller_memcached_host' in config and |     'distributed_poller_memcached_host' in config and | ||||||
|     'distributed_poller_memcached_port' in config and |     'distributed_poller_memcached_port' in config and | ||||||
|         config['distributed_poller']): |         config['distributed_poller']): | ||||||
|  |  | ||||||
|  |     time_tag = str(get_time_tag(step)) | ||||||
|  |     master_tag = "poller.master." + time_tag | ||||||
|  |     nodes_tag = "poller.nodes." + time_tag | ||||||
|  |  | ||||||
|     try: |     try: | ||||||
|         import memcache |         import memcache | ||||||
|         import uuid |         import uuid | ||||||
|         memc = memcache.Client([config['distributed_poller_memcached_host'] + ':' + |         memc = memcache.Client([config['distributed_poller_memcached_host'] + ':' + | ||||||
|             str(config['distributed_poller_memcached_port'])]) |             str(config['distributed_poller_memcached_port'])]) | ||||||
|         if str(memc.get("poller.master")) == config['distributed_poller_name']: |         if str(memc.get(master_tag)) == config['distributed_poller_name']: | ||||||
|             print "This system is already joined as the poller master." |             print "This system is already joined as the poller master." | ||||||
|             sys.exit(2) |             sys.exit(2) | ||||||
|         if memc_alive(): |         if memc_alive(): | ||||||
|             if memc.get("poller.master") is None: |             if memc.get(master_tag) is None: | ||||||
|                 print "Registered as Master" |                 print "Registered as Master" | ||||||
|                 memc.set("poller.master", config['distributed_poller_name'], 10) |                 memc.set(master_tag, config['distributed_poller_name'], 10) | ||||||
|                 memc.set("poller.nodes", 0, 300) |                 memc.set(nodes_tag, 0, step) | ||||||
|                 IsNode = False |                 IsNode = False | ||||||
|             else: |             else: | ||||||
|                 print "Registered as Node joining Master %s" % memc.get("poller.master") |                 print "Registered as Node joining Master %s" % memc.get(master_tag) | ||||||
|                 IsNode = True |                 IsNode = True | ||||||
|                 memc.incr("poller.nodes") |                 memc.incr(nodes_tag) | ||||||
|             distpoll = True |             distpoll = True | ||||||
|         else: |         else: | ||||||
|             print "Could not connect to memcached, disabling distributed poller." |             print "Could not connect to memcached, disabling distributed poller." | ||||||
| @@ -235,8 +251,8 @@ def printworker(): | |||||||
|         global distpoll |         global distpoll | ||||||
|         if distpoll: |         if distpoll: | ||||||
|             if not IsNode: |             if not IsNode: | ||||||
|                 memc_touch('poller.master', 10) |                 memc_touch(master_tag, 10) | ||||||
|                 nodes = memc.get('poller.nodes') |                 nodes = memc.get(nodes_tag) | ||||||
|                 if nodes is None and not memc_alive(): |                 if nodes is None and not memc_alive(): | ||||||
|                     print "WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly." |                     print "WARNING: Lost Memcached. Taking over all devices. Nodes will quit shortly." | ||||||
|                     distpoll = False |                     distpoll = False | ||||||
| @@ -245,7 +261,7 @@ def printworker(): | |||||||
|                     print "INFO: %s Node(s) Total" % (nodes) |                     print "INFO: %s Node(s) Total" % (nodes) | ||||||
|                     nodeso = nodes |                     nodeso = nodes | ||||||
|             else: |             else: | ||||||
|                 memc_touch('poller.nodes', 10) |                 memc_touch(nodes_tag, 10) | ||||||
|             try: |             try: | ||||||
|                 worker_id, device_id, elapsed_time = print_queue.get(False) |                 worker_id, device_id, elapsed_time = print_queue.get(False) | ||||||
|             except: |             except: | ||||||
| @@ -264,7 +280,7 @@ def printworker(): | |||||||
|         real_duration += elapsed_time |         real_duration += elapsed_time | ||||||
|         per_device_duration[device_id] = elapsed_time |         per_device_duration[device_id] = elapsed_time | ||||||
|         polled_devices += 1 |         polled_devices += 1 | ||||||
|         if elapsed_time < 300: |         if elapsed_time < step: | ||||||
|             print "INFO: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time) |             print "INFO: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time) | ||||||
|         else: |         else: | ||||||
|             print "WARNING: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time) |             print "WARNING: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time) | ||||||
| @@ -280,9 +296,9 @@ def poll_worker(): | |||||||
|     while True: |     while True: | ||||||
|         device_id = poll_queue.get() |         device_id = poll_queue.get() | ||||||
| # (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC5 | # (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC5 | ||||||
|         if not distpoll or memc.get('poller.device.' + str(device_id)) is None: |         if not distpoll or memc.get('poller.device.%s.%s'% (device_id, time_tag)) is None: | ||||||
|             if distpoll: |             if distpoll: | ||||||
|                 result = memc.add('poller.device.' + str(device_id), config['distributed_poller_name'], 300) |                 result = memc.add('poller.device.%s.%s'% (device_id, time_tag), config['distributed_poller_name'], step) | ||||||
|                 if not result: |                 if not result: | ||||||
|                     print "This device (%s) appears to be being polled by another poller" % (device_id) |                     print "This device (%s) appears to be being polled by another poller" % (device_id) | ||||||
|                     poll_queue.task_done() |                     poll_queue.task_done() | ||||||
| @@ -334,28 +350,28 @@ print "INFO: poller-wrapper polled %s devices in %s seconds with %s workers" % ( | |||||||
|  |  | ||||||
| # (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC6 | # (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC6 | ||||||
| if distpoll or memc_alive(): | if distpoll or memc_alive(): | ||||||
|     master = memc.get("poller.master") |     master = memc.get(master_tag) | ||||||
|     if master == config['distributed_poller_name'] and not IsNode: |     if master == config['distributed_poller_name'] and not IsNode: | ||||||
|         print "Wait for all poller-nodes to finish" |         print "Wait for all poller-nodes to finish" | ||||||
|         nodes = memc.get("poller.nodes") |         nodes = memc.get(nodes_tag) | ||||||
|         while nodes > 0 and nodes is not None: |         while nodes > 0 and nodes is not None: | ||||||
|             try: |             try: | ||||||
|                 time.sleep(1) |                 time.sleep(1) | ||||||
|                 nodes = memc.get("poller.nodes") |                 nodes = memc.get(nodes_tag) | ||||||
|             except: |             except: | ||||||
|                 pass |                 pass | ||||||
|         print "Clearing Locks" |         print "Clearing Locks for %s" % time_tag | ||||||
|         x = minlocks |         x = minlocks | ||||||
|         while x <= maxlocks: |         while x <= maxlocks: | ||||||
|             memc.delete('poller.device.' + str(x)) |             res = memc.delete('poller.device.%s.%s' % (x, time_tag)) | ||||||
|             x = x + 1 |             x += 1 | ||||||
|         print "%s Locks Cleared" % x |         print "%s Locks Cleared" % x | ||||||
|         print "Clearing Nodes" |         print "Clearing Nodes" | ||||||
|         memc.delete("poller.master") |         memc.delete(master_tag) | ||||||
|         memc.delete("poller.nodes") |         memc.delete(nodes_tag) | ||||||
|     else: |     else: | ||||||
|         memc.decr("poller.nodes") |         memc.decr(nodes_tag) | ||||||
|     print "Finished %s." % time.time() |     print "Finished %.3fs after interval start." % (time.time() - int(time_tag)) | ||||||
| # EOC6 | # EOC6 | ||||||
|  |  | ||||||
| show_stopper = False | show_stopper = False | ||||||
| @@ -375,17 +391,17 @@ else: | |||||||
| db.close() | db.close() | ||||||
|  |  | ||||||
|  |  | ||||||
| if total_time > 300: | if total_time > step: | ||||||
|     print "WARNING: the process took more than 5 minutes to finish, you need faster hardware or more threads" |     print "WARNING: the process took more than %s seconds to finish, you need faster hardware or more threads" % step | ||||||
|     print "INFO: in sequential style polling the elapsed time would have been: %s seconds" % real_duration |     print "INFO: in sequential style polling the elapsed time would have been: %s seconds" % real_duration | ||||||
|     for device in per_device_duration: |     for device in per_device_duration: | ||||||
|         if per_device_duration[device] > 300: |         if per_device_duration[device] > step: | ||||||
|             print "WARNING: device %s is taking too long: %s seconds" % (device, per_device_duration[device]) |             print "WARNING: device %s is taking too long: %s seconds" % (device, per_device_duration[device]) | ||||||
|             show_stopper = True |             show_stopper = True | ||||||
|     if show_stopper: |     if show_stopper: | ||||||
|         print "ERROR: Some devices are taking more than 300 seconds, the script cannot recommend you what to do." |         print "ERROR: Some devices are taking more than %s seconds, the script cannot recommend you what to do." % step | ||||||
|     else: |     else: | ||||||
|         recommend = int(total_time / 300.0 * amount_of_workers + 1) |         recommend = int(total_time / step * amount_of_workers + 1) | ||||||
|         print "WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" % recommend |         print "WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" % recommend | ||||||
|  |  | ||||||
|     sys.exit(2) |     sys.exit(2) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user