Skip to content

Commit

Permalink
find new master every time
Browse files Browse the repository at this point in the history
  • Loading branch information
Niccolum committed Dec 7, 2023
1 parent a1ab79d commit 85b5bc9
Showing 1 changed file with 25 additions and 11 deletions.
36 changes: 25 additions & 11 deletions redbeat/schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from kombu.utils.objects import cached_property
from kombu.utils.url import maybe_sanitize_url
from redis.client import StrictRedis
from redis.sentinel import Sentinel, MasterNotFoundError
from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_exponential

from .decoder import RedBeatJSONDecoder, RedBeatJSONEncoder, to_timestamp
Expand Down Expand Up @@ -50,6 +51,9 @@
return 1
"""

REDBEAT_REDIS_KEY = "redbeat_redis"
REDBEAT_SENTINEL_KEY = "redbeat_sentinel"


class RetryingConnection:
"""A proxy for the Redis connection that delegates all the calls to
Expand All @@ -64,6 +68,7 @@ def __init__(self, retry_period, wrapped_connection):
retry=(
retry_if_exception_type(redis.exceptions.ConnectionError)
| retry_if_exception_type(redis.exceptions.TimeoutError)
| retry_if_exception_type(MasterNotFoundError)
),
reraise=True,
wait=wait_exponential(multiplier=1, max=self.RETRY_MAX_WAIT),
Expand Down Expand Up @@ -113,15 +118,15 @@ def ensure_conf(app):
def get_redis(app=None):
app = app_or_default(app)
conf = ensure_conf(app)
if not hasattr(app, 'redbeat_redis') or app.redbeat_redis is None:
redis_options = conf.redbeat_redis_options
redis_options = conf.redbeat_redis_options

if not hasattr(app, REDBEAT_REDIS_KEY) or getattr(app, REDBEAT_REDIS_KEY) is None:
retry_period = redis_options.get('retry_period')
if redis_options.get('cluster', False):
from redis.cluster import RedisCluster

connection = RedisCluster.from_url(conf.redis_url, **redis_options)
elif conf.redis_url.startswith('redis-sentinel') and 'sentinels' in redis_options:
from redis.sentinel import Sentinel
connection_kwargs = {}
if isinstance(conf.redis_use_ssl, dict):
connection_kwargs['ssl'] = True
Expand All @@ -135,9 +140,7 @@ def get_redis(app=None):
sentinel_kwargs=redis_options.get('sentinel_kwargs'),
**connection_kwargs
)
connection = sentinel.master_for(
redis_options.get('service_name', 'master'), db=redis_options.get('db', 0)
)
_set_redbeat_connect(app, REDBEAT_SENTINEL_KEY, sentinel, retry_period)
elif conf.redis_url.startswith('rediss'):
ssl_options = {'ssl_cert_reqs': ssl.CERT_REQUIRED}
if isinstance(conf.redis_use_ssl, dict):
Expand All @@ -152,12 +155,23 @@ def get_redis(app=None):
else:
connection = StrictRedis.from_url(conf.redis_url, decode_responses=True)

if retry_period is None:
app.redbeat_redis = connection
else:
app.redbeat_redis = RetryingConnection(retry_period, connection)
_set_redbeat_connect(app, REDBEAT_REDIS_KEY, connection, retry_period)

if hasattr(app, REDBEAT_SENTINEL_KEY) and isinstance(getattr(app, REDBEAT_SENTINEL_KEY), Sentinel):
sentinel = getattr(app, REDBEAT_SENTINEL_KEY)
connection = sentinel.master_for(
redis_options.get('service_name', 'master'), db=redis_options.get('db', 0)
)
_set_redbeat_connect(app, REDBEAT_REDIS_KEY, connection, retry_period)

return getattr(app, REDBEAT_REDIS_KEY)


return app.redbeat_redis
def _set_redbeat_connect(app, connect_name, connection, retry_period):
if retry_period is None:
setattr(app, connect_name, connection)
else:
setattr(app, connect_name, RetryingConnection(retry_period, connection))


ADD_ENTRY_ERROR = """\
Expand Down

0 comments on commit 85b5bc9

Please sign in to comment.