cleaned up

This commit is contained in:
Clint Armstrong
2015-07-06 15:10:48 -04:00
parent 4d3b7a3c0f
commit 99cf049645

View File

@ -26,6 +26,7 @@ from datetime import datetime, timedelta
install_dir = os.path.dirname(os.path.realpath(__file__))
config_file = install_dir + '/config.php'
def get_config_data():
config_cmd = ['/usr/bin/env', 'php', '%s/config_to_json.php' % install_dir]
try:
@ -53,24 +54,17 @@ db_username = config['db_user']
db_password = config['db_pass']
if config['db_host'][:5].lower() == 'unix:':
db_server = config['db_host']
db_port = 0
db_server = config['db_host']
db_port = 0
elif ':' in config['db_host']:
db_server = config['db_host'].rsplit(':')[0]
db_port = int(config['db_host'].rsplit(':')[1])
db_server = config['db_host'].rsplit(':')[0]
db_port = int(config['db_host'].rsplit(':')[1])
else:
db_server = config['db_host']
db_port =0
db_server = config['db_host']
db_port = 0
db_dbname = config['db_name']
# (c) 2015, GPLv3, Daniel Preussker <f0o@devilcode.org> <<<EOC1
if 'distributed_poller_group' in config:
poller_group = str(config['distributed_poller_group'])
else:
poller_group = False
# EOC1
s_time = time.time()
real_duration = 0
per_device_duration = {}
@ -123,6 +117,7 @@ except:
# Some people, when confronted with a problem, think,
# "I know, I'll use threads," and then two they hav erpoblesms.
def printworker():
nodeso = 0
while True:
@ -139,6 +134,7 @@ def printworker():
print "WARNING: worker %s finished device %s in %s seconds" % (worker_id, device_id, elapsed_time)
print_queue.task_done()
def poll_worker(device_id):
try:
start_time = time.time()
@ -148,36 +144,35 @@ def poll_worker(device_id):
print_queue.put([threading.current_thread().name, device_id, elapsed_time])
except (KeyboardInterrupt, SystemExit):
raise
#except:
# pass
except:
pass
print_queue = Queue.Queue()
print "INFO: starting the poller at %s with %s threads" % (time.strftime("%Y-%m-%d %H:%M:%S"),
amount_of_workers)
print "INFO: starting the poller at %s with %s threads" % (time.strftime("%Y-%m-%d %H:%M:%S"), amount_of_workers)
p = threading.Thread(target=printworker)
p.setDaemon(True)
p.start()
def lockFree(lock):
global cursor
query = "SELECT IS_FREE_LOCK('{}')".format(lock)
#print 'checking lock: {}'.format(query)
cursor.execute(query)
return cursor.fetchall()[0][0] == 1
def getLock(lock):
global cursor
query = "SELECT GET_LOCK('{}', 0)".format(lock)
#print 'getting lock: {}'.format(query)
cursor.execute(query)
return cursor.fetchall()[0][0] == 1
def releaseLock(lock):
global cursor
query = "SELECT RELEASE_LOCK('{}')".format(lock)
#print 'releasing lock: {}'.format(query)
cursor.execute(query)
return cursor.fetchall()[0][0] == 1
@ -188,29 +183,20 @@ while True:
while threading.active_count() >= amount_of_workers:
time.sleep(.5)
#print 'querying for devices'
query = 'select device_id,last_polled from devices {} disabled = 0 order by last_polled asc'.format(
'where poller_group IN({}) and'.format(poller_group) if poller_group else '')
'where poller_group IN({}) and'.format(str(config['distributed_poller_group']))
if 'distributed_poller_group' in config else '')
cursor.execute(query)
devices = cursor.fetchall()
dead_retry_in = frequency
#print "first 5 devices: {}".format(devices[:5])
for device_id, last_polled in devices:
# print 'trying device {}'.format(device_id)
# time.sleep(1)
if not lockFree('polling.{}'.format(device_id)):
# print 'polling lock is not free on {} continuing'.format(device_id)
# time.sleep(1)
continue
if not lockFree('queued.{}'.format(device_id)):
# print 'queued lock is not free on {} continuing'.format(device_id)
# time.sleep(1)
continue
try:
if ((recently_scanned[device_id] + timedelta(seconds=down_retry)) - datetime.now()).seconds > 1:
dead_retry_in = ((recently_scanned[device_id] + timedelta(seconds=down_retry)) - datetime.now()).seconds
# print 'device {} recently scanned already'.format(device_id)
# time.sleep(1)
continue
except KeyError:
pass
@ -218,8 +204,6 @@ while True:
# add queue lock, so if we sleep, we lock the next device against any other pollers, break
# if aquiring lock fails
if not getLock('queued.{}'.format(device_id)):
# print 'getting queue lock on {} failed'.format(device_id)
# time.sleep(1)
break
if last_polled > datetime.now() - timedelta(seconds=frequency):
@ -235,9 +219,7 @@ while True:
print 'Starting poll of device {}, last polled {}'.format(device_id, last_polled)
recently_scanned[device_id] = datetime.now()
t = threading.Thread(target=poll_worker, args=[device_id])
#t.setDaemon(True)
t.start()
#print 'thread launched'
releaseLock('queued.{}'.format(device_id))
@ -248,36 +230,3 @@ try:
print_queue.join()
except (KeyboardInterrupt, SystemExit):
raise
total_time = int(time.time() - s_time)
print "INFO: poller-wrapper polled %s devices in %s seconds with %s workers" % (polled_devices, total_time, amount_of_workers)
show_stopper = False
query = "update pollers set last_polled=NOW(), devices='%d', time_taken='%d' where poller_name='%s'" % (polled_devices,
total_time, config['distributed_poller_name'])
response = cursor.execute(query)
if response == 1:
db.commit()
else:
query = "insert into pollers set poller_name='%s', last_polled=NOW(), devices='%d', time_taken='%d'" % (
config['distributed_poller_name'], polled_devices, total_time)
cursor.execute(query)
db.commit()
db.close()
if total_time > 300:
print "WARNING: the process took more than 5 minutes to finish, you need faster hardware or more threads"
print "INFO: in sequential style polling the elapsed time would have been: %s seconds" % real_duration
for device in per_device_duration:
if per_device_duration[device] > 300:
print "WARNING: device %s is taking too long: %s seconds" % (device, per_device_duration[device])
show_stopper = True
if show_stopper:
print "ERROR: Some devices are taking more than 300 seconds, the script cannot recommend you what to do."
else:
recommend = int(total_time / 300.0 * amount_of_workers + 1)
print "WARNING: Consider setting a minimum of %d threads. (This does not constitute professional advice!)" % recommend
sys.exit(2)