Skip to content

Commit 51dd56c

Browse files
committed
make fetchBlobAsync return future
Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
1 parent e9af8db commit 51dd56c

File tree

2 files changed

+8
-5
lines changed

2 files changed

+8
-5
lines changed

server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio
113113
}
114114

115115
@ExperimentalApi
116-
public void fetchBlobAsync(BlobFetchRequest blobFetchRequest) throws IOException {
116+
public CompletableFuture<IndexInput> fetchBlobAsync(BlobFetchRequest blobFetchRequest) throws IOException {
117117
final Path key = blobFetchRequest.getFilePath();
118118
logger.trace("Asynchronous fetchBlob called for {}", key.toString());
119119
try {
@@ -133,7 +133,7 @@ public void fetchBlobAsync(BlobFetchRequest blobFetchRequest) throws IOException
133133
// decrement this reference _after_ creating the clone to be returned.
134134
// Making sure remote recovery thread-pool take care of background download
135135
try {
136-
cacheEntry.asyncLoadIndexInput(threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY));
136+
return cacheEntry.asyncLoadIndexInput(threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY));
137137
} catch (Exception exception) {
138138
fileCache.decRef(key);
139139
throw exception;

server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.ArrayList;
2929
import java.util.List;
3030
import java.util.Optional;
31+
import java.util.concurrent.CompletableFuture;
3132
import java.util.concurrent.CountDownLatch;
3233
import java.util.concurrent.ExecutorService;
3334
import java.util.concurrent.Executors;
@@ -313,14 +314,16 @@ public void testAsyncFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Excep
313314
executor.shutdownNow();
314315
}
315316

316-
public void testRefCount() throws IOException, InterruptedException {
317+
public void testRefCount() throws Exception {
317318
List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
318319
String blobname = "test-blob";
319320
blobParts.add(new BlobFetchRequest.BlobPart("blob", 0, EIGHT_MB));
320321
BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder().fileName(blobname).directory(directory).blobParts(blobParts).build();
321322
// It will trigger async load
322-
transferManager.fetchBlobAsync(blobFetchRequest);
323-
fileCache.get(blobFetchRequest.getFilePath());
323+
CompletableFuture<IndexInput> future = transferManager.fetchBlobAsync(blobFetchRequest);
324+
future.get();
325+
assertEquals(Optional.of(0), Optional.of(fileCache.getRef(blobFetchRequest.getFilePath())));
326+
assertNotNull(fileCache.get(blobFetchRequest.getFilePath()));
324327
fileCache.decRef(blobFetchRequest.getFilePath());
325328
// Making the read call to fetch from file cache
326329
IndexInput indexInput = transferManager.fetchBlob(blobFetchRequest);

0 commit comments

Comments
 (0)