diff --git a/poller-service.py b/poller-service.py index 38eb8c397b..9ebbb46b93 100755 --- a/poller-service.py +++ b/poller-service.py @@ -157,11 +157,7 @@ def poll_worker(device_id, action): if action == 'discovery': path = discover_path command = "/usr/bin/env php %s -h %s >> /dev/null 2>&1" % (path, device_id) - if getThreadLock('{0}.{1}'.format(action, device_id)): - subprocess.check_call(command, shell=True) - releaseThreadLock('{0}.{1}'.format(action, device_id)) - else: - log.debug("DEBUG: Couldn't get lock on {0}.{1}".format(action, device_id)) + subprocess.check_call(command, shell=True) elapsed_time = int(time.time() - start_time) if elapsed_time < 300: log.debug("DEBUG: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time)) @@ -171,6 +167,9 @@ def poll_worker(device_id, action): raise except: pass + finally: + releaseThreadLock(device_id, action) + def lockFree(lock, cursor=cursor): @@ -194,12 +193,24 @@ for i in range(0, amount_of_workers): thread_cursors[i].db = thread_db_connection.db -def getThreadLock(lock): +def getThreadQueueLock(device_id): global thread_cursors + # This is how threads are limited, by the numver of cursors available + while True: + for thread_cursor in thread_cursors: + if not thread_cursor.in_use: + thread_cursor.in_use = 'queue.{0}'.format(device_id) + return getLock('queue.{0}'.format(device_id), thread_cursor.cursor) + time.sleep(.5) + + +def getThreadActionLock(device_id, action): + global thread_cursors + # This is how threads are limited, by the numver of cursors available for thread_cursor in thread_cursors: - if not thread_cursor.in_use: - thread_cursor.in_use = lock - return getLock(lock, thread_cursor.cursor) + if thread_cursor.in_use == 'queue.{0}'.format(device_id): + thread_cursor.in_use = '{0}.{1}'.format(action, device_id) + return getLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor) return False @@ -209,12 +220,12 @@ def releaseLock(lock, cursor=cursor): return cursor.fetchall()[0][0] == 1 -def releaseThreadLock(lock): +def releaseThreadLock(device_id, action): global thread_cursors for thread_cursor in thread_cursors: - if thread_cursor.in_use == lock: + if thread_cursor.in_use == '{0}.{1}'.format(action, device_id): thread_cursor.in_use = False - return releaseLock(lock, thread_cursor.cursor) + return releaseLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor) return False @@ -254,6 +265,7 @@ dev_query = ('SELECT device_id, status, 'disabled = 0 ' 'AND IS_FREE_LOCK(CONCAT("poll.", device_id)) ' 'AND IS_FREE_LOCK(CONCAT("discovery.", device_id)) ' + 'AND IS_FREE_LOCK(CONCAT("queue.", device_id)) ' 'AND ( last_poll_attempted < DATE_SUB(NOW(), INTERVAL {2} SECOND ) ' ' OR last_poll_attempted IS NULL ) ' '{3} ' @@ -296,9 +308,6 @@ while True: devices_scanned = 0 next_update = datetime.now() + timedelta(minutes=1) - while threading.active_count() >= amount_of_workers or not lockFree('schema_update'): - time.sleep(.5) - try: cursor.execute(dev_query) except: @@ -307,6 +316,9 @@ while True: devices = cursor.fetchall() for device_id, status, next_poll, next_discovery in devices: + if not getThreadQueueLock(device_id): + continue + if next_poll and next_poll > datetime.now(): log.debug('DEBUG: Sleeping until {0} before polling {1}'.format(next_poll, device_id)) sleep_until(next_poll) @@ -321,6 +333,9 @@ while True: cursor.execute('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id)) cursor.fetchall() + if not getThreadActionLock(device_id, action): + continue + t = threading.Thread(target=poll_worker, args=[device_id, action]) t.start()