From f12725fc814371868ad0a7e164fce7bf4cef215d Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Mon, 13 Jul 2015 09:38:03 -0400 Subject: [PATCH] store last attempted in sql for simpler down_retry --- poller-service.py | 43 ++++++++++++++++--------------------------- 1 file changed, 16 insertions(+), 27 deletions(-) diff --git a/poller-service.py b/poller-service.py index c794704f19..43ee9875c4 100755 --- a/poller-service.py +++ b/poller-service.py @@ -140,9 +140,9 @@ def poll_worker(device_id, action): subprocess.check_call(command, shell=True) elapsed_time = int(time.time() - start_time) if elapsed_time < 300: - log.debug("DEBUG: worker finished device %s in %s seconds" % (device_id, elapsed_time)) + log.debug("DEBUG: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time)) else: - log.warning("WARNING: worker finished device %s in %s seconds" % (device_id, elapsed_time)) + log.warning("WARNING: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time)) except (KeyboardInterrupt, SystemExit): raise except: @@ -200,12 +200,14 @@ dev_query = ('SELECT device_id, ' 'disabled = 0 ' 'AND IS_FREE_LOCK(CONCAT("polling.", device_id)) ' 'AND IS_FREE_LOCK(CONCAT("queued.", device_id)) ' - '{2} ' + 'AND last_poll_attempted < DATE_SUB( ' + ' NOW(), INTERVAL {2} SECOND ) ' + '{3} ' 'ORDER BY next_poll asc ').format(poll_frequency, discover_frequency, + down_retry, poller_group) -dont_retry = {} threads = 0 next_update = datetime.now() + timedelta(minutes=5) devices_scanned = 0 @@ -220,28 +222,26 @@ while True: seconds_taken = (datetime.now() - (next_update - timedelta(minutes=5))).seconds update_query = ('INSERT INTO pollers(poller_name, ' ' last_polled, ' - ' devices, ' + ' devices, ' ' time_taken) ' ' values("{0}", NOW(), "{1}", "{2}") ' 'ON DUPLICATE KEY UPDATE ' ' last_polled=values(last_polled), ' ' devices=values(devices) ' ' time_taken=values(time_taken) ').format(config['distributed_poller_name'], - devices_scanned, - seconds_taken) + devices_scanned, + seconds_taken) cursor.execute(update_query) + cursor.fetchall() log.info('INFO: {} devices scanned in the last 5 minutes'.format(devices_scanned)) devices_scanned = 0 next_update = datetime.now() + timedelta(minutes=5) - dont_retry = dict((dev, time) for dev, time in dont_retry.iteritems() if time > datetime.now()) - while threading.active_count() >= amount_of_workers: time.sleep(.5) cursor.execute(dev_query) devices = cursor.fetchall() - retry_devs_found = {} for device_id, next_poll, next_discovery in devices: # add queue lock, so we lock the next device against any other pollers # if this fails, the device is locked by another poller already @@ -256,22 +256,8 @@ while True: continue if next_poll and next_poll > datetime.now(): - poll_action = 'polling' - sleep_until_time = next_poll - try: - next_retry_device = min(retry_devs_found, key=retry_devs_found.get) - next_retry_time = retry_devs_found[next_retry_device] - if next_retry_time < next_poll: - device_id = next_retry_device - if not getLock('queued.{}'.format(device_id)): - continue - poll_action = 'retrying failed device' - sleep_until_time = next_retry_time - except ValueError: - pass - - log.debug('DEBUG: Sleeping until {0} before {1} {2}'.format(next_poll, poll_action, device_id)) - sleep_until(sleep_until_time) + log.debug('DEBUG: Sleeping until {0} before polling {2}'.format(next_poll, device_id)) + sleep_until(next_poll) action = 'poll' if not next_discovery or next_discovery < datetime.now(): @@ -279,7 +265,10 @@ while True: log.debug('DEBUG: Starting {} of device {}'.format(action, device_id)) devices_scanned += 1 - dont_retry[device_id] = datetime.now() + timedelta(seconds=down_retry) + + cursor.execute('UPDATE devices SET last_poll_attempted = NOW() WHERE device_id = {}'.format(device_id)) + cursor.fetchall() + t = threading.Thread(target=poll_worker, args=[device_id, action]) t.start()