-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PubSub: Add an option to streaming pull to invoke callbacks for batches of messages #8022
Conversation
In certain cases automatically leasing Message instances upon creation might not be desired, thus an optional parameter is added to Message initializer that allows skipping that. The default behavior is not changed, new Message instances *are* automatically leased upon creation.
Leasing messages through a request queue in dispatcher causes a race condition with the ConsumeBidirectionalStream thread. A request to pause the background consumer can arrive when the Bidi consumer is just about to fetch the the next batch of messages, and thus the latter gets paused only *after* fetching those messages. This commit synchronously leases received messages in the streaming pull manager callback. If that hits the lease management load limit, the background consumer is paused synchronously, and will correctly pause *before* pulling another batch of messages.
If the PubSub backend sends too many messages in a single response that would cause the leaser overload should all these messeges were added to it, the StreamingPullManager now puts excessive messages into an internal holding buffer. The messages are released from the buffer when the leaser again has enough capacity (as defined by the FlowControl settings), and the message received callback is invoked then as well.
With the StreamingPullManager._on_response() callback adding received messages to the leaser synchronously (in the background consumer thread), a race condition can happen with the dispatcher thread that can asynchronously add (remove) messages to (from) lease management, e.g. on ack() and nack() requests. The same is the case with related operations of maybe pausing/resuming the background consumer. This commit thus adds locks in key places, assuring that these operations are atomic, ant not subject to race conditions.
lint failed: |
We should discuss this feature with the Pub/Sub team before merging. They want to limit the difference between implementations in different languages, if possible. |
Subscriber client's async streaming pull can now be configured to invoke a callback for batches of received messages, as opposed to invoking a callback for each received message.
Hi, is there any ETA of merging this? That is a pretty vital decision i have to make in project. And the decision depends of this ETA. Thanks! |
@C0rvax I can only give an unofficial answer, as I am not the decision maker here, but AFAIK this PR is unlikely to be merged in the near future. PubSub clients for other languages currently lack this feature, and consistency is desired across various implementations. |
Thanks for your answer, it helps a lot! |
It would be nice to have this feature, though, since java, ruby and go all have the streaming pull/concurrency feature, and python does not. |
@szeitlin The Python client also supports asynchronous (streaming) pull, and it's actually the preferred way of pulling the messages (docs). The thing here is just that each received message is dispatched individually, i.e. one message per each callback invocation, as opposed to multiple messages per invocation. Or did I misunderstand your comment? |
We're trying the asynchronous approach, but ultimately we still have a use case where we'll need to write out messages in batch, which is going to be a bit of work to implement using the streaming pull method and checking to see when we've accumulated enough messages. I get the impression a few other people would appreciate this feature for the same reason, unless I'm misunderstanding how it's supposed to work? |
The batch callback, at least how it was implemented here, simply means that if N messages are available to dispatch to the user callback, all of them are passed to the callback in a list, as opposed to N callback invocations, one for each individual message. The concern raised in internal discussions was that it is not defined what a "batch" actually is, and can thus be confusing to users. Publishing several messages in a single batch does not necessarily mean that the backend, too, will actually deliver these messages in a single batch. The "received" batch can also contain messages from several different publish batches. Since the number of messages received in a single batch might vary, you would still have to implement the aggregation logic to know when enough messages have been accumulated. Just to avoid any misunderstanding, what would be your expectation of such feature, i.e. what messages would you expect to receive in a particular batch? |
Good question. I guess we were just hoping we could set a minimum batch size (in our case it's ok if things arrive out of order). |
The backend does not support that, unfortunately, thus any "minimum size" batching logic needs to be implemented on the client side. While the client library could do that internally, it cannot decide for the user what to do in edge cases, e.g. when no new messages arrive for a substantial period of time, but it does not yet hold enough messages to release the next batch. One way to achieve batching is to put received messages into a queue which is then processed by a different thread that can then batch/process them as needed, e.g. write them out when there are enough of them in the queue. import queue
received_queue = queue.Queue()
...
future = subscriber.subscribe("my/subscription/path", callback=received_queue.put)
try:
future.result()
except KeyboardInterrupt:
future.cancel() Would that help? |
This is an interesting point, we will try that if the way we have it isn't fast enough. Thanks! |
Closes #4994.
Do not merge yet!
This has been removed from GA for now (comment), because other implementations currently do not have this feature.
This PR adds an option to receive asynchronously pulled messages in callbacks in batches, rather than a single message per each callback invocation.
How to test
Steps to reproduce:
batch=True
:Actual result (before the fix):
The provided
callback
is invoked once for each message.Expected result (after the fix):
The provided
callback
is invoked once for each batch of published message (subject toFlowControl.max_messages
limits). The callback is invoked with a list of messages as the argument.TODO list