You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
fromtaskiq_redisimportListQueueBroker, RedisAsyncResultBackendredis_async_result=RedisAsyncResultBackend(
redis_url="redis://localhost:6379",
)
# Or you can use PubSubBroker if you need broadcastingbroker=ListQueueBroker(
url="redis://localhost:6379",
).with_result_backend(
result_backend=redis_async_result,
)
Starting this worker and then killing it with ctrl+c:
taskiq worker config.taskiq_scheduler:broker
[2024-11-08 18:13:02,105][taskiq.worker][INFO ][MainProcess] Pid of a main process: 22684
[2024-11-08 18:13:02,105][taskiq.worker][INFO ][MainProcess] Starting 2 worker processes.
[2024-11-08 18:13:02,109][taskiq.process-manager][INFO ][MainProcess] Started process worker-0 with pid 22686
[2024-11-08 18:13:02,110][taskiq.process-manager][INFO ][MainProcess] Started process worker-1 with pid 22687
^CWorker process interrupted.
Shutting down the broker.
Worker process interrupted.
Shutting down the broker.
[2024-11-08 18:13:03,078][taskiq.process-manager][WARNING][MainProcess] Workers are scheduled for shutdown.
Task exception was never retrieved
future: <Task finished name='Task-1' coro=<Receiver.listen() done, defined at /Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py:321> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionError('Connection closed by server.')])>
Traceback (most recent call last):
File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/cli/worker/run.py", line 148, in start_listen
loop.run_until_complete(receiver.listen())
File "uvloop/loop.pyx", line 1512, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1505, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1379, in uvloop.loop.Loop.run_forever
File "uvloop/loop.pyx", line 557, in uvloop.loop.Loop._run
File "uvloop/handles/poll.pyx", line 216, in uvloop.loop.__on_uvpoll_event
File "uvloop/cbhandles.pyx", line 83, in uvloop.loop.Handle._run
File "uvloop/cbhandles.pyx", line 66, in uvloop.loop.Handle._run
File "uvloop/loop.pyx", line 399, in uvloop.loop.Loop._read_from_self
File "uvloop/loop.pyx", line 404, in uvloop.loop.Loop._invoke_signals
File "uvloop/loop.pyx", line 379, in uvloop.loop.Loop._ceval_process_signals
File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/cli/worker/run.py", line 107, in interrupt_handler
raise KeyboardInterrupt
KeyboardInterrupt
During handling of the above exception, another exception occurred:
+ Exception Group Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py", line 333, in listen
| async with anyio.create_task_group() as gr:
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 763, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py", line 361, in prefetcher
| message = await iterator.__anext__()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq_redis/redis_broker.py", line 129, in listen
| yield (await redis_conn.brpop(self.queue_name))[
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 616, in execute_command
| return await conn.retry.call_with_retry(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/retry.py", line 62, in call_with_retry
| await fail(error)
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 603, in _disconnect_raise
| raise error
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/retry.py", line 59, in call_with_retry
| return await do()
| ^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 590, in _send_command_parse_response
| return await self.parse_response(conn, command_name, **options)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 637, in parse_response
| response = await connection.read_response()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/connection.py", line 543, in read_response
| response = await self._parser.read_response(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 82, in read_response
| response = await self._read_response(disable_decoding=disable_decoding)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 90, in _read_response
| raw = await self._readline()
| ^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/_parsers/base.py", line 221, in _readline
| raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
| redis.exceptions.ConnectionError: Connection closed by server.
+------------------------------------
Task exception was never retrieved
future: <Task finished name='Task-1' coro=<Receiver.listen() done, defined at /Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py:321> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionError('Connection closed by server.')])>
Traceback (most recent call last):
File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/cli/worker/run.py", line 148, in start_listen
loop.run_until_complete(receiver.listen())
File "uvloop/loop.pyx", line 1512, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1505, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1379, in uvloop.loop.Loop.run_forever
File "uvloop/loop.pyx", line 557, in uvloop.loop.Loop._run
File "uvloop/handles/poll.pyx", line 216, in uvloop.loop.__on_uvpoll_event
File "uvloop/cbhandles.pyx", line 83, in uvloop.loop.Handle._run
File "uvloop/cbhandles.pyx", line 66, in uvloop.loop.Handle._run
File "uvloop/loop.pyx", line 399, in uvloop.loop.Loop._read_from_self
File "uvloop/loop.pyx", line 404, in uvloop.loop.Loop._invoke_signals
File "uvloop/loop.pyx", line 379, in uvloop.loop.Loop._ceval_process_signals
File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/cli/worker/run.py", line 107, in interrupt_handler
raise KeyboardInterrupt
KeyboardInterrupt
During handling of the above exception, another exception occurred:
+ Exception Group Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py", line 333, in listen
| async with anyio.create_task_group() as gr:
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 763, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py", line 361, in prefetcher
| message = await iterator.__anext__()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq_redis/redis_broker.py", line 129, in listen
| yield (await redis_conn.brpop(self.queue_name))[
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 616, in execute_command
| return await conn.retry.call_with_retry(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/retry.py", line 62, in call_with_retry
| await fail(error)
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 603, in _disconnect_raise
| raise error
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/retry.py", line 59, in call_with_retry
| return await do()
| ^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 590, in _send_command_parse_response
| return await self.parse_response(conn, command_name, **options)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/client.py", line 637, in parse_response
| response = await connection.read_response()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/asyncio/connection.py", line 543, in read_response
| response = await self._parser.read_response(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 82, in read_response
| response = await self._read_response(disable_decoding=disable_decoding)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/_parsers/resp2.py", line 90, in _read_response
| raw = await self._readline()
| ^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/redis/_parsers/base.py", line 221, in _readline
| raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
| redis.exceptions.ConnectionError: Connection closed by server.
+------------------------------------
Shouldn't it be silent exit? Exectly same thing happens with NATS backend but not with RabbitMQ.
[2024-11-08 15:11:07,684][taskiq.worker][INFO ][MainProcess] Pid of a main process: 9387
[2024-11-08 15:11:07,684][taskiq.worker][INFO ][MainProcess] Starting 2 worker processes.
[2024-11-08 15:11:07,688][taskiq.process-manager][INFO ][MainProcess] Started process worker-0 with pid 9389
[2024-11-08 15:11:07,690][taskiq.process-manager][INFO ][MainProcess] Started process worker-1 with pid 9390
^C[2024-11-08 15:13:45,281][taskiq.process-manager][DEBUG ][MainProcess] Got signal 2.
[2024-11-08 15:13:45,283][taskiq.process-manager][WARNING][MainProcess] Workers are scheduled for shutdown.
Worker process interrupted.
Worker process interrupted.
Shutting down the broker.
Shutting down the broker.
Task exception was never retrieved
future: <Task finished name='Task-1' coro=<Receiver.listen() done, defined at /Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py:321> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionClosedError()])>
Traceback (most recent call last):
File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/cli/worker/run.py", line 148, in start_listen
loop.run_until_complete(receiver.listen())
File "uvloop/loop.pyx", line 1512, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1505, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1379, in uvloop.loop.Loop.run_forever
File "uvloop/loop.pyx", line 557, in uvloop.loop.Loop._run
File "uvloop/handles/poll.pyx", line 216, in uvloop.loop.__on_uvpoll_event
File "uvloop/cbhandles.pyx", line 83, in uvloop.loop.Handle._run
File "uvloop/cbhandles.pyx", line 66, in uvloop.loop.Handle._runTask exception was never retrieved
future: <Task finished name='Task-1' coro=<Receiver.listen() done, defined at /Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py:321> exception=ExceptionGroup('unhandled errors in a TaskGroup', [ConnectionClosedError()])>
Traceback (most recent call last):
File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/cli/worker/run.py", line 148, in start_listen
loop.run_until_complete(receiver.listen())
File "uvloop/loop.pyx", line 1512, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1505, in uvloop.loop.Loop.run_until_complete
File "uvloop/loop.pyx", line 1379, in uvloop.loop.Loop.run_forever
File "uvloop/loop.pyx", line 557, in uvloop.loop.Loop._run
File "uvloop/handles/poll.pyx", line 216, in uvloop.loop.__on_uvpoll_event
File "uvloop/cbhandles.pyx", line 83, in uvloop.loop.Handle._run
File "uvloop/cbhandles.pyx", line 66, in uvloop.loop.Handle._run
File "uvloop/loop.pyx", line 399, in uvloop.loop.Loop._read_from_self
File "uvloop/loop.pyx", line 404, in uvloop.loop.Loop._invoke_signals
File "uvloop/loop.pyx", line 379, in uvloop.loop.Loop._ceval_process_signals
File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/cli/worker/run.py", line 107, in interrupt_handler
raise KeyboardInterrupt
KeyboardInterrupt
During handling of the above exception, another exception occurred:
+ Exception Group Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py", line 333, in listen
| async with anyio.create_task_group() as gr:
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 763, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/nats/aio/subscription.py", line 189, in next_msg
| msg = await future
| ^^^^^^^^^^^^
| File "/Users/Silver/.local/share/mise/installs/python/3.12.5/lib/python3.12/asyncio/tasks.py", line 520, in wait_for
| return await fut
| ^^^^^^^^^
| File "/Users/Silver/.local/share/mise/installs/python/3.12.5/lib/python3.12/asyncio/queues.py", line 158, in get
| await getter
| asyncio.exceptions.CancelledError
|
| During handling of the above exception, another exception occurred:
|
| Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py", line 361, in prefetcher
| message = await iterator.__anext__()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq_nats/broker.py", line 232, in listen
| nats_messages: typing.List[NatsMessage] = await self.consumer.fetch(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/nats/js/client.py", line 1073, in fetch
| msg = await self._fetch_one(expires, timeout, heartbeat)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/nats/js/client.py", line 1125, in _fetch_one
| msg = await self._sub.next_msg(timeout=deadline)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/nats/aio/subscription.py", line 196, in next_msg
| raise errors.ConnectionClosedError
| nats.errors.ConnectionClosedError: nats: connection closed
+------------------
File "uvloop/loop.pyx", line 399, in uvloop.loop.Loop._read_from_self
File "uvloop/loop.pyx", line 404, in uvloop.loop.Loop._invoke_signals
File "uvloop/loop.pyx", line 379, in uvloop.loop.Loop._ceval_process_signals
File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/cli/worker/run.py", line 107, in interrupt_handler
raise KeyboardInterrupt
KeyboardInterrupt
During handling of the above exception, another exception occurred:
+ Exception Group Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py", line 333, in listen
| async with anyio.create_task_group() as gr:
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 763, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/nats/aio/subscription.py", line 189, in next_msg
| msg = await future
| ^^^^^^^^^^^^
| File "/Users/Silver/.local/share/mise/installs/python/3.12.5/lib/python3.12/asyncio/tasks.py", line 520, in wait_for
| return await fut
| ^^^^^^^^^
| File "/Users/Silver/.local/share/mise/installs/python/3.12.5/lib/python3.12/asyncio/queues.py", line 158, in get
| await getter
| asyncio.exceptions.CancelledError
|
| During handling of the above exception, another exception occurred:
|
| Traceback (most recent call last):
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq/receiver/receiver.py", line 361, in prefetcher
| message = await iterator.__anext__()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/taskiq_nats/broker.py", line 232, in listen
| nats_messages: typing.List[NatsMessage] = await self.consumer.fetch(
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/nats/js/client.py", line 1073, in fetch
| msg = await self._fetch_one(expires, timeout, heartbeat)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/nats/js/client.py", line 1125, in _fetch_one
| msg = await self._sub.next_msg(timeout=deadline)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| File "/Users/Silver/Projects/GitHub/mybaze/.venv/lib/python3.12/site-packages/nats/aio/subscription.py", line 196, in next_msg
| raise errors.ConnectionClosedError
| nats.errors.ConnectionClosedError: nats: connection closed
+------------------------------------
------------------
[2024-11-08 15:13:45,439][root][DEBUG ][MainProcess] Got event: ShutdownAction()
[2024-11-08 15:13:45,439][taskiq.process-manager][DEBUG ][MainProcess] Process manager closed.
interestingly enough there is no issue when result backend is not used
taskiq worker config.taskiq_scheduler:broker
[2024-11-08 18:32:59,106][taskiq.worker][INFO ][MainProcess] Pid of a main process: 25309
[2024-11-08 18:32:59,106][taskiq.worker][INFO ][MainProcess] Starting 2 worker processes.
[2024-11-08 18:32:59,110][taskiq.process-manager][INFO ][MainProcess] Started process worker-0 with pid 25311
[2024-11-08 18:32:59,111][taskiq.process-manager][INFO ][MainProcess] Started process worker-1 with pid 25312
^CWorker process interrupted.
Worker process interrupted.
Shutting down the broker.
Shutting down the broker.
[2024-11-08 18:32:59,753][taskiq.process-manager][WARNING][MainProcess] Workers are scheduled for shutdown.
However with taskiq-redis there is no difference whether result backend was configured or not.
The text was updated successfully, but these errors were encountered:
Here is my minimal setup:
Starting this worker and then killing it with
ctrl+c
:Shouldn't it be silent exit? Exectly same thing happens with NATS backend but not with RabbitMQ.
Code and logs for
taskiq-nats
case below:logs
interestingly enough there is no issue when
result backend
is not usedlogs:
However with
taskiq-redis
there is no difference whetherresult backend
was configured or not.The text was updated successfully, but these errors were encountered: