-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Change Kafka Lookup Extractor to not register consumer group #12842
Change Kafka Lookup Extractor to not register consumer group #12842
Conversation
The current behaviour of the Kafka lookup extractor is to not commit offsets by assigning a unique ID to the consumer group and setting auto.offset.reset to earliest. This does the job but also pollutes the Kafka broker with a bunch of "ghost" consumer groups that will never again be used. To fix this, we now set enable.auto.commit to false, which prevents the ghost consumer groups being created in the first place.
Provide some additional detail on functionality and configuration. Hopefully this will make it clearer how the extractor works for developers who aren't so familiar with Kafka.
See [lookups](../../querying/lookups.md) for how to configure and use lookups. | ||
The extractor works by consuming the configured Kafka topic from the beginning, and appending every record to an internal map. The key of the Kafka record is used as they key of the map, and the payload of the record is used as the value. At query time, a lookup can be used to transform the key into the associated value. See [lookups](../../querying/lookups.md) for how to configure and use lookups in a query. Keys and values are both stored as strings by the lookup extractor. | ||
|
||
The extractor remains subscribed to the topic, so new records are added to the lookup map as they appear. This allows for lookup values to be updated in near-realtime. If two records are added to the topic with the same key, the record with the larger offset will replace the previous record in the lookup map. A record with a `null` payload will be treated as a tombstone record, and the associated key will be removed from the lookup map. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the PR, that has the code to remove a payload with null
message, is yet to be merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this change will need to wait for the PR to be merged. I will resolve any merge conflicts when that happens.
thank you for these changes. can you look into the CI failures? they look legit failures. |
Description
The Kafka lookup extractor has to consume an entire topic from the beginning in order to build the internal lookup map. Previously, the extractor would always use a randomly generated Kafka
group.id
. This meant that the service would register a new consumer group every time it started, essentially "forgetting" it's previously committed consumer offsets. This guarantees that the service will always consume the entire topic.This has the unintended side-effect of also leaving a lot of "ghost" consumers registered with the Kafka cluster. These consumer groups will never be used again and so they just hang around on the broker until Kafka decides to delete them (by default, after 2 days). This needlessly adds bloat to the Kafka broker.
This has been fixed by setting the Kafka consumer config
enable.auto.commit
tofalse
. This means that the consumer never attempts to commit offsets, achieving the same result as before without leaving a bunch of "ghost" consumer groups registered on the broker.I also took the chance to flesh out the documentation a whole bunch.
Key changed/added classes in this PR
org.apache.druid.query.lookup.KafkaLookupExtractorFactory
This PR has: