| 
									
										
										
										
											2016-04-11 15:37:09 +02:00
										 |  |  | #! /usr/bin/env python2 | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  | """
 | 
					
						
							|  |  |  |  poller-service A service to wrap SNMP polling.  It will poll up to $threads devices at a time, and will not re-poll | 
					
						
							| 
									
										
										
										
											2015-07-07 14:33:33 -04:00
										 |  |  |                 devices that have been polled within the last $poll_frequency seconds. It will prioritize devices based | 
					
						
							|  |  |  |                 on the last time polled. If resources are sufficient, this service should poll every device every | 
					
						
							| 
									
										
										
										
											2015-07-07 13:02:13 -04:00
										 |  |  |                 $poll_frequency seconds, but should gracefully degrade if resources are inefficient, polling devices as | 
					
						
							| 
									
										
										
										
											2015-07-16 12:11:45 -04:00
										 |  |  |                 frequently as possible. This service is based on Job Snijders' poller-wrapper.py. | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |  Author:        Clint Armstrong <clint@clintarmstrong.net> | 
					
						
							|  |  |  |  Date:          July 2015 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-16 12:11:45 -04:00
										 |  |  |  License:       BSD 2-Clause | 
					
						
							| 
									
										
										
										
											2015-08-21 07:44:05 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  | Copyright (c) 2015, Clint Armstrong | 
					
						
							|  |  |  | All rights reserved. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  | """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import json | 
					
						
							|  |  |  | import os | 
					
						
							|  |  |  | import subprocess | 
					
						
							|  |  |  | import sys | 
					
						
							|  |  |  | import threading | 
					
						
							|  |  |  | import time | 
					
						
							|  |  |  | import MySQLdb | 
					
						
							| 
									
										
										
										
											2015-07-07 11:20:09 -04:00
										 |  |  | import logging | 
					
						
							|  |  |  | import logging.handlers | 
					
						
							| 
									
										
										
										
											2015-07-06 12:00:21 -04:00
										 |  |  | from datetime import datetime, timedelta | 
					
						
							| 
									
										
										
										
											2015-09-02 11:54:27 -04:00
										 |  |  | from collections import namedtuple | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-07 11:20:09 -04:00
										 |  |  | log = logging.getLogger('poller-service') | 
					
						
							|  |  |  | log.setLevel(logging.DEBUG) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | formatter = logging.Formatter('poller-service: %(message)s') | 
					
						
							| 
									
										
										
										
											2015-07-07 14:33:33 -04:00
										 |  |  | handler = logging.handlers.SysLogHandler(address='/dev/log') | 
					
						
							| 
									
										
										
										
											2015-07-07 11:20:09 -04:00
										 |  |  | handler.setFormatter(formatter) | 
					
						
							|  |  |  | log.addHandler(handler) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  | install_dir = os.path.dirname(os.path.realpath(__file__)) | 
					
						
							| 
									
										
										
										
											2015-07-06 10:52:59 -04:00
										 |  |  | config_file = install_dir + '/config.php' | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-07 11:40:29 -04:00
										 |  |  | log.info('INFO: Starting poller-service') | 
					
						
							| 
									
										
										
										
											2015-07-06 15:10:48 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-07 14:33:33 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-08 13:51:10 -04:00
										 |  |  | class DB: | 
					
						
							|  |  |  |     conn = None | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-14 09:34:13 -04:00
										 |  |  |     def __init__(self): | 
					
						
							| 
									
										
										
										
											2015-09-14 13:38:46 -04:00
										 |  |  |         self.in_use = threading.Lock() | 
					
						
							| 
									
										
										
										
											2015-09-14 09:43:34 -04:00
										 |  |  |         self.connect() | 
					
						
							| 
									
										
										
										
											2015-09-14 09:34:13 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-08 13:51:10 -04:00
										 |  |  |     def connect(self): | 
					
						
							| 
									
										
										
										
											2015-09-14 13:38:46 -04:00
										 |  |  |         self.in_use.acquire(True) | 
					
						
							| 
									
										
										
										
											2015-09-09 08:37:00 -04:00
										 |  |  |         while True: | 
					
						
							| 
									
										
										
										
											2015-09-14 09:00:55 -04:00
										 |  |  |             try: | 
					
						
							|  |  |  |                 self.conn.close() | 
					
						
							|  |  |  |             except: | 
					
						
							|  |  |  |                 pass | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-09 08:37:00 -04:00
										 |  |  |             try: | 
					
						
							|  |  |  |                 if db_port == 0: | 
					
						
							|  |  |  |                     self.conn = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname) | 
					
						
							|  |  |  |                 else: | 
					
						
							|  |  |  |                     self.conn = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname) | 
					
						
							|  |  |  |                 break | 
					
						
							|  |  |  |             except (AttributeError, MySQLdb.OperationalError): | 
					
						
							| 
									
										
										
										
											2015-09-09 09:20:03 -04:00
										 |  |  |                 log.warning('WARNING: MySQL Error, reconnecting.') | 
					
						
							| 
									
										
										
										
											2015-09-09 08:37:00 -04:00
										 |  |  |                 time.sleep(.5) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-08 13:51:10 -04:00
										 |  |  |         self.conn.autocommit(True) | 
					
						
							| 
									
										
										
										
											2015-09-09 08:25:00 -04:00
										 |  |  |         self.conn.ping(True) | 
					
						
							| 
									
										
										
										
											2015-09-14 13:38:46 -04:00
										 |  |  |         self.in_use.release() | 
					
						
							| 
									
										
										
										
											2015-09-08 13:51:10 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def query(self, sql): | 
					
						
							| 
									
										
										
										
											2015-09-14 13:38:46 -04:00
										 |  |  |         self.in_use.acquire(True) | 
					
						
							| 
									
										
										
										
											2015-09-09 09:20:03 -04:00
										 |  |  |         while True: | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 cursor = self.conn.cursor() | 
					
						
							|  |  |  |                 cursor.execute(sql) | 
					
						
							|  |  |  |                 ret = cursor.fetchall() | 
					
						
							|  |  |  |                 cursor.close() | 
					
						
							| 
									
										
										
										
											2015-09-14 13:38:46 -04:00
										 |  |  |                 self.in_use.release() | 
					
						
							| 
									
										
										
										
											2015-09-09 09:20:03 -04:00
										 |  |  |                 return ret | 
					
						
							|  |  |  |             except (AttributeError, MySQLdb.OperationalError): | 
					
						
							|  |  |  |                 log.warning('WARNING: MySQL Operational Error during query, reconnecting.') | 
					
						
							| 
									
										
										
										
											2015-09-14 13:38:46 -04:00
										 |  |  |                 self.in_use.release() | 
					
						
							| 
									
										
										
										
											2015-09-09 09:20:03 -04:00
										 |  |  |                 self.connect() | 
					
						
							|  |  |  |             except (AttributeError, MySQLdb.ProgrammingError): | 
					
						
							|  |  |  |                 log.warning('WARNING: MySQL Programming Error during query, attempting query again.') | 
					
						
							| 
									
										
										
										
											2015-09-14 09:41:03 -04:00
										 |  |  |                 cursor.close() | 
					
						
							| 
									
										
										
										
											2015-09-08 13:51:10 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  | def get_config_data(): | 
					
						
							| 
									
										
										
										
											2015-07-06 10:52:59 -04:00
										 |  |  |     config_cmd = ['/usr/bin/env', 'php', '%s/config_to_json.php' % install_dir] | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  |     try: | 
					
						
							|  |  |  |         proc = subprocess.Popen(config_cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE) | 
					
						
							|  |  |  |     except: | 
					
						
							| 
									
										
										
										
											2015-07-07 11:20:09 -04:00
										 |  |  |         log.critical("ERROR: Could not execute: %s" % config_cmd) | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  |         sys.exit(2) | 
					
						
							| 
									
										
										
										
											2015-07-18 21:09:49 -04:00
										 |  |  |     return proc.communicate()[0].decode() | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  | try: | 
					
						
							|  |  |  |     with open(config_file) as f: | 
					
						
							|  |  |  |         pass | 
					
						
							|  |  |  | except IOError as e: | 
					
						
							| 
									
										
										
										
											2015-07-07 11:20:09 -04:00
										 |  |  |     log.critical("ERROR: Oh dear... %s does not seem readable" % config_file) | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  |     sys.exit(2) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | try: | 
					
						
							|  |  |  |     config = json.loads(get_config_data()) | 
					
						
							|  |  |  | except: | 
					
						
							| 
									
										
										
										
											2015-07-07 11:20:09 -04:00
										 |  |  |     log.critical("ERROR: Could not load or parse configuration, are PATHs correct?") | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  |     sys.exit(2) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-07 11:29:04 -04:00
										 |  |  | try: | 
					
						
							| 
									
										
										
										
											2015-07-18 21:38:27 -04:00
										 |  |  |     loglevel = int(config['poller_service_loglevel']) | 
					
						
							| 
									
										
										
										
											2015-07-07 11:29:04 -04:00
										 |  |  | except KeyError: | 
					
						
							| 
									
										
										
										
											2015-07-18 21:31:01 -04:00
										 |  |  |     loglevel = 20 | 
					
						
							| 
									
										
										
										
											2015-07-18 21:38:27 -04:00
										 |  |  | except ValueError: | 
					
						
							|  |  |  |     loglevel = logging.getLevelName(config['poller_service_loglevel']) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | try: | 
					
						
							|  |  |  |     log.setLevel(loglevel) | 
					
						
							|  |  |  | except ValueError: | 
					
						
							| 
									
										
										
										
											2015-07-18 21:31:01 -04:00
										 |  |  |     log.warning('ERROR: {0} is not a valid log level. If using python 3.4.0-3.4.1 you must specify loglevel by number'.format(str(loglevel))) | 
					
						
							| 
									
										
										
										
											2015-07-18 21:38:27 -04:00
										 |  |  |     log.setLevel(20) | 
					
						
							| 
									
										
										
										
											2015-07-07 11:29:04 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  | poller_path = config['install_dir'] + '/poller.php' | 
					
						
							| 
									
										
										
										
											2015-07-07 13:02:13 -04:00
										 |  |  | discover_path = config['install_dir'] + '/discovery.php' | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  | db_username = config['db_user'] | 
					
						
							|  |  |  | db_password = config['db_pass'] | 
					
						
							| 
									
										
										
										
											2017-01-27 23:16:04 +00:00
										 |  |  | db_port = int(config['db_port']) | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  | if config['db_host'][:5].lower() == 'unix:': | 
					
						
							| 
									
										
										
										
											2015-07-06 15:10:48 -04:00
										 |  |  |     db_server = config['db_host'] | 
					
						
							|  |  |  |     db_port = 0 | 
					
						
							| 
									
										
										
										
											2017-04-06 16:02:37 -05:00
										 |  |  | elif config['db_socket']: | 
					
						
							|  |  |  |     db_server = config['db_socket'] | 
					
						
							|  |  |  |     db_port = 0 | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  | else: | 
					
						
							| 
									
										
										
										
											2015-07-06 15:10:48 -04:00
										 |  |  |     db_server = config['db_host'] | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  | db_dbname = config['db_name'] | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-06 12:00:21 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  | try: | 
					
						
							| 
									
										
										
										
											2015-07-07 11:40:29 -04:00
										 |  |  |     amount_of_workers = int(config['poller_service_workers']) | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  |     if amount_of_workers == 0: | 
					
						
							| 
									
										
										
										
											2015-07-16 11:55:04 -04:00
										 |  |  |         amount_of_workers = 16 | 
					
						
							| 
									
										
										
										
											2015-07-07 11:40:29 -04:00
										 |  |  | except KeyError: | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  |     amount_of_workers = 16 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | try: | 
					
						
							| 
									
										
										
										
											2015-07-07 13:02:13 -04:00
										 |  |  |     poll_frequency = int(config['poller_service_poll_frequency']) | 
					
						
							|  |  |  |     if poll_frequency == 0: | 
					
						
							| 
									
										
										
										
											2015-07-16 11:55:04 -04:00
										 |  |  |         poll_frequency = 300 | 
					
						
							| 
									
										
										
										
											2015-07-07 11:40:29 -04:00
										 |  |  | except KeyError: | 
					
						
							| 
									
										
										
										
											2015-07-07 13:02:13 -04:00
										 |  |  |     poll_frequency = 300 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | try: | 
					
						
							|  |  |  |     discover_frequency = int(config['poller_service_discover_frequency']) | 
					
						
							| 
									
										
										
										
											2015-07-07 13:10:36 -04:00
										 |  |  |     if discover_frequency == 0: | 
					
						
							| 
									
										
										
										
											2015-07-16 11:55:04 -04:00
										 |  |  |         discover_frequency = 21600 | 
					
						
							| 
									
										
										
										
											2015-07-07 13:02:13 -04:00
										 |  |  | except KeyError: | 
					
						
							| 
									
										
										
										
											2015-07-07 13:10:36 -04:00
										 |  |  |     discover_frequency = 21600 | 
					
						
							| 
									
										
										
										
											2015-07-06 10:47:18 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-06 14:51:03 -04:00
										 |  |  | try: | 
					
						
							| 
									
										
										
										
											2015-07-07 11:40:29 -04:00
										 |  |  |     down_retry = int(config['poller_service_down_retry']) | 
					
						
							| 
									
										
										
										
											2015-07-06 14:51:03 -04:00
										 |  |  |     if down_retry == 0: | 
					
						
							| 
									
										
										
										
											2015-07-16 11:55:04 -04:00
										 |  |  |         down_retry = 60 | 
					
						
							| 
									
										
										
										
											2015-07-07 11:40:29 -04:00
										 |  |  | except KeyError: | 
					
						
							| 
									
										
										
										
											2015-07-07 13:33:16 -04:00
										 |  |  |     down_retry = 60 | 
					
						
							| 
									
										
										
										
											2015-07-06 14:51:03 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-14 08:40:20 -04:00
										 |  |  | try: | 
					
						
							|  |  |  |     retry_query = int(config['poller_service_retry_query']) | 
					
						
							|  |  |  |     if retry_query == 0: | 
					
						
							|  |  |  |         retry_query = 1 | 
					
						
							|  |  |  | except KeyError: | 
					
						
							|  |  |  |     retry_query = 1 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-14 09:34:13 -04:00
										 |  |  | try: | 
					
						
							|  |  |  |     single_connection = bool(config['poller_service_single_connection']) | 
					
						
							|  |  |  | except KeyError: | 
					
						
							|  |  |  |     single_connection = False | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-08 13:51:10 -04:00
										 |  |  | db = DB() | 
					
						
							| 
									
										
										
										
											2015-07-06 15:10:48 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-08 13:51:10 -04:00
										 |  |  | def lockFree(lock, db=db): | 
					
						
							| 
									
										
										
										
											2015-07-18 20:04:44 -04:00
										 |  |  |     query = "SELECT IS_FREE_LOCK('{0}')".format(lock) | 
					
						
							| 
									
										
										
										
											2015-09-09 09:22:31 -04:00
										 |  |  |     return db.query(query)[0][0] == 1 | 
					
						
							| 
									
										
										
										
											2015-07-06 12:00:21 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-06 15:10:48 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-08 13:51:10 -04:00
										 |  |  | def getLock(lock, db=db): | 
					
						
							| 
									
										
										
										
											2015-07-18 20:04:44 -04:00
										 |  |  |     query = "SELECT GET_LOCK('{0}', 0)".format(lock) | 
					
						
							| 
									
										
										
										
											2015-09-09 09:22:31 -04:00
										 |  |  |     return db.query(query)[0][0] == 1 | 
					
						
							| 
									
										
										
										
											2015-07-06 14:51:03 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-06 15:10:48 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-08 13:51:10 -04:00
										 |  |  | def releaseLock(lock, db=db): | 
					
						
							| 
									
										
										
										
											2015-07-18 20:04:44 -04:00
										 |  |  |     query = "SELECT RELEASE_LOCK('{0}')".format(lock) | 
					
						
							| 
									
										
										
										
											2015-09-08 13:51:10 -04:00
										 |  |  |     cursor = db.query(query) | 
					
						
							| 
									
										
										
										
											2015-09-09 09:22:31 -04:00
										 |  |  |     return db.query(query)[0][0] == 1 | 
					
						
							| 
									
										
										
										
											2015-07-06 14:51:03 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-07 14:33:33 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-07 10:09:55 -04:00
										 |  |  | def sleep_until(timestamp): | 
					
						
							|  |  |  |     now = datetime.now() | 
					
						
							|  |  |  |     if timestamp > now: | 
					
						
							|  |  |  |         sleeptime = (timestamp - now).seconds | 
					
						
							|  |  |  |     else: | 
					
						
							|  |  |  |         sleeptime = 0 | 
					
						
							|  |  |  |     time.sleep(sleeptime) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-18 20:04:44 -04:00
										 |  |  | poller_group = ('and poller_group IN({0}) ' | 
					
						
							| 
									
										
										
										
											2015-07-07 14:33:33 -04:00
										 |  |  |                 .format(str(config['distributed_poller_group'])) if 'distributed_poller_group' in config else '') | 
					
						
							| 
									
										
										
										
											2015-07-07 10:09:55 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  | # Add last_polled and last_polled_timetaken so we can sort by the time the last poll started, with the goal | 
					
						
							|  |  |  | # of having each device complete a poll within the given time range. | 
					
						
							| 
									
										
										
										
											2015-07-13 14:32:00 -04:00
										 |  |  | dev_query = ('SELECT device_id, status,                                          ' | 
					
						
							| 
									
										
										
										
											2015-08-29 02:41:47 +02:00
										 |  |  |              'CAST(                                                              ' | 
					
						
							|  |  |  |              '  DATE_ADD(                                                        ' | 
					
						
							|  |  |  |              '    DATE_SUB(                                                      ' | 
					
						
							|  |  |  |              '      last_polled,                                                 ' | 
					
						
							|  |  |  |              '      INTERVAL last_polled_timetaken SECOND                        ' | 
					
						
							|  |  |  |              '    ),                                                             ' | 
					
						
							|  |  |  |              '    INTERVAL {0} SECOND)                                           ' | 
					
						
							| 
									
										
										
										
											2015-09-08 10:30:08 -04:00
										 |  |  |              '  AS DATETIME                                                      ' | 
					
						
							| 
									
										
										
										
											2015-08-29 02:41:47 +02:00
										 |  |  |              ') AS next_poll,                                                    ' | 
					
						
							|  |  |  |              'CAST(                                                              ' | 
					
						
							|  |  |  |              '  DATE_ADD(                                                        ' | 
					
						
							|  |  |  |              '    DATE_SUB(                                                      ' | 
					
						
							|  |  |  |              '      last_discovered,                                             ' | 
					
						
							|  |  |  |              '      INTERVAL last_discovered_timetaken SECOND                    ' | 
					
						
							|  |  |  |              '    ),                                                             ' | 
					
						
							|  |  |  |              '    INTERVAL {1} SECOND)                                           ' | 
					
						
							| 
									
										
										
										
											2015-09-08 10:30:08 -04:00
										 |  |  |              '  AS DATETIME                                                      ' | 
					
						
							| 
									
										
										
										
											2015-08-29 02:41:47 +02:00
										 |  |  |              ') as next_discovery                                                ' | 
					
						
							| 
									
										
										
										
											2015-07-13 10:15:06 -04:00
										 |  |  |              'FROM devices WHERE                                                 ' | 
					
						
							|  |  |  |              'disabled = 0                                                       ' | 
					
						
							| 
									
										
										
										
											2015-09-02 12:56:59 -04:00
										 |  |  |              'AND IS_FREE_LOCK(CONCAT("poll.", device_id))                       ' | 
					
						
							|  |  |  |              'AND IS_FREE_LOCK(CONCAT("discovery.", device_id))                       ' | 
					
						
							| 
									
										
										
										
											2015-09-02 14:22:42 -04:00
										 |  |  |              'AND IS_FREE_LOCK(CONCAT("queue.", device_id))                       ' | 
					
						
							| 
									
										
										
										
											2015-07-13 10:15:06 -04:00
										 |  |  |              'AND ( last_poll_attempted < DATE_SUB(NOW(), INTERVAL {2} SECOND )  ' | 
					
						
							|  |  |  |              '  OR last_poll_attempted IS NULL )                                 ' | 
					
						
							|  |  |  |              '{3}                                                                ' | 
					
						
							| 
									
										
										
										
											2015-07-17 13:33:24 -04:00
										 |  |  |              'ORDER BY next_poll asc                                             ' | 
					
						
							| 
									
										
										
										
											2015-09-09 14:47:32 -04:00
										 |  |  |              'LIMIT 1                                                            ').format(poll_frequency, | 
					
						
							| 
									
										
										
										
											2015-07-13 10:15:06 -04:00
										 |  |  |                                                                                            discover_frequency, | 
					
						
							|  |  |  |                                                                                            down_retry, | 
					
						
							|  |  |  |                                                                                            poller_group) | 
					
						
							| 
									
										
										
										
											2015-07-07 10:09:55 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-13 10:59:26 -04:00
										 |  |  | next_update = datetime.now() + timedelta(minutes=1) | 
					
						
							| 
									
										
										
										
											2015-07-07 11:29:04 -04:00
										 |  |  | devices_scanned = 0 | 
					
						
							| 
									
										
										
										
											2015-07-06 14:51:03 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-14 08:40:20 -04:00
										 |  |  | dont_query_until = datetime.fromtimestamp(0) | 
					
						
							| 
									
										
										
										
											2015-07-07 11:20:09 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-09 16:40:33 -04:00
										 |  |  | def poll_worker(): | 
					
						
							| 
									
										
										
										
											2015-09-09 14:47:32 -04:00
										 |  |  |     global dev_query | 
					
						
							|  |  |  |     global devices_scanned | 
					
						
							| 
									
										
										
										
											2015-09-14 08:40:20 -04:00
										 |  |  |     global dont_query_until | 
					
						
							| 
									
										
										
										
											2015-09-14 09:35:55 -04:00
										 |  |  |     global single_connection | 
					
						
							| 
									
										
										
										
											2015-09-09 17:01:44 -04:00
										 |  |  |     thread_id = threading.current_thread().name | 
					
						
							| 
									
										
										
										
											2015-09-14 09:35:55 -04:00
										 |  |  | 
 | 
					
						
							|  |  |  |     if single_connection: | 
					
						
							|  |  |  |         global db | 
					
						
							|  |  |  |     else: | 
					
						
							|  |  |  |         db = DB() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-09 14:47:32 -04:00
										 |  |  |     while True: | 
					
						
							| 
									
										
										
										
											2015-09-14 08:40:20 -04:00
										 |  |  |         if datetime.now() < dont_query_until: | 
					
						
							|  |  |  |             time.sleep(1) | 
					
						
							|  |  |  |             continue | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-09 15:01:27 -04:00
										 |  |  |         dev_row = db.query(dev_query) | 
					
						
							|  |  |  |         if len(dev_row) < 1: | 
					
						
							| 
									
										
										
										
											2015-09-14 08:40:20 -04:00
										 |  |  |             dont_query_until = datetime.now() + timedelta(seconds=retry_query) | 
					
						
							| 
									
										
										
										
											2015-09-09 15:23:04 -04:00
										 |  |  |             time.sleep(1) | 
					
						
							| 
									
										
										
										
											2015-09-09 15:01:27 -04:00
										 |  |  |             continue | 
					
						
							|  |  |  |              | 
					
						
							|  |  |  |         device_id, status, next_poll, next_discovery  = dev_row[0] | 
					
						
							| 
									
										
										
										
											2015-07-13 11:10:14 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-09 14:47:32 -04:00
										 |  |  |         if not getLock('queue.{0}'.format(device_id), db): | 
					
						
							| 
									
										
										
										
											2015-09-09 15:09:19 -04:00
										 |  |  |             releaseLock('queue.{0}'.format(device_id), db) | 
					
						
							| 
									
										
										
										
											2015-09-02 14:22:42 -04:00
										 |  |  |             continue | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-07 13:10:36 -04:00
										 |  |  |         if next_poll and next_poll > datetime.now(): | 
					
						
							| 
									
										
										
										
											2015-09-09 14:58:39 -04:00
										 |  |  |             log.debug('DEBUG: Thread {0} Sleeping until {1} before polling {2}'.format(thread_id, next_poll, device_id)) | 
					
						
							| 
									
										
										
										
											2015-07-13 09:38:03 -04:00
										 |  |  |             sleep_until(next_poll) | 
					
						
							| 
									
										
										
										
											2015-07-07 10:09:55 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-07 13:02:13 -04:00
										 |  |  |         action = 'poll' | 
					
						
							| 
									
										
										
										
											2015-07-13 14:32:00 -04:00
										 |  |  |         if (not next_discovery or next_discovery < datetime.now()) and status == 1: | 
					
						
							| 
									
										
										
										
											2015-07-07 13:02:13 -04:00
										 |  |  |             action = 'discovery' | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-09 14:58:39 -04:00
										 |  |  |         log.debug('DEBUG: Thread {0} Starting {1} of device {2}'.format(thread_id, action, device_id)) | 
					
						
							| 
									
										
										
										
											2015-07-07 11:20:09 -04:00
										 |  |  |         devices_scanned += 1 | 
					
						
							| 
									
										
										
										
											2015-07-13 09:38:03 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-09 09:22:31 -04:00
										 |  |  |         db.query('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id)) | 
					
						
							| 
									
										
										
										
											2015-07-13 09:38:03 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-09 14:47:32 -04:00
										 |  |  |         if not getLock('{0}.{1}'.format(action, device_id), db): | 
					
						
							|  |  |  |             releaseLock('{0}.{1}'.format(action, device_id), db) | 
					
						
							| 
									
										
										
										
											2015-09-09 15:09:19 -04:00
										 |  |  |             releaseLock('queue.{0}'.format(device_id), db) | 
					
						
							| 
									
										
										
										
											2015-09-02 14:22:42 -04:00
										 |  |  |             continue | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-09 15:09:19 -04:00
										 |  |  |         releaseLock('queue.{0}'.format(device_id), db) | 
					
						
							| 
									
										
										
										
											2015-09-09 14:47:32 -04:00
										 |  |  |         try: | 
					
						
							|  |  |  |             start_time = time.time() | 
					
						
							|  |  |  |             path = poller_path | 
					
						
							|  |  |  |             if action == 'discovery': | 
					
						
							|  |  |  |                 path = discover_path | 
					
						
							|  |  |  |             command = "/usr/bin/env php %s -h %s >> /dev/null 2>&1" % (path, device_id) | 
					
						
							|  |  |  |             subprocess.check_call(command, shell=True) | 
					
						
							|  |  |  |             elapsed_time = int(time.time() - start_time) | 
					
						
							|  |  |  |             if elapsed_time < 300: | 
					
						
							| 
									
										
										
										
											2015-09-09 14:58:39 -04:00
										 |  |  |                 log.debug("DEBUG: Thread {0} finished {1} of device {2} in {3} seconds".format(thread_id, action, device_id, elapsed_time)) | 
					
						
							| 
									
										
										
										
											2015-09-09 14:47:32 -04:00
										 |  |  |             else: | 
					
						
							| 
									
										
										
										
											2015-09-09 14:58:39 -04:00
										 |  |  |                 log.warning("WARNING: Thread {0} finished {1} of device {2} in {3} seconds".format(thread_id, action, device_id, elapsed_time)) | 
					
						
							| 
									
										
										
										
											2015-09-09 14:47:32 -04:00
										 |  |  |         except (KeyboardInterrupt, SystemExit): | 
					
						
							|  |  |  |             raise | 
					
						
							|  |  |  |         except: | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  |         finally: | 
					
						
							|  |  |  |             releaseLock('{0}.{1}'.format(action, device_id), db) | 
					
						
							|  |  |  |          | 
					
						
							| 
									
										
										
										
											2015-07-06 14:51:03 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-09 14:47:32 -04:00
										 |  |  | for i in range(0, amount_of_workers): | 
					
						
							| 
									
										
										
										
											2015-09-09 16:40:33 -04:00
										 |  |  |     t = threading.Thread(target=poll_worker) | 
					
						
							| 
									
										
										
										
											2015-09-09 17:01:44 -04:00
										 |  |  |     t.name = i | 
					
						
							| 
									
										
										
										
											2015-09-09 16:40:33 -04:00
										 |  |  |     t.daemon = True | 
					
						
							|  |  |  |     t.start() | 
					
						
							| 
									
										
										
										
											2015-07-13 09:48:18 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-07-13 10:06:57 -04:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-09-09 14:47:32 -04:00
										 |  |  | while True: | 
					
						
							| 
									
										
										
										
											2015-09-09 15:23:04 -04:00
										 |  |  |     sleep_until(next_update) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     seconds_taken = (datetime.now() - (next_update - timedelta(minutes=1))).seconds | 
					
						
							|  |  |  |     update_query = ('INSERT INTO pollers(poller_name,     ' | 
					
						
							|  |  |  |                     '                    last_polled,     ' | 
					
						
							|  |  |  |                     '                    devices,         ' | 
					
						
							|  |  |  |                     '                    time_taken)      ' | 
					
						
							|  |  |  |                     '  values("{0}", NOW(), "{1}", "{2}") ' | 
					
						
							|  |  |  |                     'ON DUPLICATE KEY UPDATE              ' | 
					
						
							|  |  |  |                     '  last_polled=values(last_polled),   ' | 
					
						
							|  |  |  |                     '  devices=values(devices),           ' | 
					
						
							|  |  |  |                     '  time_taken=values(time_taken)      ').format(config['distributed_poller_name'].strip(), | 
					
						
							|  |  |  |                                                                     devices_scanned, | 
					
						
							|  |  |  |                                                                     seconds_taken) | 
					
						
							|  |  |  |     try: | 
					
						
							|  |  |  |         db.query(update_query) | 
					
						
							|  |  |  |     except: | 
					
						
							|  |  |  |         log.critical('ERROR: MySQL query error. Is your schema up to date?') | 
					
						
							|  |  |  |         sys.exit(2) | 
					
						
							|  |  |  |     log.info('INFO: {0} devices scanned in the last minute'.format(devices_scanned)) | 
					
						
							|  |  |  |     devices_scanned = 0 | 
					
						
							|  |  |  |     next_update = datetime.now() + timedelta(minutes=1) |