Skip to content

Commit

Permalink
[Writable Warm] Composite Directory implementation and integrating it…
Browse files Browse the repository at this point in the history
… with FileCache (opensearch-project#12782)

* Composite Directory POC

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Refactor TransferManager interface to RemoteStoreFileTrackerAdapter

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Implement block level fetch for Composite Directory

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Removed CACHE state from FileTracker

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Fixes after latest pull

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Add new setting for warm, remove store type setting, FileTracker and RemoteStoreFileTrackerAdapter, CompositeDirectoryFactory and update Composite Directory implementation

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Modify TransferManager - replace BlobContainer with Functional Interface to fetch an InputStream instead

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Reuse OnDemandBlockSnapshotIndexInput instead of OnDemandBlockCompositeIndexInput

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Modify constructors to avoid breaking public api contract and code review fixes

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Add experimental annotations for newly created classes and review comment fixes

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Use ref count as a temporary measure to prevent file from eviction until uploaded to Remote

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Remove method level locks

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Handle tmp file deletion

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Nit fixes

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Handle delete and close in Composite Directory, log current state of FileCache and correct it's clear method and modify unit and integration tests as per review comments

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Refactor usages of WRITEABLE_REMOTE_INDEX_SETTING to TIERED_REMOTE_INDEX_SETTING

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Add tests for FileCachedIndexInput and review comment fixes

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Add additional IT for feature flag disabled

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Move setting for Partial Locality type behind Feature Flag, fix bug for ref count via cloneMap in FullFileCachedIndexInput and other review fixes

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Minor test and nit fixes

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Add javadocs for FullFileCachedIndexInput

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

* Minor precommit fixes

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>

---------

Signed-off-by: Shreyansh Ray <rayshrey@amazon.com>
  • Loading branch information
rayshrey committed Jun 20, 2024
1 parent 010cd3b commit 5e797fc
Show file tree
Hide file tree
Showing 35 changed files with 1,900 additions and 208 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add fingerprint ingest processor ([#13724](https://github.com/opensearch-project/OpenSearch/pull/13724))
- [Remote Store] Rate limiter for remote store low priority uploads ([#14374](https://github.com/opensearch-project/OpenSearch/pull/14374/))
- Apply the date histogram rewrite optimization to range aggregation ([#13865](https://github.com/opensearch-project/OpenSearch/pull/13865))
- [Writable Warm] Add composite directory implementation and integrate it with FileCache ([12782](https://github.com/opensearch-project/OpenSearch/pull/12782))

### Dependencies
- Update to Apache Lucene 9.11.0 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.CompositeDirectory;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.utils.FileTypeUtils;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.Node;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, supportsDedicatedMasters = false)
// Uncomment the below line to enable trace level logs for this test for better debugging
// @TestLogging(reason = "Getting trace logs from composite directory package", value = "org.opensearch.index.store:TRACE")
public class WritableWarmIT extends RemoteStoreBaseIntegTestCase {

protected static final String INDEX_NAME = "test-idx-1";
protected static final int NUM_DOCS_IN_BULK = 1000;

/*
Disabling MockFSIndexStore plugin as the MockFSDirectoryFactory wraps the FSDirectory over a OpenSearchMockDirectoryWrapper which extends FilterDirectory (whereas FSDirectory extends BaseDirectory)
As a result of this wrapping the local directory of Composite Directory does not satisfy the assertion that local directory must be of type FSDirectory
*/
@Override
protected boolean addMockIndexStorePlugin() {
return false;
}

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true);
return featureSettings.build();
}

public void testWritableWarmFeatureFlagDisabled() {
Settings clusterSettings = Settings.builder().put(super.nodeSettings(0)).put(FeatureFlags.TIERED_REMOTE_INDEX, false).build();
InternalTestCluster internalTestCluster = internalCluster();
internalTestCluster.startClusterManagerOnlyNode(clusterSettings);
internalTestCluster.startDataOnlyNode(clusterSettings);

Settings indexSettings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
.build();

try {
prepareCreate(INDEX_NAME).setSettings(indexSettings).get();
fail("Should have thrown Exception as setting should not be registered if Feature Flag is Disabled");
} catch (SettingsException ex) {
assertEquals(
"unknown setting ["
+ IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()
+ "] please check that any required plugins are installed, or check the "
+ "breaking changes documentation for removed settings",
ex.getMessage()
);
}
}

public void testWritableWarmBasic() throws Exception {
InternalTestCluster internalTestCluster = internalCluster();
internalTestCluster.startClusterManagerOnlyNode();
internalTestCluster.startDataOnlyNode();
Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
.build();
assertAcked(client().admin().indices().prepareCreate(INDEX_NAME).setSettings(settings).get());

// Verify from the cluster settings if the data locality is partial
GetIndexResponse getIndexResponse = client().admin()
.indices()
.getIndex(new GetIndexRequest().indices(INDEX_NAME).includeDefaults(true))
.get();
Settings indexSettings = getIndexResponse.settings().get(INDEX_NAME);
assertEquals(IndexModule.DataLocalityType.PARTIAL.name(), indexSettings.get(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey()));

// Ingesting some docs
indexBulk(INDEX_NAME, NUM_DOCS_IN_BULK);
flushAndRefresh(INDEX_NAME);

// ensuring cluster is green after performing force-merge
ensureGreen();

SearchResponse searchResponse = client().prepareSearch(INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
// Asserting that search returns same number of docs as ingested
assertHitCount(searchResponse, NUM_DOCS_IN_BULK);

// Ingesting docs again before force merge
indexBulk(INDEX_NAME, NUM_DOCS_IN_BULK);
flushAndRefresh(INDEX_NAME);

FileCache fileCache = internalTestCluster.getDataNodeInstance(Node.class).fileCache();
IndexShard shard = internalTestCluster.getDataNodeInstance(IndicesService.class)
.indexService(resolveIndex(INDEX_NAME))
.getShardOrNull(0);
Directory directory = (((FilterDirectory) (((FilterDirectory) (shard.store().directory())).getDelegate())).getDelegate());

// Force merging the index
Set<String> filesBeforeMerge = new HashSet<>(Arrays.asList(directory.listAll()));
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).get();
flushAndRefresh(INDEX_NAME);
Set<String> filesAfterMerge = new HashSet<>(Arrays.asList(directory.listAll()));

Set<String> filesFromPreviousGenStillPresent = filesBeforeMerge.stream()
.filter(filesAfterMerge::contains)
.filter(file -> !FileTypeUtils.isLockFile(file))
.filter(file -> !FileTypeUtils.isSegmentsFile(file))
.collect(Collectors.toUnmodifiableSet());

// Asserting that after merge all the files from previous gen are no more part of the directory
assertTrue(filesFromPreviousGenStillPresent.isEmpty());

// Asserting that files from previous gen are not present in File Cache as well
filesBeforeMerge.stream()
.filter(file -> !FileTypeUtils.isLockFile(file))
.filter(file -> !FileTypeUtils.isSegmentsFile(file))
.forEach(file -> assertNull(fileCache.get(((CompositeDirectory) directory).getFilePath(file))));

// Deleting the index (so that ref count drops to zero for all the files) and then pruning the cache to clear it to avoid any file
// leaks
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
fileCache.prune();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.IndexSortConfig;
Expand Down Expand Up @@ -260,7 +261,10 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
* is ready for production release, the feature flag can be removed, and the
* setting should be moved to {@link #BUILT_IN_INDEX_SETTINGS}.
*/
public static final Map<String, List<Setting>> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of();
public static final Map<String, List<Setting>> FEATURE_FLAGGED_INDEX_SETTINGS = Map.of(
FeatureFlags.TIERED_REMOTE_INDEX,
List.of(IndexModule.INDEX_STORE_LOCALITY_SETTING)
);

public static final IndexScopedSettings DEFAULT_SCOPED_SETTINGS = new IndexScopedSettings(Settings.EMPTY, BUILT_IN_INDEX_SETTINGS);

Expand Down
78 changes: 76 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@
import java.util.function.Function;
import java.util.function.Supplier;

import static org.apache.logging.log4j.util.Strings.toRootUpperCase;

/**
* IndexModule represents the central extension point for index level custom implementations like:
* <ul>
Expand Down Expand Up @@ -141,6 +143,17 @@ public final class IndexModule {
Property.NodeScope
);

/**
* Index setting which used to determine how the data is cached locally fully or partially
*/
public static final Setting<DataLocalityType> INDEX_STORE_LOCALITY_SETTING = new Setting<>(
"index.store.data_locality",
DataLocalityType.FULL.name(),
DataLocalityType::getValueOf,
Property.IndexScope,
Property.NodeScope
);

public static final Setting<String> INDEX_RECOVERY_TYPE_SETTING = new Setting<>(
"index.recovery.type",
"",
Expand Down Expand Up @@ -297,6 +310,7 @@ public Iterator<Setting<?>> settings() {
private final AtomicBoolean frozen = new AtomicBoolean(false);
private final BooleanSupplier allowExpensiveQueries;
private final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories;
private final FileCache fileCache;

/**
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
Expand All @@ -315,7 +329,8 @@ public IndexModule(
final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
final BooleanSupplier allowExpensiveQueries,
final IndexNameExpressionResolver expressionResolver,
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
final FileCache fileCache
) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
Expand All @@ -327,6 +342,30 @@ public IndexModule(
this.allowExpensiveQueries = allowExpensiveQueries;
this.expressionResolver = expressionResolver;
this.recoveryStateFactories = recoveryStateFactories;
this.fileCache = fileCache;
}

public IndexModule(
final IndexSettings indexSettings,
final AnalysisRegistry analysisRegistry,
final EngineFactory engineFactory,
final EngineConfigFactory engineConfigFactory,
final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories,
final BooleanSupplier allowExpensiveQueries,
final IndexNameExpressionResolver expressionResolver,
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories
) {
this(
indexSettings,
analysisRegistry,
engineFactory,
engineConfigFactory,
directoryFactories,
allowExpensiveQueries,
expressionResolver,
recoveryStateFactories,
null
);
}

/**
Expand Down Expand Up @@ -577,6 +616,40 @@ public boolean match(Settings settings) {
}
}

/**
* Indicates the locality of the data - whether it will be cached fully or partially
*/
public enum DataLocalityType {
/**
* Indicates that all the data will be cached locally
*/
FULL,
/**
* Indicates that only a subset of the data will be cached locally
*/
PARTIAL;

private static final Map<String, DataLocalityType> LOCALITY_TYPES;

static {
final Map<String, DataLocalityType> localityTypes = new HashMap<>(values().length);
for (final DataLocalityType dataLocalityType : values()) {
localityTypes.put(dataLocalityType.name(), dataLocalityType);
}
LOCALITY_TYPES = Collections.unmodifiableMap(localityTypes);
}

public static DataLocalityType getValueOf(final String localityType) {
Objects.requireNonNull(localityType, "No locality type given.");
final String localityTypeName = toRootUpperCase(localityType.trim());
final DataLocalityType type = LOCALITY_TYPES.get(localityTypeName);
if (type != null) {
return type;
}
throw new IllegalArgumentException("Unknown locality type constant [" + localityType + "].");
}
}

public static Type defaultStoreType(final boolean allowMmap) {
if (allowMmap && Constants.JRE_IS_64BIT && MMapDirectory.UNMAP_SUPPORTED) {
return Type.HYBRIDFS;
Expand Down Expand Up @@ -665,7 +738,8 @@ public IndexService newIndexService(
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
recoverySettings,
remoteStoreSettings
remoteStoreSettings,
fileCache
);
success = true;
return indexService;
Expand Down
Loading

0 comments on commit 5e797fc

Please sign in to comment.