mirror of
https://github.com/librenms/librenms.git
synced 2024-10-07 16:52:45 +00:00
thread locking in python
This commit is contained in:
@@ -33,6 +33,7 @@ import MySQLdb
|
||||
import logging
|
||||
import logging.handlers
|
||||
from datetime import datetime, timedelta
|
||||
from collections import namedtuple
|
||||
|
||||
log = logging.getLogger('poller-service')
|
||||
log.setLevel(logging.DEBUG)
|
||||
@@ -129,17 +130,20 @@ try:
|
||||
except KeyError:
|
||||
down_retry = 60
|
||||
|
||||
try:
|
||||
if db_port == 0:
|
||||
db = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname)
|
||||
else:
|
||||
db = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname)
|
||||
db.autocommit(True)
|
||||
cursor = db.cursor()
|
||||
except:
|
||||
log.critical("ERROR: Could not connect to MySQL database!")
|
||||
sys.exit(2)
|
||||
def connectDB():
|
||||
try:
|
||||
if db_port == 0:
|
||||
db_inst = MySQLdb.connect(host=db_server, user=db_username, passwd=db_password, db=db_dbname)
|
||||
else:
|
||||
db_inst = MySQLdb.connect(host=db_server, port=db_port, user=db_username, passwd=db_password, db=db_dbname)
|
||||
db_inst.autocommit(True)
|
||||
cursor_inst = db_inst.cursor()
|
||||
return cursor_inst
|
||||
except:
|
||||
log.critical("ERROR: Could not connect to MySQL database!")
|
||||
sys.exit(2)
|
||||
|
||||
cursor = connectDB()
|
||||
|
||||
def poll_worker(device_id, action):
|
||||
try:
|
||||
@@ -148,7 +152,9 @@ def poll_worker(device_id, action):
|
||||
if action == 'discovery':
|
||||
path = discover_path
|
||||
command = "/usr/bin/env php %s -h %s >> /dev/null 2>&1" % (path, device_id)
|
||||
subprocess.check_call(command, shell=True)
|
||||
if getThreadLock('{0}.{1}'.format(action, device_id)):
|
||||
subprocess.check_call(command, shell=True)
|
||||
releaseThreadLock('{0}.{1}'.format(action, device_id))
|
||||
elapsed_time = int(time.time() - start_time)
|
||||
if elapsed_time < 300:
|
||||
log.debug("DEBUG: worker finished %s of device %s in %s seconds" % (action, device_id, elapsed_time))
|
||||
@@ -160,27 +166,49 @@ def poll_worker(device_id, action):
|
||||
pass
|
||||
|
||||
|
||||
def lockFree(lock):
|
||||
global cursor
|
||||
def lockFree(lock, cursor=cursor):
|
||||
query = "SELECT IS_FREE_LOCK('{0}')".format(lock)
|
||||
cursor.execute(query)
|
||||
return cursor.fetchall()[0][0] == 1
|
||||
|
||||
|
||||
def getLock(lock):
|
||||
global cursor
|
||||
def getLock(lock, cursor=cursor):
|
||||
query = "SELECT GET_LOCK('{0}', 0)".format(lock)
|
||||
cursor.execute(query)
|
||||
return cursor.fetchall()[0][0] == 1
|
||||
|
||||
|
||||
def releaseLock(lock):
|
||||
global cursor
|
||||
thread_cursors = []
|
||||
for i in range(0, amount_of_workers):
|
||||
thread_cursors.append(namedtuple('Cursor{}'.format(i), ['in_use', 'cursor']))
|
||||
thread_cursors[i].in_use = False
|
||||
thread_cursors[i].cursor = connectDB
|
||||
|
||||
|
||||
def getThreadLock(lock):
|
||||
global thread_cursors
|
||||
for thread_cursor in thread_cursors:
|
||||
if not thread_cursor.in_use:
|
||||
thread_cursor.in_use = lock
|
||||
return getLock(lock, thread_cursor.cursor)
|
||||
return False
|
||||
|
||||
|
||||
def releaseLock(lock, cursor=cursor):
|
||||
query = "SELECT RELEASE_LOCK('{0}')".format(lock)
|
||||
cursor.execute(query)
|
||||
return cursor.fetchall()[0][0] == 1
|
||||
|
||||
|
||||
def releaseThreadLock(lock):
|
||||
global thread_cursors
|
||||
for thread_cursor in thread_cursors:
|
||||
if thread_cursor.in_use == lock:
|
||||
thread_cursor.in_use = False
|
||||
return releaseLock(lock, thread_cursor.cursor)
|
||||
return False
|
||||
|
||||
|
||||
def sleep_until(timestamp):
|
||||
now = datetime.now()
|
||||
if timestamp > now:
|
||||
|
||||
Reference in New Issue
Block a user