From 79f77a8f8b558048a61a5463cc61bc8f84e06171 Mon Sep 17 00:00:00 2001 From: raidzin Date: Mon, 13 Oct 2025 11:44:46 +0300 Subject: [PATCH] fix: ZeroMQ broker --- taskiq/brokers/zmq_broker.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/taskiq/brokers/zmq_broker.py b/taskiq/brokers/zmq_broker.py index 84af799b..36cfbfa6 100644 --- a/taskiq/brokers/zmq_broker.py +++ b/taskiq/brokers/zmq_broker.py @@ -61,6 +61,16 @@ async def startup(self) -> None: self.socket.bind(self.pub_host) await super().startup() + async def shutdown(self) -> None: + """ + Shutdown for zmq broker. + + This function closes actual connections to sockets + """ + if not self.is_worker_process: + self.socket.unbind(self.pub_host) + return await super().shutdown() + async def kick(self, message: BrokerMessage) -> None: """ Kicking message. @@ -77,8 +87,7 @@ async def kick(self, message: BrokerMessage) -> None: ] for idx in range(math.ceil(len(message.message) / part_len)) ] - with self.socket.connect(self.pub_host) as sock: - await sock.send_multipart(parts) + await self.socket.send_multipart(parts) async def listen(self) -> AsyncGenerator[bytes, None]: """