Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metadata): add missing tags for the old stream #1346

Merged
merged 1 commit into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<suppress checks="ClassFanOutComplexity" files="RemoteLogManagerTest.java"/>
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>
<suppress checks="ClassDataAbstractionCoupling" files="ControllerStreamManager.java"/>

<!-- server tests -->
<suppress checks="MethodLength|JavaNCSS|NPath" files="DescribeTopicPartitionsRequestHandlerTest.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"broker"
],
"name": "OpenStreamsRequest",
"validVersions": "0",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{
Expand Down Expand Up @@ -55,7 +55,14 @@
"versions": "0+",
"entityType": "streamEpoch",
"about": "The epoch of the requesting stream in the requesting broker"
}
},
{ "name": "Tags", "type": "[]Tag", "versions": "1+",
"about": "The stream tags.", "fields": [
{ "name": "Key", "type": "string", "versions": "1+", "mapKey": true,
"about": "The tag key." },
{ "name": "Value", "type": "string", "versions": "1+",
"about": "The tag value." }
]}
]
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"apiKey": 502,
"type": "response",
"name": "OpenStreamsResponse",
"validVersions": "0",
"validVersions": "0-1",
"flexibleVersions": "0+",
"fields": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,15 @@ public Builder toRequestBuilder() {
}

@Override
public CompletableFuture<StreamMetadata> openStream(long streamId, long epoch) {
public CompletableFuture<StreamMetadata> openStream(long streamId, long epoch, Map<String, String> tags) {
OpenStreamRequest request = new OpenStreamRequest()
.setStreamId(streamId)
.setStreamEpoch(epoch);
if (version.get().isStreamTagsSupported() && tags != null && !tags.isEmpty()) {
OpenStreamsRequestData.TagCollection tagCollection = new OpenStreamsRequestData.TagCollection();
tags.forEach((k, v) -> tagCollection.add(new OpenStreamsRequestData.Tag().setKey(k).setValue(v)));
request.setTags(tagCollection);
}
WrapRequest req = new BatchRequest() {
@Override
public Builder addSubRequest(Builder builder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ object ElasticLog extends Logging {
val metaStreamId = Unpooled.wrappedBuffer(value.get()).readLong()
awaitStreamReadyForOpen(openStreamChecker, topicId.get, topicPartition.partition(), metaStreamId, leaderEpoch, logIdent = logIdent)
// open partition meta stream
val stream = client.streamClient().openStream(metaStreamId, OpenStreamOptions.builder().epoch(leaderEpoch).build())
val stream = client.streamClient().openStream(metaStreamId, OpenStreamOptions.builder().epoch(leaderEpoch).tags(streamTags).build())
.thenApply(stream => new MetaStream(stream, META_SCHEDULE_EXECUTOR, logIdent))
.get()
info(s"${logIdent}opened existing meta stream: streamId==$metaStreamId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public LazyStream(String name, long streamId, StreamClient client, int replicaCo
if (streamId != NOOP_STREAM_ID) {
try {
// open exist stream
inner = client.openStream(streamId, OpenStreamOptions.builder().epoch(epoch).build()).get();
inner = client.openStream(streamId, OpenStreamOptions.builder().epoch(epoch).tags(tags).build()).get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class S3StreamMetadata {
*/
private final TimelineLong startOffset;
private final TimelineObject<StreamState> currentState;
private final Map<String, String> tags;
private Map<String, String> tags;
private final TimelineHashMap<Integer/*rangeIndex*/, RangeMetadata> ranges;
private final TimelineHashMap<Long/*objectId*/, S3StreamObject> streamObjects;

Expand Down Expand Up @@ -108,6 +108,10 @@ public Map<String, String> tags() {
return tags;
}

public void setTags(Map<String, String> tags) {
this.tags = tags;
}

public Map<Integer, RangeMetadata> ranges() {
return ranges;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,20 @@ public ControllerResult<OpenStreamResponse> openStream(int nodeId, long nodeEpoc
List<ApiMessageAndVersion> records = new ArrayList<>();
int newRangeIndex = streamMetadata.currentRangeIndex() + 1;
// stream update record
records.add(new ApiMessageAndVersion(new S3StreamRecord()
AutoMQVersion autoMQVersion = featureControlManager.autoMQVersion();
S3StreamRecord s3StreamRecord = new S3StreamRecord()
.setStreamId(streamId)
.setEpoch(epoch)
.setRangeIndex(newRangeIndex)
.setStartOffset(streamMetadata.startOffset())
.setStreamState(StreamState.OPENED.toByte()), version.streamRecordVersion()));
.setStreamState(StreamState.OPENED.toByte());
if (request.tags().size() > 0 && autoMQVersion.isStreamTagsSupported()) {
// Compatible with the stream created in the old version, add missing tags for the stream.
S3StreamRecord.TagCollection tags = new S3StreamRecord.TagCollection();
request.tags().forEach(tag -> tags.add(new S3StreamRecord.Tag().setKey(tag.key()).setValue(tag.value())));
s3StreamRecord.setTags(tags);
}
records.add(new ApiMessageAndVersion(s3StreamRecord, autoMQVersion.streamRecordVersion()));
// get new range's start offset
// default regard this range is the first range in stream, use 0 as start offset
long startOffset = 0;
Expand Down Expand Up @@ -1140,6 +1148,11 @@ public void replay(S3StreamRecord record) {
streamMetadata.currentEpoch(record.epoch());
streamMetadata.currentRangeIndex(record.rangeIndex());
streamMetadata.currentState(StreamState.fromByte(record.streamState()));
if (streamMetadata.tags().isEmpty() && record.tags().size() > 0) {
Map<String, String> tags = new HashMap<>();
record.tags().forEach(tag -> tags.put(tag.key(), tag.value()));
streamMetadata.setTags(tags);
}
return;
}
Map<String, String> tags = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
public class CreateStreamOptions {
private int replicaCount;
private long epoch;
private Map<String, String> tags = new HashMap<>();
private final Map<String, String> tags = new HashMap<>();

private CreateStreamOptions() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
package com.automq.stream.api;

import com.automq.stream.utils.Arguments;
import java.util.HashMap;
import java.util.Map;

public class OpenStreamOptions {
private WriteMode writeMode = WriteMode.SINGLE;
private ReadMode readMode = ReadMode.MULTIPLE;
private long epoch;
private final Map<String, String> tags = new HashMap<>();

private OpenStreamOptions() {
}
Expand All @@ -37,6 +40,10 @@ public long epoch() {
return epoch;
}

public Map<String, String> tags() {
return tags;
}

public enum WriteMode {
SINGLE(0), MULTIPLE(1);

Expand Down Expand Up @@ -85,6 +92,16 @@ public Builder epoch(long epoch) {
return this;
}

public Builder tag(String key, String value) {
options.tags.put(key, value);
return this;
}

public Builder tags(Map<String, String> tags) {
options.tags.putAll(tags);
return this;
}

public OpenStreamOptions build() {
return options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public CompletableFuture<Stream> createAndOpenStream(CreateStreamOptions options
TimerUtil timerUtil = new TimerUtil();
return FutureUtil.exec(() -> streamManager.createStream(options.tags()).thenCompose(streamId -> {
StreamOperationStats.getInstance().createStreamLatency.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
return openStream0(streamId, options.epoch());
return openStream0(streamId, options.epoch(), options.tags());
}), LOGGER, "createAndOpenStream");
});
}
Expand All @@ -107,7 +107,7 @@ public CompletableFuture<Stream> createAndOpenStream(CreateStreamOptions options
public CompletableFuture<Stream> openStream(long streamId, OpenStreamOptions openStreamOptions) {
return runInLock(() -> {
checkState();
return FutureUtil.exec(() -> openStream0(streamId, openStreamOptions.epoch()), LOGGER, "openStream");
return FutureUtil.exec(() -> openStream0(streamId, openStreamOptions.epoch(), openStreamOptions.tags()), LOGGER, "openStream");
});
}

Expand All @@ -129,10 +129,10 @@ private void startStreamObjectsCompactions() {
}, 5, 5, TimeUnit.MINUTES);
}

private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
private CompletableFuture<Stream> openStream0(long streamId, long epoch, Map<String, String> tags) {
return runInLock(() -> {
TimerUtil timerUtil = new TimerUtil();
CompletableFuture<Stream> cf = streamManager.openStream(streamId, epoch).
CompletableFuture<Stream> cf = streamManager.openStream(streamId, epoch, tags).
thenApply(metadata -> {
StreamWrapper stream = new StreamWrapper(newStream(metadata));
runInLock(() -> openedStreams.put(streamId, stream));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ public synchronized CompletableFuture<StreamMetadata> openStream(long streamId,
return CompletableFuture.completedFuture(stream);
}

@Override
public CompletableFuture<StreamMetadata> openStream(long streamId, long epoch, Map<String, String> tags) {
return openStream(streamId, epoch);
}

@Override
public synchronized CompletableFuture<Void> trimStream(long streamId, long epoch, long newStartOffset) {
StreamMetadata stream = streams.get(streamId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ default CompletableFuture<Long> createStream() {
* @param epoch stream epoch.
* @return {@link StreamMetadata}
*/
CompletableFuture<StreamMetadata> openStream(long streamId, long epoch);
default CompletableFuture<StreamMetadata> openStream(long streamId, long epoch) {
return openStream(streamId, epoch, Collections.emptyMap());
}

CompletableFuture<StreamMetadata> openStream(long streamId, long epoch, Map<String, String> tags);

/**
* Trim stream to new start offset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -61,7 +62,7 @@ public void testShutdown_withOpeningStream() {
when(stream.streamId()).thenReturn(1L);

CompletableFuture<StreamMetadata> cf = new CompletableFuture<>();
when(streamManager.openStream(anyLong(), anyLong())).thenReturn(cf);
when(streamManager.openStream(anyLong(), anyLong(), anyMap())).thenReturn(cf);

doAnswer(args -> stream).when(client).newStream(any());

Expand Down
Loading