The content of these metadata is RSM-dependent and is opaque to the broker, i.e.
+ * it's not interpreted, only stored along with the rest of the remote log segment metadata.
+ *
+ *
Examples of such metadata are:
+ *
+ *
The storage path on the remote storage in case it's nondeterministic or version-dependent.
+ *
The actual size of the all files related to the segment on the remote storage.
+ *
+ *
+ *
The maximum size the broker accepts and stores is controlled by
+ * the {@code remote.log.metadata.custom.metadata.max.bytes} setting.
+ */
+ public static class CustomMetadata {
+ private final byte[] value;
+
+ public CustomMetadata(byte[] value) {
+ this.value = value;
+ }
+
+ public byte[] value() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CustomMetadata that = (CustomMetadata) o;
+ return Arrays.equals(value, that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(value);
+ }
+
+ @Override
+ public String toString() {
+ return "CustomMetadata{" + value.length + " bytes}";
+ }
+ }
}
diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java
index a01df9602d250..210615ef53fbf 100644
--- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java
+++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataUpdate.java
@@ -18,8 +18,10 @@
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import java.util.Objects;
+import java.util.Optional;
/**
* It describes the metadata update about the log segment in the remote storage. This is currently used to update the
@@ -34,6 +36,11 @@ public class RemoteLogSegmentMetadataUpdate extends RemoteLogMetadata {
*/
private final RemoteLogSegmentId remoteLogSegmentId;
+ /**
+ * Custom metadata.
+ */
+ private final Optional customMetadata;
+
/**
* It indicates the state in which the action is executed on this segment.
*/
@@ -42,13 +49,17 @@ public class RemoteLogSegmentMetadataUpdate extends RemoteLogMetadata {
/**
* @param remoteLogSegmentId Universally unique remote log segment id.
* @param eventTimestampMs Epoch time in milli seconds at which the remote log segment is copied to the remote tier storage.
+ * @param customMetadata Custom metadata.
* @param state State of the remote log segment.
* @param brokerId Broker id from which this event is generated.
*/
public RemoteLogSegmentMetadataUpdate(RemoteLogSegmentId remoteLogSegmentId, long eventTimestampMs,
- RemoteLogSegmentState state, int brokerId) {
+ Optional customMetadata,
+ RemoteLogSegmentState state,
+ int brokerId) {
super(brokerId, eventTimestampMs);
this.remoteLogSegmentId = Objects.requireNonNull(remoteLogSegmentId, "remoteLogSegmentId can not be null");
+ this.customMetadata = Objects.requireNonNull(customMetadata, "customMetadata can not be null");
this.state = Objects.requireNonNull(state, "state can not be null");
}
@@ -59,6 +70,13 @@ public RemoteLogSegmentId remoteLogSegmentId() {
return remoteLogSegmentId;
}
+ /**
+ * @return Custom metadata.
+ */
+ public Optional customMetadata() {
+ return customMetadata;
+ }
+
/**
* It represents the state of the remote log segment. It can be one of the values of {@link RemoteLogSegmentState}.
*/
@@ -81,6 +99,7 @@ public boolean equals(Object o) {
}
RemoteLogSegmentMetadataUpdate that = (RemoteLogSegmentMetadataUpdate) o;
return Objects.equals(remoteLogSegmentId, that.remoteLogSegmentId) &&
+ Objects.equals(customMetadata, that.customMetadata) &&
state == that.state &&
eventTimestampMs() == that.eventTimestampMs() &&
brokerId() == that.brokerId();
@@ -88,14 +107,15 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
- return Objects.hash(remoteLogSegmentId, state, eventTimestampMs(), brokerId());
+ return Objects.hash(remoteLogSegmentId, customMetadata, state, eventTimestampMs(), brokerId());
}
@Override
public String toString() {
return "RemoteLogSegmentMetadataUpdate{" +
"remoteLogSegmentId=" + remoteLogSegmentId +
- ", state=" + state +
+ ", customMetadata=" + customMetadata +
+ ", state=" + state +
", eventTimestampMs=" + eventTimestampMs() +
", brokerId=" + brokerId() +
'}';
diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
index cc26109969ccd..fa819979b2cba 100644
--- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
+++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageManager.java
@@ -18,9 +18,11 @@
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import java.io.Closeable;
import java.io.InputStream;
+import java.util.Optional;
/**
* This interface provides the lifecycle of remote log segments that includes copy, fetch, and delete from remote
@@ -81,10 +83,11 @@ enum IndexType {
*
* @param remoteLogSegmentMetadata metadata about the remote log segment.
* @param logSegmentData data to be copied to tiered storage.
+ * @return custom metadata to be added to the segment metadata after copying.
* @throws RemoteStorageException if there are any errors in storing the data of the segment.
*/
- void copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
- LogSegmentData logSegmentData)
+ Optional copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+ LogSegmentData logSegmentData)
throws RemoteStorageException;
/**
diff --git a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java
index 8a83033aa0444..dfd905ce0cdb3 100644
--- a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java
+++ b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java
@@ -16,14 +16,18 @@
*/
package org.apache.kafka.server.log.remote.storage;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.Map;
+import java.util.Optional;
public class NoOpRemoteStorageManager implements RemoteStorageManager {
@Override
- public void copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
- LogSegmentData logSegmentData) {
+ public Optional copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+ LogSegmentData logSegmentData) {
+ return Optional.empty();
}
@Override
diff --git a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
new file mode 100644
index 0000000000000..4cd2b350441c5
--- /dev/null
+++ b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadataTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.junit.jupiter.api.Test;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RemoteLogSegmentMetadataTest {
+ private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(),
+ new TopicPartition("foo", 0));
+
+ @Test
+ void createWithUpdates() {
+ int brokerId = 0;
+ int eventTimestamp = 0;
+ int brokerIdFinished = 1;
+ int timestampFinished = 1;
+ long startOffset = 0L;
+ long endOffset = 100L;
+ int segmentSize = 123;
+ long maxTimestamp = -1L;
+
+ Map segmentLeaderEpochs = new HashMap<>();
+ segmentLeaderEpochs.put(0, 0L);
+ RemoteLogSegmentId segmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
+ RemoteLogSegmentMetadata segmentMetadata = new RemoteLogSegmentMetadata(segmentId, startOffset, endOffset,
+ maxTimestamp, brokerId, eventTimestamp, segmentSize,
+ segmentLeaderEpochs);
+
+ CustomMetadata customMetadata = new CustomMetadata(new byte[]{0, 1, 2, 3});
+ RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(
+ segmentId, timestampFinished, Optional.of(customMetadata), RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+ brokerIdFinished);
+ RemoteLogSegmentMetadata updatedMetadata = segmentMetadata.createWithUpdates(segmentMetadataUpdate);
+
+ RemoteLogSegmentMetadata expectedUpdatedMetadata = new RemoteLogSegmentMetadata(
+ segmentId, startOffset, endOffset,
+ maxTimestamp, brokerIdFinished, timestampFinished, segmentSize, Optional.of(customMetadata),
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+ segmentLeaderEpochs
+ );
+ assertEquals(expectedUpdatedMetadata, updatedMetadata);
+
+ // Check that the original metadata have not changed.
+ assertEquals(segmentId, segmentMetadata.remoteLogSegmentId());
+ assertEquals(startOffset, segmentMetadata.startOffset());
+ assertEquals(endOffset, segmentMetadata.endOffset());
+ assertEquals(maxTimestamp, segmentMetadata.maxTimestampMs());
+ assertEquals(brokerId, segmentMetadata.brokerId());
+ assertEquals(eventTimestamp, segmentMetadata.eventTimestampMs());
+ assertEquals(segmentSize, segmentMetadata.segmentSizeInBytes());
+ assertEquals(segmentLeaderEpochs, segmentMetadata.segmentLeaderEpochs());
+ }
+}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
index 15e4562a2da03..0b0b2817061fa 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
@@ -77,7 +77,8 @@ protected void loadRemoteLogSegmentMetadata(RemoteLogMetadataSnapshotFile.Snapsh
private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata(RemoteLogSegmentMetadataSnapshot snapshot) {
return new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, snapshot.segmentId()), snapshot.startOffset(),
snapshot.endOffset(), snapshot.maxTimestampMs(), snapshot.brokerId(), snapshot.eventTimestampMs(),
- snapshot.segmentSizeInBytes(), snapshot.state(), snapshot.segmentLeaderEpochs());
+ snapshot.segmentSizeInBytes(), snapshot.customMetadata(), snapshot.state(), snapshot.segmentLeaderEpochs()
+ );
}
/**
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
index c936d5056f8d7..ec1ed6a66d15c 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
@@ -20,6 +20,7 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import java.util.Collections;
@@ -27,6 +28,7 @@
import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeMap;
+import java.util.Optional;
/**
* This class represents the entry containing the metadata about a remote log segment. This is similar to
@@ -68,6 +70,11 @@ public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata {
*/
private final int segmentSizeInBytes;
+ /**
+ * Custom metadata.
+ */
+ private final Optional customMetadata;
+
/**
* It indicates the state in which the action is executed on this segment.
*/
@@ -79,13 +86,14 @@ public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata {
* {@code segmentLeaderEpochs} can not be empty. If all the records in this segment belong to the same leader epoch
* then it should have an entry with epoch mapping to start-offset of this segment.
*
- * @param segmentId Universally unique remote log segment id.
+ * @param segmentId Universally unique remote log segment id.
* @param startOffset Start offset of this segment (inclusive).
* @param endOffset End offset of this segment (inclusive).
* @param maxTimestampMs Maximum timestamp in milliseconds in this segment.
* @param brokerId Broker id from which this event is generated.
* @param eventTimestampMs Epoch time in milliseconds at which the remote log segment is copied to the remote tier storage.
* @param segmentSizeInBytes Size of this segment in bytes.
+ * @param customMetadata Custom metadata.
* @param state State of the respective segment of remoteLogSegmentId.
* @param segmentLeaderEpochs leader epochs occurred within this segment.
*/
@@ -96,6 +104,7 @@ public RemoteLogSegmentMetadataSnapshot(Uuid segmentId,
int brokerId,
long eventTimestampMs,
int segmentSizeInBytes,
+ Optional customMetadata,
RemoteLogSegmentState state,
Map segmentLeaderEpochs) {
super(brokerId, eventTimestampMs);
@@ -106,6 +115,7 @@ public RemoteLogSegmentMetadataSnapshot(Uuid segmentId,
this.endOffset = endOffset;
this.maxTimestampMs = maxTimestampMs;
this.segmentSizeInBytes = segmentSizeInBytes;
+ this.customMetadata = Objects.requireNonNull(customMetadata, "customMetadata can not be null");
if (segmentLeaderEpochs == null || segmentLeaderEpochs.isEmpty()) {
throw new IllegalArgumentException("segmentLeaderEpochs can not be null or empty");
@@ -117,7 +127,8 @@ public RemoteLogSegmentMetadataSnapshot(Uuid segmentId,
public static RemoteLogSegmentMetadataSnapshot create(RemoteLogSegmentMetadata metadata) {
return new RemoteLogSegmentMetadataSnapshot(metadata.remoteLogSegmentId().id(), metadata.startOffset(), metadata.endOffset(),
metadata.maxTimestampMs(), metadata.brokerId(), metadata.eventTimestampMs(),
- metadata.segmentSizeInBytes(), metadata.state(), metadata.segmentLeaderEpochs());
+ metadata.segmentSizeInBytes(), metadata.customMetadata(), metadata.state(), metadata.segmentLeaderEpochs()
+ );
}
/**
@@ -162,6 +173,13 @@ public NavigableMap segmentLeaderEpochs() {
return segmentLeaderEpochs;
}
+ /**
+ * @return Custom metadata.
+ */
+ public Optional customMetadata() {
+ return customMetadata;
+ }
+
/**
* Returns the current state of this remote log segment. It can be any of the below
*
@@ -185,13 +203,19 @@ public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof RemoteLogSegmentMetadataSnapshot)) return false;
RemoteLogSegmentMetadataSnapshot that = (RemoteLogSegmentMetadataSnapshot) o;
- return startOffset == that.startOffset && endOffset == that.endOffset && maxTimestampMs == that.maxTimestampMs && segmentSizeInBytes == that.segmentSizeInBytes && Objects.equals(
- segmentId, that.segmentId) && Objects.equals(segmentLeaderEpochs, that.segmentLeaderEpochs) && state == that.state;
+ return startOffset == that.startOffset
+ && endOffset == that.endOffset
+ && maxTimestampMs == that.maxTimestampMs
+ && segmentSizeInBytes == that.segmentSizeInBytes
+ && Objects.equals(customMetadata, that.customMetadata)
+ && Objects.equals(segmentId, that.segmentId)
+ && Objects.equals(segmentLeaderEpochs, that.segmentLeaderEpochs)
+ && state == that.state;
}
@Override
public int hashCode() {
- return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs, segmentLeaderEpochs, segmentSizeInBytes, state);
+ return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs, segmentLeaderEpochs, segmentSizeInBytes, customMetadata, state);
}
@Override
@@ -203,6 +227,7 @@ public String toString() {
", maxTimestampMs=" + maxTimestampMs +
", segmentLeaderEpochs=" + segmentLeaderEpochs +
", segmentSizeInBytes=" + segmentSizeInBytes +
+ ", customMetadata=" + customMetadata +
", state=" + state +
'}';
}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
index bd613f8c9cdf1..ad47ee05c84ee 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
@@ -19,11 +19,13 @@
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogSegmentMetadataSnapshot;
import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataSnapshotRecord;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
public class RemoteLogSegmentMetadataSnapshotTransform implements RemoteLogMetadataTransform {
@@ -39,6 +41,7 @@ public ApiMessageAndVersion toApiMessageAndVersion(RemoteLogSegmentMetadataSnaps
.setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
.setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata.segmentLeaderEpochs()))
.setRemoteLogSegmentState(segmentMetadata.state().id());
+ segmentMetadata.customMetadata().ifPresent(md -> record.setCustomMetadata(md.value()));
return new ApiMessageAndVersion(record, record.highestSupportedVersion());
}
@@ -59,6 +62,7 @@ public RemoteLogSegmentMetadataSnapshot fromApiMessageAndVersion(ApiMessageAndVe
segmentLeaderEpochs.put(segmentLeaderEpoch.leaderEpoch(), segmentLeaderEpoch.offset());
}
+ Optional customMetadata = Optional.ofNullable(record.customMetadata()).map(CustomMetadata::new);
return new RemoteLogSegmentMetadataSnapshot(record.segmentId(),
record.startOffset(),
record.endOffset(),
@@ -66,6 +70,7 @@ public RemoteLogSegmentMetadataSnapshot fromApiMessageAndVersion(ApiMessageAndVe
record.brokerId(),
record.eventTimestampMs(),
record.segmentSizeInBytes(),
+ customMetadata,
RemoteLogSegmentState.forId(record.remoteLogSegmentState()),
segmentLeaderEpochs);
}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
index 4282b9e71f302..9e893d2cbc3f0 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
@@ -22,12 +22,14 @@
import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
public class RemoteLogSegmentMetadataTransform implements RemoteLogMetadataTransform {
@@ -43,6 +45,7 @@ public ApiMessageAndVersion toApiMessageAndVersion(RemoteLogSegmentMetadata segm
.setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
.setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata))
.setRemoteLogSegmentState(segmentMetadata.state().id());
+ segmentMetadata.customMetadata().ifPresent(md -> record.setCustomMetadata(md.value()));
return new ApiMessageAndVersion(record, record.highestSupportedVersion());
}
@@ -75,6 +78,7 @@ public RemoteLogSegmentMetadata fromApiMessageAndVersion(ApiMessageAndVersion ap
segmentLeaderEpochs.put(segmentLeaderEpoch.leaderEpoch(), segmentLeaderEpoch.offset());
}
+ Optional customMetadata = Optional.ofNullable(record.customMetadata()).map(CustomMetadata::new);
RemoteLogSegmentMetadata remoteLogSegmentMetadata =
new RemoteLogSegmentMetadata(remoteLogSegmentId, record.startOffset(), record.endOffset(),
record.maxTimestampMs(), record.brokerId(),
@@ -82,6 +86,7 @@ public RemoteLogSegmentMetadata fromApiMessageAndVersion(ApiMessageAndVersion ap
segmentLeaderEpochs);
RemoteLogSegmentMetadataUpdate rlsmUpdate
= new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, record.eventTimestampMs(),
+ customMetadata,
RemoteLogSegmentState.forId(record.remoteLogSegmentState()),
record.brokerId());
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
index 3db776520c644..e2d2bf8049cf4 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
@@ -21,9 +21,12 @@
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import java.util.Optional;
+
public class RemoteLogSegmentMetadataUpdateTransform implements RemoteLogMetadataTransform {
public ApiMessageAndVersion toApiMessageAndVersion(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate) {
@@ -32,6 +35,7 @@ public ApiMessageAndVersion toApiMessageAndVersion(RemoteLogSegmentMetadataUpdat
.setBrokerId(segmentMetadataUpdate.brokerId())
.setEventTimestampMs(segmentMetadataUpdate.eventTimestampMs())
.setRemoteLogSegmentState(segmentMetadataUpdate.state().id());
+ segmentMetadataUpdate.customMetadata().ifPresent(md -> record.setCustomMetadata(md.value()));
return new ApiMessageAndVersion(record, record.highestSupportedVersion());
}
@@ -42,8 +46,9 @@ public RemoteLogSegmentMetadataUpdate fromApiMessageAndVersion(ApiMessageAndVers
TopicIdPartition topicIdPartition = new TopicIdPartition(entry.topicIdPartition().id(),
new TopicPartition(entry.topicIdPartition().name(), entry.topicIdPartition().partition()));
+ Optional customMetadata = Optional.ofNullable(record.customMetadata()).map(CustomMetadata::new);
return new RemoteLogSegmentMetadataUpdate(new RemoteLogSegmentId(topicIdPartition, entry.id()),
- record.eventTimestampMs(), RemoteLogSegmentState.forId(record.remoteLogSegmentState()), record.brokerId());
+ record.eventTimestampMs(), customMetadata, RemoteLogSegmentState.forId(record.remoteLogSegmentState()), record.brokerId());
}
private RemoteLogSegmentMetadataUpdateRecord.RemoteLogSegmentIdEntry createRemoteLogSegmentIdEntry(RemoteLogSegmentMetadataUpdate data) {
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java
index 6284e4cb78e6b..0260264e34497 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/ClassLoaderAwareRemoteStorageManager.java
@@ -18,9 +18,12 @@
import org.apache.kafka.storage.internals.log.StorageAction;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
+import java.util.Optional;
/**
* A wrapper class of {@link RemoteStorageManager} that sets the context class loader when calling the respective
@@ -66,12 +69,9 @@ private T withClassLoader(StorageAction action) t
}
}
- public void copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+ public Optional copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
LogSegmentData logSegmentData) throws RemoteStorageException {
- withClassLoader(() -> {
- delegate.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData);
- return null;
- });
+ return withClassLoader(() -> delegate.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData));
}
@Override
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
index 17a2746fdb36d..1167ee73c6675 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java
@@ -81,6 +81,13 @@ public final class RemoteLogManagerConfig {
public static final String REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC = "Listener name of the local broker to which it should get connected if " +
"needed by RemoteLogMetadataManager implementation.";
+ public static final String REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP = "remote.log.metadata.custom.metadata.max.bytes";
+ public static final String REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC = "The maximum size of custom metadata in bytes that the broker " +
+ "should accept from a remote storage plugin. If custom metadata exceeds this limit, the updated segment metadata " +
+ "will not be stored, the copied data will be attempted to delete, " +
+ "and the remote copying task for this topic-partition will stop with an error.";
+ public static final int DEFAULT_REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES = 128;
+
public static final String REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP = "remote.log.index.file.cache.total.size.bytes";
public static final String REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_DOC = "The total size of the space allocated to store index files fetched " +
"from remote storage in the local storage.";
@@ -181,6 +188,12 @@ public final class RemoteLogManagerConfig {
new ConfigDef.NonEmptyString(),
MEDIUM,
REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_DOC)
+ .defineInternal(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP,
+ INT,
+ DEFAULT_REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES,
+ atLeast(0),
+ LOW,
+ REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_DOC)
.defineInternal(REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
LONG,
DEFAULT_REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES,
@@ -260,6 +273,7 @@ public final class RemoteLogManagerConfig {
private final String remoteLogMetadataManagerPrefix;
private final HashMap remoteLogMetadataManagerProps;
private final String remoteLogMetadataManagerListenerName;
+ private final int remoteLogMetadataCustomMetadataMaxBytes;
public RemoteLogManagerConfig(AbstractConfig config) {
this(config.getBoolean(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP),
@@ -276,6 +290,7 @@ public RemoteLogManagerConfig(AbstractConfig config) {
config.getDouble(REMOTE_LOG_MANAGER_TASK_RETRY_JITTER_PROP),
config.getInt(REMOTE_LOG_READER_THREADS_PROP),
config.getInt(REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP),
+ config.getInt(REMOTE_LOG_METADATA_CUSTOM_METADATA_MAX_BYTES_PROP),
config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP),
config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP) != null
? config.originalsWithPrefix(config.getString(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP))
@@ -301,6 +316,7 @@ public RemoteLogManagerConfig(boolean enableRemoteStorageSystem,
double remoteLogManagerTaskRetryJitter,
int remoteLogReaderThreads,
int remoteLogReaderMaxPendingTasks,
+ int remoteLogMetadataCustomMetadataMaxBytes,
String remoteStorageManagerPrefix,
Map remoteStorageManagerProps, /* properties having keys stripped out with remoteStorageManagerPrefix */
String remoteLogMetadataManagerPrefix,
@@ -324,6 +340,7 @@ public RemoteLogManagerConfig(boolean enableRemoteStorageSystem,
this.remoteLogMetadataManagerPrefix = remoteLogMetadataManagerPrefix;
this.remoteLogMetadataManagerProps = new HashMap<>(remoteLogMetadataManagerProps);
this.remoteLogMetadataManagerListenerName = remoteLogMetadataManagerListenerName;
+ this.remoteLogMetadataCustomMetadataMaxBytes = remoteLogMetadataCustomMetadataMaxBytes;
}
public boolean enableRemoteStorageSystem() {
@@ -382,6 +399,10 @@ public String remoteLogMetadataManagerListenerName() {
return remoteLogMetadataManagerListenerName;
}
+ public int remoteLogMetadataCustomMetadataMaxBytes() {
+ return remoteLogMetadataCustomMetadataMaxBytes;
+ }
+
public String remoteStorageManagerPrefix() {
return remoteStorageManagerPrefix;
}
@@ -412,6 +433,7 @@ public boolean equals(Object o) {
&& remoteLogManagerTaskRetryJitter == that.remoteLogManagerTaskRetryJitter
&& remoteLogReaderThreads == that.remoteLogReaderThreads
&& remoteLogReaderMaxPendingTasks == that.remoteLogReaderMaxPendingTasks
+ && remoteLogMetadataCustomMetadataMaxBytes == that.remoteLogMetadataCustomMetadataMaxBytes
&& Objects.equals(remoteStorageManagerClassName, that.remoteStorageManagerClassName)
&& Objects.equals(remoteStorageManagerClassPath, that.remoteStorageManagerClassPath)
&& Objects.equals(remoteLogMetadataManagerClassName, that.remoteLogMetadataManagerClassName)
@@ -427,7 +449,7 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(enableRemoteStorageSystem, remoteStorageManagerClassName, remoteStorageManagerClassPath,
remoteLogMetadataManagerClassName, remoteLogMetadataManagerClassPath, remoteLogMetadataManagerListenerName,
- remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize, remoteLogManagerTaskIntervalMs,
+ remoteLogMetadataCustomMetadataMaxBytes, remoteLogIndexFileCacheTotalSizeBytes, remoteLogManagerThreadPoolSize, remoteLogManagerTaskIntervalMs,
remoteLogManagerTaskRetryBackoffMs, remoteLogManagerTaskRetryBackoffMaxMs, remoteLogManagerTaskRetryJitter,
remoteLogReaderThreads, remoteLogReaderMaxPendingTasks, remoteStorageManagerProps, remoteLogMetadataManagerProps,
remoteStorageManagerPrefix, remoteLogMetadataManagerPrefix);
diff --git a/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json b/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
index d18144e4dfe12..c737135a6a212 100644
--- a/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
+++ b/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
@@ -116,6 +116,14 @@
"versions": "0+",
"about": "Segment size in bytes."
},
+ {
+ "name": "CustomMetadata",
+ "type": "bytes",
+ "default": "null",
+ "versions": "0+",
+ "nullableVersions": "0+",
+ "about": "Custom metadata."
+ },
{
"name": "RemoteLogSegmentState",
"type": "int8",
diff --git a/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
index dbb29139c19c1..20fb17325729d 100644
--- a/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
+++ b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
@@ -82,6 +82,14 @@
"versions": "0+",
"about": "Segment size in bytes"
},
+ {
+ "name": "CustomMetadata",
+ "type": "bytes",
+ "default": "null",
+ "versions": "0+",
+ "nullableVersions": "0+",
+ "about": "Custom metadata."
+ },
{
"name": "RemoteLogSegmentState",
"type": "int8",
diff --git a/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json b/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json
index 24003dcbce849..48aa34d4e91b9 100644
--- a/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json
+++ b/storage/src/main/resources/message/RemoteLogSegmentMetadataUpdateRecord.json
@@ -72,6 +72,14 @@
"versions": "0+",
"about": "Epoch time in milli seconds at which this event is generated."
},
+ {
+ "name": "CustomMetadata",
+ "type": "bytes",
+ "default": "null",
+ "versions": "0+",
+ "nullableVersions": "0+",
+ "about": "Custom metadata."
+ },
{
"name": "RemoteLogSegmentState",
"type": "int8",
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
index 5f77417a45805..d5341e07b07ad 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
@@ -21,6 +21,7 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.test.TestUtils;
@@ -50,8 +51,10 @@ public void testFileBasedRemoteLogMetadataCacheWithUnreferencedSegments() throws
0, 100, System.currentTimeMillis(), brokerId, System.currentTimeMillis(),
1024 * 1024, Collections.singletonMap(0, 0L));
cache.addCopyInProgressSegment(metadata1);
- RemoteLogSegmentMetadataUpdate metadataUpdate1 = new RemoteLogSegmentMetadataUpdate(segmentId1, System.currentTimeMillis(),
- RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+ RemoteLogSegmentMetadataUpdate metadataUpdate1 = new RemoteLogSegmentMetadataUpdate(
+ segmentId1, System.currentTimeMillis(),
+ Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
cache.updateRemoteLogSegmentMetadata(metadataUpdate1);
Optional receivedMetadata = cache.remoteLogSegmentMetadata(0, 0L);
assertTrue(receivedMetadata.isPresent());
@@ -63,8 +66,10 @@ public void testFileBasedRemoteLogMetadataCacheWithUnreferencedSegments() throws
0, 900, System.currentTimeMillis(), brokerId, System.currentTimeMillis(),
1024 * 1024, Collections.singletonMap(0, 0L));
cache.addCopyInProgressSegment(metadata2);
- RemoteLogSegmentMetadataUpdate metadataUpdate2 = new RemoteLogSegmentMetadataUpdate(segmentId2, System.currentTimeMillis(),
- RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+ RemoteLogSegmentMetadataUpdate metadataUpdate2 = new RemoteLogSegmentMetadataUpdate(
+ segmentId2, System.currentTimeMillis(),
+ Optional.of(new CustomMetadata(new byte[]{4, 5, 6, 7})),
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
cache.updateRemoteLogSegmentMetadata(metadataUpdate2);
// Fetch segment for leader epoch:0 and start offset:0, it should be the newly added segment.
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
index 789997feed299..6fe0846280330 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java
@@ -31,6 +31,7 @@
import java.util.Collections;
import java.util.Map;
+import java.util.Optional;
public class RemoteLogMetadataCacheTest {
@@ -57,7 +58,7 @@ public void testAPIsWithInvalidArgs() {
-1L, BROKER_ID_0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
RemoteLogSegmentMetadata updatedMetadata = segmentMetadata
.createWithUpdates(new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
- time.milliseconds(), state, BROKER_ID_1));
+ time.milliseconds(), Optional.empty(), state, BROKER_ID_1));
Assertions.assertThrows(IllegalArgumentException.class, () ->
cache.addCopyInProgressSegment(updatedMetadata));
}
@@ -67,7 +68,9 @@ public void testAPIsWithInvalidArgs() {
Assertions.assertThrows(RemoteResourceNotFoundException.class, () -> {
RemoteLogSegmentId nonExistingId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
cache.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(nonExistingId,
- time.milliseconds(), RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID_1));
+ time.milliseconds(),
+ Optional.empty(),
+ RemoteLogSegmentState.DELETE_SEGMENT_STARTED, BROKER_ID_1));
});
// Check for invalid state transition.
@@ -75,7 +78,9 @@ public void testAPIsWithInvalidArgs() {
RemoteLogSegmentMetadata segmentMetadata = createSegmentUpdateWithState(cache, Collections.singletonMap(0, 0L), 0,
100, RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
cache.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
- time.milliseconds(), RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, BROKER_ID_1));
+ time.milliseconds(),
+ Optional.empty(),
+ RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, BROKER_ID_1));
});
}
@@ -90,8 +95,11 @@ private RemoteLogSegmentMetadata createSegmentUpdateWithState(RemoteLogMetadataC
BROKER_ID_0, time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
cache.addCopyInProgressSegment(segmentMetadata);
- RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId,
- time.milliseconds(), state, BROKER_ID_1);
+ RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(
+ segmentId,
+ time.milliseconds(),
+ Optional.empty(),
+ state, BROKER_ID_1);
cache.updateRemoteLogSegmentMetadata(segMetadataUpdate);
return segmentMetadata.createWithUpdates(segMetadataUpdate);
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
index 3f9db8cee13b5..e3d1a2aee0cd3 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
@@ -23,6 +23,8 @@
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
@@ -30,6 +32,7 @@
import java.io.PrintStream;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -46,15 +49,22 @@ public void testFormat() throws IOException {
segLeaderEpochs.put(1, 20L);
segLeaderEpochs.put(2, 80L);
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, SEGMENT_ID);
+ Optional customMetadata = Optional.of(new CustomMetadata(new byte[10]));
RemoteLogSegmentMetadata remoteLogMetadata = new RemoteLogSegmentMetadata(
remoteLogSegmentId, 0L, 100L, -1L, 1,
- 123L, 1024, segLeaderEpochs);
+ 123L, 1024, customMetadata,
+ RemoteLogSegmentState.COPY_SEGMENT_STARTED, segLeaderEpochs);
byte[] metadataBytes = new RemoteLogMetadataSerde().serialize(remoteLogMetadata);
ConsumerRecord metadataRecord = new ConsumerRecord<>("__remote_log_metadata", 0, 0, null, metadataBytes);
String expected = String.format(
- "partition: 0, offset: 0, value: RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=%s:foo-0, id=%s}, startOffset=0, endOffset=100, brokerId=1, maxTimestampMs=-1, eventTimestampMs=123, segmentLeaderEpochs={0=0, 1=20, 2=80}, segmentSizeInBytes=1024, state=COPY_SEGMENT_STARTED}\n",
+ "partition: 0, offset: 0, value: " +
+ "RemoteLogSegmentMetadata{remoteLogSegmentId=RemoteLogSegmentId{topicIdPartition=%s:foo-0, id=%s}, " +
+ "startOffset=0, endOffset=100, brokerId=1, maxTimestampMs=-1, " +
+ "eventTimestampMs=123, segmentLeaderEpochs={0=0, 1=20, 2=80}, segmentSizeInBytes=1024, " +
+ "customMetadata=Optional[CustomMetadata{10 bytes}], " +
+ "state=COPY_SEGMENT_STARTED}\n",
TOPIC_ID, SEGMENT_ID);
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos)) {
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
index 402d1a2994a7b..5b48790c7fdc9 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSerdeTest.java
@@ -24,6 +24,7 @@
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
@@ -34,6 +35,7 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
public class RemoteLogMetadataSerdeTest {
@@ -69,12 +71,17 @@ private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata() {
segLeaderEpochs.put(2, 80L);
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
return new RemoteLogSegmentMetadata(remoteLogSegmentId, 0L, 100L, -1L, 1,
- time.milliseconds(), 1024, segLeaderEpochs);
+ time.milliseconds(), 1024,
+ Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})),
+ RemoteLogSegmentState.COPY_SEGMENT_STARTED,
+ segLeaderEpochs
+ );
}
private RemoteLogSegmentMetadataUpdate createRemoteLogSegmentMetadataUpdate() {
RemoteLogSegmentId remoteLogSegmentId = new RemoteLogSegmentId(TP0, Uuid.randomUuid());
return new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, time.milliseconds(),
+ Optional.of(new CustomMetadata(new byte[] {0, 1, 2, 3})),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 2);
}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
index 1b4602887be0f..dbfbbf3b0440e 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
@@ -61,10 +62,13 @@ public void testWriteReadCommittedLogMetadataFile() throws Exception {
long startOffset = 0;
for (int i = 0; i < 100; i++) {
long endOffset = startOffset + 100L;
+ CustomMetadata customMetadata = new CustomMetadata(new byte[]{(byte) i});
remoteLogSegmentMetadatas.add(
new RemoteLogSegmentMetadataSnapshot(Uuid.randomUuid(), startOffset, endOffset,
System.currentTimeMillis(), 1, 100, 1024,
- RemoteLogSegmentState.COPY_SEGMENT_FINISHED, Collections.singletonMap(i, startOffset)));
+ Optional.of(customMetadata),
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED, Collections.singletonMap(i, startOffset)
+ ));
startOffset = endOffset + 1;
}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
index 87e7683329332..504f47e17a584 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTransformTest.java
@@ -27,6 +27,7 @@
import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemotePartitionDeleteMetadataTransform;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
@@ -35,6 +36,7 @@
import org.junit.jupiter.api.Test;
import java.util.Collections;
+import java.util.Optional;
public class RemoteLogMetadataTransformTest {
private static final TopicIdPartition TP0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
@@ -58,6 +60,7 @@ public void testRemoteLogSegmentMetadataUpdateTransform() {
RemoteLogSegmentMetadataUpdate metadataUpdate =
new RemoteLogSegmentMetadataUpdate(new RemoteLogSegmentId(TP0, Uuid.randomUuid()), time.milliseconds(),
+ Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, 1);
ApiMessageAndVersion apiMessageAndVersion = metadataUpdateTransform.toApiMessageAndVersion(metadataUpdate);
RemoteLogSegmentMetadataUpdate metadataUpdateFromRecord = metadataUpdateTransform.fromApiMessageAndVersion(apiMessageAndVersion);
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
index b847e7cba3fb9..6928c6e281f62 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
@@ -92,7 +92,8 @@ public void testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager remot
});
RemoteLogSegmentMetadataUpdate segment0Update = new RemoteLogSegmentMetadataUpdate(
- segment0Id, time.milliseconds(), RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
+ segment0Id, time.milliseconds(), Optional.empty(),
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED, BROKER_ID_1);
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segment0Update);
RemoteLogSegmentMetadata expectedSegment0Metadata = segment0Metadata.createWithUpdates(segment0Update);
@@ -167,6 +168,7 @@ public void testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager remot
remoteLogSegmentLifecycleManager
.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
time.milliseconds(),
+ Optional.empty(),
RemoteLogSegmentState.DELETE_SEGMENT_STARTED,
BROKER_ID_1));
Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 10).isPresent());
@@ -176,6 +178,7 @@ public void testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager remot
remoteLogSegmentLifecycleManager
.updateRemoteLogSegmentMetadata(new RemoteLogSegmentMetadataUpdate(expectedSegment0Metadata.remoteLogSegmentId(),
time.milliseconds(),
+ Optional.empty(),
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED,
BROKER_ID_1));
Assertions.assertFalse(remoteLogSegmentLifecycleManager.remoteLogSegmentMetadata(0, 10).isPresent());
@@ -218,7 +221,9 @@ private RemoteLogSegmentMetadata createSegmentUpdateWithState(RemoteLogSegmentLi
time.milliseconds(), SEG_SIZE, segmentLeaderEpochs);
remoteLogSegmentLifecycleManager.addRemoteLogSegmentMetadata(segmentMetadata);
- RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(), state, BROKER_ID_1);
+ RemoteLogSegmentMetadataUpdate segMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentId, time.milliseconds(),
+ Optional.empty(),
+ state, BROKER_ID_1);
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segMetadataUpdate);
return segmentMetadata.createWithUpdates(segMetadataUpdate);
@@ -367,6 +372,7 @@ public void testCacheSegmentsWithDeleteSegmentFinishedState(RemoteLogSegmentLife
RemoteLogSegmentMetadataUpdate segmentMetadataUpdate = new RemoteLogSegmentMetadataUpdate(segmentMetadata.remoteLogSegmentId(),
time.milliseconds(),
+ Optional.empty(),
RemoteLogSegmentState.DELETE_SEGMENT_FINISHED,
BROKER_ID_1);
remoteLogSegmentLifecycleManager.updateRemoteLogSegmentMetadata(segmentMetadataUpdate);
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
new file mode 100644
index 0000000000000..17d424249ff3b
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogSegmentMetadataSnapshot;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata.CustomMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class RemoteLogSegmentMetadataSnapshotTransformTest {
+ @ParameterizedTest
+ @MethodSource("parameters")
+ void testToAndFromMessage(Optional customMetadata) {
+ Map segmentLeaderEpochs = new HashMap<>();
+ segmentLeaderEpochs.put(0, 0L);
+ RemoteLogSegmentMetadataSnapshot snapshot = new RemoteLogSegmentMetadataSnapshot(
+ Uuid.randomUuid(),
+ 0L, 100L, -1L, 0, 0, 1234,
+ customMetadata,
+ RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
+ segmentLeaderEpochs
+ );
+
+ RemoteLogSegmentMetadataSnapshotTransform transform = new RemoteLogSegmentMetadataSnapshotTransform();
+ ApiMessageAndVersion message = transform.toApiMessageAndVersion(snapshot);
+ assertEquals(snapshot, transform.fromApiMessageAndVersion(message));
+ }
+
+ private static Stream