Skip to content

Commit bfe17b6

Browse files
author
Ajay Kumar Movva
committed
Addressing comments by fixing threadpool and ref counts
Signed-off-by: Ajay Kumar Movva <movvaam@amazon.com>
1 parent 673885b commit bfe17b6

File tree

8 files changed

+63
-44
lines changed

8 files changed

+63
-44
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4343
- Make node duress values cacheable ([#18649](https://github.com/opensearch-project/OpenSearch/pull/18649))
4444
- Change default value of remote_data_ratio, which is used in Searchable Snapshots and Writeable Warm from 0 to 5 and min allowed value to 1 ([#18767](https://github.com/opensearch-project/OpenSearch/pull/18767))
4545
- Making multi rate limiters in repository dynamic [#18069](https://github.com/opensearch-project/OpenSearch/pull/18069)
46+
- Adding changes to support async fetch blob for transfer manager [#18754](https://github.com/opensearch-project/OpenSearch/pull/18754)
4647

4748
### Dependencies
4849
- Bump `stefanzweifel/git-auto-commit-action` from 5 to 6 ([#18524](https://github.com/opensearch-project/OpenSearch/pull/18524))

server/src/main/java/org/opensearch/index/store/CompositeDirectory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ public CompositeDirectory(Directory localDirectory, Directory remoteDirectory, F
8080
CompositeDirectory.this.remoteDirectory.openBlockInput(name, position, length, IOContext.DEFAULT),
8181
length
8282
),
83-
fileCache
83+
fileCache,
84+
threadPool
8485
);
8586
}
8687

server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ private Future<RemoteSnapshotDirectory> createRemoteSnapshotDirectoryFromSnapsho
101101
assert indexShardSnapshot instanceof BlobStoreIndexShardSnapshot
102102
: "indexShardSnapshot should be an instance of BlobStoreIndexShardSnapshot";
103103
final BlobStoreIndexShardSnapshot snapshot = (BlobStoreIndexShardSnapshot) indexShardSnapshot;
104-
TransferManager transferManager = new TransferManager(blobContainer::readBlob, remoteStoreFileCache);
104+
TransferManager transferManager = new TransferManager(blobContainer::readBlob, remoteStoreFileCache, threadPool);
105105
return new RemoteSnapshotDirectory(snapshot, localStoreDir, transferManager);
106106
});
107107
}

server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public OnDemandBlockSnapshotIndexInput(
9696
);
9797
}
9898

99-
OnDemandBlockSnapshotIndexInput(
99+
protected OnDemandBlockSnapshotIndexInput(
100100
OnDemandBlockIndexInput.Builder builder,
101101
FileInfo fileInfo,
102102
FSDirectory directory,

server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.index.store.remote.filecache.CachedIndexInput;
1717
import org.opensearch.index.store.remote.filecache.FileCache;
1818
import org.opensearch.index.store.remote.filecache.FileCachedIndexInput;
19+
import org.opensearch.threadpool.ThreadPool;
1920

2021
import java.io.BufferedOutputStream;
2122
import java.io.IOException;
@@ -51,10 +52,12 @@ public interface StreamReader {
5152

5253
private final StreamReader streamReader;
5354
private final FileCache fileCache;
55+
private final ThreadPool threadPool;
5456

55-
public TransferManager(final StreamReader streamReader, final FileCache fileCache) {
57+
public TransferManager(final StreamReader streamReader, final FileCache fileCache, ThreadPool threadPool) {
5658
this.streamReader = streamReader;
5759
this.fileCache = fileCache;
60+
this.threadPool = threadPool;
5861
}
5962

6063
/**
@@ -110,7 +113,7 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio
110113
}
111114

112115
@ExperimentalApi
113-
public CompletableFuture<IndexInput> fetchBlobAsync(BlobFetchRequest blobFetchRequest, Executor executor) throws IOException {
116+
public void fetchBlobAsync(BlobFetchRequest blobFetchRequest) throws IOException {
114117
final Path key = blobFetchRequest.getFilePath();
115118
logger.trace("Asynchronous fetchBlob called for {}", key.toString());
116119
try {
@@ -128,12 +131,15 @@ public CompletableFuture<IndexInput> fetchBlobAsync(BlobFetchRequest blobFetchRe
128131
// Cache entry was either retrieved from the cache or newly added, either
129132
// way the reference count has been incremented by one. We can only
130133
// decrement this reference _after_ creating the clone to be returned.
134+
// Making sure remote recovery thread-pool take care of background download
131135
try {
132-
return cacheEntry.asyncLoadIndexInput(executor);
133-
} finally {
136+
cacheEntry.asyncLoadIndexInput(threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY));
137+
} catch (Exception exception) {
134138
fileCache.decRef(key);
139+
throw exception;
135140
}
136141
} catch (Exception cause) {
142+
logger.error("Exception while asynchronous fetching blob key:{}, Exception {}", key, cause.getMessage());
137143
throw (RuntimeException) cause;
138144
}
139145
}
@@ -241,6 +247,7 @@ public CompletableFuture<IndexInput> asyncLoadIndexInput(Executor executor) {
241247
throw new CompletionException(e);
242248
}
243249
}, executor).handle((indexInput, throwable) -> {
250+
fileCache.decRef(request.getFilePath());
244251
if (throwable != null) {
245252
result.completeExceptionally(throwable);
246253
} else {

server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerBlobContainerReaderTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class TransferManagerBlobContainerReaderTests extends TransferManagerTest
2727
protected void initializeTransferManager() throws IOException {
2828
blobContainer = mock(BlobContainer.class);
2929
doAnswer(i -> new ByteArrayInputStream(createData())).when(blobContainer).readBlob(eq("blob"), anyLong(), anyLong());
30-
transferManager = new TransferManager(blobContainer::readBlob, fileCache);
30+
transferManager = new TransferManager(blobContainer::readBlob, fileCache, threadPool);
3131
}
3232

3333
protected void mockExceptionWhileReading() throws IOException {

server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerRemoteDirectoryReaderTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ protected void initializeTransferManager() throws IOException {
3737
remoteDirectory.openBlockInput(name, position, length, data.length, IOContext.DEFAULT),
3838
length
3939
),
40-
fileCache
40+
fileCache,
41+
threadPool
4142
);
4243
}
4344

server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java

Lines changed: 44 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,16 @@
2727
import java.io.IOException;
2828
import java.util.ArrayList;
2929
import java.util.List;
30-
import java.util.concurrent.CompletableFuture;
31-
import java.util.concurrent.CompletionException;
30+
import java.util.Optional;
3231
import java.util.concurrent.CountDownLatch;
33-
import java.util.concurrent.Executor;
3432
import java.util.concurrent.ExecutorService;
3533
import java.util.concurrent.Executors;
3634
import java.util.concurrent.Future;
3735
import java.util.concurrent.TimeUnit;
3836

3937
import static org.hamcrest.Matchers.equalTo;
4038
import static org.hamcrest.Matchers.greaterThan;
39+
import static org.mockito.Mockito.doReturn;
4140
import static org.mockito.Mockito.mock;
4241

4342
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
@@ -51,17 +50,21 @@ public abstract class TransferManagerTestCase extends OpenSearchTestCase {
5150
protected MMapDirectory directory;
5251
protected TransferManager transferManager;
5352
protected ThreadPool threadPool;
53+
protected ExecutorService executorService;
5454

5555
@Before
5656
public void setUp() throws Exception {
5757
super.setUp();
5858
threadPool = mock(ThreadPool.class);
5959
directory = new MMapDirectory(createTempDir(), SimpleFSLockFactory.INSTANCE);
6060
initializeTransferManager();
61+
executorService = Executors.newFixedThreadPool(3);
62+
doReturn(executorService).when(threadPool).executor(ThreadPool.Names.REMOTE_RECOVERY);
6163
}
6264

6365
@After
6466
public void tearDown() throws Exception {
67+
executorService.shutdownNow();
6568
super.tearDown();
6669
}
6770

@@ -84,8 +87,7 @@ public void testSingleAccessAsynchronous() throws Exception {
8487
ExecutorService executor = Executors.newFixedThreadPool(3);
8588
IndexInput indexInput = null;
8689
try {
87-
indexInput = asyncFetchBlobWithName("file", executor);
88-
;
90+
indexInput = asyncFetchBlobWithName("file");
8991
assertIndexInputIsFunctional(indexInput);
9092
MatcherAssert.assertThat(fileCache.activeUsage(), equalTo((long) EIGHT_MB));
9193
} finally {
@@ -127,21 +129,21 @@ public void testConcurrentAccess() throws Exception {
127129
public void testConcurrentAccessAsynchronous() throws Exception {
128130
// Kick off multiple threads that all concurrently request the same resource
129131
final String blobname = "file";
130-
ExecutorService executor = Executors.newFixedThreadPool(3);
132+
ExecutorService executor = Executors.newFixedThreadPool(8);
131133
try {
132-
final List<CompletableFuture<IndexInput>> futures = new ArrayList<>();
134+
final List<Future<IndexInput>> futures = new ArrayList<>();
133135
for (int i = 0; i < 8; i++) {
134-
futures.add(asyncFetchBlob(blobname, executor));
136+
futures.add(executor.submit(() -> asyncFetchBlobWithName(blobname)));
135137
}
136138
// Wait for all threads to complete
137-
for (CompletableFuture<IndexInput> future : futures) {
139+
for (Future<IndexInput> future : futures) {
138140
future.get(1, TimeUnit.SECONDS);
139141
}
140142
// Assert that all IndexInputs are independently positioned by seeking
141143
// to the end and closing each one. If not independent, then this would
142144
// result in EOFExceptions and/or NPEs.
143-
for (CompletableFuture<IndexInput> future : futures) {
144-
try (IndexInput i = future.join().clone()) {
145+
for (Future<IndexInput> future : futures) {
146+
try (IndexInput i = future.get()) {
145147
assertIndexInputIsFunctional(i);
146148
}
147149
}
@@ -198,10 +200,10 @@ public void testOverflowDisabled() throws Exception {
198200
public void testOverflowDisabledAsynchronous() throws Exception {
199201
ExecutorService executor = Executors.newFixedThreadPool(3);
200202
initializeTransferManager();
201-
IndexInput i1 = asyncFetchBlobWithName("1", executor);
202-
IndexInput i2 = asyncFetchBlobWithName("2", executor);
203+
IndexInput i1 = asyncFetchBlobWithName("1");
204+
IndexInput i2 = asyncFetchBlobWithName("2");
203205

204-
assertThrows(IOException.class, () -> { IndexInput i3 = asyncFetchBlobWithName("3", executor); });
206+
assertThrows(IOException.class, () -> { IndexInput i3 = asyncFetchBlobWithName("3"); });
205207
executor.shutdownNow();
206208
}
207209

@@ -222,13 +224,15 @@ public void testDownloadFailsAsyncDownload() throws Exception {
222224
ExecutorService executor = Executors.newFixedThreadPool(3);
223225
List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
224226
blobParts.add(new BlobFetchRequest.BlobPart("failure-blob", 0, EIGHT_MB));
225-
expectThrows(
226-
CompletionException.class,
227-
() -> transferManager.fetchBlobAsync(
228-
BlobFetchRequest.builder().fileName("file").directory(directory).blobParts(blobParts).build(),
229-
executor
230-
).join().clone()
231-
);
227+
expectThrows(IOException.class, () -> {
228+
BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder()
229+
.fileName("file")
230+
.directory(directory)
231+
.blobParts(blobParts)
232+
.build();
233+
transferManager.fetchBlobAsync(blobFetchRequest);
234+
try (IndexInput indexInput = transferManager.fetchBlob(blobFetchRequest)) {}
235+
});
232236
MatcherAssert.assertThat(fileCache.activeUsage(), equalTo(0L));
233237
MatcherAssert.assertThat(fileCache.usage(), equalTo(0L));
234238
executor.shutdownNow();
@@ -285,7 +289,7 @@ public void testAsyncFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Excep
285289
blockingThread.start();
286290

287291
// Assert that a different blob can be fetched and will not block on the first blob
288-
try (IndexInput i = asyncFetchBlobWithName("file", executor)) {
292+
try (IndexInput i = asyncFetchBlobWithName("file")) {
289293
assertIndexInputIsFunctional(i);
290294
}
291295

@@ -296,6 +300,22 @@ public void testAsyncFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Excep
296300
executor.shutdownNow();
297301
}
298302

303+
public void testRefCount() throws IOException, InterruptedException {
304+
List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
305+
String blobname = "test-blob";
306+
blobParts.add(new BlobFetchRequest.BlobPart("blob", 0, EIGHT_MB));
307+
BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder().fileName(blobname).directory(directory).blobParts(blobParts).build();
308+
// It will trigger async load
309+
transferManager.fetchBlobAsync(blobFetchRequest);
310+
Thread.sleep(2000);
311+
assertEquals(Optional.of(0), Optional.of(fileCache.getRef(blobFetchRequest.getFilePath())));
312+
assertNotNull(fileCache.get(blobFetchRequest.getFilePath()));
313+
fileCache.decRef(blobFetchRequest.getFilePath());
314+
// Making the read call to fetch from file cache
315+
IndexInput indexInput = transferManager.fetchBlob(blobFetchRequest);
316+
assertEquals(Optional.of(1), Optional.of(fileCache.getRef(blobFetchRequest.getFilePath())));
317+
}
318+
299319
protected abstract void initializeTransferManager() throws IOException;
300320

301321
protected abstract void mockExceptionWhileReading() throws IOException;
@@ -308,23 +328,12 @@ private IndexInput fetchBlobWithName(String blobname) throws IOException {
308328
return transferManager.fetchBlob(BlobFetchRequest.builder().fileName(blobname).directory(directory).blobParts(blobParts).build());
309329
}
310330

311-
private CompletableFuture<IndexInput> asyncFetchBlob(String blobname, Executor executor) throws IOException {
312-
List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
313-
blobParts.add(new BlobFetchRequest.BlobPart("blob", 0, EIGHT_MB));
314-
// It will trigger async load
315-
return transferManager.fetchBlobAsync(
316-
BlobFetchRequest.builder().fileName(blobname).directory(directory).blobParts(blobParts).build(),
317-
executor
318-
);
319-
}
320-
321-
private IndexInput asyncFetchBlobWithName(String blobname, Executor executor) throws IOException {
331+
private IndexInput asyncFetchBlobWithName(String blobname) throws IOException {
322332
List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
323333
blobParts.add(new BlobFetchRequest.BlobPart("blob", 0, EIGHT_MB));
324334
BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder().fileName(blobname).directory(directory).blobParts(blobParts).build();
325335
// It will trigger async load
326-
transferManager.fetchBlobAsync(blobFetchRequest, executor);
327-
336+
transferManager.fetchBlobAsync(blobFetchRequest);
328337
assertNotNull(fileCache.get(blobFetchRequest.getFilePath()));
329338
fileCache.decRef(blobFetchRequest.getFilePath());
330339
// Making the read call to fetch from file cache

0 commit comments

Comments
 (0)