store last attempted in sql for simpler down_retry

This commit is contained in:
Clint Armstrong
2015-07-13 09:38:03 -04:00
parent 01b7648136
commit f12725fc81

View File

@@ -140,9 +140,9 @@ def poll_worker(device_id, action):
subprocess.check_call(command, shell=True)
elapsed_time = int(time.time() - start_time)
if elapsed_time < 300:
log.debug("DEBUG: worker finished device %s in %s seconds" % (device_id, elapsed_time))
log.debug("DEBUG: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time))
else:
log.warning("WARNING: worker finished device %s in %s seconds" % (device_id, elapsed_time))
log.warning("WARNING: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time))
except (KeyboardInterrupt, SystemExit):
raise
except:
@@ -200,12 +200,14 @@ dev_query = ('SELECT device_id, '
'disabled = 0 '
'AND IS_FREE_LOCK(CONCAT("polling.", device_id)) '
'AND IS_FREE_LOCK(CONCAT("queued.", device_id)) '
'{2} '
'AND last_poll_attempted < DATE_SUB( '
' NOW(), INTERVAL {2} SECOND ) '
'{3} '
'ORDER BY next_poll asc ').format(poll_frequency,
discover_frequency,
down_retry,
poller_group)
dont_retry = {}
threads = 0
next_update = datetime.now() + timedelta(minutes=5)
devices_scanned = 0
@@ -220,28 +222,26 @@ while True:
seconds_taken = (datetime.now() - (next_update - timedelta(minutes=5))).seconds
update_query = ('INSERT INTO pollers(poller_name, '
' last_polled, '
' devices, '
' devices, '
' time_taken) '
' values("{0}", NOW(), "{1}", "{2}") '
'ON DUPLICATE KEY UPDATE '
' last_polled=values(last_polled), '
' devices=values(devices) '
' time_taken=values(time_taken) ').format(config['distributed_poller_name'],
devices_scanned,
seconds_taken)
devices_scanned,
seconds_taken)
cursor.execute(update_query)
cursor.fetchall()
log.info('INFO: {} devices scanned in the last 5 minutes'.format(devices_scanned))
devices_scanned = 0
next_update = datetime.now() + timedelta(minutes=5)
dont_retry = dict((dev, time) for dev, time in dont_retry.iteritems() if time > datetime.now())
while threading.active_count() >= amount_of_workers:
time.sleep(.5)
cursor.execute(dev_query)
devices = cursor.fetchall()
retry_devs_found = {}
for device_id, next_poll, next_discovery in devices:
# add queue lock, so we lock the next device against any other pollers
# if this fails, the device is locked by another poller already
@@ -256,22 +256,8 @@ while True:
continue
if next_poll and next_poll > datetime.now():
poll_action = 'polling'
sleep_until_time = next_poll
try:
next_retry_device = min(retry_devs_found, key=retry_devs_found.get)
next_retry_time = retry_devs_found[next_retry_device]
if next_retry_time < next_poll:
device_id = next_retry_device
if not getLock('queued.{}'.format(device_id)):
continue
poll_action = 'retrying failed device'
sleep_until_time = next_retry_time
except ValueError:
pass
log.debug('DEBUG: Sleeping until {0} before {1} {2}'.format(next_poll, poll_action, device_id))
sleep_until(sleep_until_time)
log.debug('DEBUG: Sleeping until {0} before polling {2}'.format(next_poll, device_id))
sleep_until(next_poll)
action = 'poll'
if not next_discovery or next_discovery < datetime.now():
@@ -279,7 +265,10 @@ while True:
log.debug('DEBUG: Starting {} of device {}'.format(action, device_id))
devices_scanned += 1
dont_retry[device_id] = datetime.now() + timedelta(seconds=down_retry)
cursor.execute('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {}'.format(device_id))
cursor.fetchall()
t = threading.Thread(target=poll_worker, args=[device_id, action])
t.start()