KAFKA-14367; Add OffsetFetch to the new GroupCoordinator interface#12870
KAFKA-14367; Add OffsetFetch to the new GroupCoordinator interface#12870jolshan merged 10 commits intoapache:trunkfrom
OffsetFetch to the new GroupCoordinator interface#12870Conversation
bfe7da5 to
df9ee42
Compare
|
I have to add a few unit tests in KafkaApisTest. There are none at the moment. |
There was a problem hiding this comment.
I have removed this test because I think that this is wrong. When the same group id is specified multiple times, we should rather throw an invalid request rather than doing this. Thoughts?
This was basically due to the fact that we were using an hash map so the last occurrence of a group won. We don't use the hashmap anymore with the patch. Instead, we return an entry for every group present in the request.
There was a problem hiding this comment.
Was this behavior documented or just a quirk of the implementation?
I'm also curious if making this change (invalid requests) will break someone's clients.
There was a problem hiding this comment.
I think that it is a quirk of the implementation. It is because we used HashMaps before this patch so the last one wins.
There was a problem hiding this comment.
In my opinion, it is pretty bad to just ignore one group passed in the request like this.
There was a problem hiding this comment.
Are we deciding to actually throw the error? If so we should document that.
Otherwise, maybe we can add a comment in the request file that requesting the group multiple times in the request will also give us the response multiple times as of this release.
There was a problem hiding this comment.
I need to look into other APIs to see how we usually handle this. At a first glance, it seems that we are not consistent.
There was a problem hiding this comment.
Did we ever get a conclusion here? If so let's just document what we decided.
There was a problem hiding this comment.
+1 for having duplicate responses if a group is repeated. i don't think this warrants an invalid request but we should conform to the norm if there is one.
There was a problem hiding this comment.
I have looked at other APIs and we are not consistent, unfortunately. I believe that my current implementation (returning a response for each provided group in the same order) is the right way and likely the expected way. We could consider this as a bug in the KIP-709 implementation. I have asked about this in the KIP-709 discuss thread as well.
222da0e to
4b1b652
Compare
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
Outdated
Show resolved
Hide resolved
|
Was looking for the tests and then remembered this comment: Reminder on this to add the tests 😄 |
| @@ -19,6 +19,7 @@ | |||
| import java.util.Map.Entry; | |||
There was a problem hiding this comment.
As I follow-up, I would like to clean this class. There are way too many ways to construct this object and the logic is pretty complicated. It is the same for the request. Let's do this separately.
There was a problem hiding this comment.
sounds like a good plan. Thanks!
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
Show resolved
Hide resolved
jeffkbkim
left a comment
There was a problem hiding this comment.
thanks for the PR! the code looks much more simplified and cleaner. left some comments.
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
Outdated
Show resolved
Hide resolved
| newPartition = new OffsetFetchResponsePartition() | ||
| .setPartitionIndex(partition.partitionIndex()) | ||
| .setErrorCode(group.errorCode()); | ||
| } else { |
There was a problem hiding this comment.
i'm a bit confused on the else statement. we can hit this either when version >= 2 or group error == NONE.
based on the comment above, it seems that for version >= 2 we don't have to put error at the partition level. also, do we need to add offset/metadata if there is an error for version >= 2?
There was a problem hiding this comment.
It is still possible to have a partition level error with version >= 2 (e.g. UNSTABLE_OFFSET_COMMIT). To answer your second point, if there is an error, the offset/metadata should be correctly set at this stage so we can just copy whatever we have got here.
There was a problem hiding this comment.
that makes sense. thanks for the clarification
| DESCRIBE, TOPIC, allPartitionData)(_.topic) | ||
| (Errors.NONE, authorizedPartitionData) | ||
| } | ||
| private def fetchAllOffsets( |
There was a problem hiding this comment.
fetchAllOffsetsForGroup and fetchOffsetsForGroup makes it more readable for me. wdyt?
looking at the new GroupCoordinator interface seems like they have a counterpart method. thoughts on changing both?
There was a problem hiding this comment.
Renaming in KafkaApis is reasonable. For GroupCoordinator, the ForGroup seems a bit redundant so I would rather keep it as it is there.
There was a problem hiding this comment.
+1 for having duplicate responses if a group is repeated. i don't think this warrants an invalid request but we should conform to the norm if there is one.
|
@jolshan @jeffkbkim I have updated the PR. Could you take another look? |
|
Looks fairly reasonable. I'm going to rebuild though to see if some of the tests look flaky. |
|
Failed tests seem unrelated: |
jolshan
left a comment
There was a problem hiding this comment.
Thanks for the hard work David :)
* apache-github/trunk: KAFKA-14601: Improve exception handling in KafkaEventQueue apache#13089 KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface (apache#12886) KAFKA-14530: Check state updater more often (apache#13017) KAFKA-14304 Use boolean for ZK migrating brokers in RPC/record (apache#13103) KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part 2 (apache#12301) KAFKA-14607: Move Scheduler/KafkaScheduler to server-common (apache#13092) KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface (apache#12870) KAFKA-14557; Lock metadata log dir (apache#13058) MINOR: Implement toString method for TopicAssignment and PartitionAssignment (apache#13101) KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (apache#11818) KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer (apache#13032) KAFKA-14600: Reduce flakiness in ProducerIdExpirationTest (apache#13087) KAFKA-14279: Add 3.3.x streams system tests (apache#13077) MINOR: bump streams quickstart pom versions and add to list in gradle.properties (apache#13064) MINOR: Update KRaft cluster upgrade documentation for 3.4 (apache#13063) KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller. (apache#12998) KAFKA-14570: Fix parenthesis in verifyFullFetchResponsePartitions output (apache#13072) MINOR: Remove public mutable fields from ProducerAppendInfo (apache#13091)
…master * apache-github/trunk: (23 commits) MINOR: Include the inner exception stack trace when re-throwing an exception (apache#12229) MINOR: Fix docs to state that sendfile implemented in `TransferableRecords` instead of `MessageSet` (apache#13109) Update ProducerConfig.java (apache#13115) KAFKA-14618; Fix off by one error in snapshot id (apache#13108) KAFKA-13709 (follow-up): Avoid mention of 'exactly-once delivery' or 'delivery guarantees' in Connect (apache#13106) KAFKA-14367; Add `TxnOffsetCommit` to the new `GroupCoordinator` interface (apache#12901) KAFKA-14568: Move FetchDataInfo and related to storage module (apache#13085) KAFKA-14612: Make sure to write a new topics ConfigRecords to metadata log iff the topic is created (apache#13104) KAFKA-14601: Improve exception handling in KafkaEventQueue apache#13089 KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface (apache#12886) KAFKA-14530: Check state updater more often (apache#13017) KAFKA-14304 Use boolean for ZK migrating brokers in RPC/record (apache#13103) KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part 2 (apache#12301) KAFKA-14607: Move Scheduler/KafkaScheduler to server-common (apache#13092) KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface (apache#12870) KAFKA-14557; Lock metadata log dir (apache#13058) MINOR: Implement toString method for TopicAssignment and PartitionAssignment (apache#13101) KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (apache#11818) KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer (apache#13032) KAFKA-14600: Reduce flakiness in ProducerIdExpirationTest (apache#13087) ...
apache#12870) This patch adds OffsetFetch to the new GroupCoordinator interface and updates KafkaApis to use it. Reviewers: Philip Nee <pnee@confluent.i>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch adds
OffsetFetchto the newGroupCoordinatorinterface and updatesKafkaApisto use it. The changes inKafkaApisare larger than what I was hoping for. I think that we should refactor this part of the code even further but I leave this for a further PR.Committer Checklist (excluded from commit message)