Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MID-164] Process kafka messages sequentially and commit manually #135

Merged
merged 16 commits into from
Jul 12, 2022

Conversation

ulich
Copy link
Contributor

@ulich ulich commented Jul 11, 2022

This ensures that message processing is reliable. Before this change, the handling of a message was pushed off to a goroutine and the next message was read immediately. As soon as the next message is read, the previously read message is marked as processed successfully implicitly (by commiting the offset, which happened automatically relying on the default value of enable.auto.commit=true every 5 seconds, by default). After this change, you should make sure that you dont set enable.auto.commit at all (it will be set to false by this library) or set enable.auto.commit=false explicitly

If there was a crash or redeployment in between reading the message and the handler function finishing (especially a problem when the retry middleware is used, which can lead to very long execution times of the handler function), the messages would have been lost and wouldnt be reprocessed when the k8s pod comes back up.

Additionally, if there were thousands messages pushed to a topic, the consumer was reading all of them quickly after eachother, pushing each message processing onto a goroutine, essentially processing all messages in parallel. This causes problems on memory consumption and/or CPU etc.

This change will make the processing sequential. One consumer will read one message at a time. There will also be an explicit commit after the message handler is finished processing.

Now that each message is processed sequentially, there is no need for the config.WithDeliveryOrder method anymore as it will always be ordered. This got removed.

This change will reduce the throughput of message processing if you dont modify your application source code, as one consumer will only process one message at a time. If you expect a high volume of message processing, you can change your service and start multiple consumers instead of by scaling to more k8s pods.

Extra stuff:

  • When a handler returns an error, this is now passed to the errFn
  • Use latest confluent-kafka-go v1.9.1

This ensures that message processing is more reliable. By pushing the message handling off to a goroutine, we immediately "acknowledged" the kafka message and if a redeployment or an app crash happened during processing a message, it would have lost the messages.

Additionally, when there are thousands of messages in the kafka topic, they all would have been read into memory and executed (more or less) simultaneously, leading to a very high memory consumption.
jcyamacho
jcyamacho previously approved these changes Jul 12, 2022
jcyamacho
jcyamacho previously approved these changes Jul 12, 2022
@ulich ulich marked this pull request as ready for review July 12, 2022 10:35
@jcyamacho
Copy link
Contributor

drone is all green, there is an issue with the hook:
image

@ulich ulich merged commit 66a4aaa into master Jul 12, 2022
@ulich ulich deleted the remove-concurrent-reads branch July 12, 2022 10:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants