Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3stream): introduce tracing to s3stream #866

Merged
merged 4 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.1.3-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.9.0-SNAPSHOT</s3stream.version>
<s3stream.version>0.10.0-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down Expand Up @@ -134,7 +134,7 @@
<version>${s3stream.version}</version>
<exclusions>
<exclusion>
<groupId>io.opentelemetry.instrumentation</groupId>
<groupId>io.opentelemetry.*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
Expand Down
52 changes: 48 additions & 4 deletions s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.9.0-SNAPSHOT</version>
<version>0.10.0-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand All @@ -34,7 +34,8 @@
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<alapha.opentelemetry.version>1.32.0-alpha</alapha.opentelemetry.version>
<opentelemetry.version>1.32.0</opentelemetry.version>
<aspectj.version>1.9.20.1</aspectj.version>
</properties>
<dependencies>
<dependency>
Expand Down Expand Up @@ -113,10 +114,25 @@
<artifactId>jackson-databind</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-runtime-telemetry-java8</artifactId>
<version>${alapha.opentelemetry.version}</version>
<artifactId>opentelemetry-instrumentation-annotations</artifactId>
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
<version>${aspectj.version}</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>${aspectj.version}</version>
</dependency>
</dependencies>

Expand Down Expand Up @@ -239,6 +255,34 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>dev.aspectj</groupId>
<artifactId>aspectj-maven-plugin</artifactId>
<version>1.13.1</version>
<dependencies>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjtools</artifactId>
<version>${aspectj.version}</version>
</dependency>
</dependencies>
<configuration>
<complianceLevel>${maven.compiler.target}</complianceLevel>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<showWeaveInfo>true</showWeaveInfo>
<verbose>true</verbose>
<Xlint>ignore</Xlint>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<distributionManagement>
Expand Down
12 changes: 9 additions & 3 deletions s3stream/src/main/java/com/automq/stream/api/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package com.automq.stream.api;

import com.automq.stream.api.exceptions.StreamClientException;
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;

import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -54,7 +56,11 @@ public interface Stream {
* @return - complete success with async {@link AppendResult}, when append success.
* - complete exception with {@link StreamClientException}, when append fail. TODO: specify the exception.
*/
CompletableFuture<AppendResult> append(RecordBatch recordBatch);
CompletableFuture<AppendResult> append(AppendContext context, RecordBatch recordBatch);

default CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
return append(AppendContext.DEFAULT, recordBatch);
}

/**
* Fetch recordBatch list from stream. Note the startOffset may be in the middle in the first recordBatch.
Expand All @@ -67,10 +73,10 @@ public interface Stream {
* @return - complete success with {@link FetchResult}, when fetch success.
* - complete exception with {@link StreamClientException}, when startOffset is bigger than stream end offset.
*/
CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytesHint, ReadOptions readOptions);
CompletableFuture<FetchResult> fetch(FetchContext context, long startOffset, long endOffset, int maxBytesHint);

default CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytesHint) {
return fetch(startOffset, endOffset, maxBytesHint, ReadOptions.DEFAULT);
return fetch(FetchContext.DEFAULT, startOffset, endOffset, maxBytesHint);
}

/**
Expand Down
41 changes: 28 additions & 13 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package com.automq.stream.s3;

import com.automq.stream.api.ReadOptions;
import com.automq.stream.api.exceptions.FastReadFailFastException;
import com.automq.stream.s3.cache.CacheAccessType;
import com.automq.stream.s3.cache.LogCache;
import com.automq.stream.s3.cache.ReadDataBlock;
import com.automq.stream.s3.cache.S3BlockCache;
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;
import com.automq.stream.s3.metadata.StreamMetadata;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
Expand All @@ -32,6 +33,7 @@
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.s3.streams.StreamManager;
import com.automq.stream.s3.trace.context.TraceContext;
import com.automq.stream.s3.wal.WriteAheadLog;
import com.automq.stream.utils.FutureTicker;
import com.automq.stream.utils.FutureUtil;
Expand All @@ -40,6 +42,8 @@
import io.netty.buffer.ByteBuf;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.opentelemetry.instrumentation.annotations.SpanAttribute;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -259,14 +263,15 @@ public void shutdown() {


@Override
public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
@WithSpan
public CompletableFuture<Void> append(AppendContext context, StreamRecordBatch streamRecord) {
TimerUtil timerUtil = new TimerUtil();
CompletableFuture<Void> cf = new CompletableFuture<>();
// encoded before append to free heap ByteBuf.
streamRecord.encoded();
WalWriteRequest writeRequest = new WalWriteRequest(streamRecord, -1L, cf);
WalWriteRequest writeRequest = new WalWriteRequest(streamRecord, -1L, cf, context);
handleAppendRequest(writeRequest);
append0(writeRequest, false);
append0(context, writeRequest, false);
cf.whenComplete((nil, ex) -> {
streamRecord.release();
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STORAGE);
Expand All @@ -279,7 +284,7 @@ public CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
*
* @return backoff status.
*/
public boolean append0(WalWriteRequest request, boolean fromBackoff) {
public boolean append0(AppendContext context, WalWriteRequest request, boolean fromBackoff) {
// TODO: storage status check, fast fail the request when storage closed.
if (!fromBackoff && !backoffRecords.isEmpty()) {
backoffRecords.offer(request);
Expand All @@ -304,7 +309,7 @@ public boolean append0(WalWriteRequest request, boolean fromBackoff) {
Lock lock = confirmOffsetCalculator.addLock();
lock.lock();
try {
appendResult = deltaWAL.append(streamRecord.encoded());
appendResult = deltaWAL.append(new TraceContext(context), streamRecord.encoded());
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -344,7 +349,7 @@ private void tryDrainBackoffRecords() {
if (request == null) {
break;
}
if (append0(request, true)) {
if (append0(request.context, request, true)) {
LOGGER.warn("try drain backoff record fail, still backoff");
break;
}
Expand All @@ -356,20 +361,30 @@ private void tryDrainBackoffRecords() {
}

@Override
public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, long endOffset, int maxBytes, ReadOptions readOptions) {
@WithSpan
public CompletableFuture<ReadDataBlock> read(FetchContext context,
@SpanAttribute long streamId,
@SpanAttribute long startOffset,
@SpanAttribute long endOffset,
@SpanAttribute int maxBytes) {
TimerUtil timerUtil = new TimerUtil();
CompletableFuture<ReadDataBlock> cf = new CompletableFuture<>();
FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes, readOptions), cf);
FutureUtil.propagate(read0(context, streamId, startOffset, endOffset, maxBytes), cf);
cf.whenComplete((nil, ex) -> S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.READ_STORAGE));
return cf;
}

private CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, long endOffset, int maxBytes, ReadOptions readOptions) {
List<StreamRecordBatch> logCacheRecords = deltaWALCache.get(streamId, startOffset, endOffset, maxBytes);
@WithSpan
private CompletableFuture<ReadDataBlock> read0(FetchContext context,
@SpanAttribute long streamId,
@SpanAttribute long startOffset,
@SpanAttribute long endOffset,
@SpanAttribute int maxBytes) {
List<StreamRecordBatch> logCacheRecords = deltaWALCache.get(context, streamId, startOffset, endOffset, maxBytes);
if (!logCacheRecords.isEmpty() && logCacheRecords.get(0).getBaseOffset() <= startOffset) {
return CompletableFuture.completedFuture(new ReadDataBlock(logCacheRecords, CacheAccessType.DELTA_WAL_CACHE_HIT));
}
if (readOptions.fastRead()) {
if (context.readOptions().fastRead()) {
// fast read fail fast when need read from block cache.
logCacheRecords.forEach(StreamRecordBatch::release);
logCacheRecords.clear();
Expand All @@ -380,7 +395,7 @@ private CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset,
}
Timeout timeout = timeoutDetect.newTimeout(t -> LOGGER.warn("read from block cache timeout, stream={}, {}, maxBytes: {}", streamId, startOffset, maxBytes), 1, TimeUnit.MINUTES);
long finalEndOffset = endOffset;
return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(blockCacheRst -> {
return blockCache.read(context, streamId, startOffset, endOffset, maxBytes).thenApply(blockCacheRst -> {
List<StreamRecordBatch> rst = new ArrayList<>(blockCacheRst.getRecords());
int remainingBytesSize = maxBytes - rst.stream().mapToInt(StreamRecordBatch::size).sum();
int readIndex = -1;
Expand Down
30 changes: 20 additions & 10 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import com.automq.stream.RecordBatchWithContextWrapper;
import com.automq.stream.api.AppendResult;
import com.automq.stream.api.FetchResult;
import com.automq.stream.api.ReadOptions;
import com.automq.stream.api.RecordBatch;
import com.automq.stream.api.RecordBatchWithContext;
import com.automq.stream.api.Stream;
import com.automq.stream.api.exceptions.ErrorCode;
import com.automq.stream.api.exceptions.FastReadFailFastException;
import com.automq.stream.api.exceptions.StreamClientException;
import com.automq.stream.s3.cache.CacheAccessType;
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
Expand All @@ -38,6 +39,8 @@
import com.automq.stream.utils.FutureUtil;
import com.automq.stream.utils.GlobalSwitch;
import io.netty.buffer.Unpooled;
import io.opentelemetry.instrumentation.annotations.SpanAttribute;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -139,7 +142,8 @@ public long nextOffset() {
}

@Override
public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
@WithSpan
public CompletableFuture<AppendResult> append(AppendContext context, RecordBatch recordBatch) {
TimerUtil timerUtil = new TimerUtil();
writeLock.lock();
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.APPEND_STREAM_WRITE_LOCK);
Expand All @@ -148,7 +152,7 @@ public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
if (networkInboundLimiter != null) {
networkInboundLimiter.forceConsume(recordBatch.rawPayload().remaining());
}
return append0(recordBatch);
return append0(context, recordBatch);
}, LOGGER, "append");
pendingAppends.add(cf);
cf.whenComplete((nil, ex) -> {
Expand All @@ -161,13 +165,14 @@ public CompletableFuture<AppendResult> append(RecordBatch recordBatch) {
}
}

private CompletableFuture<AppendResult> append0(RecordBatch recordBatch) {
@WithSpan
private CompletableFuture<AppendResult> append0(AppendContext context, RecordBatch recordBatch) {
if (!status.isWritable()) {
return FutureUtil.failedFuture(new StreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + " stream is not writable"));
}
long offset = nextOffset.getAndAdd(recordBatch.count());
StreamRecordBatch streamRecordBatch = new StreamRecordBatch(streamId, epoch, offset, recordBatch.count(), Unpooled.wrappedBuffer(recordBatch.rawPayload()));
CompletableFuture<AppendResult> cf = storage.append(streamRecordBatch).thenApply(nil -> {
CompletableFuture<AppendResult> cf = storage.append(context, streamRecordBatch).thenApply(nil -> {
updateConfirmOffset(offset + recordBatch.count());
return new DefaultAppendResult(offset);
});
Expand All @@ -186,12 +191,16 @@ private CompletableFuture<AppendResult> append0(RecordBatch recordBatch) {
}

@Override
public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, int maxBytes, ReadOptions readOptions) {
@WithSpan
public CompletableFuture<FetchResult> fetch(FetchContext context,
@SpanAttribute long startOffset,
@SpanAttribute long endOffset,
@SpanAttribute int maxBytes) {
TimerUtil timerUtil = new TimerUtil();
readLock.lock();
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM_READ_LOCK);
try {
CompletableFuture<FetchResult> cf = exec(() -> fetch0(startOffset, endOffset, maxBytes, readOptions), LOGGER, "fetch");
CompletableFuture<FetchResult> cf = exec(() -> fetch0(context, startOffset, endOffset, maxBytes), LOGGER, "fetch");
pendingFetches.add(cf);
cf.whenComplete((rs, ex) -> {
S3StreamMetricsManager.recordOperationLatency(timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.FETCH_STREAM);
Expand All @@ -216,7 +225,8 @@ public CompletableFuture<FetchResult> fetch(long startOffset, long endOffset, in
}
}

private CompletableFuture<FetchResult> fetch0(long startOffset, long endOffset, int maxBytes, ReadOptions readOptions) {
@WithSpan
private CompletableFuture<FetchResult> fetch0(FetchContext context, long startOffset, long endOffset, int maxBytes) {
if (!status.isReadable()) {
return FutureUtil.failedFuture(new StreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + " stream is already closed"));
}
Expand All @@ -237,12 +247,12 @@ private CompletableFuture<FetchResult> fetch0(long startOffset, long endOffset,
if (startOffset == endOffset) {
return CompletableFuture.completedFuture(new DefaultFetchResult(Collections.emptyList(), CacheAccessType.DELTA_WAL_CACHE_HIT, false));
}
return storage.read(streamId, startOffset, endOffset, maxBytes, readOptions).thenApply(dataBlock -> {
return storage.read(context, streamId, startOffset, endOffset, maxBytes).thenApply(dataBlock -> {
List<StreamRecordBatch> records = dataBlock.getRecords();
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("{} stream fetch, startOffset: {}, endOffset: {}, maxBytes: {}, records: {}", logIdent, startOffset, endOffset, maxBytes, records.size());
}
return new DefaultFetchResult(records, dataBlock.getCacheAccessType(), readOptions.pooledBuf());
return new DefaultFetchResult(records, dataBlock.getCacheAccessType(), context.readOptions().pooledBuf());
});
}

Expand Down
15 changes: 12 additions & 3 deletions s3stream/src/main/java/com/automq/stream/s3/Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

package com.automq.stream.s3;

import com.automq.stream.api.ReadOptions;
import com.automq.stream.s3.cache.ReadDataBlock;
import com.automq.stream.s3.context.AppendContext;
import com.automq.stream.s3.context.FetchContext;
import com.automq.stream.s3.model.StreamRecordBatch;

import java.util.concurrent.CompletableFuture;
Expand All @@ -37,9 +38,17 @@ public interface Storage {
*
* @param streamRecord {@link StreamRecordBatch}
*/
CompletableFuture<Void> append(StreamRecordBatch streamRecord);
CompletableFuture<Void> append(AppendContext context, StreamRecordBatch streamRecord);

CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, long endOffset, int maxBytes, ReadOptions readOptions);
default CompletableFuture<Void> append(StreamRecordBatch streamRecord) {
return append(AppendContext.DEFAULT, streamRecord);
}

CompletableFuture<ReadDataBlock> read(FetchContext context, long streamId, long startOffset, long endOffset, int maxBytes);

default CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, long endOffset, int maxBytes) {
return read(FetchContext.DEFAULT, streamId, startOffset, endOffset, maxBytes);
}

/**
* Force stream record in WAL upload to s3
Expand Down
Loading
Loading