Skip to content

Commit

Permalink
find new master every time
Browse files Browse the repository at this point in the history
  • Loading branch information
Niccolum committed Dec 7, 2023
1 parent a1ab79d commit d387c98
Showing 1 changed file with 19 additions and 9 deletions.
28 changes: 19 additions & 9 deletions redbeat/schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from kombu.utils.objects import cached_property
from kombu.utils.url import maybe_sanitize_url
from redis.client import StrictRedis
from redis.sentinel import Sentinel, MasterNotFoundError
from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_exponential

from .decoder import RedBeatJSONDecoder, RedBeatJSONEncoder, to_timestamp
Expand Down Expand Up @@ -64,6 +65,7 @@ def __init__(self, retry_period, wrapped_connection):
retry=(
retry_if_exception_type(redis.exceptions.ConnectionError)
| retry_if_exception_type(redis.exceptions.TimeoutError)
| retry_if_exception_type(MasterNotFoundError)
),
reraise=True,
wait=wait_exponential(multiplier=1, max=self.RETRY_MAX_WAIT),
Expand Down Expand Up @@ -113,15 +115,15 @@ def ensure_conf(app):
def get_redis(app=None):
app = app_or_default(app)
conf = ensure_conf(app)
redis_options = conf.redbeat_redis_options

if not hasattr(app, 'redbeat_redis') or app.redbeat_redis is None:
redis_options = conf.redbeat_redis_options
retry_period = redis_options.get('retry_period')
if redis_options.get('cluster', False):
from redis.cluster import RedisCluster

connection = RedisCluster.from_url(conf.redis_url, **redis_options)
elif conf.redis_url.startswith('redis-sentinel') and 'sentinels' in redis_options:
from redis.sentinel import Sentinel
connection_kwargs = {}
if isinstance(conf.redis_use_ssl, dict):
connection_kwargs['ssl'] = True
Expand All @@ -135,9 +137,7 @@ def get_redis(app=None):
sentinel_kwargs=redis_options.get('sentinel_kwargs'),
**connection_kwargs
)
connection = sentinel.master_for(
redis_options.get('service_name', 'master'), db=redis_options.get('db', 0)
)
_set_redbeat_connect(app, "redbeat_sentinel", sentinel, retry_period)
elif conf.redis_url.startswith('rediss'):
ssl_options = {'ssl_cert_reqs': ssl.CERT_REQUIRED}
if isinstance(conf.redis_use_ssl, dict):
Expand All @@ -152,14 +152,24 @@ def get_redis(app=None):
else:
connection = StrictRedis.from_url(conf.redis_url, decode_responses=True)

if retry_period is None:
app.redbeat_redis = connection
else:
app.redbeat_redis = RetryingConnection(retry_period, connection)
_set_redbeat_connect(app, "redbeat_redis", connection, retry_period)

if hasattr(app, 'redbeat_sentinel') and isinstance(app.redbeat_sentinel, Sentinel):
connection = app.redbeat_sentinel.master_for(
redis_options.get('service_name', 'master'), db=redis_options.get('db', 0)
)
_set_redbeat_connect(app, "redbeat_redis", connection, retry_period)

return app.redbeat_redis


def _set_redbeat_connect(app, connect_name, connection, retry_period):
if retry_period is None:
setattr(app, connect_name, connection)
else:
setattr(app, connect_name, RetryingConnection(retry_period, connection))


ADD_ENTRY_ERROR = """\
Couldn't add entry %r to redis schedule: %r. Contents: %r
Expand Down

0 comments on commit d387c98

Please sign in to comment.