Add redis sentinel support to dispatcher service (#10598)

* Add redis sentinel support to dispatcher service

* Update docs for redis sentinel support

* Don't re-raise python exception in service
This commit is contained in:
bewing
2019-10-01 01:51:07 -05:00
committed by Tony Murray
parent 2825a7f7ec
commit 74724a4618
4 changed files with 37 additions and 7 deletions

View File

@@ -231,8 +231,16 @@ class ThreadingLock(Lock):
class RedisLock(Lock):
def __init__(self, namespace='lock', **redis_kwargs):
import redis
from redis.sentinel import Sentinel
redis_kwargs['decode_responses'] = True
self._redis = redis.Redis(**redis_kwargs)
if redis_kwargs.get('sentinel') and redis_kwargs.get('sentinel_service'):
sentinels = [tuple(l.split(':')) for l in redis_kwargs.pop('sentinel').split(',')]
sentinel_service = redis_kwargs.pop('sentinel_service')
kwargs = {k: v for k, v in redis_kwargs.items() if k in ["decode_responses", "password", "db"]}
self._redis = Sentinel(sentinels, **kwargs).master_for(sentinel_service)
else:
kwargs = {k: v for k, v in redis_kwargs.items() if "sentinel" not in k}
self._redis = redis.Redis(**kwargs)
self._redis.ping()
self._namespace = namespace
@@ -284,8 +292,16 @@ class RedisLock(Lock):
class RedisUniqueQueue(object):
def __init__(self, name, namespace='queue', **redis_kwargs):
import redis
from redis.sentinel import Sentinel
redis_kwargs['decode_responses'] = True
self._redis = redis.Redis(**redis_kwargs)
if redis_kwargs.get('sentinel') and redis_kwargs.get('sentinel_service'):
sentinels = [tuple(l.split(':')) for l in redis_kwargs.pop('sentinel').split(',')]
sentinel_service = redis_kwargs.pop('sentinel_service')
kwargs = {k: v for k, v in redis_kwargs.items() if k in ["decode_responses", "password", "db"]}
self._redis = Sentinel(sentinels, **kwargs).master_for(sentinel_service)
else:
kwargs = {k: v for k, v in redis_kwargs.items() if "sentinel" not in k}
self._redis = redis.Redis(**kwargs)
self._redis.ping()
self.key = "{}:{}".format(namespace, name)

View File

@@ -165,8 +165,10 @@ class QueueManager:
port=self.config.redis_port,
db=self.config.redis_db,
password=self.config.redis_pass,
unix_socket_path=self.config.redis_socket
)
unix_socket_path=self.config.redis_socket,
sentinel=self.config.redis_sentinel,
sentinel_service=self.config.redis_sentinel_service)
except ImportError:
if self.config.distributed:
critical("ERROR: Redis connection required for distributed polling")

View File

@@ -71,6 +71,8 @@ class ServiceConfig:
redis_db = 0
redis_pass = None
redis_socket = None
redis_sentinel = None
redis_sentinel_service = None
db_host = 'localhost'
db_port = 0
@@ -122,6 +124,10 @@ class ServiceConfig:
self.redis_pass = os.getenv('REDIS_PASSWORD', config.get('redis_pass', ServiceConfig.redis_pass))
self.redis_port = int(os.getenv('REDIS_PORT', config.get('redis_port', ServiceConfig.redis_port)))
self.redis_socket = os.getenv('REDIS_SOCKET', config.get('redis_socket', ServiceConfig.redis_socket))
self.redis_sentinel = os.getenv('REDIS_SENTINEL', config.get('redis_sentinel', ServiceConfig.redis_sentinel))
self.redis_sentinel_service = os.getenv('REDIS_SENTINEL_SERVICE',
config.get('redis_sentinel_service',
ServiceConfig.redis_sentinel_service))
self.db_host = os.getenv('DB_HOST', config.get('db_host', ServiceConfig.db_host))
self.db_name = os.getenv('DB_DATABASE', config.get('db_name', ServiceConfig.db_name))
@@ -363,7 +369,9 @@ class Service:
port=self.config.redis_port,
db=self.config.redis_db,
password=self.config.redis_pass,
unix_socket_path=self.config.redis_socket)
unix_socket_path=self.config.redis_socket,
sentinel=self.config.redis_sentinel,
sentinel_service=self.config.redis_sentinel_service)
except ImportError:
if self.config.distributed:
critical("ERROR: Redis connection required for distributed polling")