-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add protocol/parsing changes [KIP-881] #4189
Conversation
fcb8a73
to
cdb5340
Compare
bf236c3
to
c4cb7c9
Compare
Rebased onto master. |
* Metadata and leader epoch refactor. store private metadata into a struct that contains the public one. * Remove rd_kafka_broker_id_rack_pair. Replaced by rd_kafka_metadata_internal_t * Style fix, documentation, remove internal accessors. * Remove internal struct from assignor cb. Add internal accessor function for that. * Use a define instead of a function call for rd_kafka_metadata_get_internal * Make macro parenthesized --------- Co-authored-by: Milind L <miluthra@confluent.io>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Just check that documentation param.
src/rdkafka_broker.c
Outdated
@@ -5282,6 +5282,7 @@ int rd_kafka_brokers_add(rd_kafka_t *rk, const char *brokerlist) { | |||
* | |||
* @param rkbp if non-NULL, will be set to the broker object with | |||
* refcount increased, or NULL on error. | |||
* @param rack if non-NULL, it will set the rack of the broker object. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this parameter documentation stale?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it was, I've fixed it now. Merging after CI run.
Thanks!
There are two commits in this change:
Change the embedded MemberMetadata protocol, ref to ConsumerProtocolSubscription.json.
Add RackId (and GenerationId) to the message we are sending, and on receive, parse RackId (and GenerationId), storing it inside
rd_kafka_group_member_t
so it can be accessed by the assignors.Adds a unit test for it additional to the code changes for checking serialization/deserialization.
Change the Metadata response parsing code - currently, the Rack for each broker is simply skipped over. This change stores it inside the METADATA_OP, and then extracts it to pass it to assignors.
This needs to happen through a new field in the op union, we can't change the publicly exposed metadata type without significant effort.
Another rejected approach: add a rack field to the rd_kafka_broker_t, and use the rd_kafka_broker_update, which dispatches an op on the broker thread, to update a broker's rack. However, this was rejected for two reasons, first, that the actual update happens on the broker thread, and the assignor runs on the main thread, so the latest broker rack might not be updated. Second, because broker id may change, but in the assignor, we use the latest broker id right from the metadata, so to get the rack, find_broker_by_nodeid() call would fail.
This can be merged at any time, this doesn't cause any breakages in the existing functionality.
This is based off KIP-320, because it is a prereq for KIP-881, and contains significant changes in similar areas of the code, so this prevents merge conflicts at a later stage.