-
Notifications
You must be signed in to change notification settings - Fork 79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parameter queue_maxsize of Client.messages() has a global side-effect #250
Comments
Hi there, This is a very well thought out issue, thanks for that 👍 One solution to this problem is to implement some sort of "distributor" that receives all messages and sorts them into queues of the different consumers. With that, we can rewrite your example such that the slow consumer does not miss messages anymore: import asyncio
import aiomqtt
async def fast_producer(client: aiomqtt.Client):
while True:
await asyncio.sleep(0.1)
await client.publish("fast", "fast")
async def fast_consumer():
while True:
message = await fast_queue.get()
print(f"Fast consumer received: {message.payload}")
async def slow_producer(client: aiomqtt.Client):
while True:
await asyncio.sleep(10)
await client.publish("slow", "slow")
async def slow_consumer():
while True:
message = await slow_queue.get()
print(f"Slow consumer received: {message.payload}")
await asyncio.sleep(5) # slow call
fast_queue = asyncio.Queue(maxsize=10)
slow_queue = asyncio.Queue(maxsize=1)
async def distributor(client: aiomqtt.Client):
async with client.messages() as messages:
await client.subscribe("fast")
await client.subscribe("slow")
async for message in messages:
try:
if message.topic.matches("fast"):
fast_queue.put_nowait(message)
elif message.topic.matches("slow"):
slow_queue.put_nowait(message)
except asyncio.QueueFull:
print("One of the message queues is full. Discarding message.")
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
tasks = [
asyncio.create_task(fast_producer(client)),
asyncio.create_task(fast_consumer()),
asyncio.create_task(slow_producer(client)),
asyncio.create_task(slow_consumer()),
asyncio.create_task(distributor(client)),
]
await asyncio.gather(*tasks)
asyncio.run(main()) Does that work for your use case? People sometimes have very complex requirements to how messages are handled. To make the default case (single queue) easier, and make the queueing more flexible at the same time, we're thinking about switching to a single client-wide queue for |
My real use case is more complex and could not be solved by a single global queue. The problem with the distributor (as well as the client-wide queue) is that it only work for static topics that are known at compile time. One use case not covered are request-response. With MQTTv5 and response topic, you can use the request-response pattern. At least with client-wide queue, it is more explicit that the queue is really global and the I do, however, lament the loss of functionality that will need to be implemented by users of the library. |
Hi alex007sirois, thanks for opening this issue. :) Let me have a look. Also thank to you, @empicano, for quickly providing a detailed answer and workaround. 👍 IMHO, the overall problem here is the lack of a backpressure in aiomqtt. That is, a way to communicate that "you're sending too many messages; I can't keep up; please slow down". Right now, our current "solution" is to provide Backpressure (or the lack thereof) is the reason we are in this mess. One queue or multiple queues. It's the same underlying problem.
I completely agree. You trade one problem with another, I'm afraid. You fix the memory leak but you may now experience message loss (when the queue is full). Maybe this is acceptable in your use case if you can accept potential message loss or guarantee that it doesn't happen by your own application design. We provide the In any case, there are no easy fixes here. We have to solve backpressure somehow to get to the root of this issue. :) That's not something that I personally have the bandwidth to do at the moment. |
Thanks for the interesting discussion on this! 😎 I'll close this as we switched to a single client-wide queue with 2.0 that incidentally also fixes this issue. Please reopen if there's anything left unsolved or unclear!
Indeed, import asyncio
import aiomqtt
async def handle(client):
async for message in client._messages():
print(message.topic, message.payload)
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
async with asyncio.TaskGroup() as t:
t.create_task(handle(client))
t.create_task(handle(client))
asyncio.run(main()) I'd like to change Backpressure is still not solved, I'm afraid, but I suggest we open another issue for that. |
The issue arose when migrating from filtered_messages to messages.
We had issues with unbounded queues that were not being consumed and caused memory leaks.
We were able to locate the leak by setting
queue_maxsize
.Unbounded queues are dangerous and so we added a limit to every queue.
However, since messages are put in queue before checking the topic, you cannot reliably set
queue_maxsize
.It depends on an implicit global state of the sum of all subscription.
If a subscribed topic receive a lot of messages it will discard relevant messages from slower topics.
Every time we subscribe to a new topic or a topic is published more often, we need to scan the whole code to change the queue size.
So we are stuck between two sub-optimal solutions.
An optional topic passed to
Client.messages()
to filter message would make thequeue_maxsize
more usable.In the meantime I will add a callback to _on_message_callbacks.
Little example:
One fast topic and one slow topic.
The slow consumer may miss some messages...
The text was updated successfully, but these errors were encountered: