diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java index 457d090fc535e..ad17bfe34f24c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java @@ -517,7 +517,8 @@ public static CoordinatorRecord newOffsetCommitRecord( .setMetadata(offsetAndMetadata.metadata) .setCommitTimestamp(offsetAndMetadata.commitTimestampMs) // Version 1 has a non-empty expireTimestamp field - .setExpireTimestamp(offsetAndMetadata.expireTimestampMs.orElse(OffsetCommitRequest.DEFAULT_TIMESTAMP)), + .setExpireTimestamp(offsetAndMetadata.expireTimestampMs.orElse(OffsetCommitRequest.DEFAULT_TIMESTAMP)) + .setTopicId(offsetAndMetadata.topicId), version ) ); @@ -527,9 +528,7 @@ static short offsetCommitValueVersion(boolean expireTimestampMs) { if (expireTimestampMs) { return 1; } else { - // Serialize with the highest supported non-flexible version - // until a tagged field is introduced or the version is bumped. - return 3; + return 4; } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java index a5635f58ac565..b8c8ee28238eb 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; @@ -65,12 +66,18 @@ public class OffsetAndMetadata { */ public final long recordOffset; + /** + * The topic id used to commit the offset. + */ + public final Uuid topicId; + public OffsetAndMetadata( long committedOffset, OptionalInt leaderEpoch, String metadata, long commitTimestampMs, - OptionalLong expireTimestampMs + OptionalLong expireTimestampMs, + Uuid topicId ) { this( -1L, @@ -78,7 +85,8 @@ public OffsetAndMetadata( leaderEpoch, metadata, commitTimestampMs, - expireTimestampMs + expireTimestampMs, + topicId ); } @@ -88,7 +96,8 @@ public OffsetAndMetadata( OptionalInt leaderEpoch, String metadata, long commitTimestampMs, - OptionalLong expireTimestampMs + OptionalLong expireTimestampMs, + Uuid topicId ) { this.recordOffset = recordOffset; this.committedOffset = committedOffset; @@ -96,6 +105,7 @@ public OffsetAndMetadata( this.metadata = Objects.requireNonNull(metadata); this.commitTimestampMs = commitTimestampMs; this.expireTimestampMs = Objects.requireNonNull(expireTimestampMs); + this.topicId = topicId; } @Override @@ -105,6 +115,7 @@ public String toString() { ", metadata=" + metadata + ", commitTimestampMs=" + commitTimestampMs + ", expireTimestampMs=" + expireTimestampMs + + ", topicId=" + topicId + ", recordOffset=" + recordOffset + ')'; } @@ -121,6 +132,7 @@ public boolean equals(Object o) { if (recordOffset != that.recordOffset) return false; if (!Objects.equals(leaderEpoch, that.leaderEpoch)) return false; if (!Objects.equals(metadata, that.metadata)) return false; + if (!Objects.equals(topicId, that.topicId)) return false; return Objects.equals(expireTimestampMs, that.expireTimestampMs); } @@ -129,6 +141,7 @@ public int hashCode() { int result = (int) (committedOffset ^ (committedOffset >>> 32)); result = 31 * result + (leaderEpoch != null ? leaderEpoch.hashCode() : 0); result = 31 * result + (metadata != null ? metadata.hashCode() : 0); + result = 31 * result + (topicId != null ? topicId.hashCode() : 0); result = 31 * result + (int) (commitTimestampMs ^ (commitTimestampMs >>> 32)); result = 31 * result + (expireTimestampMs != null ? expireTimestampMs.hashCode() : 0); result = 31 * result + (int) (recordOffset ^ (recordOffset >>> 32)); @@ -148,7 +161,8 @@ public static OffsetAndMetadata fromRecord( ofSentinel(record.leaderEpoch()), record.metadata(), record.commitTimestamp(), - ofSentinel(record.expireTimestamp()) + ofSentinel(record.expireTimestamp()), + record.topicId() ); } @@ -156,6 +170,7 @@ public static OffsetAndMetadata fromRecord( * @return An OffsetAndMetadata created from an OffsetCommitRequestPartition request. */ public static OffsetAndMetadata fromRequest( + Uuid topicId, OffsetCommitRequestData.OffsetCommitRequestPartition partition, long currentTimeMs, OptionalLong expireTimestampMs @@ -166,7 +181,8 @@ public static OffsetAndMetadata fromRequest( partition.committedMetadata() == null ? OffsetAndMetadata.NO_METADATA : partition.committedMetadata(), currentTimeMs, - expireTimestampMs + expireTimestampMs, + topicId ); } @@ -183,7 +199,8 @@ public static OffsetAndMetadata fromRequest( partition.committedMetadata() == null ? OffsetAndMetadata.NO_METADATA : partition.committedMetadata(), currentTimeMs, - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID ); } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index 0fa997c557a09..cded489990521 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -481,6 +481,7 @@ public CoordinatorResult commitOffs .setErrorCode(Errors.NONE.code())); final OffsetAndMetadata offsetAndMetadata = OffsetAndMetadata.fromRequest( + topic.topicId(), partition, currentTimeMs, expireTimestampMs diff --git a/group-coordinator/src/main/resources/common/message/OffsetCommitValue.json b/group-coordinator/src/main/resources/common/message/OffsetCommitValue.json index 8f7d32d544796..d9eca6cebe80e 100644 --- a/group-coordinator/src/main/resources/common/message/OffsetCommitValue.json +++ b/group-coordinator/src/main/resources/common/message/OffsetCommitValue.json @@ -32,6 +32,8 @@ { "name": "commitTimestamp", "type": "int64", "versions": "0+", "about": "The time at which the commit was added to the log."}, { "name": "expireTimestamp", "type": "int64", "versions": "1", "default": -1, "ignorable": true, - "about": "The time at which the offset will expire."} + "about": "The time at which the offset will expire."}, + { "name": "topicId", "type": "uuid", "versions": "4+", "taggedVersions": "4+", "tag": 0, "ignorable": true, + "about": "The topic id of the committed offset."} ] } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java index 8bd0987ac4697..be2e9df5fea86 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpersTest.java @@ -55,6 +55,8 @@ import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import java.util.ArrayList; import java.util.Arrays; @@ -68,6 +70,7 @@ import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Set; +import java.util.stream.Stream; import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment; @@ -720,11 +723,19 @@ public void testEmptyGroupMetadataRecord() { @Test public void testOffsetCommitValueVersion() { assertEquals((short) 1, GroupCoordinatorRecordHelpers.offsetCommitValueVersion(true)); - assertEquals((short) 3, GroupCoordinatorRecordHelpers.offsetCommitValueVersion(false)); + assertEquals((short) 4, GroupCoordinatorRecordHelpers.offsetCommitValueVersion(false)); } - @Test - public void testNewOffsetCommitRecord() { + private static Stream uuids() { + return Stream.of( + Uuid.ZERO_UUID, + Uuid.randomUuid() + ); + } + + @ParameterizedTest + @MethodSource("uuids") + public void testNewOffsetCommitRecord(Uuid topicId) { OffsetCommitKey key = new OffsetCommitKey() .setGroup("group-id") .setTopic("foo") @@ -734,7 +745,8 @@ public void testNewOffsetCommitRecord() { .setLeaderEpoch(10) .setMetadata("metadata") .setCommitTimestamp(1234L) - .setExpireTimestamp(-1L); + .setExpireTimestamp(-1L) + .setTopicId(topicId); CoordinatorRecord expectedRecord = CoordinatorRecord.record( key, @@ -749,11 +761,14 @@ public void testNewOffsetCommitRecord() { "foo", 1, new OffsetAndMetadata( + -1L, 100L, OptionalInt.of(10), "metadata", 1234L, - OptionalLong.empty()) + OptionalLong.empty(), + topicId + ) )); value.setLeaderEpoch(-1); @@ -767,7 +782,9 @@ public void testNewOffsetCommitRecord() { OptionalInt.empty(), "metadata", 1234L, - OptionalLong.empty()) + OptionalLong.empty(), + topicId + ) )); } @@ -798,7 +815,9 @@ public void testNewOffsetCommitRecordWithExpireTimestamp() { OptionalInt.of(10), "metadata", 1234L, - OptionalLong.of(5678L)) + OptionalLong.of(5678L), + Uuid.ZERO_UUID + ) )); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java index e6be1f27883e8..2cc585b06afd2 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java @@ -16,27 +16,33 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; import org.apache.kafka.coordinator.group.generated.OffsetCommitValue; import org.apache.kafka.server.util.MockTime; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import java.util.OptionalInt; import java.util.OptionalLong; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; public class OffsetAndMetadataTest { @Test public void testAttributes() { + Uuid topicId = Uuid.randomUuid(); OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata( 100L, OptionalInt.of(10), "metadata", 1234L, - OptionalLong.of(5678L) + OptionalLong.of(5678L), + topicId ); assertEquals(100L, offsetAndMetadata.committedOffset); @@ -44,16 +50,26 @@ public void testAttributes() { assertEquals("metadata", offsetAndMetadata.metadata); assertEquals(1234L, offsetAndMetadata.commitTimestampMs); assertEquals(OptionalLong.of(5678L), offsetAndMetadata.expireTimestampMs); + assertEquals(topicId, offsetAndMetadata.topicId); } - @Test - public void testFromRecord() { + private static Stream uuids() { + return Stream.of( + Uuid.ZERO_UUID, + Uuid.randomUuid() + ); + } + + @ParameterizedTest + @MethodSource("uuids") + public void testFromRecord(Uuid uuid) { OffsetCommitValue record = new OffsetCommitValue() .setOffset(100L) .setLeaderEpoch(-1) .setMetadata("metadata") .setCommitTimestamp(1234L) - .setExpireTimestamp(-1L); + .setExpireTimestamp(-1L) + .setTopicId(uuid); assertEquals(new OffsetAndMetadata( 10L, @@ -61,7 +77,8 @@ public void testFromRecord() { OptionalInt.empty(), "metadata", 1234L, - OptionalLong.empty() + OptionalLong.empty(), + uuid ), OffsetAndMetadata.fromRecord(10L, record)); record @@ -74,12 +91,14 @@ public void testFromRecord() { OptionalInt.of(12), "metadata", 1234L, - OptionalLong.of(5678L) + OptionalLong.of(5678L), + uuid ), OffsetAndMetadata.fromRecord(11L, record)); } - @Test - public void testFromRequest() { + @ParameterizedTest + @MethodSource("uuids") + public void testFromRequest(Uuid uuid) { MockTime time = new MockTime(); OffsetCommitRequestData.OffsetCommitRequestPartition partition = @@ -95,8 +114,10 @@ public void testFromRequest() { OptionalInt.empty(), "", time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + uuid ), OffsetAndMetadata.fromRequest( + uuid, partition, time.milliseconds(), OptionalLong.empty() @@ -113,8 +134,10 @@ public void testFromRequest() { OptionalInt.of(10), "hello", time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + uuid ), OffsetAndMetadata.fromRequest( + uuid, partition, time.milliseconds(), OptionalLong.empty() @@ -127,8 +150,10 @@ public void testFromRequest() { OptionalInt.of(10), "hello", time.milliseconds(), - OptionalLong.of(5678L) + OptionalLong.of(5678L), + uuid ), OffsetAndMetadata.fromRequest( + uuid, partition, time.milliseconds(), OptionalLong.of(5678L) @@ -153,7 +178,8 @@ public void testFromTransactionalRequest() { OptionalInt.empty(), "", time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID ), OffsetAndMetadata.fromRequest( partition, time.milliseconds() @@ -170,7 +196,8 @@ public void testFromTransactionalRequest() { OptionalInt.of(10), "hello", time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID ), OffsetAndMetadata.fromRequest( partition, time.milliseconds() diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java index 2f1cb354a5a86..63bbad944678f 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImplTest.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.coordinator.group; +import org.apache.kafka.common.Uuid; + import org.junit.jupiter.api.Test; import java.util.OptionalInt; @@ -39,7 +41,8 @@ public void testIsOffsetExpired() { OptionalInt.of(1), "metadata", commitTimestamp, - expireTimestampMs + expireTimestampMs, + Uuid.ZERO_UUID ); // Test when expire timestamp exists (older versions with per partition retention) @@ -56,7 +59,8 @@ public void testIsOffsetExpired() { OptionalInt.of(1), "metadata", commitTimestamp, - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID ); // 3. Current timestamp - base timestamp >= offsets retention => should expire diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index 382b2a9b0e571..83246fb43bd62 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -68,6 +68,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; import java.net.InetAddress; import java.time.Duration; @@ -80,6 +81,7 @@ import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Set; +import java.util.stream.Stream; import static org.apache.kafka.common.requests.OffsetFetchResponse.INVALID_OFFSET; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.OFFSET_COMMITS_SENSOR_NAME; @@ -449,7 +451,8 @@ public void commitOffset( OptionalInt.of(leaderEpoch), "metadata", commitTimestamp, - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID ) )); } @@ -580,6 +583,13 @@ public boolean hasOffset( } } + private static Stream uuids() { + return Stream.of( + Uuid.ZERO_UUID, + Uuid.randomUuid() + ); + } + @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) public void testOffsetCommitWithUnknownGroup(short version) { @@ -906,7 +916,8 @@ public void testGenericGroupOffsetCommitWithRetentionTime() { OptionalInt.empty(), "", context.time.milliseconds(), - OptionalLong.of(context.time.milliseconds() + 1234L) + OptionalLong.of(context.time.milliseconds() + 1234L), + Uuid.ZERO_UUID ) )), result.records() @@ -971,8 +982,9 @@ public void testGenericGroupOffsetCommitMaintainsSession() { assertFalse(group.hasMember(member.memberId())); } - @Test - public void testSimpleGroupOffsetCommit() { + @ParameterizedTest + @MethodSource("uuids") + public void testSimpleGroupOffsetCommit(Uuid topicId) { OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); CoordinatorResult result = context.commitOffset( @@ -981,6 +993,7 @@ public void testSimpleGroupOffsetCommit() { .setTopics(List.of( new OffsetCommitRequestData.OffsetCommitRequestTopic() .setName("bar") + .setTopicId(topicId) .setPartitions(List.of( new OffsetCommitRequestData.OffsetCommitRequestPartition() .setPartitionIndex(0) @@ -994,6 +1007,7 @@ public void testSimpleGroupOffsetCommit() { .setTopics(List.of( new OffsetCommitResponseData.OffsetCommitResponseTopic() .setName("bar") + .setTopicId(topicId) .setPartitions(List.of( new OffsetCommitResponseData.OffsetCommitResponsePartition() .setPartitionIndex(0) @@ -1013,7 +1027,8 @@ public void testSimpleGroupOffsetCommit() { OptionalInt.empty(), "", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + topicId ) )), result.records() @@ -1072,7 +1087,8 @@ public void testSimpleGroupOffsetCommitWithInstanceId() { OptionalInt.empty(), "", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID ) )), result.records() @@ -1235,15 +1251,17 @@ public void testConsumerGroupOffsetCommitFromAdminClient() { OptionalInt.empty(), "", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID ) )), result.records() ); } - @Test - public void testConsumerGroupOffsetCommit() { + @ParameterizedTest + @MethodSource("uuids") + public void testConsumerGroupOffsetCommit(Uuid topicId) { OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); // Create an empty group. @@ -1267,74 +1285,7 @@ public void testConsumerGroupOffsetCommit() { .setTopics(List.of( new OffsetCommitRequestData.OffsetCommitRequestTopic() .setName("bar") - .setPartitions(List.of( - new OffsetCommitRequestData.OffsetCommitRequestPartition() - .setPartitionIndex(0) - .setCommittedOffset(100L) - .setCommittedLeaderEpoch(10) - .setCommittedMetadata("metadata") - )) - )) - ); - - assertEquals( - new OffsetCommitResponseData() - .setTopics(List.of( - new OffsetCommitResponseData.OffsetCommitResponseTopic() - .setName("bar") - .setPartitions(List.of( - new OffsetCommitResponseData.OffsetCommitResponsePartition() - .setPartitionIndex(0) - .setErrorCode(Errors.NONE.code()) - )) - )), - result.response() - ); - - assertEquals( - List.of(GroupCoordinatorRecordHelpers.newOffsetCommitRecord( - "foo", - "bar", - 0, - new OffsetAndMetadata( - 100L, - OptionalInt.of(10), - "metadata", - context.time.milliseconds(), - OptionalLong.empty() - ) - )), - result.records() - ); - } - - @Test - public void testConsumerGroupOffsetCommitWithTopicIds() { - Uuid topicId = Uuid.randomUuid(); - OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); - - // Create an empty group. - ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup( - "foo", - true - ); - - // Add member. - group.updateMember(new ConsumerGroupMember.Builder("member") - .setMemberEpoch(10) - .setPreviousMemberEpoch(10) - .build() - ); - - CoordinatorResult result = context.commitOffset( - new OffsetCommitRequestData() - .setGroupId("foo") - .setMemberId("member") - .setGenerationIdOrMemberEpoch(10) - .setTopics(List.of( - new OffsetCommitRequestData.OffsetCommitRequestTopic() .setTopicId(topicId) - .setName("bar") .setPartitions(List.of( new OffsetCommitRequestData.OffsetCommitRequestPartition() .setPartitionIndex(0) @@ -1349,8 +1300,8 @@ public void testConsumerGroupOffsetCommitWithTopicIds() { new OffsetCommitResponseData() .setTopics(List.of( new OffsetCommitResponseData.OffsetCommitResponseTopic() - .setTopicId(topicId) .setName("bar") + .setTopicId(topicId) .setPartitions(List.of( new OffsetCommitResponseData.OffsetCommitResponsePartition() .setPartitionIndex(0) @@ -1370,7 +1321,8 @@ public void testConsumerGroupOffsetCommitWithTopicIds() { OptionalInt.of(10), "metadata", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + topicId ) )), result.records() @@ -1446,7 +1398,8 @@ public void testConsumerGroupOffsetCommitWithOffsetMetadataTooLarge() { OptionalInt.of(10), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID ) )), result.records() @@ -1512,7 +1465,8 @@ public void testConsumerGroupTransactionalOffsetCommit() { OptionalInt.of(10), "metadata", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID ) )), result.records() @@ -1669,7 +1623,8 @@ public void testGenericGroupTransactionalOffsetCommit() { OptionalInt.of(10), "metadata", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID ) )), result.records() @@ -2692,8 +2647,9 @@ private static OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPar .setMetadata(""); } - @Test - public void testReplay() { + @ParameterizedTest + @MethodSource("uuids") + public void testReplay(Uuid topicId) { OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build(); verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata( @@ -2702,7 +2658,8 @@ public void testReplay() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + topicId )); verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata( @@ -2711,7 +2668,8 @@ public void testReplay() { OptionalInt.of(10), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + topicId )); verifyReplay(context, "foo", "bar", 1, new OffsetAndMetadata( @@ -2720,7 +2678,8 @@ public void testReplay() { OptionalInt.of(10), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + topicId )); verifyReplay(context, "foo", "bar", 1, new OffsetAndMetadata( @@ -2729,7 +2688,8 @@ public void testReplay() { OptionalInt.of(10), "small", context.time.milliseconds(), - OptionalLong.of(12345L) + OptionalLong.of(12345L), + topicId )); } @@ -2743,7 +2703,8 @@ public void testTransactionalReplay() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID )); verifyTransactionalReplay(context, 5, "foo", "bar", 1, new OffsetAndMetadata( @@ -2752,7 +2713,8 @@ public void testTransactionalReplay() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID )); verifyTransactionalReplay(context, 5, "bar", "zar", 0, new OffsetAndMetadata( @@ -2761,7 +2723,8 @@ public void testTransactionalReplay() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID )); verifyTransactionalReplay(context, 5, "bar", "zar", 1, new OffsetAndMetadata( @@ -2770,7 +2733,8 @@ public void testTransactionalReplay() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID )); verifyTransactionalReplay(context, 6, "foo", "bar", 2, new OffsetAndMetadata( @@ -2779,7 +2743,8 @@ public void testTransactionalReplay() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID )); verifyTransactionalReplay(context, 6, "foo", "bar", 3, new OffsetAndMetadata( @@ -2788,7 +2753,8 @@ public void testTransactionalReplay() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID )); } @@ -2803,7 +2769,8 @@ public void testReplayWithTombstoneAndPendingTransactionalOffsets() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID )); verifyTransactionalReplay(context, 10L, "foo", "bar", 0, new OffsetAndMetadata( @@ -2812,7 +2779,8 @@ public void testReplayWithTombstoneAndPendingTransactionalOffsets() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID )); verifyTransactionalReplay(context, 10L, "foo", "bar", 1, new OffsetAndMetadata( @@ -2821,7 +2789,8 @@ public void testReplayWithTombstoneAndPendingTransactionalOffsets() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID )); // Delete the offsets. @@ -2853,7 +2822,8 @@ public void testReplayTransactionEndMarkerWithCommit() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID )); // Add pending transactional commit for producer id 5. @@ -2863,7 +2833,8 @@ public void testReplayTransactionEndMarkerWithCommit() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID )); // Add pending transactional commit for producer id 6. @@ -2873,7 +2844,8 @@ public void testReplayTransactionEndMarkerWithCommit() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID )); // Replaying an end marker with an unknown producer id should not fail. @@ -2897,7 +2869,8 @@ public void testReplayTransactionEndMarkerWithCommit() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID ), context.offsetMetadataManager.offset( "foo", "bar", @@ -2933,7 +2906,8 @@ public void testReplayTransactionEndMarkerKeepsTheMostRecentCommittedOffset() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID )); // Add regular offset commit. @@ -2943,7 +2917,8 @@ public void testReplayTransactionEndMarkerKeepsTheMostRecentCommittedOffset() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID )); // Replaying an end marker to commit transaction of producer id 5. @@ -2965,7 +2940,8 @@ public void testReplayTransactionEndMarkerKeepsTheMostRecentCommittedOffset() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID ), context.offsetMetadataManager.offset( "foo", "bar", @@ -2984,7 +2960,8 @@ public void testOffsetCommitsNumberMetricWithTransactionalOffsets() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID )); // Add pending transactional commit for producer id 5. @@ -2994,7 +2971,8 @@ public void testOffsetCommitsNumberMetricWithTransactionalOffsets() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID )); // Add pending transactional commit for producer id 6. @@ -3004,7 +2982,8 @@ public void testOffsetCommitsNumberMetricWithTransactionalOffsets() { OptionalInt.empty(), "small", context.time.milliseconds(), - OptionalLong.empty() + OptionalLong.empty(), + Uuid.ZERO_UUID )); // Commit all the transactions. diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java index a8b4d639e1c73..dfcb415fd3ea1 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java @@ -1124,7 +1124,7 @@ public void testOffsetExpirationCondition() { long currentTimestamp = 30000L; long commitTimestamp = 20000L; long offsetsRetentionMs = 10000L; - OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty()); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty(), Uuid.ZERO_UUID); MockTime time = new MockTime(); long currentStateTimestamp = time.milliseconds(); ClassicGroup group = new ClassicGroup(new LogContext(), "groupId", EMPTY, time); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java index 333df21d9c33d..a6b91e5a83b88 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java @@ -1198,7 +1198,7 @@ public void testOffsetExpirationCondition() { long currentTimestamp = 30000L; long commitTimestamp = 20000L; long offsetsRetentionMs = 10000L; - OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty()); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty(), Uuid.ZERO_UUID); ConsumerGroup group = new ConsumerGroup(new SnapshotRegistry(new LogContext()), "group-id", mock(GroupCoordinatorMetricsShard.class)); Optional offsetExpirationCondition = group.offsetExpirationCondition(); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java index e9038b33402e2..99e13bbf15545 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java @@ -791,7 +791,7 @@ public void testOffsetExpirationCondition() { long currentTimestamp = 30000L; long commitTimestamp = 20000L; long offsetsRetentionMs = 10000L; - OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty()); + OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(15000L, OptionalInt.empty(), "", commitTimestamp, OptionalLong.empty(), Uuid.ZERO_UUID); StreamsGroup group = new StreamsGroup(LOG_CONTEXT, new SnapshotRegistry(LOG_CONTEXT), "group-id", mock(GroupCoordinatorMetricsShard.class)); Optional offsetExpirationCondition = group.offsetExpirationCondition(); diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java index 331b252dcc4c1..d774d0fed79c9 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/OffsetMessageFormatterTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.tools.consumer; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.protocol.MessageUtil; import org.apache.kafka.coordinator.group.generated.GroupMetadataKey; import org.apache.kafka.coordinator.group.generated.GroupMetadataValue; @@ -28,7 +29,6 @@ import java.util.stream.Stream; public class OffsetMessageFormatterTest extends CoordinatorRecordMessageFormatterTest { - private static final OffsetCommitKey OFFSET_COMMIT_KEY = new OffsetCommitKey() .setGroup("group-id") .setTopic("foo") @@ -38,7 +38,8 @@ public class OffsetMessageFormatterTest extends CoordinatorRecordMessageFormatte .setLeaderEpoch(10) .setMetadata("metadata") .setCommitTimestamp(1234L) - .setExpireTimestamp(5678L); + .setExpireTimestamp(5678L) + .setTopicId(Uuid.fromString("MKXx1fIkQy2J9jXHhK8m1w")); private static final GroupMetadataKey GROUP_METADATA_KEY = new GroupMetadataKey().setGroup("group-id"); private static final GroupMetadataValue GROUP_METADATA_VALUE = new GroupMetadataValue() .setProtocolType("consumer") @@ -121,7 +122,8 @@ protected Stream parameters() { "data":{"offset":100, "leaderEpoch":10, "metadata":"metadata", - "commitTimestamp":1234}}} + "commitTimestamp":1234, + "topicId":"MKXx1fIkQy2J9jXHhK8m1w"}}} """ ), Arguments.of(