From 98b50b60bafddad04348e7ecac5cf5a8be6749dd Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 11:54:27 -0400 Subject: [PATCH 01/56] thread locking in python --- poller-service.py | 62 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 45 insertions(+), 17 deletions(-) diff --git a/poller-service.py b/poller-service.py index 099bf72139..a310541e06 100755 --- a/poller-service.py +++ b/poller-service.py @@ -33,6 +33,7 @@ import MySQLdb import logging import logging.handlers from datetime import datetime, timedelta +from collections import namedtuple log = logging.getLogger('poller-service') log.setLevel(logging.DEBUG) @@ -129,17 +130,20 @@ try: except KeyError: down_retry = 60 -try: - if db_port == 0: - db = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname) - else: - db = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname) - db.autocommit(True) - cursor = db.cursor() -except: - log.critical("ERROR: Could not connect to MySQL database!") - sys.exit(2) +def connectDB(): + try: + if db_port == 0: + db_inst = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname) + else: + db_inst = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname) + db_inst.autocommit(True) + cursor_inst = db_inst.cursor() + return cursor_inst + except: + log.critical("ERROR: Could not connect to MySQL database!") + sys.exit(2) +cursor = connectDB() def poll_worker(device_id, action): try: @@ -148,7 +152,9 @@ def poll_worker(device_id, action): if action == 'discovery': path = discover_path command = "/usr/bin/env php %s -h %s >> /dev/null 2>&1" % (path, device_id) - subprocess.check_call(command, shell=True) + if getThreadLock('{0}.{1}'.format(action, device_id)): + subprocess.check_call(command, shell=True) + releaseThreadLock('{0}.{1}'.format(action, device_id)) elapsed_time = int(time.time() - start_time) if elapsed_time < 300: log.debug("DEBUG: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time)) @@ -160,27 +166,49 @@ def poll_worker(device_id, action): pass -def lockFree(lock): - global cursor +def lockFree(lock, cursor=cursor): query = "SELECT IS_FREE_LOCK('{0}')".format(lock) cursor.execute(query) return cursor.fetchall()[0][0] == 1 -def getLock(lock): - global cursor +def getLock(lock, cursor=cursor): query = "SELECT GET_LOCK('{0}', 0)".format(lock) cursor.execute(query) return cursor.fetchall()[0][0] == 1 -def releaseLock(lock): - global cursor +thread_cursors = [] +for i in range(0, amount_of_workers): + thread_cursors.append(namedtuple('Cursor{}'.format(i), ['in_use', 'cursor'])) + thread_cursors[i].in_use = False + thread_cursors[i].cursor = connectDB + + +def getThreadLock(lock): + global thread_cursors + for thread_cursor in thread_cursors: + if not thread_cursor.in_use: + thread_cursor.in_use = lock + return getLock(lock, thread_cursor.cursor) + return False + + +def releaseLock(lock, cursor=cursor): query = "SELECT RELEASE_LOCK('{0}')".format(lock) cursor.execute(query) return cursor.fetchall()[0][0] == 1 +def releaseThreadLock(lock): + global thread_cursors + for thread_cursor in thread_cursors: + if thread_cursor.in_use == lock: + thread_cursor.in_use = False + return releaseLock(lock, thread_cursor.cursor) + return False + + def sleep_until(timestamp): now = datetime.now() if timestamp > now: From 964896d6b09a0d65dbb9ffcfca14125d5f9582ff Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 12:46:01 -0400 Subject: [PATCH 02/56] return db object to keep it from going out of scope --- poller-service.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/poller-service.py b/poller-service.py index a310541e06..60f7370689 100755 --- a/poller-service.py +++ b/poller-service.py @@ -138,12 +138,17 @@ def connectDB(): db_inst = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname) db_inst.autocommit(True) cursor_inst = db_inst.cursor() - return cursor_inst + ret = namedtuple('db_connection', ['db', 'cursor']) + ret.db = db_inst + ret.cursor = cursor_inst + return ret except: log.critical("ERROR: Could not connect to MySQL database!") sys.exit(2) -cursor = connectDB() +db_connection = connectDB() +db = db_connection.db +cursor = db_connection.cursor def poll_worker(device_id, action): try: @@ -180,9 +185,11 @@ def getLock(lock, cursor=cursor): thread_cursors = [] for i in range(0, amount_of_workers): - thread_cursors.append(namedtuple('Cursor{}'.format(i), ['in_use', 'cursor'])) + thread_cursors.append(namedtuple('Cursor{}'.format(i), ['in_use', 'cursor', 'db'])) thread_cursors[i].in_use = False - thread_cursors[i].cursor = connectDB + thread_db_connection = connectDB() + thread_cursors[i].cursor = thread_db_connection.cursor + thread_cursors[i].db = thread_db_connection.db def getThreadLock(lock): From aafca03ec706e7f3d2fb240c743ba1be15464419 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 12:50:24 -0400 Subject: [PATCH 03/56] queue locks are no longer necessary --- poller-service.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/poller-service.py b/poller-service.py index 60f7370689..6c5fd2c88c 100755 --- a/poller-service.py +++ b/poller-service.py @@ -251,7 +251,6 @@ dev_query = ('SELECT device_id, status, 'FROM devices WHERE ' 'disabled = 0 ' 'AND IS_FREE_LOCK(CONCAT("polling.", device_id)) ' - 'AND IS_FREE_LOCK(CONCAT("queued.", device_id)) ' 'AND ( last_poll_attempted < DATE_SUB(NOW(), INTERVAL {2} SECOND ) ' ' OR last_poll_attempted IS NULL ) ' '{3} ' @@ -305,14 +304,6 @@ while True: devices = cursor.fetchall() for device_id, status, next_poll, next_discovery in devices: - # add queue lock, so we lock the next device against any other pollers - # if this fails, the device is locked by another poller already - if not getLock('queued.{0}'.format(device_id)): - continue - if not lockFree('polling.{0}'.format(device_id)): - releaseLock('queued.{0}'.format(device_id)) - continue - if next_poll and next_poll > datetime.now(): log.debug('DEBUG: Sleeping until {0} before polling {1}'.format(next_poll, device_id)) sleep_until(next_poll) From d11ad883bb44a695dcbbef52896294454a4b0b9c Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 12:56:59 -0400 Subject: [PATCH 04/56] make sure lock name matches action --- poller-service.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/poller-service.py b/poller-service.py index 6c5fd2c88c..c5fed7077a 100755 --- a/poller-service.py +++ b/poller-service.py @@ -250,7 +250,8 @@ dev_query = ('SELECT device_id, status, ') as next_discovery ' 'FROM devices WHERE ' 'disabled = 0 ' - 'AND IS_FREE_LOCK(CONCAT("polling.", device_id)) ' + 'AND IS_FREE_LOCK(CONCAT("poll.", device_id)) ' + 'AND IS_FREE_LOCK(CONCAT("discovery.", device_id)) ' 'AND ( last_poll_attempted < DATE_SUB(NOW(), INTERVAL {2} SECOND ) ' ' OR last_poll_attempted IS NULL ) ' '{3} ' From 020e40ab7f31ff521bdd1083b197182351c9e547 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 13:01:38 -0400 Subject: [PATCH 05/56] python 2.6 compat --- poller-service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/poller-service.py b/poller-service.py index c5fed7077a..9f75d6a949 100755 --- a/poller-service.py +++ b/poller-service.py @@ -185,7 +185,7 @@ def getLock(lock, cursor=cursor): thread_cursors = [] for i in range(0, amount_of_workers): - thread_cursors.append(namedtuple('Cursor{}'.format(i), ['in_use', 'cursor', 'db'])) + thread_cursors.append(namedtuple('Cursor{0}'.format(i), ['in_use', 'cursor', 'db'])) thread_cursors[i].in_use = False thread_db_connection = connectDB() thread_cursors[i].cursor = thread_db_connection.cursor From fb899c1ebe05fae10169a3b99e14c01041e7b96b Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 13:30:33 -0400 Subject: [PATCH 06/56] log when can't get lock --- poller-service.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/poller-service.py b/poller-service.py index 9f75d6a949..38eb8c397b 100755 --- a/poller-service.py +++ b/poller-service.py @@ -160,6 +160,8 @@ def poll_worker(device_id, action): if getThreadLock('{0}.{1}'.format(action, device_id)): subprocess.check_call(command, shell=True) releaseThreadLock('{0}.{1}'.format(action, device_id)) + else: + log.debug("DEBUG: Couldn't get lock on {0}.{1}".format(action, device_id)) elapsed_time = int(time.time() - start_time) if elapsed_time < 300: log.debug("DEBUG: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time)) From 8d7a937e03f20dd990226146f801090657a52ce2 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 14:22:42 -0400 Subject: [PATCH 07/56] move thread control to lock threads --- poller-service.py | 45 ++++++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/poller-service.py b/poller-service.py index 38eb8c397b..9ebbb46b93 100755 --- a/poller-service.py +++ b/poller-service.py @@ -157,11 +157,7 @@ def poll_worker(device_id, action): if action == 'discovery': path = discover_path command = "/usr/bin/env php %s -h %s >> /dev/null 2>&1" % (path, device_id) - if getThreadLock('{0}.{1}'.format(action, device_id)): - subprocess.check_call(command, shell=True) - releaseThreadLock('{0}.{1}'.format(action, device_id)) - else: - log.debug("DEBUG: Couldn't get lock on {0}.{1}".format(action, device_id)) + subprocess.check_call(command, shell=True) elapsed_time = int(time.time() - start_time) if elapsed_time < 300: log.debug("DEBUG: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time)) @@ -171,6 +167,9 @@ def poll_worker(device_id, action): raise except: pass + finally: + releaseThreadLock(device_id, action) + def lockFree(lock, cursor=cursor): @@ -194,12 +193,24 @@ for i in range(0, amount_of_workers): thread_cursors[i].db = thread_db_connection.db -def getThreadLock(lock): +def getThreadQueueLock(device_id): global thread_cursors + # This is how threads are limited, by the numver of cursors available + while True: + for thread_cursor in thread_cursors: + if not thread_cursor.in_use: + thread_cursor.in_use = 'queue.{0}'.format(device_id) + return getLock('queue.{0}'.format(device_id), thread_cursor.cursor) + time.sleep(.5) + + +def getThreadActionLock(device_id, action): + global thread_cursors + # This is how threads are limited, by the numver of cursors available for thread_cursor in thread_cursors: - if not thread_cursor.in_use: - thread_cursor.in_use = lock - return getLock(lock, thread_cursor.cursor) + if thread_cursor.in_use == 'queue.{0}'.format(device_id): + thread_cursor.in_use = '{0}.{1}'.format(action, device_id) + return getLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor) return False @@ -209,12 +220,12 @@ def releaseLock(lock, cursor=cursor): return cursor.fetchall()[0][0] == 1 -def releaseThreadLock(lock): +def releaseThreadLock(device_id, action): global thread_cursors for thread_cursor in thread_cursors: - if thread_cursor.in_use == lock: + if thread_cursor.in_use == '{0}.{1}'.format(action, device_id): thread_cursor.in_use = False - return releaseLock(lock, thread_cursor.cursor) + return releaseLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor) return False @@ -254,6 +265,7 @@ dev_query = ('SELECT device_id, status, 'disabled = 0 ' 'AND IS_FREE_LOCK(CONCAT("poll.", device_id)) ' 'AND IS_FREE_LOCK(CONCAT("discovery.", device_id)) ' + 'AND IS_FREE_LOCK(CONCAT("queue.", device_id)) ' 'AND ( last_poll_attempted < DATE_SUB(NOW(), INTERVAL {2} SECOND ) ' ' OR last_poll_attempted IS NULL ) ' '{3} ' @@ -296,9 +308,6 @@ while True: devices_scanned = 0 next_update = datetime.now() + timedelta(minutes=1) - while threading.active_count() >= amount_of_workers or not lockFree('schema_update'): - time.sleep(.5) - try: cursor.execute(dev_query) except: @@ -307,6 +316,9 @@ while True: devices = cursor.fetchall() for device_id, status, next_poll, next_discovery in devices: + if not getThreadQueueLock(device_id): + continue + if next_poll and next_poll > datetime.now(): log.debug('DEBUG: Sleeping until {0} before polling {1}'.format(next_poll, device_id)) sleep_until(next_poll) @@ -321,6 +333,9 @@ while True: cursor.execute('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id)) cursor.fetchall() + if not getThreadActionLock(device_id, action): + continue + t = threading.Thread(target=poll_worker, args=[device_id, action]) t.start() From b7ecea93233f538198acfc40fd2efdb772577bac Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 14:42:57 -0400 Subject: [PATCH 08/56] make sure to clear the thread if unsuccessful --- poller-service.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/poller-service.py b/poller-service.py index 9ebbb46b93..ccab47f489 100755 --- a/poller-service.py +++ b/poller-service.py @@ -200,7 +200,10 @@ def getThreadQueueLock(device_id): for thread_cursor in thread_cursors: if not thread_cursor.in_use: thread_cursor.in_use = 'queue.{0}'.format(device_id) - return getLock('queue.{0}'.format(device_id), thread_cursor.cursor) + if getLock('queue.{0}'.format(device_id), thread_cursor.cursor): + return True + else: + thread_cursor.in_use = False time.sleep(.5) @@ -210,7 +213,10 @@ def getThreadActionLock(device_id, action): for thread_cursor in thread_cursors: if thread_cursor.in_use == 'queue.{0}'.format(device_id): thread_cursor.in_use = '{0}.{1}'.format(action, device_id) - return getLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor) + if getLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor): + return True + else: + thread_cursor.in_use = False return False From ecc4a696c7c9f7a4a1c857900d5ac7ae8e50e3d0 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 15:01:25 -0400 Subject: [PATCH 09/56] release queue if getting action lock fails --- poller-service.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/poller-service.py b/poller-service.py index ccab47f489..9c17e0fe51 100755 --- a/poller-service.py +++ b/poller-service.py @@ -204,6 +204,8 @@ def getThreadQueueLock(device_id): return True else: thread_cursor.in_use = False + return False + log.debug("DEBUG: No threads avaliable") time.sleep(.5) @@ -217,6 +219,8 @@ def getThreadActionLock(device_id, action): return True else: thread_cursor.in_use = False + releaseLock('queue.{0}'.format(device_id), thread_cursor.cursor) + return False return False From 65b7d69e9a1183f787f93b429911421beb88974a Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 15:23:47 -0400 Subject: [PATCH 10/56] always release queue lock --- poller-service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/poller-service.py b/poller-service.py index 9c17e0fe51..6d4296207c 100755 --- a/poller-service.py +++ b/poller-service.py @@ -215,11 +215,11 @@ def getThreadActionLock(device_id, action): for thread_cursor in thread_cursors: if thread_cursor.in_use == 'queue.{0}'.format(device_id): thread_cursor.in_use = '{0}.{1}'.format(action, device_id) + releaseLock('queue.{0}'.format(device_id), thread_cursor.cursor) if getLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor): return True else: thread_cursor.in_use = False - releaseLock('queue.{0}'.format(device_id), thread_cursor.cursor) return False return False From ca7daae7cdfbbce74f7ab14e8425bf63a55f699b Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 15:38:45 -0400 Subject: [PATCH 11/56] remove spammy debug log --- poller-service.py | 1 - 1 file changed, 1 deletion(-) diff --git a/poller-service.py b/poller-service.py index 6d4296207c..a9b292e4e4 100755 --- a/poller-service.py +++ b/poller-service.py @@ -205,7 +205,6 @@ def getThreadQueueLock(device_id): else: thread_cursor.in_use = False return False - log.debug("DEBUG: No threads avaliable") time.sleep(.5) From dde75439d27aacc0004f4afaf5c3fa004b7ed809 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 15:39:27 -0400 Subject: [PATCH 12/56] only log worker threads, not main python thread --- poller-service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/poller-service.py b/poller-service.py index a9b292e4e4..e393f9ac71 100755 --- a/poller-service.py +++ b/poller-service.py @@ -292,7 +292,7 @@ while True: cur_threads = threading.active_count() if cur_threads != threads: threads = cur_threads - log.debug('DEBUG: {0} threads currently active'.format(threads)) + log.debug('DEBUG: {0} threads currently active'.format(str(threads - 1))) if next_update < datetime.now(): seconds_taken = (datetime.now() - (next_update - timedelta(minutes=1))).seconds From 68a970663fd1213e1a9cb1aca3e19e07b620d36d Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Tue, 8 Sep 2015 10:30:08 -0400 Subject: [PATCH 13/56] MySQL doesn't support specifying microsecond precision, and MariaDB defaults to zero, so it can be removed to be compatible with both. --- poller-service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/poller-service.py b/poller-service.py index e393f9ac71..3f72ca4b94 100755 --- a/poller-service.py +++ b/poller-service.py @@ -259,7 +259,7 @@ dev_query = ('SELECT device_id, status, ' INTERVAL last_polled_timetaken SECOND ' ' ), ' ' INTERVAL {0} SECOND) ' - ' AS DATETIME(0) ' + ' AS DATETIME ' ') AS next_poll, ' 'CAST( ' ' DATE_ADD( ' @@ -268,7 +268,7 @@ dev_query = ('SELECT device_id, status, ' INTERVAL last_discovered_timetaken SECOND ' ' ), ' ' INTERVAL {1} SECOND) ' - ' AS DATETIME(0) ' + ' AS DATETIME ' ') as next_discovery ' 'FROM devices WHERE ' 'disabled = 0 ' From 49c786f394ffae4a322dbfc0d6956228427509a8 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Tue, 8 Sep 2015 13:51:10 -0400 Subject: [PATCH 14/56] make more resiliant to MySQL disconnects --- poller-service.py | 73 ++++++++++++++++++++++++----------------------- 1 file changed, 37 insertions(+), 36 deletions(-) diff --git a/poller-service.py b/poller-service.py index 3f72ca4b94..a11064e3fb 100755 --- a/poller-service.py +++ b/poller-service.py @@ -49,6 +49,27 @@ config_file = install_dir + '/config.php' log.info('INFO: Starting poller-service') +class DB: + conn = None + + def connect(self): + if db_port == 0: + self.conn = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname) + else: + self.conn = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname) + self.conn.autocommit(True) + + def query(self, sql): + try: + cursor = self.conn.cursor() + cursor.execute(sql) + except (AttributeError, MySQLdb.OperationalError): + self.connect() + cursor = self.conn.cursor() + cursor.execute(sql) + return cursor + + def get_config_data(): config_cmd = ['/usr/bin/env', 'php', '%s/config_to_json.php' % install_dir] try: @@ -130,25 +151,7 @@ try: except KeyError: down_retry = 60 -def connectDB(): - try: - if db_port == 0: - db_inst = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname) - else: - db_inst = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname) - db_inst.autocommit(True) - cursor_inst = db_inst.cursor() - ret = namedtuple('db_connection', ['db', 'cursor']) - ret.db = db_inst - ret.cursor = cursor_inst - return ret - except: - log.critical("ERROR: Could not connect to MySQL database!") - sys.exit(2) - -db_connection = connectDB() -db = db_connection.db -cursor = db_connection.cursor +db = DB() def poll_worker(device_id, action): try: @@ -172,25 +175,23 @@ def poll_worker(device_id, action): -def lockFree(lock, cursor=cursor): +def lockFree(lock, db=db): query = "SELECT IS_FREE_LOCK('{0}')".format(lock) - cursor.execute(query) + cursor = db.query(query) return cursor.fetchall()[0][0] == 1 -def getLock(lock, cursor=cursor): +def getLock(lock, db=db): query = "SELECT GET_LOCK('{0}', 0)".format(lock) - cursor.execute(query) + cursor = db.query(query) return cursor.fetchall()[0][0] == 1 thread_cursors = [] for i in range(0, amount_of_workers): - thread_cursors.append(namedtuple('Cursor{0}'.format(i), ['in_use', 'cursor', 'db'])) + thread_cursors.append(namedtuple('Cursor{0}'.format(i), ['in_use', 'db'])) thread_cursors[i].in_use = False - thread_db_connection = connectDB() - thread_cursors[i].cursor = thread_db_connection.cursor - thread_cursors[i].db = thread_db_connection.db + thread_cursors[i].db = DB() def getThreadQueueLock(device_id): @@ -200,7 +201,7 @@ def getThreadQueueLock(device_id): for thread_cursor in thread_cursors: if not thread_cursor.in_use: thread_cursor.in_use = 'queue.{0}'.format(device_id) - if getLock('queue.{0}'.format(device_id), thread_cursor.cursor): + if getLock('queue.{0}'.format(device_id), thread_cursor.db): return True else: thread_cursor.in_use = False @@ -214,8 +215,8 @@ def getThreadActionLock(device_id, action): for thread_cursor in thread_cursors: if thread_cursor.in_use == 'queue.{0}'.format(device_id): thread_cursor.in_use = '{0}.{1}'.format(action, device_id) - releaseLock('queue.{0}'.format(device_id), thread_cursor.cursor) - if getLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor): + releaseLock('queue.{0}'.format(device_id), thread_cursor.db) + if getLock('{0}.{1}'.format(action, device_id), thread_cursor.db): return True else: thread_cursor.in_use = False @@ -223,9 +224,9 @@ def getThreadActionLock(device_id, action): return False -def releaseLock(lock, cursor=cursor): +def releaseLock(lock, db=db): query = "SELECT RELEASE_LOCK('{0}')".format(lock) - cursor.execute(query) + cursor = db.query(query) return cursor.fetchall()[0][0] == 1 @@ -234,7 +235,7 @@ def releaseThreadLock(device_id, action): for thread_cursor in thread_cursors: if thread_cursor.in_use == '{0}.{1}'.format(action, device_id): thread_cursor.in_use = False - return releaseLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor) + return releaseLock('{0}.{1}'.format(action, device_id), thread_cursor.db) return False @@ -308,7 +309,7 @@ while True: devices_scanned, seconds_taken) try: - cursor.execute(update_query) + cursor = db.query(update_query) except: log.critical('ERROR: MySQL query error. Is your schema up to date?') sys.exit(2) @@ -318,7 +319,7 @@ while True: next_update = datetime.now() + timedelta(minutes=1) try: - cursor.execute(dev_query) + cursor = db.query(dev_query) except: log.critical('ERROR: MySQL query error. Is your schema up to date?') sys.exit(2) @@ -339,7 +340,7 @@ while True: log.debug('DEBUG: Starting {0} of device {1}'.format(action, device_id)) devices_scanned += 1 - cursor.execute('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id)) + cursor = db.query('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id)) cursor.fetchall() if not getThreadActionLock(device_id, action): From 31cb8dddb7d65fd0589aad26f1894d3b5002e313 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 11:54:27 -0400 Subject: [PATCH 15/56] thread locking in python --- poller-service.py | 124 ++++++++++++++++++---------------------------- 1 file changed, 49 insertions(+), 75 deletions(-) diff --git a/poller-service.py b/poller-service.py index a11064e3fb..a310541e06 100755 --- a/poller-service.py +++ b/poller-service.py @@ -49,27 +49,6 @@ config_file = install_dir + '/config.php' log.info('INFO: Starting poller-service') -class DB: - conn = None - - def connect(self): - if db_port == 0: - self.conn = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname) - else: - self.conn = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname) - self.conn.autocommit(True) - - def query(self, sql): - try: - cursor = self.conn.cursor() - cursor.execute(sql) - except (AttributeError, MySQLdb.OperationalError): - self.connect() - cursor = self.conn.cursor() - cursor.execute(sql) - return cursor - - def get_config_data(): config_cmd = ['/usr/bin/env', 'php', '%s/config_to_json.php' % install_dir] try: @@ -151,7 +130,20 @@ try: except KeyError: down_retry = 60 -db = DB() +def connectDB(): + try: + if db_port == 0: + db_inst = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname) + else: + db_inst = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname) + db_inst.autocommit(True) + cursor_inst = db_inst.cursor() + return cursor_inst + except: + log.critical("ERROR: Could not connect to MySQL database!") + sys.exit(2) + +cursor = connectDB() def poll_worker(device_id, action): try: @@ -160,7 +152,9 @@ def poll_worker(device_id, action): if action == 'discovery': path = discover_path command = "/usr/bin/env php %s -h %s >> /dev/null 2>&1" % (path, device_id) - subprocess.check_call(command, shell=True) + if getThreadLock('{0}.{1}'.format(action, device_id)): + subprocess.check_call(command, shell=True) + releaseThreadLock('{0}.{1}'.format(action, device_id)) elapsed_time = int(time.time() - start_time) if elapsed_time < 300: log.debug("DEBUG: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time)) @@ -170,72 +164,48 @@ def poll_worker(device_id, action): raise except: pass - finally: - releaseThreadLock(device_id, action) - -def lockFree(lock, db=db): +def lockFree(lock, cursor=cursor): query = "SELECT IS_FREE_LOCK('{0}')".format(lock) - cursor = db.query(query) + cursor.execute(query) return cursor.fetchall()[0][0] == 1 -def getLock(lock, db=db): +def getLock(lock, cursor=cursor): query = "SELECT GET_LOCK('{0}', 0)".format(lock) - cursor = db.query(query) + cursor.execute(query) return cursor.fetchall()[0][0] == 1 thread_cursors = [] for i in range(0, amount_of_workers): - thread_cursors.append(namedtuple('Cursor{0}'.format(i), ['in_use', 'db'])) + thread_cursors.append(namedtuple('Cursor{}'.format(i), ['in_use', 'cursor'])) thread_cursors[i].in_use = False - thread_cursors[i].db = DB() + thread_cursors[i].cursor = connectDB -def getThreadQueueLock(device_id): +def getThreadLock(lock): global thread_cursors - # This is how threads are limited, by the numver of cursors available - while True: - for thread_cursor in thread_cursors: - if not thread_cursor.in_use: - thread_cursor.in_use = 'queue.{0}'.format(device_id) - if getLock('queue.{0}'.format(device_id), thread_cursor.db): - return True - else: - thread_cursor.in_use = False - return False - time.sleep(.5) - - -def getThreadActionLock(device_id, action): - global thread_cursors - # This is how threads are limited, by the numver of cursors available for thread_cursor in thread_cursors: - if thread_cursor.in_use == 'queue.{0}'.format(device_id): - thread_cursor.in_use = '{0}.{1}'.format(action, device_id) - releaseLock('queue.{0}'.format(device_id), thread_cursor.db) - if getLock('{0}.{1}'.format(action, device_id), thread_cursor.db): - return True - else: - thread_cursor.in_use = False - return False + if not thread_cursor.in_use: + thread_cursor.in_use = lock + return getLock(lock, thread_cursor.cursor) return False -def releaseLock(lock, db=db): +def releaseLock(lock, cursor=cursor): query = "SELECT RELEASE_LOCK('{0}')".format(lock) - cursor = db.query(query) + cursor.execute(query) return cursor.fetchall()[0][0] == 1 -def releaseThreadLock(device_id, action): +def releaseThreadLock(lock): global thread_cursors for thread_cursor in thread_cursors: - if thread_cursor.in_use == '{0}.{1}'.format(action, device_id): + if thread_cursor.in_use == lock: thread_cursor.in_use = False - return releaseLock('{0}.{1}'.format(action, device_id), thread_cursor.db) + return releaseLock(lock, thread_cursor.cursor) return False @@ -260,7 +230,7 @@ dev_query = ('SELECT device_id, status, ' INTERVAL last_polled_timetaken SECOND ' ' ), ' ' INTERVAL {0} SECOND) ' - ' AS DATETIME ' + ' AS DATETIME(0) ' ') AS next_poll, ' 'CAST( ' ' DATE_ADD( ' @@ -269,13 +239,12 @@ dev_query = ('SELECT device_id, status, ' INTERVAL last_discovered_timetaken SECOND ' ' ), ' ' INTERVAL {1} SECOND) ' - ' AS DATETIME ' + ' AS DATETIME(0) ' ') as next_discovery ' 'FROM devices WHERE ' 'disabled = 0 ' - 'AND IS_FREE_LOCK(CONCAT("poll.", device_id)) ' - 'AND IS_FREE_LOCK(CONCAT("discovery.", device_id)) ' - 'AND IS_FREE_LOCK(CONCAT("queue.", device_id)) ' + 'AND IS_FREE_LOCK(CONCAT("polling.", device_id)) ' + 'AND IS_FREE_LOCK(CONCAT("queued.", device_id)) ' 'AND ( last_poll_attempted < DATE_SUB(NOW(), INTERVAL {2} SECOND ) ' ' OR last_poll_attempted IS NULL ) ' '{3} ' @@ -293,7 +262,7 @@ while True: cur_threads = threading.active_count() if cur_threads != threads: threads = cur_threads - log.debug('DEBUG: {0} threads currently active'.format(str(threads - 1))) + log.debug('DEBUG: {0} threads currently active'.format(threads)) if next_update < datetime.now(): seconds_taken = (datetime.now() - (next_update - timedelta(minutes=1))).seconds @@ -309,7 +278,7 @@ while True: devices_scanned, seconds_taken) try: - cursor = db.query(update_query) + cursor.execute(update_query) except: log.critical('ERROR: MySQL query error. Is your schema up to date?') sys.exit(2) @@ -318,15 +287,23 @@ while True: devices_scanned = 0 next_update = datetime.now() + timedelta(minutes=1) + while threading.active_count() >= amount_of_workers or not lockFree('schema_update'): + time.sleep(.5) + try: - cursor = db.query(dev_query) + cursor.execute(dev_query) except: log.critical('ERROR: MySQL query error. Is your schema up to date?') sys.exit(2) devices = cursor.fetchall() for device_id, status, next_poll, next_discovery in devices: - if not getThreadQueueLock(device_id): + # add queue lock, so we lock the next device against any other pollers + # if this fails, the device is locked by another poller already + if not getLock('queued.{0}'.format(device_id)): + continue + if not lockFree('polling.{0}'.format(device_id)): + releaseLock('queued.{0}'.format(device_id)) continue if next_poll and next_poll > datetime.now(): @@ -340,12 +317,9 @@ while True: log.debug('DEBUG: Starting {0} of device {1}'.format(action, device_id)) devices_scanned += 1 - cursor = db.query('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id)) + cursor.execute('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id)) cursor.fetchall() - if not getThreadActionLock(device_id, action): - continue - t = threading.Thread(target=poll_worker, args=[device_id, action]) t.start() From a63a748a21ac003cfc5cfc21ba8a41733cc7d41b Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 12:46:01 -0400 Subject: [PATCH 16/56] return db object to keep it from going out of scope --- poller-service.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/poller-service.py b/poller-service.py index a310541e06..60f7370689 100755 --- a/poller-service.py +++ b/poller-service.py @@ -138,12 +138,17 @@ def connectDB(): db_inst = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname) db_inst.autocommit(True) cursor_inst = db_inst.cursor() - return cursor_inst + ret = namedtuple('db_connection', ['db', 'cursor']) + ret.db = db_inst + ret.cursor = cursor_inst + return ret except: log.critical("ERROR: Could not connect to MySQL database!") sys.exit(2) -cursor = connectDB() +db_connection = connectDB() +db = db_connection.db +cursor = db_connection.cursor def poll_worker(device_id, action): try: @@ -180,9 +185,11 @@ def getLock(lock, cursor=cursor): thread_cursors = [] for i in range(0, amount_of_workers): - thread_cursors.append(namedtuple('Cursor{}'.format(i), ['in_use', 'cursor'])) + thread_cursors.append(namedtuple('Cursor{}'.format(i), ['in_use', 'cursor', 'db'])) thread_cursors[i].in_use = False - thread_cursors[i].cursor = connectDB + thread_db_connection = connectDB() + thread_cursors[i].cursor = thread_db_connection.cursor + thread_cursors[i].db = thread_db_connection.db def getThreadLock(lock): From d60dc000a3052bc716d6505ed6768231533b7cb9 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 12:50:24 -0400 Subject: [PATCH 17/56] queue locks are no longer necessary --- poller-service.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/poller-service.py b/poller-service.py index 60f7370689..6c5fd2c88c 100755 --- a/poller-service.py +++ b/poller-service.py @@ -251,7 +251,6 @@ dev_query = ('SELECT device_id, status, 'FROM devices WHERE ' 'disabled = 0 ' 'AND IS_FREE_LOCK(CONCAT("polling.", device_id)) ' - 'AND IS_FREE_LOCK(CONCAT("queued.", device_id)) ' 'AND ( last_poll_attempted < DATE_SUB(NOW(), INTERVAL {2} SECOND ) ' ' OR last_poll_attempted IS NULL ) ' '{3} ' @@ -305,14 +304,6 @@ while True: devices = cursor.fetchall() for device_id, status, next_poll, next_discovery in devices: - # add queue lock, so we lock the next device against any other pollers - # if this fails, the device is locked by another poller already - if not getLock('queued.{0}'.format(device_id)): - continue - if not lockFree('polling.{0}'.format(device_id)): - releaseLock('queued.{0}'.format(device_id)) - continue - if next_poll and next_poll > datetime.now(): log.debug('DEBUG: Sleeping until {0} before polling {1}'.format(next_poll, device_id)) sleep_until(next_poll) From d377b70e192428ba53114ca84817b3a815644b9e Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 12:56:59 -0400 Subject: [PATCH 18/56] make sure lock name matches action --- poller-service.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/poller-service.py b/poller-service.py index 6c5fd2c88c..c5fed7077a 100755 --- a/poller-service.py +++ b/poller-service.py @@ -250,7 +250,8 @@ dev_query = ('SELECT device_id, status, ') as next_discovery ' 'FROM devices WHERE ' 'disabled = 0 ' - 'AND IS_FREE_LOCK(CONCAT("polling.", device_id)) ' + 'AND IS_FREE_LOCK(CONCAT("poll.", device_id)) ' + 'AND IS_FREE_LOCK(CONCAT("discovery.", device_id)) ' 'AND ( last_poll_attempted < DATE_SUB(NOW(), INTERVAL {2} SECOND ) ' ' OR last_poll_attempted IS NULL ) ' '{3} ' From eddb04aaef0064757f490ce31f4b6590cb75d9b7 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 13:01:38 -0400 Subject: [PATCH 19/56] python 2.6 compat --- poller-service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/poller-service.py b/poller-service.py index c5fed7077a..9f75d6a949 100755 --- a/poller-service.py +++ b/poller-service.py @@ -185,7 +185,7 @@ def getLock(lock, cursor=cursor): thread_cursors = [] for i in range(0, amount_of_workers): - thread_cursors.append(namedtuple('Cursor{}'.format(i), ['in_use', 'cursor', 'db'])) + thread_cursors.append(namedtuple('Cursor{0}'.format(i), ['in_use', 'cursor', 'db'])) thread_cursors[i].in_use = False thread_db_connection = connectDB() thread_cursors[i].cursor = thread_db_connection.cursor From ca1a6883a82c81a0ff896491d526e20351f5add4 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 13:30:33 -0400 Subject: [PATCH 20/56] log when can't get lock --- poller-service.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/poller-service.py b/poller-service.py index 9f75d6a949..38eb8c397b 100755 --- a/poller-service.py +++ b/poller-service.py @@ -160,6 +160,8 @@ def poll_worker(device_id, action): if getThreadLock('{0}.{1}'.format(action, device_id)): subprocess.check_call(command, shell=True) releaseThreadLock('{0}.{1}'.format(action, device_id)) + else: + log.debug("DEBUG: Couldn't get lock on {0}.{1}".format(action, device_id)) elapsed_time = int(time.time() - start_time) if elapsed_time < 300: log.debug("DEBUG: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time)) From 3b6e0e68e94efd493639f6e33af5a613040a55c4 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 14:22:42 -0400 Subject: [PATCH 21/56] move thread control to lock threads --- poller-service.py | 45 ++++++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/poller-service.py b/poller-service.py index 38eb8c397b..9ebbb46b93 100755 --- a/poller-service.py +++ b/poller-service.py @@ -157,11 +157,7 @@ def poll_worker(device_id, action): if action == 'discovery': path = discover_path command = "/usr/bin/env php %s -h %s >> /dev/null 2>&1" % (path, device_id) - if getThreadLock('{0}.{1}'.format(action, device_id)): - subprocess.check_call(command, shell=True) - releaseThreadLock('{0}.{1}'.format(action, device_id)) - else: - log.debug("DEBUG: Couldn't get lock on {0}.{1}".format(action, device_id)) + subprocess.check_call(command, shell=True) elapsed_time = int(time.time() - start_time) if elapsed_time < 300: log.debug("DEBUG: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time)) @@ -171,6 +167,9 @@ def poll_worker(device_id, action): raise except: pass + finally: + releaseThreadLock(device_id, action) + def lockFree(lock, cursor=cursor): @@ -194,12 +193,24 @@ for i in range(0, amount_of_workers): thread_cursors[i].db = thread_db_connection.db -def getThreadLock(lock): +def getThreadQueueLock(device_id): global thread_cursors + # This is how threads are limited, by the numver of cursors available + while True: + for thread_cursor in thread_cursors: + if not thread_cursor.in_use: + thread_cursor.in_use = 'queue.{0}'.format(device_id) + return getLock('queue.{0}'.format(device_id), thread_cursor.cursor) + time.sleep(.5) + + +def getThreadActionLock(device_id, action): + global thread_cursors + # This is how threads are limited, by the numver of cursors available for thread_cursor in thread_cursors: - if not thread_cursor.in_use: - thread_cursor.in_use = lock - return getLock(lock, thread_cursor.cursor) + if thread_cursor.in_use == 'queue.{0}'.format(device_id): + thread_cursor.in_use = '{0}.{1}'.format(action, device_id) + return getLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor) return False @@ -209,12 +220,12 @@ def releaseLock(lock, cursor=cursor): return cursor.fetchall()[0][0] == 1 -def releaseThreadLock(lock): +def releaseThreadLock(device_id, action): global thread_cursors for thread_cursor in thread_cursors: - if thread_cursor.in_use == lock: + if thread_cursor.in_use == '{0}.{1}'.format(action, device_id): thread_cursor.in_use = False - return releaseLock(lock, thread_cursor.cursor) + return releaseLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor) return False @@ -254,6 +265,7 @@ dev_query = ('SELECT device_id, status, 'disabled = 0 ' 'AND IS_FREE_LOCK(CONCAT("poll.", device_id)) ' 'AND IS_FREE_LOCK(CONCAT("discovery.", device_id)) ' + 'AND IS_FREE_LOCK(CONCAT("queue.", device_id)) ' 'AND ( last_poll_attempted < DATE_SUB(NOW(), INTERVAL {2} SECOND ) ' ' OR last_poll_attempted IS NULL ) ' '{3} ' @@ -296,9 +308,6 @@ while True: devices_scanned = 0 next_update = datetime.now() + timedelta(minutes=1) - while threading.active_count() >= amount_of_workers or not lockFree('schema_update'): - time.sleep(.5) - try: cursor.execute(dev_query) except: @@ -307,6 +316,9 @@ while True: devices = cursor.fetchall() for device_id, status, next_poll, next_discovery in devices: + if not getThreadQueueLock(device_id): + continue + if next_poll and next_poll > datetime.now(): log.debug('DEBUG: Sleeping until {0} before polling {1}'.format(next_poll, device_id)) sleep_until(next_poll) @@ -321,6 +333,9 @@ while True: cursor.execute('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id)) cursor.fetchall() + if not getThreadActionLock(device_id, action): + continue + t = threading.Thread(target=poll_worker, args=[device_id, action]) t.start() From 840d1e86b966527097c2e4722c323e1eb85eeac4 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 14:42:57 -0400 Subject: [PATCH 22/56] make sure to clear the thread if unsuccessful --- poller-service.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/poller-service.py b/poller-service.py index 9ebbb46b93..ccab47f489 100755 --- a/poller-service.py +++ b/poller-service.py @@ -200,7 +200,10 @@ def getThreadQueueLock(device_id): for thread_cursor in thread_cursors: if not thread_cursor.in_use: thread_cursor.in_use = 'queue.{0}'.format(device_id) - return getLock('queue.{0}'.format(device_id), thread_cursor.cursor) + if getLock('queue.{0}'.format(device_id), thread_cursor.cursor): + return True + else: + thread_cursor.in_use = False time.sleep(.5) @@ -210,7 +213,10 @@ def getThreadActionLock(device_id, action): for thread_cursor in thread_cursors: if thread_cursor.in_use == 'queue.{0}'.format(device_id): thread_cursor.in_use = '{0}.{1}'.format(action, device_id) - return getLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor) + if getLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor): + return True + else: + thread_cursor.in_use = False return False From 5b647d8e70c8a3ca4c52cba4f330da8271a54b8c Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 15:01:25 -0400 Subject: [PATCH 23/56] release queue if getting action lock fails --- poller-service.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/poller-service.py b/poller-service.py index ccab47f489..9c17e0fe51 100755 --- a/poller-service.py +++ b/poller-service.py @@ -204,6 +204,8 @@ def getThreadQueueLock(device_id): return True else: thread_cursor.in_use = False + return False + log.debug("DEBUG: No threads avaliable") time.sleep(.5) @@ -217,6 +219,8 @@ def getThreadActionLock(device_id, action): return True else: thread_cursor.in_use = False + releaseLock('queue.{0}'.format(device_id), thread_cursor.cursor) + return False return False From b0e7c9346b02f033f0733574f302de1cd2b1f7ba Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 15:23:47 -0400 Subject: [PATCH 24/56] always release queue lock --- poller-service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/poller-service.py b/poller-service.py index 9c17e0fe51..6d4296207c 100755 --- a/poller-service.py +++ b/poller-service.py @@ -215,11 +215,11 @@ def getThreadActionLock(device_id, action): for thread_cursor in thread_cursors: if thread_cursor.in_use == 'queue.{0}'.format(device_id): thread_cursor.in_use = '{0}.{1}'.format(action, device_id) + releaseLock('queue.{0}'.format(device_id), thread_cursor.cursor) if getLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor): return True else: thread_cursor.in_use = False - releaseLock('queue.{0}'.format(device_id), thread_cursor.cursor) return False return False From 0a00bb617866f3901b3926e066d2fe6f98dff32a Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 15:38:45 -0400 Subject: [PATCH 25/56] remove spammy debug log --- poller-service.py | 1 - 1 file changed, 1 deletion(-) diff --git a/poller-service.py b/poller-service.py index 6d4296207c..a9b292e4e4 100755 --- a/poller-service.py +++ b/poller-service.py @@ -205,7 +205,6 @@ def getThreadQueueLock(device_id): else: thread_cursor.in_use = False return False - log.debug("DEBUG: No threads avaliable") time.sleep(.5) From 32adff03f147546a6eea11804c63eaa3da296dca Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 2 Sep 2015 15:39:27 -0400 Subject: [PATCH 26/56] only log worker threads, not main python thread --- poller-service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/poller-service.py b/poller-service.py index a9b292e4e4..e393f9ac71 100755 --- a/poller-service.py +++ b/poller-service.py @@ -292,7 +292,7 @@ while True: cur_threads = threading.active_count() if cur_threads != threads: threads = cur_threads - log.debug('DEBUG: {0} threads currently active'.format(threads)) + log.debug('DEBUG: {0} threads currently active'.format(str(threads - 1))) if next_update < datetime.now(): seconds_taken = (datetime.now() - (next_update - timedelta(minutes=1))).seconds From ac63a8937aa8f1386d4162f23c597e0b3c2a5ce7 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Tue, 8 Sep 2015 10:30:08 -0400 Subject: [PATCH 27/56] MySQL doesn't support specifying microsecond precision, and MariaDB defaults to zero, so it can be removed to be compatible with both. --- poller-service.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/poller-service.py b/poller-service.py index e393f9ac71..3f72ca4b94 100755 --- a/poller-service.py +++ b/poller-service.py @@ -259,7 +259,7 @@ dev_query = ('SELECT device_id, status, ' INTERVAL last_polled_timetaken SECOND ' ' ), ' ' INTERVAL {0} SECOND) ' - ' AS DATETIME(0) ' + ' AS DATETIME ' ') AS next_poll, ' 'CAST( ' ' DATE_ADD( ' @@ -268,7 +268,7 @@ dev_query = ('SELECT device_id, status, ' INTERVAL last_discovered_timetaken SECOND ' ' ), ' ' INTERVAL {1} SECOND) ' - ' AS DATETIME(0) ' + ' AS DATETIME ' ') as next_discovery ' 'FROM devices WHERE ' 'disabled = 0 ' From b08300e42510d038bce038d427c09cfed6dc98e4 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Tue, 8 Sep 2015 13:51:10 -0400 Subject: [PATCH 28/56] make more resiliant to MySQL disconnects --- poller-service.py | 73 ++++++++++++++++++++++++----------------------- 1 file changed, 37 insertions(+), 36 deletions(-) diff --git a/poller-service.py b/poller-service.py index 3f72ca4b94..a11064e3fb 100755 --- a/poller-service.py +++ b/poller-service.py @@ -49,6 +49,27 @@ config_file = install_dir + '/config.php' log.info('INFO: Starting poller-service') +class DB: + conn = None + + def connect(self): + if db_port == 0: + self.conn = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname) + else: + self.conn = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname) + self.conn.autocommit(True) + + def query(self, sql): + try: + cursor = self.conn.cursor() + cursor.execute(sql) + except (AttributeError, MySQLdb.OperationalError): + self.connect() + cursor = self.conn.cursor() + cursor.execute(sql) + return cursor + + def get_config_data(): config_cmd = ['/usr/bin/env', 'php', '%s/config_to_json.php' % install_dir] try: @@ -130,25 +151,7 @@ try: except KeyError: down_retry = 60 -def connectDB(): - try: - if db_port == 0: - db_inst = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname) - else: - db_inst = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname) - db_inst.autocommit(True) - cursor_inst = db_inst.cursor() - ret = namedtuple('db_connection', ['db', 'cursor']) - ret.db = db_inst - ret.cursor = cursor_inst - return ret - except: - log.critical("ERROR: Could not connect to MySQL database!") - sys.exit(2) - -db_connection = connectDB() -db = db_connection.db -cursor = db_connection.cursor +db = DB() def poll_worker(device_id, action): try: @@ -172,25 +175,23 @@ def poll_worker(device_id, action): -def lockFree(lock, cursor=cursor): +def lockFree(lock, db=db): query = "SELECT IS_FREE_LOCK('{0}')".format(lock) - cursor.execute(query) + cursor = db.query(query) return cursor.fetchall()[0][0] == 1 -def getLock(lock, cursor=cursor): +def getLock(lock, db=db): query = "SELECT GET_LOCK('{0}', 0)".format(lock) - cursor.execute(query) + cursor = db.query(query) return cursor.fetchall()[0][0] == 1 thread_cursors = [] for i in range(0, amount_of_workers): - thread_cursors.append(namedtuple('Cursor{0}'.format(i), ['in_use', 'cursor', 'db'])) + thread_cursors.append(namedtuple('Cursor{0}'.format(i), ['in_use', 'db'])) thread_cursors[i].in_use = False - thread_db_connection = connectDB() - thread_cursors[i].cursor = thread_db_connection.cursor - thread_cursors[i].db = thread_db_connection.db + thread_cursors[i].db = DB() def getThreadQueueLock(device_id): @@ -200,7 +201,7 @@ def getThreadQueueLock(device_id): for thread_cursor in thread_cursors: if not thread_cursor.in_use: thread_cursor.in_use = 'queue.{0}'.format(device_id) - if getLock('queue.{0}'.format(device_id), thread_cursor.cursor): + if getLock('queue.{0}'.format(device_id), thread_cursor.db): return True else: thread_cursor.in_use = False @@ -214,8 +215,8 @@ def getThreadActionLock(device_id, action): for thread_cursor in thread_cursors: if thread_cursor.in_use == 'queue.{0}'.format(device_id): thread_cursor.in_use = '{0}.{1}'.format(action, device_id) - releaseLock('queue.{0}'.format(device_id), thread_cursor.cursor) - if getLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor): + releaseLock('queue.{0}'.format(device_id), thread_cursor.db) + if getLock('{0}.{1}'.format(action, device_id), thread_cursor.db): return True else: thread_cursor.in_use = False @@ -223,9 +224,9 @@ def getThreadActionLock(device_id, action): return False -def releaseLock(lock, cursor=cursor): +def releaseLock(lock, db=db): query = "SELECT RELEASE_LOCK('{0}')".format(lock) - cursor.execute(query) + cursor = db.query(query) return cursor.fetchall()[0][0] == 1 @@ -234,7 +235,7 @@ def releaseThreadLock(device_id, action): for thread_cursor in thread_cursors: if thread_cursor.in_use == '{0}.{1}'.format(action, device_id): thread_cursor.in_use = False - return releaseLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor) + return releaseLock('{0}.{1}'.format(action, device_id), thread_cursor.db) return False @@ -308,7 +309,7 @@ while True: devices_scanned, seconds_taken) try: - cursor.execute(update_query) + cursor = db.query(update_query) except: log.critical('ERROR: MySQL query error. Is your schema up to date?') sys.exit(2) @@ -318,7 +319,7 @@ while True: next_update = datetime.now() + timedelta(minutes=1) try: - cursor.execute(dev_query) + cursor = db.query(dev_query) except: log.critical('ERROR: MySQL query error. Is your schema up to date?') sys.exit(2) @@ -339,7 +340,7 @@ while True: log.debug('DEBUG: Starting {0} of device {1}'.format(action, device_id)) devices_scanned += 1 - cursor.execute('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id)) + cursor = db.query('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id)) cursor.fetchall() if not getThreadActionLock(device_id, action): From 9610dac1301f7611e1f10d6f6bc3eb3a252fc9b8 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 9 Sep 2015 08:25:00 -0400 Subject: [PATCH 29/56] mysql ping with reconnect --- poller-service.py | 1 + 1 file changed, 1 insertion(+) diff --git a/poller-service.py b/poller-service.py index a11064e3fb..61dcc84cf1 100755 --- a/poller-service.py +++ b/poller-service.py @@ -58,6 +58,7 @@ class DB: else: self.conn = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname) self.conn.autocommit(True) + self.conn.ping(True) def query(self, sql): try: From 55768cf4c90d2139e563765639f780ba3feb66b5 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 9 Sep 2015 08:37:00 -0400 Subject: [PATCH 30/56] catch error during connectino and retry --- poller-service.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/poller-service.py b/poller-service.py index 61dcc84cf1..289c298676 100755 --- a/poller-service.py +++ b/poller-service.py @@ -53,10 +53,18 @@ class DB: conn = None def connect(self): - if db_port == 0: - self.conn = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname) - else: - self.conn = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname) + while True: + try: + if db_port == 0: + self.conn = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname) + else: + self.conn = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname) + break + except (AttributeError, MySQLdb.OperationalError): + log.warning('WARNING: MySQL Error, reconnecting.') + time.sleep(.5) + pass + self.conn.autocommit(True) self.conn.ping(True) From 201f516a76283884b9b09bc13e18aec0f1815650 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 9 Sep 2015 08:42:37 -0400 Subject: [PATCH 31/56] better log messages --- poller-service.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/poller-service.py b/poller-service.py index 289c298676..41d17efff3 100755 --- a/poller-service.py +++ b/poller-service.py @@ -61,7 +61,7 @@ class DB: self.conn = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname) break except (AttributeError, MySQLdb.OperationalError): - log.warning('WARNING: MySQL Error, reconnecting.') + log.warning('WARNING: MySQL Error during connect, reconnecting.') time.sleep(.5) pass @@ -73,6 +73,7 @@ class DB: cursor = self.conn.cursor() cursor.execute(sql) except (AttributeError, MySQLdb.OperationalError): + log.warning('WARNING: MySQL Error during query, reconnecting.') self.connect() cursor = self.conn.cursor() cursor.execute(sql) From 004111574af426c45810929acefbb3f502908084 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 9 Sep 2015 09:20:03 -0400 Subject: [PATCH 32/56] close cursor after query --- poller-service.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/poller-service.py b/poller-service.py index 41d17efff3..b72aaa9531 100755 --- a/poller-service.py +++ b/poller-service.py @@ -61,23 +61,25 @@ class DB: self.conn = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname) break except (AttributeError, MySQLdb.OperationalError): - log.warning('WARNING: MySQL Error during connect, reconnecting.') + log.warning('WARNING: MySQL Error, reconnecting.') time.sleep(.5) - pass self.conn.autocommit(True) self.conn.ping(True) def query(self, sql): - try: - cursor = self.conn.cursor() - cursor.execute(sql) - except (AttributeError, MySQLdb.OperationalError): - log.warning('WARNING: MySQL Error during query, reconnecting.') - self.connect() - cursor = self.conn.cursor() - cursor.execute(sql) - return cursor + while True: + try: + cursor = self.conn.cursor() + cursor.execute(sql) + ret = cursor.fetchall() + cursor.close() + return ret + except (AttributeError, MySQLdb.OperationalError): + log.warning('WARNING: MySQL Operational Error during query, reconnecting.') + self.connect() + except (AttributeError, MySQLdb.ProgrammingError): + log.warning('WARNING: MySQL Programming Error during query, attempting query again.') def get_config_data(): From 7f69855f418197bcfd3ade95a83a7050fdbca555 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 9 Sep 2015 09:22:31 -0400 Subject: [PATCH 33/56] all cursor handling in db class --- poller-service.py | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/poller-service.py b/poller-service.py index b72aaa9531..c78e12e530 100755 --- a/poller-service.py +++ b/poller-service.py @@ -189,14 +189,12 @@ def poll_worker(device_id, action): def lockFree(lock, db=db): query = "SELECT IS_FREE_LOCK('{0}')".format(lock) - cursor = db.query(query) - return cursor.fetchall()[0][0] == 1 + return db.query(query)[0][0] == 1 def getLock(lock, db=db): query = "SELECT GET_LOCK('{0}', 0)".format(lock) - cursor = db.query(query) - return cursor.fetchall()[0][0] == 1 + return db.query(query)[0][0] == 1 thread_cursors = [] @@ -239,7 +237,7 @@ def getThreadActionLock(device_id, action): def releaseLock(lock, db=db): query = "SELECT RELEASE_LOCK('{0}')".format(lock) cursor = db.query(query) - return cursor.fetchall()[0][0] == 1 + return db.query(query)[0][0] == 1 def releaseThreadLock(device_id, action): @@ -321,22 +319,20 @@ while True: devices_scanned, seconds_taken) try: - cursor = db.query(update_query) + db.query(update_query) except: log.critical('ERROR: MySQL query error. Is your schema up to date?') sys.exit(2) - cursor.fetchall() log.info('INFO: {0} devices scanned in the last minute'.format(devices_scanned)) devices_scanned = 0 next_update = datetime.now() + timedelta(minutes=1) try: - cursor = db.query(dev_query) + devices = db.query(dev_query) except: log.critical('ERROR: MySQL query error. Is your schema up to date?') sys.exit(2) - devices = cursor.fetchall() for device_id, status, next_poll, next_discovery in devices: if not getThreadQueueLock(device_id): continue @@ -352,8 +348,7 @@ while True: log.debug('DEBUG: Starting {0} of device {1}'.format(action, device_id)) devices_scanned += 1 - cursor = db.query('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id)) - cursor.fetchall() + db.query('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id)) if not getThreadActionLock(device_id, action): continue From b2f7eba5c72de978be839b0cabf9fc6ca13b09b6 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 9 Sep 2015 09:26:00 -0400 Subject: [PATCH 34/56] always close cursor --- poller-service.py | 1 + 1 file changed, 1 insertion(+) diff --git a/poller-service.py b/poller-service.py index c78e12e530..fc42af4a11 100755 --- a/poller-service.py +++ b/poller-service.py @@ -80,6 +80,7 @@ class DB: self.connect() except (AttributeError, MySQLdb.ProgrammingError): log.warning('WARNING: MySQL Programming Error during query, attempting query again.') + cursor.close() def get_config_data(): From 969a22ffa91e81035a23fa049a0c3ea587b46075 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 9 Sep 2015 14:47:32 -0400 Subject: [PATCH 35/56] move everything into the thread --- poller-service.py | 169 +++++++++++++++------------------------------- 1 file changed, 56 insertions(+), 113 deletions(-) diff --git a/poller-service.py b/poller-service.py index fc42af4a11..342aaa6ad4 100755 --- a/poller-service.py +++ b/poller-service.py @@ -166,27 +166,6 @@ except KeyError: db = DB() -def poll_worker(device_id, action): - try: - start_time = time.time() - path = poller_path - if action == 'discovery': - path = discover_path - command = "/usr/bin/env php %s -h %s >> /dev/null 2>&1" % (path, device_id) - subprocess.check_call(command, shell=True) - elapsed_time = int(time.time() - start_time) - if elapsed_time < 300: - log.debug("DEBUG: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time)) - else: - log.warning("WARNING: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time)) - except (KeyboardInterrupt, SystemExit): - raise - except: - pass - finally: - releaseThreadLock(device_id, action) - - def lockFree(lock, db=db): query = "SELECT IS_FREE_LOCK('{0}')".format(lock) @@ -198,58 +177,12 @@ def getLock(lock, db=db): return db.query(query)[0][0] == 1 -thread_cursors = [] -for i in range(0, amount_of_workers): - thread_cursors.append(namedtuple('Cursor{0}'.format(i), ['in_use', 'db'])) - thread_cursors[i].in_use = False - thread_cursors[i].db = DB() - - -def getThreadQueueLock(device_id): - global thread_cursors - # This is how threads are limited, by the numver of cursors available - while True: - for thread_cursor in thread_cursors: - if not thread_cursor.in_use: - thread_cursor.in_use = 'queue.{0}'.format(device_id) - if getLock('queue.{0}'.format(device_id), thread_cursor.db): - return True - else: - thread_cursor.in_use = False - return False - time.sleep(.5) - - -def getThreadActionLock(device_id, action): - global thread_cursors - # This is how threads are limited, by the numver of cursors available - for thread_cursor in thread_cursors: - if thread_cursor.in_use == 'queue.{0}'.format(device_id): - thread_cursor.in_use = '{0}.{1}'.format(action, device_id) - releaseLock('queue.{0}'.format(device_id), thread_cursor.db) - if getLock('{0}.{1}'.format(action, device_id), thread_cursor.db): - return True - else: - thread_cursor.in_use = False - return False - return False - - def releaseLock(lock, db=db): query = "SELECT RELEASE_LOCK('{0}')".format(lock) cursor = db.query(query) return db.query(query)[0][0] == 1 -def releaseThreadLock(device_id, action): - global thread_cursors - for thread_cursor in thread_cursors: - if thread_cursor.in_use == '{0}.{1}'.format(action, device_id): - thread_cursor.in_use = False - return releaseLock('{0}.{1}'.format(action, device_id), thread_cursor.db) - return False - - def sleep_until(timestamp): now = datetime.now() if timestamp > now: @@ -291,21 +224,70 @@ dev_query = ('SELECT device_id, status, ' OR last_poll_attempted IS NULL ) ' '{3} ' 'ORDER BY next_poll asc ' - 'LIMIT 5 ').format(poll_frequency, + 'LIMIT 1 ').format(poll_frequency, discover_frequency, down_retry, poller_group) -threads = 0 next_update = datetime.now() + timedelta(minutes=1) devices_scanned = 0 -while True: - cur_threads = threading.active_count() - if cur_threads != threads: - threads = cur_threads - log.debug('DEBUG: {0} threads currently active'.format(str(threads - 1))) +def poll_worker(): + global dev_query + global devices_scanned + db = DB() + while True: + device_id, status, next_poll, next_discovery = db.query(dev_query)[0] + + if not getLock('queue.{0}'.format(device_id), db): + continue + + if next_poll and next_poll > datetime.now(): + log.debug('DEBUG: Sleeping until {0} before polling {1}'.format(next_poll, device_id)) + sleep_until(next_poll) + + action = 'poll' + if (not next_discovery or next_discovery < datetime.now()) and status == 1: + action = 'discovery' + + log.debug('DEBUG: Starting {0} of device {1}'.format(action, device_id)) + devices_scanned += 1 + + db.query('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id)) + + if not getLock('{0}.{1}'.format(action, device_id), db): + releaseLock('{0}.{1}'.format(action, device_id), db) + continue + + releaseLock('{0}.{1}'.format(action, device_id), db) + try: + start_time = time.time() + path = poller_path + if action == 'discovery': + path = discover_path + command = "/usr/bin/env php %s -h %s >> /dev/null 2>&1" % (path, device_id) + subprocess.check_call(command, shell=True) + elapsed_time = int(time.time() - start_time) + if elapsed_time < 300: + log.debug("DEBUG: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time)) + else: + log.warning("WARNING: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time)) + except (KeyboardInterrupt, SystemExit): + raise + except: + pass + finally: + releaseLock('{0}.{1}'.format(action, device_id), db) + + +t = [] +for i in range(0, amount_of_workers): + t.append(threading.Thread(target=poll_worker)) + t[i].start() + + +while True: if next_update < datetime.now(): seconds_taken = (datetime.now() - (next_update - timedelta(minutes=1))).seconds update_query = ('INSERT INTO pollers(poller_name, ' @@ -327,42 +309,3 @@ while True: log.info('INFO: {0} devices scanned in the last minute'.format(devices_scanned)) devices_scanned = 0 next_update = datetime.now() + timedelta(minutes=1) - - try: - devices = db.query(dev_query) - except: - log.critical('ERROR: MySQL query error. Is your schema up to date?') - sys.exit(2) - - for device_id, status, next_poll, next_discovery in devices: - if not getThreadQueueLock(device_id): - continue - - if next_poll and next_poll > datetime.now(): - log.debug('DEBUG: Sleeping until {0} before polling {1}'.format(next_poll, device_id)) - sleep_until(next_poll) - - action = 'poll' - if (not next_discovery or next_discovery < datetime.now()) and status == 1: - action = 'discovery' - - log.debug('DEBUG: Starting {0} of device {1}'.format(action, device_id)) - devices_scanned += 1 - - db.query('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id)) - - if not getThreadActionLock(device_id, action): - continue - - t = threading.Thread(target=poll_worker, args=[device_id, action]) - t.start() - - # If we made it this far, break out of the loop and query again. - break - - # This point is only reached if the query is empty, so sleep half a second before querying again. - time.sleep(.5) - - # Make sure we're not holding any device queue locks in this connection before querying again - # by locking a different string. - getLock('unlock.{0}'.format(config['distributed_poller_name'])) From c914ba75a1829de4abccd8efc137ce25da04ee50 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 9 Sep 2015 14:58:39 -0400 Subject: [PATCH 36/56] identify thread numbers --- poller-service.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/poller-service.py b/poller-service.py index 342aaa6ad4..8cfa1ee406 100755 --- a/poller-service.py +++ b/poller-service.py @@ -233,7 +233,7 @@ next_update = datetime.now() + timedelta(minutes=1) devices_scanned = 0 -def poll_worker(): +def poll_worker(thread_id): global dev_query global devices_scanned db = DB() @@ -244,14 +244,14 @@ def poll_worker(): continue if next_poll and next_poll > datetime.now(): - log.debug('DEBUG: Sleeping until {0} before polling {1}'.format(next_poll, device_id)) + log.debug('DEBUG: Thread {0} Sleeping until {1} before polling {2}'.format(thread_id, next_poll, device_id)) sleep_until(next_poll) action = 'poll' if (not next_discovery or next_discovery < datetime.now()) and status == 1: action = 'discovery' - log.debug('DEBUG: Starting {0} of device {1}'.format(action, device_id)) + log.debug('DEBUG: Thread {0} Starting {1} of device {2}'.format(thread_id, action, device_id)) devices_scanned += 1 db.query('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id)) @@ -270,9 +270,9 @@ def poll_worker(): subprocess.check_call(command, shell=True) elapsed_time = int(time.time() - start_time) if elapsed_time < 300: - log.debug("DEBUG: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time)) + log.debug("DEBUG: Thread {0} finished {1} of device {2} in {3} seconds".format(thread_id, action, device_id, elapsed_time)) else: - log.warning("WARNING: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time)) + log.warning("WARNING: Thread {0} finished {1} of device {2} in {3} seconds".format(thread_id, action, device_id, elapsed_time)) except (KeyboardInterrupt, SystemExit): raise except: @@ -283,7 +283,7 @@ def poll_worker(): t = [] for i in range(0, amount_of_workers): - t.append(threading.Thread(target=poll_worker)) + t.append(threading.Thread(target=poll_worker, args=[i])) t[i].start() From 8de136f50c5eba0fcd1db903192770bcc7c6ce4f Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 9 Sep 2015 15:01:27 -0400 Subject: [PATCH 37/56] warn on empty queries --- poller-service.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/poller-service.py b/poller-service.py index 8cfa1ee406..fbf681c93c 100755 --- a/poller-service.py +++ b/poller-service.py @@ -238,7 +238,12 @@ def poll_worker(thread_id): global devices_scanned db = DB() while True: - device_id, status, next_poll, next_discovery = db.query(dev_query)[0] + dev_row = db.query(dev_query) + if len(dev_row) < 1: + log.warning("WARNING: Thread {0} returned no devices from query".format(thread_id)) + continue + + device_id, status, next_poll, next_discovery = dev_row[0] if not getLock('queue.{0}'.format(device_id), db): continue From d76e9e78891e73e77e21f8e477fd88db85695329 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 9 Sep 2015 15:05:39 -0400 Subject: [PATCH 38/56] release queue locks --- poller-service.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/poller-service.py b/poller-service.py index fbf681c93c..b424041302 100755 --- a/poller-service.py +++ b/poller-service.py @@ -240,12 +240,13 @@ def poll_worker(thread_id): while True: dev_row = db.query(dev_query) if len(dev_row) < 1: - log.warning("WARNING: Thread {0} returned no devices from query".format(thread_id)) + #log.warning("WARNING: Thread {0} returned no devices from query".format(thread_id)) continue device_id, status, next_poll, next_discovery = dev_row[0] if not getLock('queue.{0}'.format(device_id), db): + releaseLock('queue.{0}'.format(device_id), db): continue if next_poll and next_poll > datetime.now(): @@ -263,9 +264,10 @@ def poll_worker(thread_id): if not getLock('{0}.{1}'.format(action, device_id), db): releaseLock('{0}.{1}'.format(action, device_id), db) + releaseLock('queue.{0}'.format(device_id), db): continue - releaseLock('{0}.{1}'.format(action, device_id), db) + releaseLock('queue.{0}'.format(device_id), db): try: start_time = time.time() path = poller_path From ae8d29c4b6e11d1162a6a801a62669cd9ce5fcdb Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 9 Sep 2015 15:07:55 -0400 Subject: [PATCH 39/56] make threads daemon threads --- poller-service.py | 1 + 1 file changed, 1 insertion(+) diff --git a/poller-service.py b/poller-service.py index b424041302..9bb9d570d1 100755 --- a/poller-service.py +++ b/poller-service.py @@ -291,6 +291,7 @@ def poll_worker(thread_id): t = [] for i in range(0, amount_of_workers): t.append(threading.Thread(target=poll_worker, args=[i])) + t[i].daemon = True t[i].start() From 06a809d96d23c4938e3922c7be7768f4af7f1b29 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 9 Sep 2015 15:09:19 -0400 Subject: [PATCH 40/56] unnecessary commas --- poller-service.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/poller-service.py b/poller-service.py index 9bb9d570d1..eccfcd3614 100755 --- a/poller-service.py +++ b/poller-service.py @@ -246,7 +246,7 @@ def poll_worker(thread_id): device_id, status, next_poll, next_discovery = dev_row[0] if not getLock('queue.{0}'.format(device_id), db): - releaseLock('queue.{0}'.format(device_id), db): + releaseLock('queue.{0}'.format(device_id), db) continue if next_poll and next_poll > datetime.now(): @@ -264,10 +264,10 @@ def poll_worker(thread_id): if not getLock('{0}.{1}'.format(action, device_id), db): releaseLock('{0}.{1}'.format(action, device_id), db) - releaseLock('queue.{0}'.format(device_id), db): + releaseLock('queue.{0}'.format(device_id), db) continue - releaseLock('queue.{0}'.format(device_id), db): + releaseLock('queue.{0}'.format(device_id), db) try: start_time = time.time() path = poller_path From 01bef1ff01a8bb010e08361fff7c15bf4ef9b3b9 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 9 Sep 2015 15:23:04 -0400 Subject: [PATCH 41/56] smarter sleeping to try and lighten cpu load --- poller-service.py | 46 ++++++++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/poller-service.py b/poller-service.py index eccfcd3614..18effffe24 100755 --- a/poller-service.py +++ b/poller-service.py @@ -240,7 +240,8 @@ def poll_worker(thread_id): while True: dev_row = db.query(dev_query) if len(dev_row) < 1: - #log.warning("WARNING: Thread {0} returned no devices from query".format(thread_id)) + # Sleep 1 second after getting an empty query, don't hammer the sql server for no reason. + time.sleep(1) continue device_id, status, next_poll, next_discovery = dev_row[0] @@ -296,24 +297,25 @@ for i in range(0, amount_of_workers): while True: - if next_update < datetime.now(): - seconds_taken = (datetime.now() - (next_update - timedelta(minutes=1))).seconds - update_query = ('INSERT INTO pollers(poller_name, ' - ' last_polled, ' - ' devices, ' - ' time_taken) ' - ' values("{0}", NOW(), "{1}", "{2}") ' - 'ON DUPLICATE KEY UPDATE ' - ' last_polled=values(last_polled), ' - ' devices=values(devices), ' - ' time_taken=values(time_taken) ').format(config['distributed_poller_name'].strip(), - devices_scanned, - seconds_taken) - try: - db.query(update_query) - except: - log.critical('ERROR: MySQL query error. Is your schema up to date?') - sys.exit(2) - log.info('INFO: {0} devices scanned in the last minute'.format(devices_scanned)) - devices_scanned = 0 - next_update = datetime.now() + timedelta(minutes=1) + sleep_until(next_update) + + seconds_taken = (datetime.now() - (next_update - timedelta(minutes=1))).seconds + update_query = ('INSERT INTO pollers(poller_name, ' + ' last_polled, ' + ' devices, ' + ' time_taken) ' + ' values("{0}", NOW(), "{1}", "{2}") ' + 'ON DUPLICATE KEY UPDATE ' + ' last_polled=values(last_polled), ' + ' devices=values(devices), ' + ' time_taken=values(time_taken) ').format(config['distributed_poller_name'].strip(), + devices_scanned, + seconds_taken) + try: + db.query(update_query) + except: + log.critical('ERROR: MySQL query error. Is your schema up to date?') + sys.exit(2) + log.info('INFO: {0} devices scanned in the last minute'.format(devices_scanned)) + devices_scanned = 0 + next_update = datetime.now() + timedelta(minutes=1) From a0288763d9d9a985fa127e61d7bbdd660963b965 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 9 Sep 2015 16:40:33 -0400 Subject: [PATCH 42/56] use native thread id --- poller-service.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/poller-service.py b/poller-service.py index 18effffe24..f724dffe46 100755 --- a/poller-service.py +++ b/poller-service.py @@ -233,9 +233,10 @@ next_update = datetime.now() + timedelta(minutes=1) devices_scanned = 0 -def poll_worker(thread_id): +def poll_worker(): global dev_query global devices_scanned + thread_id = threading.current_thread().ident db = DB() while True: dev_row = db.query(dev_query) @@ -289,11 +290,10 @@ def poll_worker(thread_id): releaseLock('{0}.{1}'.format(action, device_id), db) -t = [] for i in range(0, amount_of_workers): - t.append(threading.Thread(target=poll_worker, args=[i])) - t[i].daemon = True - t[i].start() + t = threading.Thread(target=poll_worker) + t.daemon = True + t.start() while True: From 9b2c9c2fd39c2dd6f694815aa36d180efaa406d5 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 9 Sep 2015 16:52:12 -0400 Subject: [PATCH 43/56] debug stats about threads --- poller-service.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/poller-service.py b/poller-service.py index f724dffe46..25da75edd5 100755 --- a/poller-service.py +++ b/poller-service.py @@ -236,7 +236,7 @@ devices_scanned = 0 def poll_worker(): global dev_query global devices_scanned - thread_id = threading.current_thread().ident + thread_id = threading.current_thread().name db = DB() while True: dev_row = db.query(dev_query) @@ -282,6 +282,7 @@ def poll_worker(): log.debug("DEBUG: Thread {0} finished {1} of device {2} in {3} seconds".format(thread_id, action, device_id, elapsed_time)) else: log.warning("WARNING: Thread {0} finished {1} of device {2} in {3} seconds".format(thread_id, action, device_id, elapsed_time)) + thread_stats[thread_id]['last_completed_poll'] = datetime.now() except (KeyboardInterrupt, SystemExit): raise except: @@ -290,8 +291,11 @@ def poll_worker(): releaseLock('{0}.{1}'.format(action, device_id), db) +thread_stats = {} for i in range(0, amount_of_workers): + thread_stats[i] = {} t = threading.Thread(target=poll_worker) + t.name = i t.daemon = True t.start() @@ -299,6 +303,9 @@ for i in range(0, amount_of_workers): while True: sleep_until(next_update) + for thread, data in thread_stats.iteritems(): + log.debug('DEBUG: Thread {0} last completed at {1}'.format(thread, data['last_completed_poll'])) + seconds_taken = (datetime.now() - (next_update - timedelta(minutes=1))).seconds update_query = ('INSERT INTO pollers(poller_name, ' ' last_polled, ' From 310711b4e47568adca955d95386da832c4840f3e Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 9 Sep 2015 16:55:36 -0400 Subject: [PATCH 44/56] prepopulate last completed dict --- poller-service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/poller-service.py b/poller-service.py index 25da75edd5..f9a9929701 100755 --- a/poller-service.py +++ b/poller-service.py @@ -293,7 +293,7 @@ def poll_worker(): thread_stats = {} for i in range(0, amount_of_workers): - thread_stats[i] = {} + thread_stats[i] = {'last_completed_poll', 0} t = threading.Thread(target=poll_worker) t.name = i t.daemon = True From a74a5c8d814b6b2711e95970a86b1fc171856094 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 9 Sep 2015 17:00:26 -0400 Subject: [PATCH 45/56] Revert "prepopulate last completed dict" This reverts commit 310711b4e47568adca955d95386da832c4840f3e. --- poller-service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/poller-service.py b/poller-service.py index f9a9929701..25da75edd5 100755 --- a/poller-service.py +++ b/poller-service.py @@ -293,7 +293,7 @@ def poll_worker(): thread_stats = {} for i in range(0, amount_of_workers): - thread_stats[i] = {'last_completed_poll', 0} + thread_stats[i] = {} t = threading.Thread(target=poll_worker) t.name = i t.daemon = True From 39ec135ed3f00fc5c00a19bd669de8231f2c13e7 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 9 Sep 2015 17:00:36 -0400 Subject: [PATCH 46/56] Revert "debug stats about threads" This reverts commit 9b2c9c2fd39c2dd6f694815aa36d180efaa406d5. --- poller-service.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/poller-service.py b/poller-service.py index 25da75edd5..f724dffe46 100755 --- a/poller-service.py +++ b/poller-service.py @@ -236,7 +236,7 @@ devices_scanned = 0 def poll_worker(): global dev_query global devices_scanned - thread_id = threading.current_thread().name + thread_id = threading.current_thread().ident db = DB() while True: dev_row = db.query(dev_query) @@ -282,7 +282,6 @@ def poll_worker(): log.debug("DEBUG: Thread {0} finished {1} of device {2} in {3} seconds".format(thread_id, action, device_id, elapsed_time)) else: log.warning("WARNING: Thread {0} finished {1} of device {2} in {3} seconds".format(thread_id, action, device_id, elapsed_time)) - thread_stats[thread_id]['last_completed_poll'] = datetime.now() except (KeyboardInterrupt, SystemExit): raise except: @@ -291,11 +290,8 @@ def poll_worker(): releaseLock('{0}.{1}'.format(action, device_id), db) -thread_stats = {} for i in range(0, amount_of_workers): - thread_stats[i] = {} t = threading.Thread(target=poll_worker) - t.name = i t.daemon = True t.start() @@ -303,9 +299,6 @@ for i in range(0, amount_of_workers): while True: sleep_until(next_update) - for thread, data in thread_stats.iteritems(): - log.debug('DEBUG: Thread {0} last completed at {1}'.format(thread, data['last_completed_poll'])) - seconds_taken = (datetime.now() - (next_update - timedelta(minutes=1))).seconds update_query = ('INSERT INTO pollers(poller_name, ' ' last_polled, ' From a761a4be77e62ac65a90dbc353bcf2ba3144e838 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Wed, 9 Sep 2015 17:01:44 -0400 Subject: [PATCH 47/56] use thread name as id --- poller-service.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/poller-service.py b/poller-service.py index f724dffe46..ce93108e37 100755 --- a/poller-service.py +++ b/poller-service.py @@ -236,7 +236,7 @@ devices_scanned = 0 def poll_worker(): global dev_query global devices_scanned - thread_id = threading.current_thread().ident + thread_id = threading.current_thread().name db = DB() while True: dev_row = db.query(dev_query) @@ -292,6 +292,7 @@ def poll_worker(): for i in range(0, amount_of_workers): t = threading.Thread(target=poll_worker) + t.name = i t.daemon = True t.start() From 728b92f0ff320b36b322c2b43baf44f5ea4bd059 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Mon, 14 Sep 2015 08:40:20 -0400 Subject: [PATCH 48/56] first attempt at lessening mysql load --- doc/Extensions/Poller-Service.md | 3 ++- poller-service.py | 15 ++++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/doc/Extensions/Poller-Service.md b/doc/Extensions/Poller-Service.md index 6d33e27b92..cb6af28e86 100644 --- a/doc/Extensions/Poller-Service.md +++ b/doc/Extensions/Poller-Service.md @@ -4,7 +4,7 @@ The Poller service is an alternative to polling and discovery cron jobs and provides support for distributed polling without memcache. It is multi-threaded and runs continuously discovering and polling devices with the oldest data attempting to honor the polling frequency configured in `config.php`. This service replaces all the required cron jobs except for `/opt/librenms/daily.sh` and `/opt/librenms/alerts.php`. -Configure the maximum number of threads for the service in `$config['poller_service_workers']`. Configure the minimum desired polling frequency in `$config['poller_service_poll_frequency']` and the minimum desired discovery frequency in `$config['poller_service_discover_frequency']`. The service will not poll or discover devices which have data newer than this this configured age in seconds. Configure how frequently the service will attempt to poll devices which are down in `$config['poller_service_down_retry']`. +Configure the maximum number of threads for the service in `$config['poller_service_workers']`. Configure the minimum desired polling frequency in `$config['poller_service_poll_frequency']` and the minimum desired discovery frequency in `$config['poller_service_discover_frequency']`. The service will not poll or discover devices which have data newer than this this configured age in seconds. Configure how frequently the service will attempt to poll devices which are down in `$config['poller_service_down_retry']`. If you have enough pollers that the worker threads run out of work, the service will query looking for devices every `$config['poller_service_retry_query']` seconds. The poller service is designed to gracefully degrade. If not all devices can be polled within the configured frequency, the service will continuously poll devices refreshing as frequently as possible using the configured number of threads. @@ -18,6 +18,7 @@ $config['poller_service_workers'] = 16; $config['poller_service_poll_frequency'] = 300; $config['poller_service_discover_frequency'] = 21600; $config['poller_service_down_retry'] = 60; +$config['poller_service_retry_query'] = 1; ``` ## Distributed Polling diff --git a/poller-service.py b/poller-service.py index ce93108e37..fff7968d78 100755 --- a/poller-service.py +++ b/poller-service.py @@ -164,6 +164,13 @@ try: except KeyError: down_retry = 60 +try: + retry_query = int(config['poller_service_retry_query']) + if retry_query == 0: + retry_query = 1 +except KeyError: + retry_query = 1 + db = DB() @@ -232,16 +239,22 @@ dev_query = ('SELECT device_id, status, next_update = datetime.now() + timedelta(minutes=1) devices_scanned = 0 +dont_query_until = datetime.fromtimestamp(0) def poll_worker(): global dev_query global devices_scanned + global dont_query_until thread_id = threading.current_thread().name db = DB() while True: + if datetime.now() < dont_query_until: + time.sleep(1) + continue + dev_row = db.query(dev_query) if len(dev_row) < 1: - # Sleep 1 second after getting an empty query, don't hammer the sql server for no reason. + dont_query_until = datetime.now() + timedelta(seconds=retry_query) time.sleep(1) continue From cb3189e944e196764c7e4a8ed6ff442621d8e485 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Mon, 14 Sep 2015 09:00:55 -0400 Subject: [PATCH 49/56] always try to close the connection before reconnecting. --- poller-service.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/poller-service.py b/poller-service.py index fff7968d78..93efcd7254 100755 --- a/poller-service.py +++ b/poller-service.py @@ -54,6 +54,11 @@ class DB: def connect(self): while True: + try: + self.conn.close() + except: + pass + try: if db_port == 0: self.conn = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname) From a89018ab918b6a7855733eea7b9326a7361047eb Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Mon, 14 Sep 2015 09:34:13 -0400 Subject: [PATCH 50/56] add blocking to db query --- poller-service.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/poller-service.py b/poller-service.py index 93efcd7254..84f2936489 100755 --- a/poller-service.py +++ b/poller-service.py @@ -52,7 +52,11 @@ log.info('INFO: Starting poller-service') class DB: conn = None + def __init__(self): + self.in_use = False + def connect(self): + self.in_use = True while True: try: self.conn.close() @@ -71,17 +75,24 @@ class DB: self.conn.autocommit(True) self.conn.ping(True) + self.in_use = False def query(self, sql): + while self.in_use: + continue + + self.in_use = True while True: try: cursor = self.conn.cursor() cursor.execute(sql) ret = cursor.fetchall() cursor.close() + self.in_use = False return ret except (AttributeError, MySQLdb.OperationalError): log.warning('WARNING: MySQL Operational Error during query, reconnecting.') + self.in_use = False self.connect() except (AttributeError, MySQLdb.ProgrammingError): log.warning('WARNING: MySQL Programming Error during query, attempting query again.') @@ -176,6 +187,11 @@ try: except KeyError: retry_query = 1 +try: + single_connection = bool(config['poller_service_single_connection']) +except KeyError: + single_connection = False + db = DB() From b68df56b14849b425c5e33767edfb792d159a765 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Mon, 14 Sep 2015 09:35:55 -0400 Subject: [PATCH 51/56] use global db connection if configured --- poller-service.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/poller-service.py b/poller-service.py index 84f2936489..c46d81a4de 100755 --- a/poller-service.py +++ b/poller-service.py @@ -266,8 +266,14 @@ def poll_worker(): global dev_query global devices_scanned global dont_query_until + global single_connection thread_id = threading.current_thread().name - db = DB() + + if single_connection: + global db + else: + db = DB() + while True: if datetime.now() < dont_query_until: time.sleep(1) From f5d6ad62b330721af78b8b2d2bfcbbe8f23acaad Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Mon, 14 Sep 2015 09:39:51 -0400 Subject: [PATCH 52/56] catch cursor close --- poller-service.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/poller-service.py b/poller-service.py index c46d81a4de..a9cae26239 100755 --- a/poller-service.py +++ b/poller-service.py @@ -96,7 +96,10 @@ class DB: self.connect() except (AttributeError, MySQLdb.ProgrammingError): log.warning('WARNING: MySQL Programming Error during query, attempting query again.') - cursor.close() + try: + cursor.close() + except: + pass def get_config_data(): From 0b4645e3eae2b1a63bad7e942bd438c1a4787775 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Mon, 14 Sep 2015 09:41:03 -0400 Subject: [PATCH 53/56] Revert "catch cursor close" This reverts commit f5d6ad62b330721af78b8b2d2bfcbbe8f23acaad. --- poller-service.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/poller-service.py b/poller-service.py index a9cae26239..c46d81a4de 100755 --- a/poller-service.py +++ b/poller-service.py @@ -96,10 +96,7 @@ class DB: self.connect() except (AttributeError, MySQLdb.ProgrammingError): log.warning('WARNING: MySQL Programming Error during query, attempting query again.') - try: - cursor.close() - except: - pass + cursor.close() def get_config_data(): From 12f2cdfcc80d836a5aa753f4aef51f3a913145f1 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Mon, 14 Sep 2015 09:43:34 -0400 Subject: [PATCH 54/56] connect on init, check for in_use on connect --- poller-service.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/poller-service.py b/poller-service.py index c46d81a4de..7b1c1c40aa 100755 --- a/poller-service.py +++ b/poller-service.py @@ -54,8 +54,12 @@ class DB: def __init__(self): self.in_use = False + self.connect() def connect(self): + while self.in_use: + continue + self.in_use = True while True: try: From 3476724cad3fb3b04163467c6f79da69b63fd8f9 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Mon, 14 Sep 2015 10:16:07 -0400 Subject: [PATCH 55/56] docs --- doc/Extensions/Poller-Service.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/doc/Extensions/Poller-Service.md b/doc/Extensions/Poller-Service.md index cb6af28e86..5e484ae27d 100644 --- a/doc/Extensions/Poller-Service.md +++ b/doc/Extensions/Poller-Service.md @@ -19,6 +19,7 @@ $config['poller_service_poll_frequency'] = 300; $config['poller_service_discover_frequency'] = 21600; $config['poller_service_down_retry'] = 60; $config['poller_service_retry_query'] = 1; +$config['poller_service_single_connection'] = false; ``` ## Distributed Polling @@ -27,6 +28,9 @@ Distributed polling is possible, and uses the same configuration options as are ## Multi-Master MySQL considerations Because locks are not replicated in Multi-Master MySQL configurations, if you are using such a configuration, you will need to make sure that all pollers are using the same MySQL server. +## Single Connection +If you are running MariaDB 10.2 or newer, you can tell poller-service to use a single mysql connectino for managing locks by setting `$config['poller_service_single_connection']` to `true`. *DO NOT* configure this for any version of MariaDB less than 10.2 or any version of MySQL. + ## Service Installation An upstart configuration `poller-service.conf` is provided. To install run `ln -s /opt/librenms/poller-service.conf /etc/init/poller-service.conf`. The service will start on boot and can be started manually by running `start poller-service`. If you recieve an error that the service does not exist, run `initctl reload-configuration`. The service is configured to run as the user `librenms` and will fail if that user does not exist. From 5e3d968ee999e3764df20ff455b6db21b2e0b2e5 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Mon, 14 Sep 2015 13:38:46 -0400 Subject: [PATCH 56/56] use threading locks instead of a boolean variable and busy loop --- poller-service.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/poller-service.py b/poller-service.py index 7b1c1c40aa..b545e4bcd4 100755 --- a/poller-service.py +++ b/poller-service.py @@ -53,14 +53,11 @@ class DB: conn = None def __init__(self): - self.in_use = False + self.in_use = threading.Lock() self.connect() def connect(self): - while self.in_use: - continue - - self.in_use = True + self.in_use.acquire(True) while True: try: self.conn.close() @@ -79,24 +76,21 @@ class DB: self.conn.autocommit(True) self.conn.ping(True) - self.in_use = False + self.in_use.release() def query(self, sql): - while self.in_use: - continue - - self.in_use = True + self.in_use.acquire(True) while True: try: cursor = self.conn.cursor() cursor.execute(sql) ret = cursor.fetchall() cursor.close() - self.in_use = False + self.in_use.release() return ret except (AttributeError, MySQLdb.OperationalError): log.warning('WARNING: MySQL Operational Error during query, reconnecting.') - self.in_use = False + self.in_use.release() self.connect() except (AttributeError, MySQLdb.ProgrammingError): log.warning('WARNING: MySQL Programming Error during query, attempting query again.')