move thread control to lock threads

This commit is contained in:
Clint Armstrong
2015-09-02 14:22:42 -04:00
parent ca1a6883a8
commit 3b6e0e68e9

View File

@@ -157,11 +157,7 @@ def poll_worker(device_id, action):
if action == 'discovery':
path = discover_path
command = "/usr/bin/env php %s -h %s >> /dev/null 2>&1" % (path, device_id)
if getThreadLock('{0}.{1}'.format(action, device_id)):
subprocess.check_call(command, shell=True)
releaseThreadLock('{0}.{1}'.format(action, device_id))
else:
log.debug("DEBUG: Couldn't get lock on {0}.{1}".format(action, device_id))
subprocess.check_call(command, shell=True)
elapsed_time = int(time.time() - start_time)
if elapsed_time < 300:
log.debug("DEBUG: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time))
@@ -171,6 +167,9 @@ def poll_worker(device_id, action):
raise
except:
pass
finally:
releaseThreadLock(device_id, action)
def lockFree(lock, cursor=cursor):
@@ -194,12 +193,24 @@ for i in range(0, amount_of_workers):
thread_cursors[i].db = thread_db_connection.db
def getThreadLock(lock):
def getThreadQueueLock(device_id):
global thread_cursors
# This is how threads are limited, by the numver of cursors available
while True:
for thread_cursor in thread_cursors:
if not thread_cursor.in_use:
thread_cursor.in_use = 'queue.{0}'.format(device_id)
return getLock('queue.{0}'.format(device_id), thread_cursor.cursor)
time.sleep(.5)
def getThreadActionLock(device_id, action):
global thread_cursors
# This is how threads are limited, by the numver of cursors available
for thread_cursor in thread_cursors:
if not thread_cursor.in_use:
thread_cursor.in_use = lock
return getLock(lock, thread_cursor.cursor)
if thread_cursor.in_use == 'queue.{0}'.format(device_id):
thread_cursor.in_use = '{0}.{1}'.format(action, device_id)
return getLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor)
return False
@@ -209,12 +220,12 @@ def releaseLock(lock, cursor=cursor):
return cursor.fetchall()[0][0] == 1
def releaseThreadLock(lock):
def releaseThreadLock(device_id, action):
global thread_cursors
for thread_cursor in thread_cursors:
if thread_cursor.in_use == lock:
if thread_cursor.in_use == '{0}.{1}'.format(action, device_id):
thread_cursor.in_use = False
return releaseLock(lock, thread_cursor.cursor)
return releaseLock('{0}.{1}'.format(action, device_id), thread_cursor.cursor)
return False
@@ -254,6 +265,7 @@ dev_query = ('SELECT device_id, status,
'disabled = 0 '
'AND IS_FREE_LOCK(CONCAT("poll.", device_id)) '
'AND IS_FREE_LOCK(CONCAT("discovery.", device_id)) '
'AND IS_FREE_LOCK(CONCAT("queue.", device_id)) '
'AND ( last_poll_attempted < DATE_SUB(NOW(), INTERVAL {2} SECOND ) '
' OR last_poll_attempted IS NULL ) '
'{3} '
@@ -296,9 +308,6 @@ while True:
devices_scanned = 0
next_update = datetime.now() + timedelta(minutes=1)
while threading.active_count() >= amount_of_workers or not lockFree('schema_update'):
time.sleep(.5)
try:
cursor.execute(dev_query)
except:
@@ -307,6 +316,9 @@ while True:
devices = cursor.fetchall()
for device_id, status, next_poll, next_discovery in devices:
if not getThreadQueueLock(device_id):
continue
if next_poll and next_poll > datetime.now():
log.debug('DEBUG: Sleeping until {0} before polling {1}'.format(next_poll, device_id))
sleep_until(next_poll)
@@ -321,6 +333,9 @@ while True:
cursor.execute('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {0}'.format(device_id))
cursor.fetchall()
if not getThreadActionLock(device_id, action):
continue
t = threading.Thread(target=poll_worker, args=[device_id, action])
t.start()