Skip to content

Commit

Permalink
fix(s3stream): add missing inflight read key on async read ahead (#829)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <niesc@automq.com>
  • Loading branch information
SCNieh authored Dec 13, 2023
1 parent 8c6585d commit 72d34d8
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset, lo
}

public CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, long endOffset, int maxBytes, ReadAheadAgent agent, UUID uuid) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[S3BlockCache] read0, stream={}, {}-{}, total bytes: {}, uuid: {} ", streamId, startOffset, endOffset, maxBytes, uuid);
}

if (startOffset >= endOffset || maxBytes <= 0) {
return CompletableFuture.completedFuture(new ReadDataBlock(Collections.emptyList(), CacheAccessType.BLOCK_CACHE_MISS));
}
Expand All @@ -124,7 +128,8 @@ public CompletableFuture<ReadDataBlock> read0(long streamId, long startOffset, l
CompletableFuture<Void> inflightReadAheadTask = inflightReadAheadTasks.get(new ReadAheadTaskKey(streamId, nextStartOffset));
if (inflightReadAheadTask != null) {
CompletableFuture<ReadDataBlock> readCf = new CompletableFuture<>();
inflightReadAheadTask.whenComplete((nil, ex) -> FutureUtil.propagate(read0(streamId, startOffset, endOffset, maxBytes, agent, uuid), readCf));
inflightReadAheadTask.whenComplete((nil, ex) -> FutureUtil.exec(() -> FutureUtil.propagate(
read0(streamId, startOffset, endOffset, maxBytes, agent, uuid), readCf), readCf, LOGGER, "read0"));
return readCf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ public void asyncReadAhead(long streamId, long startOffset, long endOffset, int
DefaultS3BlockCache.ReadAheadTaskKey readAheadTaskKey = new DefaultS3BlockCache.ReadAheadTaskKey(streamId, startOffset);
// put a placeholder task at start offset to prevent next cache miss request spawn duplicated read ahead task
inflightReadAheadTaskMap.putIfAbsent(readAheadTaskKey, new CompletableFuture<>());
context.taskKeySet.add(readAheadTaskKey);
getDataBlockIndices(streamId, endOffset, context)
.thenAcceptAsync(v ->
handleAsyncReadAhead(streamId, startOffset, endOffset, maxBytes, agent, timer, context), streamReaderExecutor)
Expand Down

0 comments on commit 72d34d8

Please sign in to comment.