Skip to content

Commit

Permalink
Add an exception handler for the pubsub thread (#2132)
Browse files Browse the repository at this point in the history
* Add an exception handler for the pubsub thread

After a connection error to Redis the pubsub thread was stopped and the
worker could not receive commands anymore.

This commit adds an exception handler for the thread that adds a log
message and ignores `redis.exceptions.ConnectionError`. Any other
exception is re-raised.

redis-py internal mechanism allows the pubsub thread to recover its
connection and reinstall the pubsub channel subscription to allow the
worker to receive commands again after connection errors.

It tries to behave the same as the main worker loop retry mechanism but
without the backoff wait factor.

Fixes #1836
Fixes #2070

* Add test for untested line, improve logging and comments

Add *args & **kwargs to tests.fixtures.raise_exc to pass tests with Python 3.7
  • Loading branch information
fcharlier authored Oct 17, 2024
1 parent f4283af commit 3e2e26e
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 4 deletions.
25 changes: 23 additions & 2 deletions rq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
except ImportError:
pass
from redis import Redis
from redis.client import Pipeline, PubSub
from redis.client import Pipeline, PubSub, PubSubWorkerThread

try:
from signal import SIGKILL
Expand Down Expand Up @@ -950,12 +950,33 @@ def run_maintenance_tasks(self):
self.clean_registries()
Group.clean_registries(connection=self.connection)

def _pubsub_exception_handler(self, exc: Exception, pubsub: "PubSub", pubsub_thread: "PubSubWorkerThread") -> None:
"""
This exception handler allows the pubsub_thread to continue & retry to
connect after a connection problem the same way the main worker loop
indefinitely retries.
redis-py internal mechanism will restore the channels subscriptions
once the connection is re-established.
"""
if isinstance(exc, (redis.exceptions.ConnectionError)):
self.log.error(
"Could not connect to Redis instance: %s Retrying in %d seconds...",
exc,
2,
)
time.sleep(2.0)
else:
self.log.warning("Pubsub thread exitin on %s" % exc)
raise

def subscribe(self):
"""Subscribe to this worker's channel"""
self.log.info('Subscribing to channel %s', self.pubsub_channel_name)
self.pubsub = self.connection.pubsub()
self.pubsub.subscribe(**{self.pubsub_channel_name: self.handle_payload})
self.pubsub_thread = self.pubsub.run_in_thread(sleep_time=0.2, daemon=True)
self.pubsub_thread = self.pubsub.run_in_thread(
sleep_time=0.2, daemon=True, exception_handler=self._pubsub_exception_handler
)

def get_heartbeat_ttl(self, job: 'Job') -> int:
"""Get's the TTL for the next heartbeat.
Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def do_nothing():
pass


def raise_exc():
def raise_exc(*args, **kwargs):
raise Exception('raise_exc error')

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.10/Redis4/redis-py3.5.0

raise_exc error

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.11/Redis4/redis-py3.5.0

raise_exc error

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.10/Redis6/redis-py3.5.0

raise_exc error

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.10/Redis5/redis-py3.5.0

raise_exc error

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.10/Redis7/redis-py3.5.0

raise_exc error

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.11/Redis5/redis-py3.5.0

raise_exc error

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.11/Redis7/redis-py3.5.0

raise_exc error

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.11/Redis6/redis-py3.5.0

raise_exc error

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.12/Redis4/redis-py3.5.0

raise_exc error

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.12/Redis5/redis-py3.5.0

raise_exc error

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.12/Redis6/redis-py3.5.0

raise_exc error

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.12/Redis7/redis-py3.5.0

raise_exc error

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.8/Redis4/redis-py3.5.0

raise_exc error

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.8/Redis6/redis-py3.5.0

raise_exc error

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.8/Redis7/redis-py3.5.0

raise_exc error

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.8/Redis5/redis-py3.5.0

raise_exc error

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.9/Redis5/redis-py3.5.0

raise_exc error

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.9/Redis4/redis-py3.5.0

raise_exc error

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.9/Redis7/redis-py3.5.0

raise_exc error

Check failure on line 51 in tests/fixtures.py

View workflow job for this annotation

GitHub Actions / Python3.9/Redis6/redis-py3.5.0

raise_exc error


Expand Down
28 changes: 27 additions & 1 deletion tests/test_commands.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import time
from multiprocessing import Process
from unittest import mock

from redis import Redis

Expand All @@ -9,7 +10,7 @@
from rq.serializers import JSONSerializer
from rq.worker import WorkerStatus
from tests import RQTestCase
from tests.fixtures import _send_kill_horse_command, _send_shutdown_command, long_running_job
from tests.fixtures import _send_kill_horse_command, _send_shutdown_command, long_running_job, raise_exc_mock


def start_work(queue_name, worker_name, connection_kwargs):
Expand All @@ -35,6 +36,31 @@ def test_shutdown_command(self):
worker.work()
p.join(1)

def test_pubsub_thread_survives_connection_error(self):
"""Ensure that the pubsub thread is still alive after its Redis connection is killed"""
connection = self.connection
worker = Worker('foo', connection=connection)
worker.subscribe()

assert worker.pubsub_thread.is_alive()

for client in connection.client_list():
connection.client_kill(client["addr"])

time.sleep(0.0) # Allow other threads to run
assert worker.pubsub_thread.is_alive()

def test_pubsub_thread_exits_other_error(self):
"""Ensure that the pubsub thread exits on other than redis.exceptions.ConnectionError"""
connection = self.connection
worker = Worker('foo', connection=connection)

with mock.patch("redis.client.PubSub.get_message", new_callable=raise_exc_mock):
worker.subscribe()

worker.pubsub_thread.join()
assert not worker.pubsub_thread.is_alive()

def test_kill_horse_command(self):
"""Ensure that shutdown command works properly."""
connection = self.connection
Expand Down

0 comments on commit 3e2e26e

Please sign in to comment.