Skip to content

Commit 6c2d152

Browse files
author
Ajay Kumar Movva
committed
Addressing comments by fixing threadpool and ref counts
Signed-off-by: Ajay Kumar Movva <movvaam@amazon.com>
1 parent 239abe7 commit 6c2d152

File tree

8 files changed

+63
-48
lines changed

8 files changed

+63
-48
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4343
- Make node duress values cacheable ([#18649](https://github.com/opensearch-project/OpenSearch/pull/18649))
4444
- Change default value of remote_data_ratio, which is used in Searchable Snapshots and Writeable Warm from 0 to 5 and min allowed value to 1 ([#18767](https://github.com/opensearch-project/OpenSearch/pull/18767))
4545
- Making multi rate limiters in repository dynamic [#18069](https://github.com/opensearch-project/OpenSearch/pull/18069)
46+
- Adding changes to support async fetch blob for transfer manager [#18754](https://github.com/opensearch-project/OpenSearch/pull/18754)
4647

4748
### Dependencies
4849
- Bump `stefanzweifel/git-auto-commit-action` from 5 to 6 ([#18524](https://github.com/opensearch-project/OpenSearch/pull/18524))

server/src/main/java/org/opensearch/index/store/CompositeDirectory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ public CompositeDirectory(Directory localDirectory, Directory remoteDirectory, F
8080
CompositeDirectory.this.remoteDirectory.openBlockInput(name, position, length, IOContext.DEFAULT),
8181
length
8282
),
83-
fileCache
83+
fileCache,
84+
threadPool
8485
);
8586
}
8687

server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ private Future<RemoteSnapshotDirectory> createRemoteSnapshotDirectoryFromSnapsho
101101
assert indexShardSnapshot instanceof BlobStoreIndexShardSnapshot
102102
: "indexShardSnapshot should be an instance of BlobStoreIndexShardSnapshot";
103103
final BlobStoreIndexShardSnapshot snapshot = (BlobStoreIndexShardSnapshot) indexShardSnapshot;
104-
TransferManager transferManager = new TransferManager(blobContainer::readBlob, remoteStoreFileCache);
104+
TransferManager transferManager = new TransferManager(blobContainer::readBlob, remoteStoreFileCache, threadPool);
105105
return new RemoteSnapshotDirectory(snapshot, localStoreDir, transferManager);
106106
});
107107
}

server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public OnDemandBlockSnapshotIndexInput(
9696
);
9797
}
9898

99-
OnDemandBlockSnapshotIndexInput(
99+
protected OnDemandBlockSnapshotIndexInput(
100100
OnDemandBlockIndexInput.Builder builder,
101101
FileInfo fileInfo,
102102
FSDirectory directory,

server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.index.store.remote.filecache.CachedIndexInput;
1717
import org.opensearch.index.store.remote.filecache.FileCache;
1818
import org.opensearch.index.store.remote.filecache.FileCachedIndexInput;
19+
import org.opensearch.threadpool.ThreadPool;
1920

2021
import java.io.BufferedOutputStream;
2122
import java.io.IOException;
@@ -51,10 +52,12 @@ public interface StreamReader {
5152

5253
private final StreamReader streamReader;
5354
private final FileCache fileCache;
55+
private final ThreadPool threadPool;
5456

55-
public TransferManager(final StreamReader streamReader, final FileCache fileCache) {
57+
public TransferManager(final StreamReader streamReader, final FileCache fileCache, ThreadPool threadPool) {
5658
this.streamReader = streamReader;
5759
this.fileCache = fileCache;
60+
this.threadPool = threadPool;
5861
}
5962

6063
/**
@@ -110,7 +113,7 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio
110113
}
111114

112115
@ExperimentalApi
113-
public CompletableFuture<IndexInput> fetchBlobAsync(BlobFetchRequest blobFetchRequest, Executor executor) throws IOException {
116+
public void fetchBlobAsync(BlobFetchRequest blobFetchRequest) throws IOException {
114117
final Path key = blobFetchRequest.getFilePath();
115118
logger.trace("Asynchronous fetchBlob called for {}", key.toString());
116119
try {
@@ -128,12 +131,15 @@ public CompletableFuture<IndexInput> fetchBlobAsync(BlobFetchRequest blobFetchRe
128131
// Cache entry was either retrieved from the cache or newly added, either
129132
// way the reference count has been incremented by one. We can only
130133
// decrement this reference _after_ creating the clone to be returned.
134+
// Making sure remote recovery thread-pool take care of background download
131135
try {
132-
return cacheEntry.asyncLoadIndexInput(executor);
133-
} finally {
136+
cacheEntry.asyncLoadIndexInput(threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY));
137+
} catch (Exception exception) {
134138
fileCache.decRef(key);
139+
throw exception;
135140
}
136141
} catch (Exception cause) {
142+
logger.error("Exception while asynchronous fetching blob key:{}, Exception {}", key, cause.getMessage());
137143
throw (RuntimeException) cause;
138144
}
139145
}
@@ -241,6 +247,7 @@ public CompletableFuture<IndexInput> asyncLoadIndexInput(Executor executor) {
241247
throw new CompletionException(e);
242248
}
243249
}, executor).handle((indexInput, throwable) -> {
250+
fileCache.decRef(request.getFilePath());
244251
if (throwable != null) {
245252
result.completeExceptionally(throwable);
246253
} else {

server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerBlobContainerReaderTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class TransferManagerBlobContainerReaderTests extends TransferManagerTest
2727
protected void initializeTransferManager() throws IOException {
2828
blobContainer = mock(BlobContainer.class);
2929
doAnswer(i -> new ByteArrayInputStream(createData())).when(blobContainer).readBlob(eq("blob"), anyLong(), anyLong());
30-
transferManager = new TransferManager(blobContainer::readBlob, fileCache);
30+
transferManager = new TransferManager(blobContainer::readBlob, fileCache, threadPool);
3131
}
3232

3333
protected void mockExceptionWhileReading() throws IOException {

server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerRemoteDirectoryReaderTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ protected void initializeTransferManager() throws IOException {
3737
remoteDirectory.openBlockInput(name, position, length, data.length, IOContext.DEFAULT),
3838
length
3939
),
40-
fileCache
40+
fileCache,
41+
threadPool
4142
);
4243
}
4344

server/src/test/java/org/opensearch/index/store/remote/utils/TransferManagerTestCase.java

Lines changed: 44 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,16 @@
2727
import java.io.IOException;
2828
import java.util.ArrayList;
2929
import java.util.List;
30-
import java.util.concurrent.CompletableFuture;
31-
import java.util.concurrent.CompletionException;
30+
import java.util.Optional;
3231
import java.util.concurrent.CountDownLatch;
33-
import java.util.concurrent.Executor;
3432
import java.util.concurrent.ExecutorService;
3533
import java.util.concurrent.Executors;
3634
import java.util.concurrent.Future;
3735
import java.util.concurrent.TimeUnit;
3836

3937
import static org.hamcrest.Matchers.equalTo;
4038
import static org.hamcrest.Matchers.greaterThan;
39+
import static org.mockito.Mockito.doReturn;
4140
import static org.mockito.Mockito.mock;
4241

4342
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
@@ -51,17 +50,21 @@ public abstract class TransferManagerTestCase extends OpenSearchTestCase {
5150
protected MMapDirectory directory;
5251
protected TransferManager transferManager;
5352
protected ThreadPool threadPool;
53+
protected ExecutorService executorService;
5454

5555
@Before
5656
public void setUp() throws Exception {
5757
super.setUp();
5858
threadPool = mock(ThreadPool.class);
5959
directory = new MMapDirectory(createTempDir(), SimpleFSLockFactory.INSTANCE);
6060
initializeTransferManager();
61+
executorService = Executors.newFixedThreadPool(3);
62+
doReturn(executorService).when(threadPool).executor(ThreadPool.Names.REMOTE_RECOVERY);
6163
}
6264

6365
@After
6466
public void tearDown() throws Exception {
67+
executorService.shutdownNow();
6568
super.tearDown();
6669
}
6770

@@ -81,11 +84,9 @@ public void testSingleAccess() throws Exception {
8184
}
8285

8386
public void testSingleAccessAsynchronous() throws Exception {
84-
ExecutorService executor = Executors.newFixedThreadPool(3);
8587
IndexInput indexInput = null;
8688
try {
87-
indexInput = asyncFetchBlobWithName("file", executor);
88-
;
89+
indexInput = asyncFetchBlobWithName("file");
8990
assertIndexInputIsFunctional(indexInput);
9091
MatcherAssert.assertThat(fileCache.activeUsage(), equalTo((long) EIGHT_MB));
9192
} finally {
@@ -95,7 +96,6 @@ public void testSingleAccessAsynchronous() throws Exception {
9596
}
9697
MatcherAssert.assertThat(fileCache.activeUsage(), equalTo(0L));
9798
MatcherAssert.assertThat(fileCache.usage(), equalTo((long) EIGHT_MB));
98-
executor.shutdownNow();
9999
}
100100

101101
public void testConcurrentAccess() throws Exception {
@@ -127,21 +127,21 @@ public void testConcurrentAccess() throws Exception {
127127
public void testConcurrentAccessAsynchronous() throws Exception {
128128
// Kick off multiple threads that all concurrently request the same resource
129129
final String blobname = "file";
130-
ExecutorService executor = Executors.newFixedThreadPool(3);
130+
ExecutorService executor = Executors.newFixedThreadPool(8);
131131
try {
132-
final List<CompletableFuture<IndexInput>> futures = new ArrayList<>();
132+
final List<Future<IndexInput>> futures = new ArrayList<>();
133133
for (int i = 0; i < 8; i++) {
134-
futures.add(asyncFetchBlob(blobname, executor));
134+
futures.add(executor.submit(() -> asyncFetchBlobWithName(blobname)));
135135
}
136136
// Wait for all threads to complete
137-
for (CompletableFuture<IndexInput> future : futures) {
137+
for (Future<IndexInput> future : futures) {
138138
future.get(1, TimeUnit.SECONDS);
139139
}
140140
// Assert that all IndexInputs are independently positioned by seeking
141141
// to the end and closing each one. If not independent, then this would
142142
// result in EOFExceptions and/or NPEs.
143-
for (CompletableFuture<IndexInput> future : futures) {
144-
try (IndexInput i = future.join().clone()) {
143+
for (Future<IndexInput> future : futures) {
144+
try (IndexInput i = future.get()) {
145145
assertIndexInputIsFunctional(i);
146146
}
147147
}
@@ -198,10 +198,10 @@ public void testOverflowDisabled() throws Exception {
198198
public void testOverflowDisabledAsynchronous() throws Exception {
199199
ExecutorService executor = Executors.newFixedThreadPool(3);
200200
initializeTransferManager();
201-
IndexInput i1 = asyncFetchBlobWithName("1", executor);
202-
IndexInput i2 = asyncFetchBlobWithName("2", executor);
201+
IndexInput i1 = asyncFetchBlobWithName("1");
202+
IndexInput i2 = asyncFetchBlobWithName("2");
203203

204-
assertThrows(IOException.class, () -> { IndexInput i3 = asyncFetchBlobWithName("3", executor); });
204+
assertThrows(IOException.class, () -> { IndexInput i3 = asyncFetchBlobWithName("3"); });
205205
executor.shutdownNow();
206206
}
207207

@@ -219,19 +219,19 @@ public void testDownloadFails() throws Exception {
219219

220220
public void testDownloadFailsAsyncDownload() throws Exception {
221221
mockExceptionWhileReading();
222-
ExecutorService executor = Executors.newFixedThreadPool(3);
223222
List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
224223
blobParts.add(new BlobFetchRequest.BlobPart("failure-blob", 0, EIGHT_MB));
225-
expectThrows(
226-
CompletionException.class,
227-
() -> transferManager.fetchBlobAsync(
228-
BlobFetchRequest.builder().fileName("file").directory(directory).blobParts(blobParts).build(),
229-
executor
230-
).join().clone()
231-
);
224+
expectThrows(IOException.class, () -> {
225+
BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder()
226+
.fileName("file")
227+
.directory(directory)
228+
.blobParts(blobParts)
229+
.build();
230+
transferManager.fetchBlobAsync(blobFetchRequest);
231+
try (IndexInput indexInput = transferManager.fetchBlob(blobFetchRequest)) {}
232+
});
232233
MatcherAssert.assertThat(fileCache.activeUsage(), equalTo(0L));
233234
MatcherAssert.assertThat(fileCache.usage(), equalTo(0L));
234-
executor.shutdownNow();
235235
}
236236

237237
public void testFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Exception {
@@ -285,7 +285,7 @@ public void testAsyncFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Excep
285285
blockingThread.start();
286286

287287
// Assert that a different blob can be fetched and will not block on the first blob
288-
try (IndexInput i = asyncFetchBlobWithName("file", executor)) {
288+
try (IndexInput i = asyncFetchBlobWithName("file")) {
289289
assertIndexInputIsFunctional(i);
290290
}
291291

@@ -296,6 +296,22 @@ public void testAsyncFetchesToDifferentBlobsDoNotBlockOnEachOther() throws Excep
296296
executor.shutdownNow();
297297
}
298298

299+
public void testRefCount() throws IOException, InterruptedException {
300+
List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
301+
String blobname = "test-blob";
302+
blobParts.add(new BlobFetchRequest.BlobPart("blob", 0, EIGHT_MB));
303+
BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder().fileName(blobname).directory(directory).blobParts(blobParts).build();
304+
// It will trigger async load
305+
transferManager.fetchBlobAsync(blobFetchRequest);
306+
Thread.sleep(2000);
307+
assertEquals(Optional.of(0), Optional.of(fileCache.getRef(blobFetchRequest.getFilePath())));
308+
assertNotNull(fileCache.get(blobFetchRequest.getFilePath()));
309+
fileCache.decRef(blobFetchRequest.getFilePath());
310+
// Making the read call to fetch from file cache
311+
IndexInput indexInput = transferManager.fetchBlob(blobFetchRequest);
312+
assertEquals(Optional.of(1), Optional.of(fileCache.getRef(blobFetchRequest.getFilePath())));
313+
}
314+
299315
protected abstract void initializeTransferManager() throws IOException;
300316

301317
protected abstract void mockExceptionWhileReading() throws IOException;
@@ -308,23 +324,12 @@ private IndexInput fetchBlobWithName(String blobname) throws IOException {
308324
return transferManager.fetchBlob(BlobFetchRequest.builder().fileName(blobname).directory(directory).blobParts(blobParts).build());
309325
}
310326

311-
private CompletableFuture<IndexInput> asyncFetchBlob(String blobname, Executor executor) throws IOException {
312-
List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
313-
blobParts.add(new BlobFetchRequest.BlobPart("blob", 0, EIGHT_MB));
314-
// It will trigger async load
315-
return transferManager.fetchBlobAsync(
316-
BlobFetchRequest.builder().fileName(blobname).directory(directory).blobParts(blobParts).build(),
317-
executor
318-
);
319-
}
320-
321-
private IndexInput asyncFetchBlobWithName(String blobname, Executor executor) throws IOException {
327+
private IndexInput asyncFetchBlobWithName(String blobname) throws IOException {
322328
List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
323329
blobParts.add(new BlobFetchRequest.BlobPart("blob", 0, EIGHT_MB));
324330
BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder().fileName(blobname).directory(directory).blobParts(blobParts).build();
325331
// It will trigger async load
326-
transferManager.fetchBlobAsync(blobFetchRequest, executor);
327-
332+
transferManager.fetchBlobAsync(blobFetchRequest);
328333
assertNotNull(fileCache.get(blobFetchRequest.getFilePath()));
329334
fileCache.decRef(blobFetchRequest.getFilePath());
330335
// Making the read call to fetch from file cache

0 commit comments

Comments
 (0)