Skip to content

Commit

Permalink
fix(s3stream): fix timeout detect
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
  • Loading branch information
superhx committed Dec 5, 2023
1 parent 14caaef commit ece0ec2
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.1.3-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.6.7-SNAPSHOT</s3stream.version>
<s3stream.version>0.6.8-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.6.7-SNAPSHOT</version>
<version>0.6.8-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
4 changes: 2 additions & 2 deletions s3stream/src/main/java/com/automq/stream/s3/S3Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ private CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset,
if (!logCacheRecords.isEmpty()) {
endOffset = logCacheRecords.get(0).getBaseOffset();
}
Timeout timeout = timeoutDetect.newTimeout(t -> LOGGER.error("read from block cache timeout, stream={}, {}, maxBytes: {}", streamId, startOffset, maxBytes), 1, TimeUnit.MINUTES);
Timeout timeout = timeoutDetect.newTimeout(t -> LOGGER.warn("read from block cache timeout, stream={}, {}, maxBytes: {}", streamId, startOffset, maxBytes), 1, TimeUnit.MINUTES);
return blockCache.read(streamId, startOffset, endOffset, maxBytes).thenApply(readDataBlock -> {
List<StreamRecordBatch> rst = new ArrayList<>(readDataBlock.getRecords());
int remainingBytesSize = maxBytes - rst.stream().mapToInt(StreamRecordBatch::size).sum();
Expand All @@ -378,11 +378,11 @@ private CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset,
continuousCheck(rst);
return new ReadDataBlock(rst, readDataBlock.getCacheAccessType());
}).whenComplete((rst, ex) -> {
timeout.cancel();
if (ex != null) {
LOGGER.error("read from block cache failed, stream={}, {}-{}, maxBytes: {}",
streamId, startOffset, maxBytes, ex);
logCacheRecords.forEach(StreamRecordBatch::release);
timeout.cancel();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ public CompletableFuture<ByteBuf> rangeRead(String path, long start, long end, T
}

Timeout timeout = timeoutDetect.newTimeout((t) -> LOGGER.warn("rangeRead {} {}-{} timeout", path, start, end), 1, TimeUnit.MINUTES);
cf.whenComplete((rst, ex) -> timeout.cancel());
return cf;
return cf.whenComplete((rst, ex) -> timeout.cancel());
}

private void rangeRead0(String path, long start, long end, CompletableFuture<ByteBuf> cf) {
Expand Down

0 comments on commit ece0ec2

Please sign in to comment.