Skip to content

Commit 239abe7

Browse files
author
Ajay Kumar Movva
committed
Adding changes to support async fetch blob for transfer manager
Signed-off-by: Ajay Kumar Movva <movvaam@amazon.com>
1 parent 1f761f2 commit 239abe7

File tree

11 files changed

+231
-10
lines changed

11 files changed

+231
-10
lines changed

server/src/main/java/org/opensearch/index/IndexService.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,14 @@ protected void closeInternal() {
712712
if (FeatureFlags.isEnabled(FeatureFlags.WRITABLE_WARM_INDEX_SETTING) &&
713713
// TODO : Need to remove this check after support for hot indices is added in Composite Directory
714714
this.indexSettings.isWarmIndex()) {
715-
directory = compositeDirectoryFactory.newDirectory(this.indexSettings, path, directoryFactory, remoteDirectory, fileCache);
715+
directory = compositeDirectoryFactory.newDirectory(
716+
this.indexSettings,
717+
path,
718+
directoryFactory,
719+
remoteDirectory,
720+
fileCache,
721+
threadPool
722+
);
716723
} else {
717724
directory = directoryFactory.newDirectory(this.indexSettings, path);
718725
}

server/src/main/java/org/opensearch/index/store/CompositeDirectory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.opensearch.index.store.remote.filecache.FileCache.RestoredCachedIndexInput;
2929
import org.opensearch.index.store.remote.utils.FileTypeUtils;
3030
import org.opensearch.index.store.remote.utils.TransferManager;
31+
import org.opensearch.threadpool.ThreadPool;
3132

3233
import java.io.FileNotFoundException;
3334
import java.io.IOException;
@@ -59,19 +60,21 @@ public class CompositeDirectory extends FilterDirectory {
5960
protected final RemoteSegmentStoreDirectory remoteDirectory;
6061
protected final FileCache fileCache;
6162
protected final TransferManager transferManager;
63+
private final ThreadPool threadPool;
6264

6365
/**
6466
* Constructor to initialise the composite directory
6567
* @param localDirectory corresponding to the local FSDirectory
6668
* @param remoteDirectory corresponding to the remote directory
6769
* @param fileCache used to cache the remote files locally
6870
*/
69-
public CompositeDirectory(Directory localDirectory, Directory remoteDirectory, FileCache fileCache) {
71+
public CompositeDirectory(Directory localDirectory, Directory remoteDirectory, FileCache fileCache, ThreadPool threadPool) {
7072
super(localDirectory);
7173
validate(localDirectory, remoteDirectory, fileCache);
7274
this.localDirectory = (FSDirectory) localDirectory;
7375
this.remoteDirectory = (RemoteSegmentStoreDirectory) remoteDirectory;
7476
this.fileCache = fileCache;
77+
this.threadPool = threadPool;
7578
transferManager = new TransferManager(
7679
(name, position, length) -> new InputStreamIndexInput(
7780
CompositeDirectory.this.remoteDirectory.openBlockInput(name, position, length, IOContext.DEFAULT),

server/src/main/java/org/opensearch/index/store/DefaultCompositeDirectoryFactory.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.index.shard.ShardPath;
1616
import org.opensearch.index.store.remote.filecache.FileCache;
1717
import org.opensearch.plugins.IndexStorePlugin;
18+
import org.opensearch.threadpool.ThreadPool;
1819

1920
import java.io.IOException;
2021

@@ -31,10 +32,11 @@ public Directory newDirectory(
3132
ShardPath shardPath,
3233
IndexStorePlugin.DirectoryFactory localDirectoryFactory,
3334
Directory remoteDirectory,
34-
FileCache fileCache
35+
FileCache fileCache,
36+
ThreadPool threadPool
3537
) throws IOException {
3638
logger.trace("Creating composite directory from core - Default CompositeDirectoryFactory");
3739
Directory localDirectory = localDirectoryFactory.newDirectory(indexSettings, shardPath);
38-
return new CompositeDirectory(localDirectory, remoteDirectory, fileCache);
40+
return new CompositeDirectory(localDirectory, remoteDirectory, fileCache, threadPool);
3941
}
4042
}

server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockIndexInput.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
*
3636
* @opensearch.internal
3737
*/
38-
abstract class OnDemandBlockIndexInput extends IndexInput implements RandomAccessInput {
38+
public abstract class OnDemandBlockIndexInput extends IndexInput implements RandomAccessInput {
3939
private static final Logger logger = LogManager.getLogger(OnDemandBlockIndexInput.class);
4040

4141
public static final String CLEANER_THREAD_NAME_PREFIX = "index-input-cleaner";
@@ -372,6 +372,10 @@ public static Builder builder() {
372372
return new Builder();
373373
}
374374

375+
/**
376+
* Builder for {@link OnDemandBlockIndexInput}. The default block size is 8MiB
377+
* (see {@link Builder#DEFAULT_BLOCK_SIZE_SHIFT}).
378+
*/
375379
public static class Builder {
376380
// Block size shift (default value is 23 == 2^23 == 8MiB)
377381
public static final int DEFAULT_BLOCK_SIZE_SHIFT = 23;
@@ -407,7 +411,7 @@ public Builder length(long length) {
407411
return this;
408412
}
409413

410-
Builder blockSizeShift(int blockSizeShift) {
414+
public Builder blockSizeShift(int blockSizeShift) {
411415
assert blockSizeShift < 31 : "blockSizeShift must be < 31";
412416
this.blockSizeShift = blockSizeShift;
413417
this.blockSize = 1 << blockSizeShift;

server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput {
3232
/**
3333
* Where this class fetches IndexInput parts from
3434
*/
35-
final TransferManager transferManager;
35+
protected final TransferManager transferManager;
3636

3737
/**
3838
* FileInfo contains snapshot metadata references for this IndexInput

server/src/main/java/org/opensearch/index/store/remote/filecache/CachedIndexInput.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@
99
package org.opensearch.index.store.remote.filecache;
1010

1111
import org.apache.lucene.store.IndexInput;
12+
import org.opensearch.common.annotation.ExperimentalApi;
1213
import org.opensearch.common.annotation.PublicApi;
1314

1415
import java.io.IOException;
16+
import java.util.concurrent.CompletableFuture;
17+
import java.util.concurrent.Executor;
1518

1619
/**
1720
* Interface for an entry in the {@link FileCache} that can return an
@@ -29,6 +32,15 @@ public interface CachedIndexInput extends AutoCloseable {
2932
*/
3033
IndexInput getIndexInput() throws IOException;
3134

35+
/**
36+
* Trigger and get the completable future of index input
37+
* @return CompletableFuture of IndexInput
38+
*/
39+
@ExperimentalApi
40+
default CompletableFuture<IndexInput> asyncLoadIndexInput(Executor executor) {
41+
return null;
42+
}
43+
3244
/**
3345
* @return length in bytes
3446
*/

server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.lucene.store.IOContext;
1414
import org.apache.lucene.store.IndexInput;
15+
import org.opensearch.common.annotation.ExperimentalApi;
1516
import org.opensearch.index.store.remote.filecache.CachedIndexInput;
1617
import org.opensearch.index.store.remote.filecache.FileCache;
1718
import org.opensearch.index.store.remote.filecache.FileCachedIndexInput;
@@ -28,6 +29,7 @@
2829
import java.security.PrivilegedExceptionAction;
2930
import java.util.concurrent.CompletableFuture;
3031
import java.util.concurrent.CompletionException;
32+
import java.util.concurrent.Executor;
3133
import java.util.concurrent.atomic.AtomicBoolean;
3234

3335
/**
@@ -107,6 +109,35 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio
107109
}
108110
}
109111

112+
@ExperimentalApi
113+
public CompletableFuture<IndexInput> fetchBlobAsync(BlobFetchRequest blobFetchRequest, Executor executor) throws IOException {
114+
final Path key = blobFetchRequest.getFilePath();
115+
logger.trace("Asynchronous fetchBlob called for {}", key.toString());
116+
try {
117+
CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> {
118+
if (cachedIndexInput == null || cachedIndexInput.isClosed()) {
119+
logger.trace("Transfer Manager - IndexInput closed or not in cache");
120+
// Doesn't exist or is closed, either way create a new one
121+
return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest);
122+
} else {
123+
logger.trace("Transfer Manager - Required blob Already in cache: {}", blobFetchRequest.toString());
124+
// already in the cache and ready to be used (open)
125+
return cachedIndexInput;
126+
}
127+
});
128+
// Cache entry was either retrieved from the cache or newly added, either
129+
// way the reference count has been incremented by one. We can only
130+
// decrement this reference _after_ creating the clone to be returned.
131+
try {
132+
return cacheEntry.asyncLoadIndexInput(executor);
133+
} finally {
134+
fileCache.decRef(key);
135+
}
136+
} catch (Exception cause) {
137+
throw (RuntimeException) cause;
138+
}
139+
}
140+
110141
private static FileCachedIndexInput createIndexInput(FileCache fileCache, StreamReader streamReader, BlobFetchRequest request) {
111142
try {
112143
// This local file cache is ref counted and may not strictly enforce configured capacity.
@@ -195,6 +226,32 @@ public IndexInput getIndexInput() throws IOException {
195226
}
196227
}
197228

229+
@ExperimentalApi
230+
public CompletableFuture<IndexInput> asyncLoadIndexInput(Executor executor) {
231+
if (isClosed.get()) {
232+
return CompletableFuture.failedFuture(new IllegalStateException("Already closed"));
233+
}
234+
if (isStarted.getAndSet(true) == false) {
235+
// Create new future and set it as the result
236+
CompletableFuture.supplyAsync(() -> {
237+
try {
238+
return createIndexInput(fileCache, streamReader, request);
239+
} catch (Exception e) {
240+
fileCache.remove(request.getFilePath());
241+
throw new CompletionException(e);
242+
}
243+
}, executor).handle((indexInput, throwable) -> {
244+
if (throwable != null) {
245+
result.completeExceptionally(throwable);
246+
} else {
247+
result.complete(indexInput);
248+
}
249+
return null;
250+
});
251+
}
252+
return result;
253+
}
254+
198255
@Override
199256
public long length() {
200257
return request.getBlobLength();

server/src/main/java/org/opensearch/plugins/IndexStorePlugin.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.opensearch.index.store.IndexStoreListener;
4444
import org.opensearch.index.store.remote.filecache.FileCache;
4545
import org.opensearch.indices.recovery.RecoveryState;
46+
import org.opensearch.threadpool.ThreadPool;
4647

4748
import java.io.IOException;
4849
import java.util.Collections;
@@ -104,7 +105,8 @@ Directory newDirectory(
104105
ShardPath shardPath,
105106
DirectoryFactory localDirectoryFactory,
106107
Directory remoteDirectory,
107-
FileCache fileCache
108+
FileCache fileCache,
109+
ThreadPool threadPool
108110
) throws IOException;
109111
}
110112

server/src/test/java/org/opensearch/index/store/CompositeDirectoryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void setup() throws IOException {
6666
localDirectory = FSDirectory.open(createTempDir());
6767
removeExtraFSFiles();
6868
fileCache = FileCacheFactory.createConcurrentLRUFileCache(FILE_CACHE_CAPACITY, new NoopCircuitBreaker(CircuitBreaker.REQUEST));
69-
compositeDirectory = new CompositeDirectory(localDirectory, remoteSegmentStoreDirectory, fileCache);
69+
compositeDirectory = new CompositeDirectory(localDirectory, remoteSegmentStoreDirectory, fileCache, threadPool);
7070
addFilesToDirectory(LOCAL_FILES);
7171
}
7272

server/src/test/java/org/opensearch/index/store/DefaultCompositeDirectoryFactoryTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ public void testNewDirectory() throws IOException {
5959
shardPath,
6060
localDirectoryFactory,
6161
remoteSegmentStoreDirectory,
62-
fileCache
62+
fileCache,
63+
threadPool
6364
);
6465
assertNotNull(directory);
6566
assert (directory instanceof CompositeDirectory);

0 commit comments

Comments
 (0)