Skip to content

Commit a5fccbe

Browse files
authored
Fix fetch Blob Async Decrement Reference (#19942)
Signed-off-by: Harsh Kothari <techarsh@amazon.com>
1 parent bf0820f commit a5fccbe

File tree

2 files changed

+16
-0
lines changed

2 files changed

+16
-0
lines changed

server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ public IndexInput getIndexInput() throws IOException {
232232
@ExperimentalApi
233233
public CompletableFuture<IndexInput> asyncLoadIndexInput(Executor executor) {
234234
if (isClosed.get()) {
235+
fileCache.decRef(request.getFilePath());
235236
return CompletableFuture.failedFuture(new IllegalStateException("Already closed"));
236237
}
237238
if (isStarted.getAndSet(true) == false) {
@@ -252,6 +253,9 @@ public CompletableFuture<IndexInput> asyncLoadIndexInput(Executor executor) {
252253
}
253254
return null;
254255
});
256+
} else {
257+
// Decreasing the extra ref count introduced by compute
258+
fileCache.decRef(request.getFilePath());
255259
}
256260
return result;
257261
}

server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,18 @@ public void testRefCount() throws Exception {
324324
assertEquals(Optional.of(1), Optional.of(fileCache.getRef(blobFetchRequest.getFilePath())));
325325
}
326326

327+
public void testRefMultipleCount() throws Exception {
328+
List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
329+
String blobname = "test-blob";
330+
blobParts.add(new BlobFetchRequest.BlobPart("blob", 0, EIGHT_MB));
331+
BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder().fileName(blobname).directory(directory).blobParts(blobParts).build();
332+
transferManager.fetchBlob(blobFetchRequest);
333+
assertNotNull(fileCache.getRef(blobFetchRequest.getFilePath()));
334+
transferManager.fetchBlobAsync(blobFetchRequest).join();
335+
waitUntil(() -> fileCache.getRef(blobFetchRequest.getFilePath()) == 1, 10, TimeUnit.SECONDS);
336+
assertEquals(Optional.of(1), Optional.of(fileCache.getRef(blobFetchRequest.getFilePath())));
337+
}
338+
327339
protected abstract void initializeTransferManager() throws IOException;
328340

329341
protected abstract void mockExceptionWhileReading() throws IOException;

0 commit comments

Comments
 (0)