Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call handleMessage in sequence, instead of using Promise.all() #248

Closed
wants to merge 1 commit into from

Conversation

kdojeteri
Copy link

Description

I removed await Promise.all() in favor of for (...) { await }, because Promise.all could result in messages not being handled in order.

Motivation and Context

Message ordering is important. The same can be said about the order in which messages are processed. In many cases, if not most, messages need to be processed one after the other, in the order they arrive.

Promise.all() runs handlers for all messages "in parallel". Well, in fact, it's in sequence until the first await in the handler's code. However, since handleMessage is declared to be an async function, I presume it will usually have an await. In practice, that means that messages are not processed in sequence, waiting for the previous message to be finished, but all at once.

My code fixes the issue by eliminating the parallelism and instead introducing a for loop in which we await the completion of each message handler before continuing.

This keeps the order in which handleMessage is called, but it changes the order of when it finishes. It is therefore potentially a breaking change.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)

Checklist:

  • My code follows the code style of this project.
  • My change requires a change to the documentation.
  • I have updated the documentation accordingly.
  • I have read the CONTRIBUTING document.
  • I have added tests to cover my changes.
  • All new and existing tests passed.

Removed await Promise.all() in favor of for (...) { await } 
Promise.all could result in messages not being handled in order.
@kdojeteri kdojeteri requested a review from a team as a code owner January 29, 2021 09:14
@niklasR
Copy link
Contributor

niklasR commented Jan 29, 2021

The library by default only polls and processes one message at the time, making this a non-issue.
The user actively has to choose a higher batchSize for this to become an "issue". Even in that case, the library allows the user to pass in a custom function to handle the batch with the handleMessageBatch, in which the user can process the messages in whatever order they want.

Not a bug.

@kdojeteri
Copy link
Author

kdojeteri commented Jan 29, 2021

In our codebase, we initialize the Consumer with a batchSize of 10. I can't imagine we're alone in this, since if you use a FIFO queue, which has a limit of 300 API calls per second, and poll for just one message at a time, you cut you queue's throughput by a factor of 10.

Initially, we tried using handleMessage to handle the messages in sequence. We quickly switched to handleMessageBatch, as you suggest, because handleMessage didn't do what we assumed it would.

But now we have another problem. If one of the messages in our queue throws an error, all messages from the batch, even the already successfully processed ones, are invalidated. This issue is mentioned in #189 and #245, along with a workaround.

I see multiple ways of resolving issues #189 and #245:

  • Document the workaround: Deleting messages manually from the queue in handleMessageBatch becomes the official way to indicate message handling success
  • Design and implement a way to tell which messages have been handled successfully: like in johncmunson's proposed patch
  • Consider merging this pull request: As you say, by default the library only polls and processes one message at the time. That's the default i.e. the expected behavior. Let's say I use handleMessage to process one message at a time. When I raise the batchSize to 10 to increase the throughput from 300 to 3000, messages are suddenly processed out of order. Since I expect handleMessage to run in order, like it did before, I'd say that is an issue.

@trubi
Copy link

trubi commented Feb 1, 2021

If there is an issue only when you change some parameter it does not mean it's not a bug. It's a bug which emerges in some specific conditions but it's still a bug. And pretty critical one. I've been struggling with this as well and it caused a lot of troubles.
If you don't want to merge this then you should at least clearly state in your documentation that this batchSize parameter does not work as expected.
If I am using FIFO queue but then this library shuffles all events during processing, I can't imagine a single use case where this wouldn't be considered a critical issue.

@ellisium
Copy link

Can you make a status if it will merged or not? at least users will get a clear view if they have to be fork it
thanks

@nicholasgriffintn
Copy link
Member

Reading this comment: #248 (comment)

I think this was sufficiently answered and given the age, I'm unsure if the requirement still exists.

Should you still think this is still an issue / requires better documentation, please open a new PR.

@github-actions
Copy link

This pull request has already been merged/closed. If you experience issues related to these changes, please open a new issue referencing this pull request.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Mar 17, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants