Skip to content

Commit 7d7feae

Browse files
committed
fix: ZeroMQ broker
1 parent 1f2e24e commit 7d7feae

File tree

1 file changed

+11
-2
lines changed

1 file changed

+11
-2
lines changed

taskiq/brokers/zmq_broker.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,16 @@ async def startup(self) -> None:
6161
self.socket.bind(self.pub_host)
6262
await super().startup()
6363

64+
async def shutdown(self) -> None:
65+
"""
66+
Shutdown for zmq broker.
67+
68+
This function closes actual connections to sockets
69+
"""
70+
if not self.is_worker_process:
71+
self.socket.unbind()
72+
return await super().shutdown()
73+
6474
async def kick(self, message: BrokerMessage) -> None:
6575
"""
6676
Kicking message.
@@ -77,8 +87,7 @@ async def kick(self, message: BrokerMessage) -> None:
7787
]
7888
for idx in range(math.ceil(len(message.message) / part_len))
7989
]
80-
with self.socket.connect(self.pub_host) as sock:
81-
await sock.send_multipart(parts)
90+
await self.socket.send_multipart(parts)
8291

8392
async def listen(self) -> AsyncGenerator[bytes, None]:
8493
"""

0 commit comments

Comments
 (0)