diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java index 8f320a3ae..7f1e5dede 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/DefaultS3BlockCache.java @@ -114,6 +114,10 @@ public CompletableFuture read(long streamId, long startOffset, lo } public CompletableFuture read0(long streamId, long startOffset, long endOffset, int maxBytes, ReadAheadAgent agent, UUID uuid) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("[S3BlockCache] read0, stream={}, {}-{}, total bytes: {}, uuid: {} ", streamId, startOffset, endOffset, maxBytes, uuid); + } + if (startOffset >= endOffset || maxBytes <= 0) { return CompletableFuture.completedFuture(new ReadDataBlock(Collections.emptyList(), CacheAccessType.BLOCK_CACHE_MISS)); } @@ -124,7 +128,8 @@ public CompletableFuture read0(long streamId, long startOffset, l CompletableFuture inflightReadAheadTask = inflightReadAheadTasks.get(new ReadAheadTaskKey(streamId, nextStartOffset)); if (inflightReadAheadTask != null) { CompletableFuture readCf = new CompletableFuture<>(); - inflightReadAheadTask.whenComplete((nil, ex) -> FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes, agent, uuid), readCf)); + inflightReadAheadTask.whenComplete((nil, ex) -> FutureUtil.exec(() -> FutureUtil.propagate( + read0(streamId, startOffset, endOffset, maxBytes, agent, uuid), readCf), readCf, LOGGER, "read0")); return readCf; } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java index 13ebf917f..013868f65 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/StreamReader.java @@ -260,6 +260,7 @@ public void asyncReadAhead(long streamId, long startOffset, long endOffset, int DefaultS3BlockCache.ReadAheadTaskKey readAheadTaskKey = new DefaultS3BlockCache.ReadAheadTaskKey(streamId, startOffset); // put a placeholder task at start offset to prevent next cache miss request spawn duplicated read ahead task inflightReadAheadTaskMap.putIfAbsent(readAheadTaskKey, new CompletableFuture<>()); + context.taskKeySet.add(readAheadTaskKey); getDataBlockIndices(streamId, endOffset, context) .thenAcceptAsync(v -> handleAsyncReadAhead(streamId, startOffset, endOffset, maxBytes, agent, timer, context), streamReaderExecutor)