Add connection exception handling to all redis calls (#14467)

This commit is contained in:
François
2022-11-02 01:04:33 +01:00
committed by GitHub
parent 5a69ac687d
commit 70d37adf66

View File

@ -467,13 +467,18 @@ class RedisLock(Lock):
key = self.__key(name)
non_existing = not (allow_owner_relock and self._redis.get(key) == owner)
return self._redis.set(key, owner, ex=int(expiration), nx=non_existing)
except redis.exceptions.ResponseError as e:
except (
redis.exceptions.ResponseError,
redis.exceptions.TimeoutError,
redis.exceptions.ConnectionError,
) as e:
logger.critical(
"Unable to obtain lock, local state: name: %s, owner: %s, expiration: %s, allow_owner_relock: %s",
"Unable to obtain lock, local state: name: %s, owner: %s, expiration: %s, allow_owner_relock: %s. %s",
name,
owner,
expiration,
allow_owner_relock,
repr(e),
)
def unlock(self, name, owner):
@ -482,14 +487,34 @@ class RedisLock(Lock):
:param name: str the name of the lock
:param owner: str a unique name for the locking node
"""
key = self.__key(name)
if self._redis.get(key) == owner:
self._redis.delete(key)
return True
import redis # pylint: disable=import-error
try:
key = self.__key(name)
if self._redis.get(key) == owner:
self._redis.delete(key)
return True
except (redis.exceptions.TimeoutError, redis.exceptions.ConnectionError) as e:
logger.critical(
"Unable to release lock because of a redis timeout/connection error, local state: name: %s, owner: %s. %s",
name,
owner,
repr(e),
)
return False
def check_lock(self, name):
return self._redis.get(self.__key(name)) is not None
import redis # pylint: disable=import-error
try:
return self._redis.get(self.__key(name)) is not None
except (redis.exceptions.TimeoutError, redis.exceptions.ConnectionError) as e:
logger.critical(
"Unable check the status of lock %s because of a redis timeout/connection error: %s",
name,
repr(e),
)
return True
def print_locks(self):
keys = self._redis.keys(self.__key("*"))
@ -534,20 +559,47 @@ class RedisUniqueQueue(object):
self._redis.delete(self.key)
def qsize(self):
return self._redis.zcount(self.key, "-inf", "+inf")
import redis # pylint: disable=import-error
try:
return self._redis.zcount(self.key, "-inf", "+inf")
except (redis.exceptions.TimeoutError, redis.exceptions.ConnectionError) as e:
logger.critical(
"Failed to retrieve the size of the queue because of a redis timeout/connection error: %s",
repr(e),
)
return 0
def empty(self):
return self.qsize() == 0
def put(self, item):
self._redis.zadd(self.key, {item: time()}, nx=True)
import redis # pylint: disable=import-error
try:
self._redis.zadd(self.key, {item: time()}, nx=True)
except (redis.exceptions.TimeoutError, redis.exceptions.ConnectionError) as e:
logger.critical(
"Failed to put item %s in queue because of a redis timeout/connection error: %s",
str(item),
repr(e),
)
def get(self, block=True, timeout=None):
import redis # pylint: disable=import-error
try:
if block:
item = self._redis.bzpopmin(self.key, timeout=timeout)
else:
item = self._redis.zpopmin(self.key)
except (redis.exceptions.TimeoutError, redis.exceptions.ConnectionError) as e:
logger.critical(
"Failed to get item %s from queue because of a redis timeout/connection error: %s",
self.key,
repr(e),
)
item = None
# Unfortunately we cannot use _redis.exceptions.ResponseError Exception here
# Since it would trigger another exception in queuemanager
except Exception as e: