Skip to content

Commit e22b412

Browse files
Fix flakey test testOverflowDisabledAsynchronous (#18891)
Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
1 parent 84b46d7 commit e22b412

File tree

2 files changed

+6
-7
lines changed

2 files changed

+6
-7
lines changed

server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio
113113
}
114114

115115
@ExperimentalApi
116-
public void fetchBlobAsync(BlobFetchRequest blobFetchRequest) throws IOException {
116+
public CompletableFuture<IndexInput> fetchBlobAsync(BlobFetchRequest blobFetchRequest) throws IOException {
117117
final Path key = blobFetchRequest.getFilePath();
118118
logger.trace("Asynchronous fetchBlob called for {}", key.toString());
119119
try {
@@ -133,7 +133,7 @@ public void fetchBlobAsync(BlobFetchRequest blobFetchRequest) throws IOException
133133
// decrement this reference _after_ creating the clone to be returned.
134134
// Making sure remote recovery thread-pool take care of background download
135135
try {
136-
cacheEntry.asyncLoadIndexInput(threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY));
136+
return cacheEntry.asyncLoadIndexInput(threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY));
137137
} catch (Exception exception) {
138138
fileCache.decRef(key);
139139
throw exception;

server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.ArrayList;
2929
import java.util.List;
3030
import java.util.Optional;
31+
import java.util.concurrent.CompletableFuture;
3132
import java.util.concurrent.CountDownLatch;
3233
import java.util.concurrent.ExecutorService;
3334
import java.util.concurrent.Executors;
@@ -313,14 +314,14 @@ public void testAsyncFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Excep
313314
executor.shutdownNow();
314315
}
315316

316-
public void testRefCount() throws IOException, InterruptedException {
317+
public void testRefCount() throws Exception {
317318
List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
318319
String blobname = "test-blob";
319320
blobParts.add(new BlobFetchRequest.BlobPart("blob", 0, EIGHT_MB));
320321
BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder().fileName(blobname).directory(directory).blobParts(blobParts).build();
321322
// It will trigger async load
322-
transferManager.fetchBlobAsync(blobFetchRequest);
323-
Thread.sleep(2000);
323+
CompletableFuture<IndexInput> future = transferManager.fetchBlobAsync(blobFetchRequest);
324+
future.get();
324325
assertEquals(Optional.of(0), Optional.of(fileCache.getRef(blobFetchRequest.getFilePath())));
325326
assertNotNull(fileCache.get(blobFetchRequest.getFilePath()));
326327
fileCache.decRef(blobFetchRequest.getFilePath());
@@ -347,8 +348,6 @@ private IndexInput asyncFetchBlobWithName(String blobname) throws IOException {
347348
BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder().fileName(blobname).directory(directory).blobParts(blobParts).build();
348349
// It will trigger async load
349350
transferManager.fetchBlobAsync(blobFetchRequest);
350-
assertNotNull(fileCache.get(blobFetchRequest.getFilePath()));
351-
fileCache.decRef(blobFetchRequest.getFilePath());
352351
// Making the read call to fetch from file cache
353352
return transferManager.fetchBlob(blobFetchRequest);
354353
}

0 commit comments

Comments
 (0)