-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
in_kafka: boost throughput #9625
base: master
Are you sure you want to change the base?
Conversation
Documentation is here: fluent/fluent-bit-docs#1520 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the auto-commit default is not right to keep current behaviour. Plus we probably should make the poll rate configurable if we're changing this all anyway.
Can we add some unit tests as well?
rd_kafka_commit(ctx->kafka.rk, NULL, 0); | ||
|
||
if(!ctx->enable_auto_commit) { | ||
/* TO-DO: commit the record based on `ret` */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to sort all TODOs, we can't just leave them in there. I know it was in the original but we should attempt to sort or add more info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well i din't introduce the TO-DO. It existed before. It just went into the if statement. Unfortunately github shows to little context so you can't see it.
plugins/in_kafka/in_kafka.c
Outdated
{ | ||
FLB_CONFIG_MAP_BOOL, "enable_auto_commit", FLB_IN_KAFKA_ENABLE_AUTO_COMMIT, | ||
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, enable_auto_commit), | ||
"Relay on kafka auto-commit and commit messages in batches" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a typo and should be rely
/* TO-DO: commit the record based on `ret` */ | ||
rd_kafka_commit(ctx->kafka.rk, NULL, 0); | ||
|
||
if(!ctx->enable_auto_commit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current behaviour is to have auto-commit false but the default below is now true which means we are not maintaining the old behaviour by default.
56a331a
to
713453e
Compare
Polling every 1ms and committing each message individually results in rather pure performance in high volume Kafka clusters. Commiting in batches (relay on auto-commit of kafka) drastically improves performance. Signed-off-by: CoreidCC <sws-github@coreid.cc>
having 1ms timeout might make sense if the input plugin is running in the main thread (not introducing delay for others). but if we run in our very own thread then we should not over- ride the fetch.wait.max.ms configuration value from the kafka-consumer. this in conjuntion with using autocommit again boosts the throuhput significantly. Signed-off-by: CoreidCC <sws-github@coreid.cc>
Signed-off-by: CoreidCC <sws-github@coreid.cc>
We have a Kafka cluster with about 40k messages and 25MB of data per seconds. Fluent-bit stands no change to keep up with this load in its current state where it
a) commits each message individually
b) a poll-timeout of just one 1ms (this completely overrides fetch.wait.max.ms from kafka)
Even Logstash is faster and vector is just consuming all these messages with ease.
probably related to "Batch processing is required in in_kafka. #8030"
Enter
[N/A]
in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
To activate the changes one need to
[INPUT]
Name kafka
threaded true -> sets timeout so that it will be limited by fetch.wait.max.ms in any practical scenario
enable_auto_commit true -> disable explicit commit call
-> The change doesn't do any dynamic allocations at all and therefore cant introduce any mem-leaks
-> The change has no impact on packaging at all
Documentation
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.