Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,8 @@ public static CoordinatorRecord newOffsetCommitRecord(
.setMetadata(offsetAndMetadata.metadata)
.setCommitTimestamp(offsetAndMetadata.commitTimestampMs)
// Version 1 has a non-empty expireTimestamp field
.setExpireTimestamp(offsetAndMetadata.expireTimestampMs.orElse(OffsetCommitRequest.DEFAULT_TIMESTAMP)),
.setExpireTimestamp(offsetAndMetadata.expireTimestampMs.orElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
.setTopicId(offsetAndMetadata.topicId),
version
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update offsetCommitValueVersion to return version 4 if topic id is present?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I missed this.

)
);
Expand All @@ -527,9 +528,7 @@ static short offsetCommitValueVersion(boolean expireTimestampMs) {
if (expireTimestampMs) {
return 1;
} else {
// Serialize with the highest supported non-flexible version
// until a tagged field is introduced or the version is bumped.
return 3;
return 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering if we can use OffsetCommitValue.highestSupportedVersion here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to use hard coded version here to ensure that they are not bumped unintentionally. This is what we have been doing for the other records too.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
Expand Down Expand Up @@ -65,20 +66,27 @@ public class OffsetAndMetadata {
*/
public final long recordOffset;

/**
* The topic id used to commit the offset.
*/
public final Uuid topicId;

public OffsetAndMetadata(
long committedOffset,
OptionalInt leaderEpoch,
String metadata,
long commitTimestampMs,
OptionalLong expireTimestampMs
OptionalLong expireTimestampMs,
Uuid topicId
) {
this(
-1L,
committedOffset,
leaderEpoch,
metadata,
commitTimestampMs,
expireTimestampMs
expireTimestampMs,
topicId
);
}

Expand All @@ -88,14 +96,16 @@ public OffsetAndMetadata(
OptionalInt leaderEpoch,
String metadata,
long commitTimestampMs,
OptionalLong expireTimestampMs
OptionalLong expireTimestampMs,
Uuid topicId
) {
this.recordOffset = recordOffset;
this.committedOffset = committedOffset;
this.leaderEpoch = Objects.requireNonNull(leaderEpoch);
this.metadata = Objects.requireNonNull(metadata);
this.commitTimestampMs = commitTimestampMs;
this.expireTimestampMs = Objects.requireNonNull(expireTimestampMs);
this.topicId = topicId;
}

@Override
Expand All @@ -105,6 +115,7 @@ public String toString() {
", metadata=" + metadata +
", commitTimestampMs=" + commitTimestampMs +
", expireTimestampMs=" + expireTimestampMs +
", topicId=" + topicId +
", recordOffset=" + recordOffset +
')';
}
Expand All @@ -121,6 +132,7 @@ public boolean equals(Object o) {
if (recordOffset != that.recordOffset) return false;
if (!Objects.equals(leaderEpoch, that.leaderEpoch)) return false;
if (!Objects.equals(metadata, that.metadata)) return false;
if (!Objects.equals(topicId, that.topicId)) return false;
return Objects.equals(expireTimestampMs, that.expireTimestampMs);
}

Expand All @@ -129,6 +141,7 @@ public int hashCode() {
int result = (int) (committedOffset ^ (committedOffset >>> 32));
result = 31 * result + (leaderEpoch != null ? leaderEpoch.hashCode() : 0);
result = 31 * result + (metadata != null ? metadata.hashCode() : 0);
result = 31 * result + (topicId != null ? topicId.hashCode() : 0);
result = 31 * result + (int) (commitTimestampMs ^ (commitTimestampMs >>> 32));
result = 31 * result + (expireTimestampMs != null ? expireTimestampMs.hashCode() : 0);
result = 31 * result + (int) (recordOffset ^ (recordOffset >>> 32));
Expand All @@ -148,14 +161,16 @@ public static OffsetAndMetadata fromRecord(
ofSentinel(record.leaderEpoch()),
record.metadata(),
record.commitTimestamp(),
ofSentinel(record.expireTimestamp())
ofSentinel(record.expireTimestamp()),
record.topicId()
);
}

/**
* @return An OffsetAndMetadata created from an OffsetCommitRequestPartition request.
*/
public static OffsetAndMetadata fromRequest(
Uuid topicId,
OffsetCommitRequestData.OffsetCommitRequestPartition partition,
long currentTimeMs,
OptionalLong expireTimestampMs
Expand All @@ -166,7 +181,8 @@ public static OffsetAndMetadata fromRequest(
partition.committedMetadata() == null ?
OffsetAndMetadata.NO_METADATA : partition.committedMetadata(),
currentTimeMs,
expireTimestampMs
expireTimestampMs,
topicId
);
}

Expand All @@ -183,7 +199,8 @@ public static OffsetAndMetadata fromRequest(
partition.committedMetadata() == null ?
OffsetAndMetadata.NO_METADATA : partition.committedMetadata(),
currentTimeMs,
OptionalLong.empty()
OptionalLong.empty(),
Uuid.ZERO_UUID
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ public CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> commitOffs
.setErrorCode(Errors.NONE.code()));

final OffsetAndMetadata offsetAndMetadata = OffsetAndMetadata.fromRequest(
topic.topicId(),
partition,
currentTimeMs,
expireTimestampMs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
{ "name": "commitTimestamp", "type": "int64", "versions": "0+",
"about": "The time at which the commit was added to the log."},
{ "name": "expireTimestamp", "type": "int64", "versions": "1", "default": -1, "ignorable": true,
"about": "The time at which the offset will expire."}
"about": "The time at which the offset will expire."},
{ "name": "topicId", "type": "uuid", "versions": "4+", "taggedVersions": "4+", "tag": 0, "ignorable": true,
"about": "The topic id of the committed offset."}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import org.apache.kafka.server.common.ApiMessageAndVersion;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -68,6 +70,7 @@
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Stream;

import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment;
Expand Down Expand Up @@ -720,11 +723,19 @@ public void testEmptyGroupMetadataRecord() {
@Test
public void testOffsetCommitValueVersion() {
assertEquals((short) 1, GroupCoordinatorRecordHelpers.offsetCommitValueVersion(true));
assertEquals((short) 3, GroupCoordinatorRecordHelpers.offsetCommitValueVersion(false));
assertEquals((short) 4, GroupCoordinatorRecordHelpers.offsetCommitValueVersion(false));
}

@Test
public void testNewOffsetCommitRecord() {
private static Stream<Uuid> uuids() {
return Stream.of(
Uuid.ZERO_UUID,
Uuid.randomUuid()
);
}

@ParameterizedTest
@MethodSource("uuids")
public void testNewOffsetCommitRecord(Uuid topicId) {
OffsetCommitKey key = new OffsetCommitKey()
.setGroup("group-id")
.setTopic("foo")
Expand All @@ -734,7 +745,8 @@ public void testNewOffsetCommitRecord() {
.setLeaderEpoch(10)
.setMetadata("metadata")
.setCommitTimestamp(1234L)
.setExpireTimestamp(-1L);
.setExpireTimestamp(-1L)
.setTopicId(topicId);

CoordinatorRecord expectedRecord = CoordinatorRecord.record(
key,
Expand All @@ -749,11 +761,14 @@ public void testNewOffsetCommitRecord() {
"foo",
1,
new OffsetAndMetadata(
-1L,
100L,
OptionalInt.of(10),
"metadata",
1234L,
OptionalLong.empty())
OptionalLong.empty(),
topicId
)
));

value.setLeaderEpoch(-1);
Expand All @@ -767,7 +782,9 @@ public void testNewOffsetCommitRecord() {
OptionalInt.empty(),
"metadata",
1234L,
OptionalLong.empty())
OptionalLong.empty(),
topicId
)
));
}

Expand Down Expand Up @@ -798,7 +815,9 @@ public void testNewOffsetCommitRecordWithExpireTimestamp() {
OptionalInt.of(10),
"metadata",
1234L,
OptionalLong.of(5678L))
OptionalLong.of(5678L),
Uuid.ZERO_UUID
)
));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,52 +16,69 @@
*/
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.server.util.MockTime;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class OffsetAndMetadataTest {
@Test
public void testAttributes() {
Uuid topicId = Uuid.randomUuid();
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(
100L,
OptionalInt.of(10),
"metadata",
1234L,
OptionalLong.of(5678L)
OptionalLong.of(5678L),
topicId
);

assertEquals(100L, offsetAndMetadata.committedOffset);
assertEquals(OptionalInt.of(10), offsetAndMetadata.leaderEpoch);
assertEquals("metadata", offsetAndMetadata.metadata);
assertEquals(1234L, offsetAndMetadata.commitTimestampMs);
assertEquals(OptionalLong.of(5678L), offsetAndMetadata.expireTimestampMs);
assertEquals(topicId, offsetAndMetadata.topicId);
}

@Test
public void testFromRecord() {
private static Stream<Uuid> uuids() {
return Stream.of(
Uuid.ZERO_UUID,
Uuid.randomUuid()
);
}

@ParameterizedTest
@MethodSource("uuids")
public void testFromRecord(Uuid uuid) {
OffsetCommitValue record = new OffsetCommitValue()
.setOffset(100L)
.setLeaderEpoch(-1)
.setMetadata("metadata")
.setCommitTimestamp(1234L)
.setExpireTimestamp(-1L);
.setExpireTimestamp(-1L)
.setTopicId(uuid);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm: when we read an older OffsetCommitValue, will we get a ZERO_UUID instead of a null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. It uses ZERO_UUID by default.


assertEquals(new OffsetAndMetadata(
10L,
100L,
OptionalInt.empty(),
"metadata",
1234L,
OptionalLong.empty()
OptionalLong.empty(),
uuid
), OffsetAndMetadata.fromRecord(10L, record));

record
Expand All @@ -74,12 +91,14 @@ public void testFromRecord() {
OptionalInt.of(12),
"metadata",
1234L,
OptionalLong.of(5678L)
OptionalLong.of(5678L),
uuid
), OffsetAndMetadata.fromRecord(11L, record));
}

@Test
public void testFromRequest() {
@ParameterizedTest
@MethodSource("uuids")
public void testFromRequest(Uuid uuid) {
MockTime time = new MockTime();

OffsetCommitRequestData.OffsetCommitRequestPartition partition =
Expand All @@ -95,8 +114,10 @@ public void testFromRequest() {
OptionalInt.empty(),
"",
time.milliseconds(),
OptionalLong.empty()
OptionalLong.empty(),
uuid
), OffsetAndMetadata.fromRequest(
uuid,
partition,
time.milliseconds(),
OptionalLong.empty()
Expand All @@ -113,8 +134,10 @@ public void testFromRequest() {
OptionalInt.of(10),
"hello",
time.milliseconds(),
OptionalLong.empty()
OptionalLong.empty(),
uuid
), OffsetAndMetadata.fromRequest(
uuid,
partition,
time.milliseconds(),
OptionalLong.empty()
Expand All @@ -127,8 +150,10 @@ public void testFromRequest() {
OptionalInt.of(10),
"hello",
time.milliseconds(),
OptionalLong.of(5678L)
OptionalLong.of(5678L),
uuid
), OffsetAndMetadata.fromRequest(
uuid,
partition,
time.milliseconds(),
OptionalLong.of(5678L)
Expand All @@ -153,7 +178,8 @@ public void testFromTransactionalRequest() {
OptionalInt.empty(),
"",
time.milliseconds(),
OptionalLong.empty()
OptionalLong.empty(),
Uuid.ZERO_UUID
), OffsetAndMetadata.fromRequest(
partition,
time.milliseconds()
Expand All @@ -170,7 +196,8 @@ public void testFromTransactionalRequest() {
OptionalInt.of(10),
"hello",
time.milliseconds(),
OptionalLong.empty()
OptionalLong.empty(),
Uuid.ZERO_UUID
), OffsetAndMetadata.fromRequest(
partition,
time.milliseconds()
Expand Down
Loading