Skip to content

Commit a599cd4

Browse files
authored
Concurrently load file cache from different directories (#18685)
Signed-off-by: Harsh Kothari <techarsh@amazon.com>
1 parent 94c5f21 commit a599cd4

File tree

4 files changed

+155
-6
lines changed

4 files changed

+155
-6
lines changed

server/src/main/java/org/opensearch/env/NodeEnvironment.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1291,6 +1291,12 @@ public static List<Path> collectFileCacheDataPath(NodePath fileCacheNodePath, Se
12911291
return indexSubPaths;
12921292
}
12931293

1294+
public static void processDirectoryFiles(Path path, List<Path> indexSubPaths) throws IOException {
1295+
try (Stream<Path> shardStream = Files.list(path)) {
1296+
shardStream.filter(NodeEnvironment::isShardPath).map(Path::toAbsolutePath).forEach(indexSubPaths::add);
1297+
}
1298+
}
1299+
12941300
@Deprecated(forRemoval = true)
12951301
public static List<Path> collectFileCacheDataPath(NodePath fileCacheNodePath) throws IOException {
12961302
return collectFileCacheDataPath(fileCacheNodePath, Settings.EMPTY);
@@ -1301,9 +1307,7 @@ private static void processDirectory(Path directoryPath, List<Path> indexSubPath
13011307
try (DirectoryStream<Path> indexStream = Files.newDirectoryStream(directoryPath)) {
13021308
for (Path indexPath : indexStream) {
13031309
if (Files.isDirectory(indexPath)) {
1304-
try (Stream<Path> shardStream = Files.list(indexPath)) {
1305-
shardStream.filter(NodeEnvironment::isShardPath).map(Path::toAbsolutePath).forEach(indexSubPaths::add);
1306-
}
1310+
processDirectoryFiles(indexPath, indexSubPaths);
13071311
}
13081312
}
13091313
}

server/src/main/java/org/opensearch/index/store/remote/filecache/FileCache.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.lucene.store.IndexInput;
14+
import org.opensearch.common.SetOnce;
1415
import org.opensearch.common.annotation.PublicApi;
1516
import org.opensearch.core.common.breaker.CircuitBreaker;
1617
import org.opensearch.core.common.breaker.CircuitBreakingException;
@@ -23,13 +24,17 @@
2324

2425
import java.io.IOException;
2526
import java.io.UncheckedIOException;
27+
import java.nio.file.DirectoryStream;
2628
import java.nio.file.Files;
2729
import java.nio.file.Path;
30+
import java.util.ArrayList;
2831
import java.util.List;
32+
import java.util.concurrent.RecursiveAction;
2933
import java.util.function.BiFunction;
3034
import java.util.function.Predicate;
3135
import java.util.stream.Stream;
3236

37+
import static org.opensearch.env.NodeEnvironment.processDirectoryFiles;
3338
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION;
3439
import static org.opensearch.index.store.remote.utils.FileTypeUtils.INDICES_FOLDER_IDENTIFIER;
3540

@@ -335,4 +340,65 @@ public boolean isClosed() {
335340
@Override
336341
public void close() throws Exception {}
337342
}
343+
344+
/**
345+
* A recursive task for loading file cache entries from disk during node startup.
346+
* Uses fork-join parallelism to efficiently scan directories and restore cached files.
347+
*/
348+
public static class LoadTask extends RecursiveAction {
349+
private final Path path;
350+
private final FileCache fc;
351+
private final boolean processedDirectory;
352+
private final SetOnce<UncheckedIOException> exception;
353+
354+
public LoadTask(Path path, FileCache fc, SetOnce<UncheckedIOException> exception) {
355+
this.path = path;
356+
this.fc = fc;
357+
this.exception = exception;
358+
this.processedDirectory = false;
359+
}
360+
361+
public LoadTask(Path path, FileCache fc, SetOnce<UncheckedIOException> exception, boolean processedDirectory) {
362+
this.path = path;
363+
this.fc = fc;
364+
this.exception = exception;
365+
this.processedDirectory = processedDirectory;
366+
}
367+
368+
@Override
369+
public void compute() {
370+
List<LoadTask> subTasks = new ArrayList<>();
371+
try {
372+
if (processedDirectory) {
373+
this.fc.restoreFromDirectory(List.of(path));
374+
} else {
375+
if (Files.isDirectory(path)) {
376+
try (DirectoryStream<Path> indexStream = Files.newDirectoryStream(path)) {
377+
for (Path indexPath : indexStream) {
378+
if (Files.isDirectory(indexPath)) {
379+
List<Path> indexSubPaths = new ArrayList<>();
380+
processDirectoryFiles(indexPath, indexSubPaths);
381+
for (Path indexSubPath : indexSubPaths) {
382+
subTasks.add(new LoadTask(indexSubPath, fc, exception, true));
383+
}
384+
}
385+
}
386+
}
387+
}
388+
}
389+
} catch (IOException | UncheckedIOException e) {
390+
try {
391+
if (e instanceof UncheckedIOException) {
392+
exception.set((UncheckedIOException) e);
393+
} else {
394+
exception.set(new UncheckedIOException("Unable to process directories.", (IOException) e));
395+
}
396+
} catch (SetOnce.AlreadySetException ignore) {
397+
398+
}
399+
return;
400+
}
401+
invokeAll(subTasks);
402+
}
403+
}
338404
}

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@
299299
import java.io.BufferedWriter;
300300
import java.io.Closeable;
301301
import java.io.IOException;
302+
import java.io.UncheckedIOException;
302303
import java.net.InetAddress;
303304
import java.net.InetSocketAddress;
304305
import java.nio.charset.StandardCharsets;
@@ -319,6 +320,9 @@
319320
import java.util.Set;
320321
import java.util.concurrent.CountDownLatch;
321322
import java.util.concurrent.Executor;
323+
import java.util.concurrent.ForkJoinPool;
324+
import java.util.concurrent.ForkJoinTask;
325+
import java.util.concurrent.ForkJoinWorkerThread;
322326
import java.util.concurrent.TimeUnit;
323327
import java.util.concurrent.atomic.AtomicReference;
324328
import java.util.function.Function;
@@ -330,7 +334,6 @@
330334
import static org.opensearch.common.util.FeatureFlags.ARROW_STREAMS_SETTING;
331335
import static org.opensearch.common.util.FeatureFlags.BACKGROUND_TASK_EXECUTION_EXPERIMENTAL;
332336
import static org.opensearch.common.util.FeatureFlags.TELEMETRY;
333-
import static org.opensearch.env.NodeEnvironment.collectFileCacheDataPath;
334337
import static org.opensearch.index.ShardIndexingPressureSettings.SHARD_INDEXING_PRESSURE_ENABLED_ATTRIBUTE_KEY;
335338
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED;
336339
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteClusterStateConfigured;
@@ -2267,8 +2270,39 @@ private void initializeFileCache(Settings settings, CircuitBreaker circuitBreake
22672270

22682271
this.fileCache = FileCacheFactory.createConcurrentLRUFileCache(capacity, circuitBreaker);
22692272
fileCacheNodePath.fileCacheReservedSize = new ByteSizeValue(this.fileCache.capacity(), ByteSizeUnit.BYTES);
2270-
List<Path> fileCacheDataPaths = collectFileCacheDataPath(fileCacheNodePath, settings);
2271-
this.fileCache.restoreFromDirectory(fileCacheDataPaths);
2273+
ForkJoinPool loadFileCacheThreadpool = new ForkJoinPool(
2274+
Runtime.getRuntime().availableProcessors(),
2275+
Node.CustomForkJoinWorkerThread::new,
2276+
null,
2277+
false
2278+
);
2279+
SetOnce<UncheckedIOException> exception = new SetOnce<>();
2280+
ForkJoinTask<Void> fileCacheFilesLoadTask = loadFileCacheThreadpool.submit(
2281+
new FileCache.LoadTask(fileCacheNodePath.fileCachePath, this.fileCache, exception)
2282+
);
2283+
if (DiscoveryNode.isDedicatedWarmNode(settings)) {
2284+
ForkJoinTask<Void> indicesFilesLoadTask = loadFileCacheThreadpool.submit(
2285+
new FileCache.LoadTask(fileCacheNodePath.indicesPath, this.fileCache, exception)
2286+
);
2287+
indicesFilesLoadTask.join();
2288+
}
2289+
fileCacheFilesLoadTask.join();
2290+
loadFileCacheThreadpool.shutdown();
2291+
if (exception.get() != null) {
2292+
logger.error("File cache initialization failed.", exception.get());
2293+
throw new OpenSearchException(exception.get());
2294+
}
2295+
}
2296+
2297+
/**
2298+
* Custom ForkJoinWorkerThread that preserves the context ClassLoader of the creating thread
2299+
* to ensure proper resource loading in worker threads.
2300+
*/
2301+
public static class CustomForkJoinWorkerThread extends ForkJoinWorkerThread {
2302+
public CustomForkJoinWorkerThread(ForkJoinPool pool) {
2303+
super(pool);
2304+
setContextClassLoader(Thread.currentThread().getContextClassLoader());
2305+
}
22722306
}
22732307

22742308
private static long calculateFileCacheSize(String capacityRaw, long totalSpace) {

server/src/test/java/org/opensearch/index/store/remote/filecache/FileCacheTests.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.lucene.store.FSDirectory;
1414
import org.apache.lucene.store.IOContext;
1515
import org.apache.lucene.store.IndexInput;
16+
import org.opensearch.common.SetOnce;
1617
import org.opensearch.common.SuppressForbidden;
1718
import org.opensearch.common.breaker.TestCircuitBreaker;
1819
import org.opensearch.core.common.breaker.CircuitBreaker;
@@ -22,14 +23,18 @@
2223
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
2324
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
2425
import org.opensearch.index.store.remote.utils.FileTypeUtils;
26+
import org.opensearch.node.Node;
2527
import org.opensearch.test.OpenSearchTestCase;
2628
import org.junit.Before;
2729

2830
import java.io.IOException;
31+
import java.io.UncheckedIOException;
2932
import java.nio.file.Files;
3033
import java.nio.file.Path;
3134
import java.nio.file.attribute.PosixFilePermissions;
3235
import java.util.List;
36+
import java.util.concurrent.ForkJoinPool;
37+
import java.util.concurrent.ForkJoinTask;
3338

3439
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
3540
public class FileCacheTests extends OpenSearchTestCase {
@@ -507,6 +512,46 @@ private void putAndDecRef(FileCache cache, int path, long indexInputSize) {
507512
cache.decRef(key);
508513
}
509514

515+
public void testConcurrentRestore() throws IOException {
516+
String index = "test-index-";
517+
String warmIndex = "test-warm-index-";
518+
519+
for (int i = 1; i <= 100; i++) {
520+
for (int j = 0; j < 10; j++) {
521+
createFile(index + i, String.valueOf(j), "_" + j + "_block_" + j);
522+
createWarmIndexFile(warmIndex + i, String.valueOf(j), "_" + j + "_block_" + j);
523+
// Remove known extra files - "extra0" file is added by the ExtrasFS, which is part of Lucene's test framework
524+
Files.deleteIfExists(
525+
path.resolve(NodeEnvironment.CACHE_FOLDER)
526+
.resolve(index + i)
527+
.resolve(String.valueOf(j))
528+
.resolve(RemoteSnapshotDirectoryFactory.LOCAL_STORE_LOCATION)
529+
.resolve("extra0")
530+
);
531+
Files.deleteIfExists(
532+
path.resolve(NodeEnvironment.INDICES_FOLDER)
533+
.resolve(warmIndex + i)
534+
.resolve(String.valueOf(j))
535+
.resolve(FileTypeUtils.INDICES_FOLDER_IDENTIFIER)
536+
.resolve("extra0")
537+
);
538+
}
539+
}
540+
FileCache fileCache = createFileCache(MEGA_BYTES);
541+
assertEquals(0, fileCache.size());
542+
Path cachePath = path.resolve(NodeEnvironment.CACHE_FOLDER);
543+
Path indicesPath = path.resolve(NodeEnvironment.INDICES_FOLDER);
544+
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors(), Node.CustomForkJoinWorkerThread::new, null, false);
545+
SetOnce<UncheckedIOException> exception = new SetOnce<>();
546+
ForkJoinTask<Void> task1 = pool.submit(new FileCache.LoadTask(indicesPath, fileCache, exception));
547+
ForkJoinTask<Void> task2 = pool.submit(new FileCache.LoadTask(cachePath, fileCache, exception));
548+
task2.join();
549+
task1.join();
550+
pool.shutdown();
551+
logger.info(fileCache);
552+
assertEquals(2000, fileCache.size());
553+
}
554+
510555
public static class StubCachedIndexInput implements CachedIndexInput {
511556

512557
private final long length;

0 commit comments

Comments
 (0)