Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

find new master every time #267

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions redbeat/schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from kombu.utils.objects import cached_property
from kombu.utils.url import maybe_sanitize_url
from redis.client import StrictRedis
from redis.sentinel import MasterNotFoundError, Sentinel
from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_exponential

from .decoder import RedBeatJSONDecoder, RedBeatJSONEncoder, to_timestamp
Expand Down Expand Up @@ -50,6 +51,9 @@
return 1
"""

REDBEAT_REDIS_KEY = "redbeat_redis"
REDBEAT_SENTINEL_KEY = "redbeat_sentinel"


class RetryingConnection:
"""A proxy for the Redis connection that delegates all the calls to
Expand All @@ -64,6 +68,7 @@ def __init__(self, retry_period, wrapped_connection):
retry=(
retry_if_exception_type(redis.exceptions.ConnectionError)
| retry_if_exception_type(redis.exceptions.TimeoutError)
| retry_if_exception_type(MasterNotFoundError)
),
reraise=True,
wait=wait_exponential(multiplier=1, max=self.RETRY_MAX_WAIT),
Expand Down Expand Up @@ -113,9 +118,10 @@ def ensure_conf(app):
def get_redis(app=None):
app = app_or_default(app)
conf = ensure_conf(app)
if not hasattr(app, 'redbeat_redis') or app.redbeat_redis is None:
redis_options = conf.redbeat_redis_options
retry_period = redis_options.get('retry_period')
redis_options = conf.redbeat_redis_options
retry_period = redis_options.get('retry_period')

if not hasattr(app, REDBEAT_REDIS_KEY) or getattr(app, REDBEAT_REDIS_KEY) is None:
if redis_options.get('cluster', False):
from redis.cluster import RedisCluster

Expand All @@ -136,9 +142,8 @@ def get_redis(app=None):
sentinel_kwargs=redis_options.get('sentinel_kwargs'),
**connection_kwargs,
)
connection = sentinel.master_for(
redis_options.get('service_name', 'master'), db=redis_options.get('db', 0)
)
_set_redbeat_connect(app, REDBEAT_SENTINEL_KEY, sentinel, retry_period)
connection = None
elif conf.redis_url.startswith('rediss'):
ssl_options = {'ssl_cert_reqs': ssl.CERT_REQUIRED}
if isinstance(conf.redis_use_ssl, dict):
Expand All @@ -153,12 +158,24 @@ def get_redis(app=None):
else:
connection = StrictRedis.from_url(conf.redis_url, decode_responses=True)

if retry_period is None:
app.redbeat_redis = connection
else:
app.redbeat_redis = RetryingConnection(retry_period, connection)
if connection:
_set_redbeat_connect(app, REDBEAT_REDIS_KEY, connection, retry_period)

if hasattr(app, REDBEAT_SENTINEL_KEY) and isinstance(getattr(app, REDBEAT_SENTINEL_KEY), Sentinel):
sentinel = getattr(app, REDBEAT_SENTINEL_KEY)
connection = sentinel.master_for(
redis_options.get('service_name', 'master'), db=redis_options.get('db', 0)
)
_set_redbeat_connect(app, REDBEAT_REDIS_KEY, connection, retry_period)

return getattr(app, REDBEAT_REDIS_KEY)


return app.redbeat_redis
def _set_redbeat_connect(app, connect_name, connection, retry_period):
if retry_period is None:
setattr(app, connect_name, connection)
else:
setattr(app, connect_name, RetryingConnection(retry_period, connection))


ADD_ENTRY_ERROR = """\
Expand Down