KAFKA-14367; Add OffsetCommit to the new GroupCoordinator interface#12886
KAFKA-14367; Add OffsetCommit to the new GroupCoordinator interface#12886dajac merged 11 commits intoapache:trunkfrom
OffsetCommit to the new GroupCoordinator interface#12886Conversation
4e2e94d to
f6bdd11
Compare
OffsetCommit to the new GroupCoordinator interfaceOffsetCommit to the new GroupCoordinator interface
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
Outdated
Show resolved
Hide resolved
|
Looks like this needs a rebase. I will take another pass when that is complete. |
f6bdd11 to
dfe1561
Compare
| // "default" expiration timestamp is now + retention (and retention may be overridden if v2) | ||
| // expire timestamp is computed differently for v1 and v2. | ||
| // - If v1 and no explicit commit timestamp is provided we treat it the same as v5. | ||
| // - If v1 and explicit retention time is provided we calculate expiration timestamp based on that |
There was a problem hiding this comment.
This comment is a little confusing.
So it seems like I understand v1 semantics -- we use the commit timestamp if provided for "now".
For v2 and beyond, I'm a big confused about the last two bullets. It makes it seem like there is no difference between v2-v4 and v5+, but I think the difference is that the retention can no longer be overridden in v5+. That part is unclear in the last bullet as it says "partition expiration" but "RetentionTimeMs" is the field name.
This is my understanding based on the code
version: (can define commit time aka "now"), (can define retention time)
1 yes no
2 no yes
3 no yes
4 no yes
5+ no no
There was a problem hiding this comment.
I realize this comment was copy-pasted, but we can clean it up I think :)
There was a problem hiding this comment.
I think that the term "partition expiration" comes from OffsetAndMetadata.expireTimestamp. expireTimestamp is indeed derived from RetentionTimeMs which is no longer available from version 5.
There was a problem hiding this comment.
Hmm. I'm not sure we made this comment much clearer.
I think the main flaws are that it says that we can only override retention time in v2 (matches the json spec) but the first two bullets mention "explicit retention time". I'm not really sure what that means.
The second thing is enumerating the versions. I think it's just clearer to say that some versions have the option to explicitly set retention time. v5 and any version without it set ignores the expireTimestamp field.
There was a problem hiding this comment.
I rewrote the comment. Let me know what you think.
There was a problem hiding this comment.
Thanks David! Looks much clearer. I think the only thing to note is that when commit time is used, it seems like commit time + retention gives us expiration. It wasn't completely clear from the comment that that's how the commit time fit into the equation, but it can be inferred. Up to you if you want to change it.
There was a problem hiding this comment.
Yeah, I thought that it is implicit that the commit time replaces "now" in the comment in this case. I think that we can leave it as it is.
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java
Outdated
Show resolved
Hide resolved
jeffkbkim
left a comment
There was a problem hiding this comment.
thanks for the PR, left some comments.
| OffsetCommitResponseData data = new OffsetCommitResponseData(); | ||
| HashMap<String, OffsetCommitResponseTopic> byTopicName = new HashMap<>(); | ||
|
|
||
| private OffsetCommitResponseTopic getOrCreateTopic( |
There was a problem hiding this comment.
nit: getOrAddTopic makes more sense to me
There was a problem hiding this comment.
That could work as well but I personally prefer getOrCreate in this case. getOrCreate is used pretty extensively in the code base as well.
| byTopicName.put(newTopic.name(), newTopic); | ||
| } else { | ||
| // Otherwise, we add the partitions to the existing one. | ||
| existingTopic.partitions().addAll(newTopic.partitions()); |
There was a problem hiding this comment.
Q: from the code it seems that existingTopic can only include partitions that failed in some way. we are assuming that there will be no overlap between existing partitions and newTopic partitions. should we add a check?
There was a problem hiding this comment.
That's right. I thought about adding a check but it is costly because the only way to check is to iterate over the existing partitions to check if the new one is there. Given that we know that partitions are not supposed to be duplicated by the user of this class, I thought that it was not necessary. What do you think?
There was a problem hiding this comment.
I think it is ok to keep as is, but maybe make a comment that we assume there are no overlapping partitions?
As a side note, If there was overlap, we would just have two of the same partition in the response right? One with the error and one without?
There was a problem hiding this comment.
that makes sense. @jolshan that seems to be the case, though since a failed partition is added first the non-error state may overwrite the error when the consumer parses the response.
also a +1 on leaving a small note that the code assumes no overlap.
| } | ||
|
|
||
| override def commitOffsets( | ||
| context: RequestContext, |
There was a problem hiding this comment.
i noticed this isn't used here as well as for the other coordinator APIs (other than joinGroup). what's the reason for having this parameter? are we expecting to use this in the new group coordinator?
There was a problem hiding this comment.
I have put it anywhere for consistency. We may need it for other methods in the new group coordinator.
| commitTimestamp = partition.commitTimestamp match { | ||
| case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimeMs | ||
| case customTimestamp => customTimestamp | ||
| }, |
There was a problem hiding this comment.
i wasn't able to find where we validate the commit timestamp. how do we handle timestamps that are less than -1? i am also curious about retention time ms.
There was a problem hiding this comment.
It seems that they are not validated anywhere. We basically store whatever we get. As a result, if the provided retention or the commit timestamp are negative, the offset will be expired immediately. This is inline with the behavior prior to this patch. We could improve it (if we want) separately.
| OffsetAndMetadata.NoMetadata | ||
| else | ||
| partitionData.committedMetadata | ||
| // For version > 0, store offsets to Coordinator. |
| requestHelper.sendMaybeThrottle(request, responseBuilder.build()) | ||
| CompletableFuture.completedFuture(()) | ||
| } else if (request.header.apiVersion == 0) { | ||
| // For version 0, always store offsets to ZK. |
| if (isDebugEnabled) | ||
| combinedCommitStatus.forKeyValue { (topicPartition, error) => | ||
| if (error != Errors.NONE) { | ||
| debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + | ||
| s"on partition $topicPartition failed due to ${error.exceptionName}") | ||
| } |
There was a problem hiding this comment.
i think we lost this debug log, can we add it back? at least when the future from newGroupCoordinator.commitOffsets completes exceptionally. and consider adding it for TOPIC_AUTHORIZATION_FAILED and UNKNOWN_TOPIC_OR_PARTITION errors.
There was a problem hiding this comment.
We have removed all those debug logs in the previous PRs because the request log gives us the same in the end.
| .setGroupId("group") | ||
| .setMemberId("member") | ||
| .setGenerationId(10) | ||
| .setRetentionTimeMs(1000) |
There was a problem hiding this comment.
can we test values less than -1?
There was a problem hiding this comment.
We could but it does not add any value as the value is just copied to OffsetAndMetadata. As explained earlier, there is not validation for this.
| new OffsetCommitRequestData.OffsetCommitRequestPartition() | ||
| .setPartitionIndex(0) | ||
| .setCommittedOffset(100) | ||
| .setCommitTimestamp(now) |
There was a problem hiding this comment.
can we test values less than -1?
There was a problem hiding this comment.
We could but it does not add any value as the value is just copied to OffsetAndMetadata. As explained earlier, there is not validation for this.
|
@jeffkbkim @jolshan Updated and rebased the PR. |
jolshan
left a comment
There was a problem hiding this comment.
Thanks David -- the one test failure looks unrelated. I'll give Jeff a chance to take a look as well before merging.
jeffkbkim
left a comment
There was a problem hiding this comment.
left some minor comments/questions, LGTM otherwise.
| byTopicName.put(newTopic.name(), newTopic); | ||
| } else { | ||
| // Otherwise, we add the partitions to the existing one. | ||
| existingTopic.partitions().addAll(newTopic.partitions()); |
There was a problem hiding this comment.
that makes sense. @jolshan that seems to be the case, though since a failed partition is added first the non-error state may overwrite the error when the consumer parses the response.
also a +1 on leaving a small note that the code assumes no overlap.
| ); | ||
|
|
||
| /** | ||
| * Commit offsets for a given Group. |
There was a problem hiding this comment.
are the descriptions for fetchOffsets(), fetchAllOffsets(), commitOffsets() "Group" instead of "Generic Group" since they can apply to both Generic and Consumer groups? just noticed the difference in join/sync/leave group.
|
@jeffkbkim @jolshan I updated the PR. |
|
Still looks good to me 😄 |
|
Failed tests are not related. |
|
I will go ahead and merge it. I can do follow-ups if needed. |
* apache-github/trunk: KAFKA-14601: Improve exception handling in KafkaEventQueue apache#13089 KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface (apache#12886) KAFKA-14530: Check state updater more often (apache#13017) KAFKA-14304 Use boolean for ZK migrating brokers in RPC/record (apache#13103) KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part 2 (apache#12301) KAFKA-14607: Move Scheduler/KafkaScheduler to server-common (apache#13092) KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface (apache#12870) KAFKA-14557; Lock metadata log dir (apache#13058) MINOR: Implement toString method for TopicAssignment and PartitionAssignment (apache#13101) KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (apache#11818) KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer (apache#13032) KAFKA-14600: Reduce flakiness in ProducerIdExpirationTest (apache#13087) KAFKA-14279: Add 3.3.x streams system tests (apache#13077) MINOR: bump streams quickstart pom versions and add to list in gradle.properties (apache#13064) MINOR: Update KRaft cluster upgrade documentation for 3.4 (apache#13063) KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller. (apache#12998) KAFKA-14570: Fix parenthesis in verifyFullFetchResponsePartitions output (apache#13072) MINOR: Remove public mutable fields from ProducerAppendInfo (apache#13091)
…master * apache-github/trunk: (23 commits) MINOR: Include the inner exception stack trace when re-throwing an exception (apache#12229) MINOR: Fix docs to state that sendfile implemented in `TransferableRecords` instead of `MessageSet` (apache#13109) Update ProducerConfig.java (apache#13115) KAFKA-14618; Fix off by one error in snapshot id (apache#13108) KAFKA-13709 (follow-up): Avoid mention of 'exactly-once delivery' or 'delivery guarantees' in Connect (apache#13106) KAFKA-14367; Add `TxnOffsetCommit` to the new `GroupCoordinator` interface (apache#12901) KAFKA-14568: Move FetchDataInfo and related to storage module (apache#13085) KAFKA-14612: Make sure to write a new topics ConfigRecords to metadata log iff the topic is created (apache#13104) KAFKA-14601: Improve exception handling in KafkaEventQueue apache#13089 KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface (apache#12886) KAFKA-14530: Check state updater more often (apache#13017) KAFKA-14304 Use boolean for ZK migrating brokers in RPC/record (apache#13103) KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part 2 (apache#12301) KAFKA-14607: Move Scheduler/KafkaScheduler to server-common (apache#13092) KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface (apache#12870) KAFKA-14557; Lock metadata log dir (apache#13058) MINOR: Implement toString method for TopicAssignment and PartitionAssignment (apache#13101) KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (apache#11818) KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer (apache#13032) KAFKA-14600: Reduce flakiness in ProducerIdExpirationTest (apache#13087) ...
…ce (apache#12886) This patch adds `OffsetCommit` to the new `GroupCoordinator` interface and updates `KafkaApis` to use it. Reviewers: Omnia G H Ibrahim <o.g.h.ibrahim@gmail.com>, Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>
This patch adds
OffsetCommitto the newGroupCoordinatorinterface and updatesKafkaApisto use it.Committer Checklist (excluded from commit message)