Skip to content

Commit 36844f5

Browse files
authored
Adding changes to support async fetch blob for transfer manager (#18754)
Signed-off-by: Ajay Kumar Movva <movvaam@amazon.com>
1 parent 1f761f2 commit 36844f5

14 files changed

+268
-16
lines changed

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,14 @@ protected void closeInternal() {
712712
if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_SETTING) &&
713713
// TODO : Need to remove this check after support for hot indices is added in Composite Directory
714714
this.indexSettings.isWarmIndex()) {
715-
directory = compositeDirectoryFactory.newDirectory(this.indexSettings, path, directoryFactory, remoteDirectory, fileCache);
715+
directory = compositeDirectoryFactory.newDirectory(
716+
this.indexSettings,
717+
path,
718+
directoryFactory,
719+
remoteDirectory,
720+
fileCache,
721+
threadPool
722+
);
716723
} else {
717724
directory = directoryFactory.newDirectory(this.indexSettings, path);
718725
}

server/src/main/java/org/opensearch/index/store/CompositeDirectory.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.opensearch.index.store.remote.filecache.FileCache.RestoredCachedIndexInput;
2929
import org.opensearch.index.store.remote.utils.FileTypeUtils;
3030
import org.opensearch.index.store.remote.utils.TransferManager;
31+
import org.opensearch.threadpool.ThreadPool;
3132

3233
import java.io.FileNotFoundException;
3334
import java.io.IOException;
@@ -59,25 +60,28 @@ public class CompositeDirectory extends FilterDirectory {
5960
protected final RemoteSegmentStoreDirectory remoteDirectory;
6061
protected final FileCache fileCache;
6162
protected final TransferManager transferManager;
63+
protected final ThreadPool threadPool;
6264

6365
/**
6466
* Constructor to initialise the composite directory
6567
* @param localDirectory corresponding to the local FSDirectory
6668
* @param remoteDirectory corresponding to the remote directory
6769
* @param fileCache used to cache the remote files locally
6870
*/
69-
public CompositeDirectory(Directory localDirectory, Directory remoteDirectory, FileCache fileCache) {
71+
public CompositeDirectory(Directory localDirectory, Directory remoteDirectory, FileCache fileCache, ThreadPool threadPool) {
7072
super(localDirectory);
7173
validate(localDirectory, remoteDirectory, fileCache);
7274
this.localDirectory = (FSDirectory) localDirectory;
7375
this.remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectory;
7476
this.fileCache = fileCache;
77+
this.threadPool = threadPool;
7578
transferManager = new TransferManager(
7679
(name, position, length) -> new InputStreamIndexInput(
7780
CompositeDirectory.this.remoteDirectory.openBlockInput(name, position, length, IOContext.DEFAULT),
7881
length
7982
),
80-
fileCache
83+
fileCache,
84+
threadPool
8185
);
8286
}
8387

server/src/main/java/org/opensearch/index/store/DefaultCompositeDirectoryFactory.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.index.shard.ShardPath;
1616
import org.opensearch.index.store.remote.filecache.FileCache;
1717
import org.opensearch.plugins.IndexStorePlugin;
18+
import org.opensearch.threadpool.ThreadPool;
1819

1920
import java.io.IOException;
2021

@@ -31,10 +32,11 @@ public Directory newDirectory(
3132
ShardPath shardPath,
3233
IndexStorePlugin.DirectoryFactory localDirectoryFactory,
3334
Directory remoteDirectory,
34-
FileCache fileCache
35+
FileCache fileCache,
36+
ThreadPool threadPool
3537
) throws IOException {
3638
logger.trace("Creating composite directory from core - Default CompositeDirectoryFactory");
3739
Directory localDirectory = localDirectoryFactory.newDirectory(indexSettings, shardPath);
38-
return new CompositeDirectory(localDirectory, remoteDirectory, fileCache);
40+
return new CompositeDirectory(localDirectory, remoteDirectory, fileCache, threadPool);
3941
}
4042
}

server/src/main/java/org/opensearch/index/store/remote/directory/RemoteSnapshotDirectoryFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ private Future<RemoteSnapshotDirectory> createRemoteSnapshotDirectoryFromSnapsho
101101
assert indexShardSnapshot instanceof BlobStoreIndexShardSnapshot
102102
: "indexShardSnapshot should be an instance of BlobStoreIndexShardSnapshot";
103103
final BlobStoreIndexShardSnapshot snapshot = (BlobStoreIndexShardSnapshot) indexShardSnapshot;
104-
TransferManager transferManager = new TransferManager(blobContainer::readBlob, remoteStoreFileCache);
104+
TransferManager transferManager = new TransferManager(blobContainer::readBlob, remoteStoreFileCache, threadPool);
105105
return new RemoteSnapshotDirectory(snapshot, localStoreDir, transferManager);
106106
});
107107
}

server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInput.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
*
3636
* @opensearch.internal
3737
*/
38-
abstract class OnDemandBlockIndexInput extends IndexInput implements RandomAccessInput {
38+
public abstract class OnDemandBlockIndexInput extends IndexInput implements RandomAccessInput {
3939
private static final Logger logger = LogManager.getLogger(OnDemandBlockIndexInput.class);
4040

4141
public static final String CLEANER_THREAD_NAME_PREFIX = "index-input-cleaner";
@@ -372,6 +372,10 @@ public static Builder builder() {
372372
return new Builder();
373373
}
374374

375+
/**
376+
* Builder for {@link OnDemandBlockIndexInput}. The default block size is 8MiB
377+
* (see {@link Builder#DEFAULT_BLOCK_SIZE_SHIFT}).
378+
*/
375379
public static class Builder {
376380
// Block size shift (default value is 23 == 2^23 == 8MiB)
377381
public static final int DEFAULT_BLOCK_SIZE_SHIFT = 23;
@@ -407,7 +411,7 @@ public Builder length(long length) {
407411
return this;
408412
}
409413

410-
Builder blockSizeShift(int blockSizeShift) {
414+
public Builder blockSizeShift(int blockSizeShift) {
411415
assert blockSizeShift < 31 : "blockSizeShift must be < 31";
412416
this.blockSizeShift = blockSizeShift;
413417
this.blockSize = 1 << blockSizeShift;

server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput {
3232
/**
3333
* Where this class fetches IndexInput parts from
3434
*/
35-
final TransferManager transferManager;
35+
protected final TransferManager transferManager;
3636

3737
/**
3838
* FileInfo contains snapshot metadata references for this IndexInput
@@ -96,7 +96,7 @@ public OnDemandBlockSnapshotIndexInput(
9696
);
9797
}
9898

99-
OnDemandBlockSnapshotIndexInput(
99+
protected OnDemandBlockSnapshotIndexInput(
100100
OnDemandBlockIndexInput.Builder builder,
101101
FileInfo fileInfo,
102102
FSDirectory directory,

server/src/main/java/org/opensearch/index/store/remote/filecache/CachedIndexInput.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@
99
package org.opensearch.index.store.remote.filecache;
1010

1111
import org.apache.lucene.store.IndexInput;
12+
import org.opensearch.common.annotation.ExperimentalApi;
1213
import org.opensearch.common.annotation.PublicApi;
1314

1415
import java.io.IOException;
16+
import java.util.concurrent.CompletableFuture;
17+
import java.util.concurrent.Executor;
1518

1619
/**
1720
* Interface for an entry in the {@link FileCache} that can return an
@@ -29,6 +32,15 @@ public interface CachedIndexInput extends AutoCloseable {
2932
*/
3033
IndexInput getIndexInput() throws IOException;
3134

35+
/**
36+
* Trigger and get the completable future of index input
37+
* @return CompletableFuture of IndexInput
38+
*/
39+
@ExperimentalApi
40+
default CompletableFuture<IndexInput> asyncLoadIndexInput(Executor executor) {
41+
return null;
42+
}
43+
3244
/**
3345
* @return length in bytes
3446
*/

server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.lucene.store.IOContext;
1414
import org.apache.lucene.store.IndexInput;
15+
import org.opensearch.common.annotation.ExperimentalApi;
1516
import org.opensearch.index.store.remote.filecache.CachedIndexInput;
1617
import org.opensearch.index.store.remote.filecache.FileCache;
1718
import org.opensearch.index.store.remote.filecache.FileCachedIndexInput;
19+
import org.opensearch.threadpool.ThreadPool;
1820

1921
import java.io.BufferedOutputStream;
2022
import java.io.IOException;
@@ -28,6 +30,7 @@
2830
import java.security.PrivilegedExceptionAction;
2931
import java.util.concurrent.CompletableFuture;
3032
import java.util.concurrent.CompletionException;
33+
import java.util.concurrent.Executor;
3134
import java.util.concurrent.atomic.AtomicBoolean;
3235

3336
/**
@@ -49,10 +52,12 @@ public interface StreamReader {
4952

5053
private final StreamReader streamReader;
5154
private final FileCache fileCache;
55+
private final ThreadPool threadPool;
5256

53-
public TransferManager(final StreamReader streamReader, final FileCache fileCache) {
57+
public TransferManager(final StreamReader streamReader, final FileCache fileCache, ThreadPool threadPool) {
5458
this.streamReader = streamReader;
5559
this.fileCache = fileCache;
60+
this.threadPool = threadPool;
5661
}
5762

5863
/**
@@ -107,6 +112,38 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio
107112
}
108113
}
109114

115+
@ExperimentalApi
116+
public void fetchBlobAsync(BlobFetchRequest blobFetchRequest) throws IOException {
117+
final Path key = blobFetchRequest.getFilePath();
118+
logger.trace("Asynchronous fetchBlob called for {}", key.toString());
119+
try {
120+
CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> {
121+
if (cachedIndexInput == null || cachedIndexInput.isClosed()) {
122+
logger.trace("Transfer Manager - IndexInput closed or not in cache");
123+
// Doesn't exist or is closed, either way create a new one
124+
return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest);
125+
} else {
126+
logger.trace("Transfer Manager - Required blob Already in cache: {}", blobFetchRequest.toString());
127+
// already in the cache and ready to be used (open)
128+
return cachedIndexInput;
129+
}
130+
});
131+
// Cache entry was either retrieved from the cache or newly added, either
132+
// way the reference count has been incremented by one. We can only
133+
// decrement this reference _after_ creating the clone to be returned.
134+
// Making sure remote recovery thread-pool take care of background download
135+
try {
136+
cacheEntry.asyncLoadIndexInput(threadPool.executor(ThreadPool.Names.REMOTE_RECOVERY));
137+
} catch (Exception exception) {
138+
fileCache.decRef(key);
139+
throw exception;
140+
}
141+
} catch (Exception cause) {
142+
logger.error("Exception while asynchronous fetching blob key:{}, Exception {}", key, cause.getMessage());
143+
throw (RuntimeException) cause;
144+
}
145+
}
146+
110147
private static FileCachedIndexInput createIndexInput(FileCache fileCache, StreamReader streamReader, BlobFetchRequest request) {
111148
try {
112149
// This local file cache is ref counted and may not strictly enforce configured capacity.
@@ -195,6 +232,33 @@ public IndexInput getIndexInput() throws IOException {
195232
}
196233
}
197234

235+
@ExperimentalApi
236+
public CompletableFuture<IndexInput> asyncLoadIndexInput(Executor executor) {
237+
if (isClosed.get()) {
238+
return CompletableFuture.failedFuture(new IllegalStateException("Already closed"));
239+
}
240+
if (isStarted.getAndSet(true) == false) {
241+
// Create new future and set it as the result
242+
CompletableFuture.supplyAsync(() -> {
243+
try {
244+
return createIndexInput(fileCache, streamReader, request);
245+
} catch (Exception e) {
246+
fileCache.remove(request.getFilePath());
247+
throw new CompletionException(e);
248+
}
249+
}, executor).handle((indexInput, throwable) -> {
250+
fileCache.decRef(request.getFilePath());
251+
if (throwable != null) {
252+
result.completeExceptionally(throwable);
253+
} else {
254+
result.complete(indexInput);
255+
}
256+
return null;
257+
});
258+
}
259+
return result;
260+
}
261+
198262
@Override
199263
public long length() {
200264
return request.getBlobLength();

server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.opensearch.index.store.IndexStoreListener;
4444
import org.opensearch.index.store.remote.filecache.FileCache;
4545
import org.opensearch.indices.recovery.RecoveryState;
46+
import org.opensearch.threadpool.ThreadPool;
4647

4748
import java.io.IOException;
4849
import java.util.Collections;
@@ -104,7 +105,8 @@ Directory newDirectory(
104105
ShardPath shardPath,
105106
DirectoryFactory localDirectoryFactory,
106107
Directory remoteDirectory,
107-
FileCache fileCache
108+
FileCache fileCache,
109+
ThreadPool threadPool
108110
) throws IOException;
109111
}
110112

server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void setup() throws IOException {
6666
localDirectory = FSDirectory.open(createTempDir());
6767
removeExtraFSFiles();
6868
fileCache = FileCacheFactory.createConcurrentLRUFileCache(FILE_CACHE_CAPACITY, new NoopCircuitBreaker(CircuitBreaker.REQUEST));
69-
compositeDirectory = new CompositeDirectory(localDirectory, remoteSegmentStoreDirectory, fileCache);
69+
compositeDirectory = new CompositeDirectory(localDirectory, remoteSegmentStoreDirectory, fileCache, threadPool);
7070
addFilesToDirectory(LOCAL_FILES);
7171
}
7272

0 commit comments

Comments
 (0)