Skip to content

Commit

Permalink
Resend QoS query only on reconnect, not periodically
Browse files Browse the repository at this point in the history
  • Loading branch information
Elena Shylko committed Nov 22, 2024
1 parent 60e1bf4 commit 4300931
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 30 deletions.
30 changes: 14 additions & 16 deletions gmqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import logging
import uuid
from copy import copy
from typing import Union, Sequence

from .mqtt.protocol import MQTTProtocol
Expand Down Expand Up @@ -147,13 +148,10 @@ def __init__(self, client_id, clean_session=True, optimistic_acknowledgement=Tru
self._will_message = will_message

# TODO: this constant may be moved to config
self._retry_deliver_timeout = kwargs.pop('retry_deliver_timeout', 5)
self._persistent_storage = kwargs.pop('persistent_storage', HeapPersistentStorage(self._retry_deliver_timeout))
self._persistent_storage = kwargs.pop('persistent_storage', HeapPersistentStorage())

self._topic_alias_maximum = kwargs.get('topic_alias_maximum', 0)

self._resend_task = asyncio.ensure_future(self._resend_qos_messages())

self._logger = logger or logging.getLogger(__name__)

def get_subscription_by_identifier(self, subscription_identifier):
Expand All @@ -178,28 +176,29 @@ async def _resend_qos_messages(self):

if await self._persistent_storage.is_empty:
self._logger.debug('[QoS query IS EMPTY]')
await asyncio.sleep(self._retry_deliver_timeout)
return
elif self._connection.is_closing():
self._logger.debug('[Some msg need to resend] Transport is closing, sleeping')
await asyncio.sleep(self._retry_deliver_timeout)
self._logger.debug('[Some msg need to resend] Transport is closing')
return
else:
self._logger.debug('[Some msg need to resend] processing message')
msg = await self._persistent_storage.pop_message()
msgs = copy(await self._persistent_storage.get_all())
self._logger.debug('[msgs need to resend] processing %s messages', len(msgs))

if msg:
(mid, package) = msg
await self._persistent_storage.clear()

for msg in msgs:
(_, mid, package) = msg

try:
self._connection.send_package(package)
except Exception as exc:
self._logger.error('[ERROR WHILE RESENDING] mid: %s', mid, exc_info=exc)

await self._persistent_storage.push_message(mid, package)
await asyncio.sleep(0.001)
else:
await asyncio.sleep(self._retry_deliver_timeout)

self._resend_task = asyncio.ensure_future(self._resend_qos_messages())
async def _clear_resend_qos_queue(self):
await self._persistent_storage.clear()


@property
def properties(self):
Expand Down Expand Up @@ -276,7 +275,6 @@ async def reconnect(self, delay=False):

async def disconnect(self, reason_code=0, **properties):
self._is_active = False
self._resend_task.cancel()
await self._disconnect(reason_code=reason_code, **properties)

async def _disconnect(self, reason_code=0, **properties):
Expand Down
10 changes: 7 additions & 3 deletions gmqtt/mqtt/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,11 @@ def _update_keepalive_if_needed(self):
def _handle_connack_packet(self, cmd, packet):
self._connected.set()

(flags, result) = struct.unpack("!BB", packet[:2])
(session_present, result) = struct.unpack("!BB", packet[:2])
if session_present:
asyncio.ensure_future(self._resend_qos_messages())
else:
asyncio.ensure_future(self._clear_resend_qos_queue())

if result != 0:
self._logger.warning('[CONNACK] %s', hex(result))
Expand Down Expand Up @@ -287,8 +291,8 @@ def _handle_connack_packet(self, cmd, packet):
# TODO: Implement checking for the flags and results
# see 3.2.2.3 Connect Return code of the http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.pdf

self._logger.debug('[CONNACK] flags: %s, result: %s', hex(flags), hex(result))
self.on_connect(self, flags, result, self.properties)
self._logger.debug('[CONNACK] session_present: %s, result: %s', hex(session_present), hex(result))
self.on_connect(self, session_present, result, self.properties)

def _handle_publish_packet(self, cmd, raw_packet):
header = cmd
Expand Down
27 changes: 16 additions & 11 deletions gmqtt/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@ async def wait_empty(self) -> None:
# this method must implement an atomic transaction from async network exchange perspective.
raise NotImplementedError

async def clear(self):
raise NotImplementedError

async def get_all(self):
raise NotImplementedError


class HeapPersistentStorage(BasePersistentStorage):
def __init__(self, timeout):
def __init__(self):
self._queue = []
self._timeout = timeout
self._empty_waiters: Set[asyncio.Future] = set()

def _notify_waiters(self, waiters: Set[asyncio.Future], notify: Callable[[asyncio.Future], None]) -> None:
Expand All @@ -46,17 +51,10 @@ async def push_message(self, mid, raw_package):
heapq.heappush(self._queue, (tm, mid, raw_package))

async def pop_message(self):
current_time = asyncio.get_event_loop().time()

(tm, mid, raw_package) = heapq.heappop(self._queue)

if current_time - tm > self._timeout:
self._check_empty()
return mid, raw_package
else:
heapq.heappush(self._queue, (tm, mid, raw_package))

return None
self._check_empty()
return mid, raw_package

async def remove_message_by_mid(self, mid):
message = next(filter(lambda x: x[1] == mid, self._queue), None)
Expand All @@ -74,3 +72,10 @@ async def wait_empty(self) -> None:
waiter = asyncio.get_running_loop().create_future()
self._empty_waiters.add(waiter)
await waiter

async def clear(self):
self._queue = []
self._notify_waiters(self._empty_waiters, lambda waiter: waiter.set_result(None))

async def get_all(self):
return self._queue

0 comments on commit 4300931

Please sign in to comment.