diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 3affbbd820774..e9810220de1b6 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1 @@ -* @reta @anasalkouz @andrross @reta @Bukhtawar @CEHENKLE @dblock @setiah @kartg @kotwanikunal @mch2 @nknize @owaiskazi19 @adnapibar @Rishikesh1159 @ryanbogan @saratvemulapalli @shwetathareja @dreamer-89 @tlfeng @VachaShah @xuezhou25 +* @reta @anasalkouz @andrross @reta @Bukhtawar @CEHENKLE @dblock @gbbafna @setiah @kartg @kotwanikunal @mch2 @nknize @owaiskazi19 @adnapibar @Rishikesh1159 @ryanbogan @saratvemulapalli @shwetathareja @dreamer-89 @tlfeng @VachaShah @xuezhou25 diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ec79b5803f9b..36d6f5efbcd6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for ppc64le architecture ([#5459](https://github.com/opensearch-project/OpenSearch/pull/5459)) - Support versioning for Weighted routing apis([#5255](https://github.com/opensearch-project/OpenSearch/pull/5255)) - Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668)) +- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959)) ### Dependencies - Bumps `log4j-core` from 2.18.0 to 2.19.0 @@ -107,6 +108,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Changed - Add support for refresh level durability ([#5253](https://github.com/opensearch-project/OpenSearch/pull/5253)) +- Integrate remote segment store in the failover flow ([#5579](https://github.com/opensearch-project/OpenSearch/pull/5579)) ### Deprecated - Refactor fuzziness interface on query builders ([#5433](https://github.com/opensearch-project/OpenSearch/pull/5433)) diff --git a/distribution/src/config/opensearch.yml b/distribution/src/config/opensearch.yml index 2188fbe600cbf..3d902026f37f9 100644 --- a/distribution/src/config/opensearch.yml +++ b/distribution/src/config/opensearch.yml @@ -86,3 +86,26 @@ ${path.logs} # Require explicit names when deleting indices: # #action.destructive_requires_name: true +# +# ---------------------------------- Experimental Features ----------------------------------- +# +# Gates the visibility of the index setting that allows changing of replication type. +# Once the feature is ready for production release, this feature flag can be removed. +# +#opensearch.experimental.feature.replication_type.enabled: false +# +# +# Gates the visibility of the index setting that allows persisting data to remote store along with local disk. +# Once the feature is ready for production release, this feature flag can be removed. +# +#opensearch.experimental.feature.remote_store.enabled: false +# +# +# Gates the functionality of a new parameter to the snapshot restore API +# that allows for creation of a new index type that searches a snapshot +# directly in a remote repository without restoring all index data to disk +# ahead of time. +# +#opensearch.experimental.feature.searchable_snapshot.enabled: false +# +# diff --git a/qa/remote-clusters/src/test/java/org/opensearch/cluster/remote/test/RemoteClustersIT.java b/qa/remote-clusters/src/test/java/org/opensearch/cluster/remote/test/RemoteClustersIT.java index ef2b8916819fe..dbea8db1a12fa 100644 --- a/qa/remote-clusters/src/test/java/org/opensearch/cluster/remote/test/RemoteClustersIT.java +++ b/qa/remote-clusters/src/test/java/org/opensearch/cluster/remote/test/RemoteClustersIT.java @@ -31,6 +31,7 @@ package org.opensearch.cluster.remote.test; +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.index.IndexRequest; @@ -122,6 +123,9 @@ public void testHAProxyModeConnectionWorks() throws IOException { RemoteConnectionInfo rci = cluster1Client().cluster().remoteInfo(new RemoteInfoRequest(), RequestOptions.DEFAULT).getInfos().get(0); logger.info("Connection info: {}", rci); + if (!rci.isConnected()) { + logger.info("Cluster health: {}", cluster1Client().cluster().health(new ClusterHealthRequest(), RequestOptions.DEFAULT)); + } assertTrue(rci.isConnected()); assertEquals(2L, cluster1Client().search( diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 5ab1fc79fa68a..54bd9d7d0b444 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -9,7 +9,6 @@ package org.opensearch.indices.replication; import com.carrotsearch.randomizedtesting.RandomizedTest; -import org.junit.BeforeClass; import org.opensearch.OpenSearchCorruptionException; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.indices.segments.IndexShardSegments; @@ -70,11 +69,6 @@ public class SegmentReplicationIT extends OpenSearchIntegTestCase { private static final int SHARD_COUNT = 1; private static final int REPLICA_COUNT = 1; - @BeforeClass - public static void assumeFeatureFlag() { - assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE))); - } - @Override protected Collection> nodePlugins() { return Arrays.asList(MockTransportService.TestPlugin.class); @@ -96,11 +90,16 @@ protected boolean addMockInternalEngine() { return false; } + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build(); + } + public void testPrimaryStopped_ReplicaPromoted() throws Exception { - final String primary = internalCluster().startNode(); + final String primary = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startNode(featureFlagSettings()); ensureGreen(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); @@ -127,7 +126,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); // start another node, index another doc and replicate. - String nodeC = internalCluster().startNode(); + String nodeC = internalCluster().startNode(featureFlagSettings()); ensureGreen(INDEX_NAME); client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get(); refresh(INDEX_NAME); @@ -138,10 +137,10 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception { } public void testRestartPrimary() throws Exception { - final String primary = internalCluster().startNode(); + final String primary = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startNode(featureFlagSettings()); ensureGreen(INDEX_NAME); assertEquals(getNodeContainingPrimaryShard().getName(), primary); @@ -167,10 +166,10 @@ public void testRestartPrimary() throws Exception { public void testCancelPrimaryAllocation() throws Exception { // this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica. - final String primary = internalCluster().startNode(); + final String primary = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureYellowAndNoInitializingShards(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startNode(featureFlagSettings()); ensureGreen(INDEX_NAME); final int initialDocCount = 1; @@ -201,10 +200,13 @@ public void testCancelPrimaryAllocation() throws Exception { /** * This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery. + * + * TODO: Ignoring this test as its flaky and needs separate fix */ + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testAddNewReplicaFailure() throws Exception { logger.info("--> starting [Primary Node] ..."); - final String primaryNode = internalCluster().startNode(); + final String primaryNode = internalCluster().startNode(featureFlagSettings()); logger.info("--> creating test index ..."); prepareCreate( @@ -227,7 +229,7 @@ public void testAddNewReplicaFailure() throws Exception { assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L)); logger.info("--> start empty node to add replica shard"); - final String replicaNode = internalCluster().startNode(); + final String replicaNode = internalCluster().startNode(featureFlagSettings()); // Mock transport service to add behaviour of throwing corruption exception during segment replication process. MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance( @@ -268,9 +270,10 @@ public void testAddNewReplicaFailure() throws Exception { assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME))); } + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { - final String nodeA = internalCluster().startNode(); - final String nodeB = internalCluster().startNode(); + final String nodeA = internalCluster().startNode(featureFlagSettings()); + final String nodeB = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); @@ -310,8 +313,8 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { } public void testIndexReopenClose() throws Exception { - final String primary = internalCluster().startNode(); - final String replica = internalCluster().startNode(); + final String primary = internalCluster().startNode(featureFlagSettings()); + final String replica = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); @@ -355,8 +358,8 @@ public void testMultipleShards() throws Exception { .put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); - final String nodeA = internalCluster().startNode(); - final String nodeB = internalCluster().startNode(); + final String nodeA = internalCluster().startNode(featureFlagSettings()); + final String nodeB = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME, indexSettings); ensureGreen(INDEX_NAME); @@ -396,8 +399,8 @@ public void testMultipleShards() throws Exception { } public void testReplicationAfterForceMerge() throws Exception { - final String nodeA = internalCluster().startNode(); - final String nodeB = internalCluster().startNode(); + final String nodeA = internalCluster().startNode(featureFlagSettings()); + final String nodeB = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); @@ -441,11 +444,11 @@ public void testReplicationAfterForceMerge() throws Exception { } public void testCancellation() throws Exception { - final String primaryNode = internalCluster().startNode(); + final String primaryNode = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()); ensureYellow(INDEX_NAME); - final String replicaNode = internalCluster().startNode(); + final String replicaNode = internalCluster().startNode(featureFlagSettings()); final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster().getInstance( SegmentReplicationSourceService.class, @@ -500,7 +503,7 @@ public void testCancellation() throws Exception { } public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { - final String primaryNode = internalCluster().startNode(); + final String primaryNode = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()); ensureGreen(INDEX_NAME); @@ -523,7 +526,7 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { .prepareUpdateSettings(INDEX_NAME) .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) ); - final String replicaNode = internalCluster().startNode(); + final String replicaNode = internalCluster().startNode(featureFlagSettings()); ensureGreen(INDEX_NAME); assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); @@ -538,8 +541,8 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { } public void testDeleteOperations() throws Exception { - final String nodeA = internalCluster().startNode(); - final String nodeB = internalCluster().startNode(); + final String nodeA = internalCluster().startNode(featureFlagSettings()); + final String nodeB = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); @@ -601,10 +604,10 @@ public void testDeleteOperations() throws Exception { } public void testUpdateOperations() throws Exception { - final String primary = internalCluster().startNode(); + final String primary = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureYellow(INDEX_NAME); - final String replica = internalCluster().startNode(); + final String replica = internalCluster().startNode(featureFlagSettings()); final int initialDocCount = scaledRandomIntBetween(0, 200); try ( @@ -705,10 +708,10 @@ public void testDropPrimaryDuringReplication() throws Exception { .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); - final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); - final String primaryNode = internalCluster().startDataOnlyNode(Settings.EMPTY); + final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(featureFlagSettings()); + final String primaryNode = internalCluster().startDataOnlyNode(featureFlagSettings()); createIndex(INDEX_NAME, settings); - internalCluster().startDataOnlyNodes(6); + internalCluster().startDataOnlyNodes(6, featureFlagSettings()); ensureGreen(INDEX_NAME); int initialDocCount = scaledRandomIntBetween(100, 200); @@ -731,7 +734,7 @@ public void testDropPrimaryDuringReplication() throws Exception { ensureYellow(INDEX_NAME); // start another replica. - internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNode(featureFlagSettings()); ensureGreen(INDEX_NAME); // index another doc and refresh - without this the new replica won't catch up. diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java index d92f2af3f4bfd..a741797012602 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/SegmentReplicationSnapshotIT.java @@ -9,7 +9,6 @@ package org.opensearch.snapshots; import com.carrotsearch.randomizedtesting.RandomizedTest; -import org.junit.BeforeClass; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder; import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; @@ -46,9 +45,9 @@ public class SegmentReplicationSnapshotIT extends AbstractSnapshotIntegTestCase private static final String REPOSITORY_NAME = "test-segrep-repo"; private static final String SNAPSHOT_NAME = "test-segrep-snapshot"; - @BeforeClass - public static void assumeFeatureFlag() { - assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE))); + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build(); } public Settings segRepEnableIndexSettings() { @@ -100,11 +99,11 @@ public void ingestData(int docCount, String indexName) throws Exception { // Start cluster with provided settings and return the node names as list public List startClusterWithSettings(Settings indexSettings, int replicaCount) throws Exception { // Start primary - final String primaryNode = internalCluster().startNode(); + final String primaryNode = internalCluster().startNode(featureFlagSettings()); List nodeNames = new ArrayList<>(); nodeNames.add(primaryNode); for (int i = 0; i < replicaCount; i++) { - nodeNames.add(internalCluster().startNode()); + nodeNames.add(internalCluster().startNode(featureFlagSettings())); } createIndex(INDEX_NAME, indexSettings); ensureGreen(INDEX_NAME); @@ -166,6 +165,7 @@ public void testRestoreOnSegRep() throws Exception { assertHitCount(resp, DOC_COUNT); } + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion() throws Exception { startClusterWithSettings(segRepEnableIndexSettings(), 1); createSnapshot(); @@ -266,7 +266,7 @@ public void testRestoreOnReplicaNode() throws Exception { // Assertions assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED)); - internalCluster().startNode(); + internalCluster().startNode(featureFlagSettings()); ensureGreen(RESTORED_INDEX_NAME); GetSettingsResponse settingsResponse = client().admin() .indices() diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java new file mode 100644 index 0000000000000..8fb6cd115f24b --- /dev/null +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.settings; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import org.opensearch.common.settings.Setting.Property; +import org.opensearch.common.util.FeatureFlags; + +/** + * Encapsulates all valid feature flag level settings. + * + * @opensearch.internal + */ +public class FeatureFlagSettings extends AbstractScopedSettings { + + protected FeatureFlagSettings( + Settings settings, + Set> settingsSet, + Set> settingUpgraders, + Property scope + ) { + super(settings, settingsSet, settingUpgraders, scope); + } + + public static final Set> BUILT_IN_FEATURE_FLAGS = Collections.unmodifiableSet( + new HashSet<>( + Arrays.asList( + FeatureFlags.REPLICATION_TYPE_SETTING, + FeatureFlags.REMOTE_STORE_SETTING, + FeatureFlags.SEARCHABLE_SNAPSHOT_SETTING, + FeatureFlags.EXTENSIONS_SETTING + ) + ) + ); +} diff --git a/server/src/main/java/org/opensearch/common/settings/SettingsModule.java b/server/src/main/java/org/opensearch/common/settings/SettingsModule.java index df16c5a499ebe..8000cde6f81c1 100644 --- a/server/src/main/java/org/opensearch/common/settings/SettingsModule.java +++ b/server/src/main/java/org/opensearch/common/settings/SettingsModule.java @@ -87,6 +87,9 @@ public SettingsModule( for (Setting setting : IndexScopedSettings.BUILT_IN_INDEX_SETTINGS) { registerSetting(setting); } + for (Setting setting : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) { + registerSetting(setting); + } for (Map.Entry> featureFlaggedSetting : IndexScopedSettings.FEATURE_FLAGGED_INDEX_SETTINGS.entrySet()) { if (FeatureFlags.isEnabled(featureFlaggedSetting.getKey())) { diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index 31dd621f678ad..704bd78d4b556 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -8,6 +8,10 @@ package org.opensearch.common.util; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Setting.Property; +import org.opensearch.common.settings.Settings; + /** * Utility class to manage feature flags. Feature flags are system properties that must be set on the JVM. * These are used to gate the visibility/availability of incomplete features. Fore more information, see @@ -43,12 +47,39 @@ public class FeatureFlags { */ public static final String EXTENSIONS = "opensearch.experimental.feature.extensions.enabled"; + /** + * Should store the settings from opensearch.yml. + */ + private static Settings settings; + + /** + * This method is responsible to map settings from opensearch.yml to local stored + * settings value. That is used for the existing isEnabled method. + * + * @param openSearchSettings The settings stored in opensearch.yml. + */ + public static void initializeFeatureFlags(Settings openSearchSettings) { + settings = openSearchSettings; + } + /** * Used to test feature flags whose values are expected to be booleans. * This method returns true if the value is "true" (case-insensitive), * and false otherwise. */ public static boolean isEnabled(String featureFlagName) { - return "true".equalsIgnoreCase(System.getProperty(featureFlagName)); + if ("true".equalsIgnoreCase(System.getProperty(featureFlagName))) { + // TODO: Remove the if condition once FeatureFlags are only supported via opensearch.yml + return true; + } + return settings != null && settings.getAsBoolean(featureFlagName, false); } + + public static final Setting REPLICATION_TYPE_SETTING = Setting.boolSetting(REPLICATION_TYPE, false, Property.NodeScope); + + public static final Setting REMOTE_STORE_SETTING = Setting.boolSetting(REMOTE_STORE, false, Property.NodeScope); + + public static final Setting SEARCHABLE_SNAPSHOT_SETTING = Setting.boolSetting(SEARCHABLE_SNAPSHOT, false, Property.NodeScope); + + public static final Setting EXTENSIONS_SETTING = Setting.boolSetting(EXTENSIONS, false, Property.NodeScope); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 3a3c4b19a02f6..ed27ad7edd7d2 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -35,6 +35,7 @@ import com.carrotsearch.hppc.ObjectLongMap; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.FilterDirectoryReader; @@ -48,6 +49,12 @@ import org.apache.lucene.search.Sort; import org.apache.lucene.search.UsageTrackingQueryCachingPolicy; import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.BufferedChecksumIndexInput; +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexInput; import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.ThreadInterruptedException; import org.opensearch.Assertions; @@ -88,6 +95,7 @@ import org.opensearch.common.util.concurrent.AsyncIOProcessor; import org.opensearch.common.util.concurrent.RunOnce; import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.common.util.set.Sets; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.core.internal.io.IOUtils; import org.opensearch.gateway.WriteStateException; @@ -142,6 +150,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.opensearch.index.similarity.SimilarityService; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.store.Store; import org.opensearch.index.store.Store.MetadataSnapshot; import org.opensearch.index.store.StoreFileMetadata; @@ -171,10 +180,12 @@ import org.opensearch.threadpool.ThreadPool; import java.io.Closeable; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; import java.nio.channels.ClosedByInterruptException; import java.nio.charset.StandardCharsets; +import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -202,8 +213,10 @@ import java.util.stream.StreamSupport; import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; +import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX; /** * An OpenSearch index shard @@ -2032,6 +2045,9 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t synchronized (engineMutex) { assert currentEngineReference.get() == null : "engine is running"; verifyNotClosed(); + if (indexSettings.isRemoteStoreEnabled()) { + syncSegmentsFromRemoteSegmentStore(false); + } // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). final Engine newEngine = engineFactory.newReadWriteEngine(config); onNewEngine(newEngine); @@ -2303,10 +2319,10 @@ public void recoverFromStore(ActionListener listener) { storeRecovery.recoverFromStore(this, listener); } - public void restoreFromRemoteStore(ActionListener listener) { + public void restoreFromRemoteStore(Repository repository, ActionListener listener) { assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - storeRecovery.recoverFromRemoteStore(this, listener); + storeRecovery.recoverFromRemoteStore(this, repository, listener); } public void restoreFromRepository(Repository repository, ActionListener listener) { @@ -3082,7 +3098,14 @@ public void startRecovery( executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore); break; case REMOTE_STORE: - executeRecovery("from remote store", recoveryState, recoveryListener, this::restoreFromRemoteStore); + final Repository remoteTranslogRepo; + final String remoteTranslogRepoName = indexSettings.getRemoteStoreTranslogRepository(); + if (remoteTranslogRepoName != null) { + remoteTranslogRepo = repositoriesService.repository(remoteTranslogRepoName); + } else { + remoteTranslogRepo = null; + } + executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(remoteTranslogRepo, l)); break; case PEER: try { @@ -4118,6 +4141,9 @@ public void close() throws IOException { } }; IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); + if (indexSettings.isRemoteStoreEnabled()) { + syncSegmentsFromRemoteSegmentStore(false); + } newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); onNewEngine(newEngineReference.get()); } @@ -4145,6 +4171,90 @@ public void close() throws IOException { onSettingsChanged(); } + /** + * Downloads segments from remote segment store. + * @param overrideLocal flag to override local segment files with those in remote store + * @throws IOException if exception occurs while reading segments from remote store + */ + public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException { + assert indexSettings.isRemoteStoreEnabled(); + logger.info("Downloading segments from remote segment store"); + assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory"; + FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory(); + assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory + : "Store.directory is not enclosing an instance of FilterDirectory"; + FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); + final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate(); + // We need to call RemoteSegmentStoreDirectory.init() in order to get latest metadata of the files that + // are uploaded to the remote segment store. + assert remoteDirectory instanceof RemoteSegmentStoreDirectory : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory"; + ((RemoteSegmentStoreDirectory) remoteDirectory).init(); + Map uploadedSegments = ((RemoteSegmentStoreDirectory) remoteDirectory) + .getSegmentsUploadedToRemoteStore(); + final Directory storeDirectory = store.directory(); + store.incRef(); + remoteStore.incRef(); + List downloadedSegments = new ArrayList<>(); + List skippedSegments = new ArrayList<>(); + try { + String segmentInfosSnapshotFilename = null; + Set localSegmentFiles = Sets.newHashSet(storeDirectory.listAll()); + for (String file : uploadedSegments.keySet()) { + long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum()); + if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) { + if (localSegmentFiles.contains(file)) { + storeDirectory.deleteFile(file); + } + storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT); + downloadedSegments.add(file); + if (file.startsWith(SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX)) { + assert segmentInfosSnapshotFilename == null : "There should be only one SegmentInfosSnapshot file"; + segmentInfosSnapshotFilename = file; + } + } else { + skippedSegments.add(file); + } + } + if (segmentInfosSnapshotFilename != null) { + try ( + ChecksumIndexInput indexInput = new BufferedChecksumIndexInput( + storeDirectory.openInput(segmentInfosSnapshotFilename, IOContext.DEFAULT) + ) + ) { + SegmentInfos infosSnapshot = SegmentInfos.readCommit( + store.directory(), + indexInput, + Long.parseLong(segmentInfosSnapshotFilename.split("__")[1]) + ); + long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); + store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); + } + } + } catch (IOException e) { + throw new IndexShardRecoveryException(shardId, "Exception while copying segment files from remote segment store", e); + } finally { + logger.info("Downloaded segments: {}", downloadedSegments); + logger.info("Skipped download for segments: {}", skippedSegments); + store.decRef(); + remoteStore.decRef(); + } + } + + private boolean localDirectoryContains(Directory localDirectory, String file, long checksum) { + try (IndexInput indexInput = localDirectory.openInput(file, IOContext.DEFAULT)) { + if (checksum == CodecUtil.retrieveChecksum(indexInput)) { + return true; + } else { + logger.warn("Checksum mismatch between local and remote segment file: {}, will override local file", file); + } + } catch (NoSuchFileException | FileNotFoundException e) { + logger.debug("File {} does not exist in local FS, downloading from remote store", file); + } catch (IOException e) { + logger.warn("Exception while reading checksum of file: {}, this can happen if file is corrupted", file); + } + return false; + } + /** * Returns the maximum sequence number of either update or delete operations have been processed in this shard * or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index d8e28d630c2f4..47d10513f4aaa 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -38,8 +38,6 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.search.Sort; -import org.apache.lucene.store.BufferedChecksumIndexInput; -import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; @@ -62,11 +60,16 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.snapshots.IndexShardRestoreFailedException; import org.opensearch.index.store.Store; +import org.opensearch.index.translog.RemoteFsTranslog; import org.opensearch.index.translog.Translog; +import org.opensearch.index.translog.transfer.FileTransferTracker; +import org.opensearch.index.translog.transfer.TranslogTransferManager; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Arrays; @@ -78,8 +81,6 @@ import java.util.stream.Collectors; import static org.opensearch.common.unit.TimeValue.timeValueMillis; -import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; -import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX; /** * This package private utility class encapsulates the logic to recover an index shard from either an existing index on @@ -118,13 +119,13 @@ void recoverFromStore(final IndexShard indexShard, ActionListener liste } } - void recoverFromRemoteStore(final IndexShard indexShard, ActionListener listener) { + void recoverFromRemoteStore(final IndexShard indexShard, Repository repository, ActionListener listener) { if (canRecover(indexShard)) { RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); assert recoveryType == RecoverySource.Type.REMOTE_STORE : "expected remote store recovery type but was: " + recoveryType; ActionListener.completeWith(recoveryListener(indexShard, listener), () -> { logger.debug("starting recovery from remote store ..."); - recoverFromRemoteStore(indexShard); + recoverFromRemoteStore(indexShard, repository); return true; }); } else { @@ -439,7 +440,7 @@ private ActionListener recoveryListener(IndexShard indexShard, ActionLi }); } - private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardRecoveryException { + private void recoverFromRemoteStore(IndexShard indexShard, Repository repository) throws IndexShardRecoveryException { final Store remoteStore = indexShard.remoteStore(); if (remoteStore == null) { throw new IndexShardRecoveryException( @@ -450,57 +451,26 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco } indexShard.preRecovery(); indexShard.prepareForIndexRecovery(); - assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory"; - FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory(); - assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory - : "Store.directory is not enclosing an instance of FilterDirectory"; - FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); - final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate(); final Store store = indexShard.store(); - final Directory storeDirectory = store.directory(); store.incRef(); remoteStore.incRef(); try { - // Cleaning up local directory before copying file from remote directory. - // This is done to make sure we start with clean slate. - // ToDo: Check if we can copy only missing files - for (String file : storeDirectory.listAll()) { - storeDirectory.deleteFile(file); - } - String segmentInfosSnapshotFilename = null; - for (String file : remoteDirectory.listAll()) { - storeDirectory.copyFrom(remoteDirectory, file, file, IOContext.DEFAULT); - if (file.startsWith(SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX)) { - segmentInfosSnapshotFilename = file; - } - } + // Download segments from remote segment store + indexShard.syncSegmentsFromRemoteSegmentStore(true); - if (segmentInfosSnapshotFilename != null) { - try ( - ChecksumIndexInput indexInput = new BufferedChecksumIndexInput( - storeDirectory.openInput(segmentInfosSnapshotFilename, IOContext.DEFAULT) - ) - ) { - SegmentInfos infosSnapshot = SegmentInfos.readCommit( - store.directory(), - indexInput, - Long.parseLong(segmentInfosSnapshotFilename.split("__")[1]) - ); - long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); - store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); - } + if (repository != null) { + syncTranslogFilesFromRemoteTranslog(indexShard, repository); + } else { + bootstrap(indexShard, store); } - // This creates empty trans-log for now - // ToDo: Add code to restore from remote trans-log - bootstrap(indexShard, store); assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; indexShard.recoveryState().getIndex().setFileDetailsComplete(); indexShard.openEngineAndRecoverFromTranslog(); indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); indexShard.finalizeRecovery(); indexShard.postRecovery("post recovery from remote_store"); - } catch (IOException e) { + } catch (IOException | IndexShardRecoveryException e) { throw new IndexShardRecoveryException(indexShard.shardId, "Exception while recovering from remote store", e); } finally { store.decRef(); @@ -508,6 +478,19 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco } } + private void syncTranslogFilesFromRemoteTranslog(IndexShard indexShard, Repository repository) throws IOException { + assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; + BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; + FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId); + TranslogTransferManager translogTransferManager = RemoteFsTranslog.buildTranslogTransferManager( + blobStoreRepository, + indexShard.getThreadPool().executor(ThreadPool.Names.TRANSLOG_TRANSFER), + shardId, + fileTransferTracker + ); + RemoteFsTranslog.download(translogTransferManager, indexShard.shardPath().resolveTranslog()); + } + /** * Recovers the state of the shard from the store. */ diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 6dba4809c515c..3b36d3777a712 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -154,6 +154,10 @@ public String toString() { return String.join(SEPARATOR, originalFilename, uploadedFilename, checksum); } + public String getChecksum() { + return this.checksum; + } + public static UploadedSegmentMetadata fromString(String uploadedFilename) { String[] values = uploadedFilename.split(SEPARATOR); return new UploadedSegmentMetadata(values[0], values[1], values[2]); diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 2af41367d860b..2a8031f99ac14 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -8,19 +8,25 @@ package org.opensearch.index.translog; +import org.opensearch.common.io.FileSystemUtils; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.index.translog.transfer.FileTransferTracker; import org.opensearch.index.translog.transfer.TransferSnapshot; import org.opensearch.index.translog.transfer.TranslogCheckpointTransferSnapshot; import org.opensearch.index.translog.transfer.TranslogTransferManager; +import org.opensearch.index.translog.transfer.TranslogTransferMetadata; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; import org.opensearch.repositories.blobstore.BlobStoreRepository; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.LongConsumer; @@ -54,15 +60,11 @@ public RemoteFsTranslog( super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); this.blobStoreRepository = blobStoreRepository; fileTransferTracker = new FileTransferTracker(shardId); - this.translogTransferManager = new TranslogTransferManager( - new BlobStoreTransferService(blobStoreRepository.blobStore(), executorService), - blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())), - fileTransferTracker, - fileTransferTracker::exclusionFilter - ); + this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, executorService, shardId, fileTransferTracker); try { - final Checkpoint checkpoint = readCheckpoint(location); + download(translogTransferManager, location); + Checkpoint checkpoint = readCheckpoint(location); this.readers.addAll(recoverFromFiles(checkpoint)); if (readers.isEmpty()) { throw new IllegalStateException("at least one reader must be recovered"); @@ -94,6 +96,45 @@ public RemoteFsTranslog( } } + public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException { + + TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata(); + if (translogMetadata != null) { + if (Files.notExists(location)) { + Files.createDirectories(location); + } + // Delete translog files on local before downloading from remote + for (Path file : FileSystemUtils.files(location)) { + Files.delete(file); + } + Map generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper(); + for (long i = translogMetadata.getGeneration(); i >= translogMetadata.getMinTranslogGeneration(); i--) { + String generation = Long.toString(i); + translogTransferManager.downloadTranslog(generationToPrimaryTermMapper.get(generation), generation, location); + } + // We copy the latest generation .ckp file to translog.ckp so that flows that depend on + // existence of translog.ckp file work in the same way + Files.copy( + location.resolve(Translog.getCommitCheckpointFileName(translogMetadata.getGeneration())), + location.resolve(Translog.CHECKPOINT_FILE_NAME) + ); + } + } + + public static TranslogTransferManager buildTranslogTransferManager( + BlobStoreRepository blobStoreRepository, + ExecutorService executorService, + ShardId shardId, + FileTransferTracker fileTransferTracker + ) { + return new TranslogTransferManager( + new BlobStoreTransferService(blobStoreRepository.blobStore(), executorService), + blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())), + fileTransferTracker, + fileTransferTracker::exclusionFilter + ); + } + @Override public boolean ensureSynced(Location location) throws IOException { try (ReleasableLock ignored = writeLock.acquire()) { diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 07cb2805ce1a6..29cc69ad1da44 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -11,13 +11,19 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.store.IndexInput; import org.opensearch.action.ActionListener; import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.transfer.listener.FileTransferListener; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -128,6 +134,50 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans } } + public boolean downloadTranslog(String primaryTerm, String generation, Path location) throws IOException { + logger.info( + "Downloading translog files with: Primary Term = {}, Generation = {}, Location = {}", + primaryTerm, + generation, + location + ); + // Download Checkpoint file from remote to local FS + String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation)); + downloadToFS(ckpFileName, location, primaryTerm); + // Download translog file from remote to local FS + String translogFilename = Translog.getFilename(Long.parseLong(generation)); + downloadToFS(translogFilename, location, primaryTerm); + return true; + } + + private void downloadToFS(String fileName, Path location, String primaryTerm) throws IOException { + Path filePath = location.resolve(fileName); + // Here, we always override the existing file if present. + // We need to change this logic when we introduce incremental download + if (Files.exists(filePath)) { + Files.delete(filePath); + } + try (InputStream inputStream = transferService.downloadBlob(remoteBaseTransferPath.add(primaryTerm), fileName)) { + Files.copy(inputStream, filePath); + } + } + + public TranslogTransferMetadata readMetadata() throws IOException { + return transferService.listAll(remoteMetadaTransferPath) + .stream() + .max(TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR) + .map(filename -> { + try (InputStream inputStream = transferService.downloadBlob(remoteMetadaTransferPath, filename);) { + IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes()); + return new TranslogTransferMetadata(indexInput); + } catch (IOException e) { + logger.error(() -> new ParameterizedMessage("Exception while reading metadata file: {}", filename), e); + return null; + } + }) + .orElse(null); + } + private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) throws IOException { Map generationPrimaryTermMap = transferSnapshot.getTranslogFileSnapshots().stream().map(s -> { assert s instanceof TranslogFileSnapshot; diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java index 0aae773f593fd..243294d85c97d 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferMetadata.java @@ -10,6 +10,7 @@ import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.store.DataOutput; +import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.OutputStreamIndexOutput; import org.apache.lucene.util.SetOnce; import org.opensearch.common.bytes.BytesReference; @@ -17,6 +18,8 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -37,7 +40,7 @@ public class TranslogTransferMetadata { private final long timeStamp; - private final int count; + private int count; private final SetOnce> generationToPrimaryTermMapper = new SetOnce<>(); @@ -49,6 +52,8 @@ public class TranslogTransferMetadata { private static final String METADATA_CODEC = "md"; + public static final Comparator METADATA_FILENAME_COMPARATOR = new MetadataFilenameComparator(); + public TranslogTransferMetadata(long primaryTerm, long generation, long minTranslogGeneration, int count) { this.primaryTerm = primaryTerm; this.generation = generation; @@ -57,6 +62,16 @@ public TranslogTransferMetadata(long primaryTerm, long generation, long minTrans this.count = count; } + public TranslogTransferMetadata(IndexInput indexInput) throws IOException { + CodecUtil.checksumEntireFile(indexInput); + CodecUtil.checkHeader(indexInput, METADATA_CODEC, CURRENT_VERSION, CURRENT_VERSION); + this.primaryTerm = indexInput.readLong(); + this.generation = indexInput.readLong(); + this.minTranslogGeneration = indexInput.readLong(); + this.timeStamp = indexInput.readLong(); + this.generationToPrimaryTermMapper.set(indexInput.readMapOfStrings()); + } + public long getPrimaryTerm() { return primaryTerm; } @@ -77,6 +92,10 @@ public void setGenerationToPrimaryTermMapper(Map generationToPri generationToPrimaryTermMapper.set(generationToPrimaryTermMap); } + public Map getGenerationToPrimaryTermMapper() { + return generationToPrimaryTermMapper.get(); + } + public String getFileName() { return String.join( METADATA_SEPARATOR, @@ -122,6 +141,29 @@ private void write(DataOutput out) throws IOException { out.writeLong(generation); out.writeLong(minTranslogGeneration); out.writeLong(timeStamp); - out.writeMapOfStrings(generationToPrimaryTermMapper.get()); + if (generationToPrimaryTermMapper.get() != null) { + out.writeMapOfStrings(generationToPrimaryTermMapper.get()); + } else { + out.writeMapOfStrings(new HashMap<>()); + } + } + + private static class MetadataFilenameComparator implements Comparator { + @Override + public int compare(String first, String second) { + // Format of metadata filename is ____ + String[] filenameTokens1 = first.split(METADATA_SEPARATOR); + String[] filenameTokens2 = second.split(METADATA_SEPARATOR); + // Here, we are not comparing only primary term and generation. + // Timestamp is not a good measure of comparison in case primary term and generation are same. + for (int i = 0; i < filenameTokens1.length - 1; i++) { + if (filenameTokens1[i].equals(filenameTokens2[i]) == false) { + return Long.compare(Long.parseLong(filenameTokens1[i]), Long.parseLong(filenameTokens2[i])); + } + } + throw new IllegalArgumentException( + "TranslogTransferMetadata files " + first + " and " + second + " have same primary term and generation" + ); + } } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index ed3256d499520..eec05f778a8ca 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -436,6 +436,9 @@ protected Node( final Settings settings = pluginsService.updatedSettings(); + // Ensure to initialize Feature Flags via the settings from opensearch.yml + FeatureFlags.initializeFeatureFlags(settings); + final Set additionalRoles = pluginsService.filterPlugins(Plugin.class) .stream() .map(Plugin::getRoles) diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index c547f1cf3cf0a..66a361008b578 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -2812,7 +2812,7 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); target.markAsRecovering("remote_store", new RecoveryState(routing, localNode, null)); final PlainActionFuture future = PlainActionFuture.newFuture(); - target.restoreFromRemoteStore(future); + target.restoreFromRemoteStore(null, future); target.remoteStore().decRef(); assertTrue(future.actionGet()); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index aaaf95af85191..a5c514c9bb4c7 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -385,6 +385,95 @@ public void testReplicaRestarts() throws Exception { } } + public void testNRTReplicaWithRemoteStorePromotedAsPrimaryRefreshRefresh() throws Exception { + testNRTReplicaWithRemoteStorePromotedAsPrimary(false, false); + } + + public void testNRTReplicaWithRemoteStorePromotedAsPrimaryRefreshCommit() throws Exception { + testNRTReplicaWithRemoteStorePromotedAsPrimary(false, true); + } + + public void testNRTReplicaWithRemoteStorePromotedAsPrimaryCommitRefresh() throws Exception { + testNRTReplicaWithRemoteStorePromotedAsPrimary(true, false); + } + + public void testNRTReplicaWithRemoteStorePromotedAsPrimaryCommitCommit() throws Exception { + testNRTReplicaWithRemoteStorePromotedAsPrimary(true, true); + } + + private void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlushFirst, boolean performFlushSecond) throws Exception { + Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .build(); + + try (ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir())) { + shards.startAll(); + IndexShard oldPrimary = shards.getPrimary(); + final IndexShard nextPrimary = shards.getReplicas().get(0); + + // 1. Create ops that are in the index and xlog of both shards but not yet part of a commit point. + final int numDocs = shards.indexDocs(randomInt(10)); + + // refresh but do not copy the segments over. + if (performFlushFirst) { + flushShard(oldPrimary, true); + } else { + oldPrimary.refresh("Test"); + } + // replicateSegments(primary, shards.getReplicas()); + + // at this point both shards should have numDocs persisted and searchable. + assertDocCounts(oldPrimary, numDocs, numDocs); + for (IndexShard shard : shards.getReplicas()) { + assertDocCounts(shard, numDocs, 0); + } + + // 2. Create ops that are in the replica's xlog, not in the index. + // index some more into both but don't replicate. replica will have only numDocs searchable, but should have totalDocs + // persisted. + final int additonalDocs = shards.indexDocs(randomInt(10)); + final int totalDocs = numDocs + additonalDocs; + + if (performFlushSecond) { + flushShard(oldPrimary, true); + } else { + oldPrimary.refresh("Test"); + } + assertDocCounts(oldPrimary, totalDocs, totalDocs); + for (IndexShard shard : shards.getReplicas()) { + assertDocCounts(shard, totalDocs, 0); + } + assertTrue(nextPrimary.translogStats().estimatedNumberOfOperations() >= additonalDocs); + assertTrue(nextPrimary.translogStats().getUncommittedOperations() >= additonalDocs); + + // promote the replica + shards.promoteReplicaToPrimary(nextPrimary).get(); + + // close oldPrimary. + oldPrimary.close("demoted", false); + oldPrimary.store().close(); + + assertEquals(InternalEngine.class, nextPrimary.getEngine().getClass()); + assertDocCounts(nextPrimary, totalDocs, totalDocs); + + // As we are downloading segments from remote segment store on failover, there should not be + // any operations replayed from translog + assertEquals(0, nextPrimary.translogStats().estimatedNumberOfOperations()); + + // refresh and push segments to our other replica. + nextPrimary.refresh("test"); + + for (IndexShard shard : shards) { + assertConsistentHistoryBetweenTranslogAndLucene(shard); + } + final List docsAfterRecovery = getDocIdAndSeqNos(shards.getPrimary()); + for (IndexShard shard : shards.getReplicas()) { + assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery)); + } + } + } + public void testNRTReplicaPromotedAsPrimary() throws Exception { try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory())) { shards.startAll(); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index 36753c1559a7f..bd9a608e6d85e 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -28,6 +28,7 @@ import org.opensearch.common.blobstore.fs.FsBlobStore; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.ReleasableBytesReference; +import org.opensearch.common.io.FileSystemUtils; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.ByteSizeUnit; @@ -147,11 +148,15 @@ public void tearDown() throws Exception { } private RemoteFsTranslog create(Path path) throws IOException { + final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); + return create(path, createRepository(), translogUUID); + } + + private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID) throws IOException { + this.repository = repository; globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final TranslogConfig translogConfig = getTranslogConfig(path); final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); - final String translogUUID = Translog.createEmptyTranslog(path, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get()); - repository = createRepository(); threadPool = new TestThreadPool(getClass().getName()); blobStoreTransferService = new BlobStoreTransferService( repository.blobStore(), @@ -296,6 +301,45 @@ public void testReadLocation() throws IOException { assertNull(translog.readOperation(new Translog.Location(100, 0, 0))); } + public void testReadLocationDownload() throws IOException { + ArrayList ops = new ArrayList<>(); + ArrayList locs = new ArrayList<>(); + locs.add(addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 0, primaryTerm.get(), new byte[] { 1 }))); + locs.add(addToTranslogAndListAndUpload(translog, ops, new Translog.Index("2", 1, primaryTerm.get(), new byte[] { 1 }))); + locs.add(addToTranslogAndListAndUpload(translog, ops, new Translog.Index("3", 2, primaryTerm.get(), new byte[] { 1 }))); + translog.sync(); + int i = 0; + for (Translog.Operation op : ops) { + assertEquals(op, translog.readOperation(locs.get(i++))); + } + + String translogUUID = translog.translogUUID; + try { + translog.getDeletionPolicy().assertNoOpenTranslogRefs(); + translog.close(); + } finally { + terminate(threadPool); + } + + // Delete translog files to test download flow + for (Path file : FileSystemUtils.files(translogDir)) { + Files.delete(file); + } + + // Creating RemoteFsTranslog with the same location + Translog newTranslog = create(translogDir, repository, translogUUID); + i = 0; + for (Translog.Operation op : ops) { + assertEquals(op, newTranslog.readOperation(locs.get(i++))); + } + try { + newTranslog.close(); + } catch (Exception e) { + // Ignoring this exception for now. Once the download flow populates FileTracker, + // we can remove this try-catch block + } + } + public void testSnapshotWithNewTranslog() throws IOException { List toClose = new ArrayList<>(); try { diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 60b7029f18fa6..14207c896c733 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -12,6 +12,7 @@ import org.mockito.Mockito; import org.opensearch.action.ActionListener; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.util.set.Sets; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.transfer.listener.FileTransferListener; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; @@ -20,14 +21,21 @@ import org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot; import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import static org.mockito.Mockito.any; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; @LuceneTestCase.SuppressFileSystems("*") public class TranslogTransferManagerTests extends OpenSearchTestCase { @@ -147,4 +155,138 @@ public TranslogTransferMetadata getTranslogTransferMetadata() { } }; } + + public void testReadMetadataNoFile() throws IOException { + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + transferService, + remoteBaseTransferPath, + null, + r -> r + ); + + when(transferService.listAll(remoteBaseTransferPath)).thenReturn(Sets.newHashSet()); + assertNull(translogTransferManager.readMetadata()); + } + + public void testReadMetadataSingleFile() throws IOException { + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + transferService, + remoteBaseTransferPath, + null, + r -> r + ); + + // BlobPath does not have equals method, so we can't use the instance directly in when + when(transferService.listAll(any(BlobPath.class))).thenReturn(Sets.newHashSet("12__234__123456789")); + + TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata(); + when(transferService.downloadBlob(any(BlobPath.class), eq("12__234__123456789"))).thenReturn( + new ByteArrayInputStream(metadata.createMetadataBytes()) + ); + + assertEquals(metadata, translogTransferManager.readMetadata()); + } + + public void testReadMetadataMultipleFiles() throws IOException { + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + transferService, + remoteBaseTransferPath, + null, + r -> r + ); + + when(transferService.listAll(any(BlobPath.class))).thenReturn( + Sets.newHashSet("12__234__56789", "12__235__56823", "12__233__56700") + ); + + TranslogTransferMetadata metadata = createTransferSnapshot().getTranslogTransferMetadata(); + when(transferService.downloadBlob(any(BlobPath.class), eq("12__235__56823"))).thenReturn( + new ByteArrayInputStream(metadata.createMetadataBytes()) + ); + + assertEquals(metadata, translogTransferManager.readMetadata()); + } + + public void testReadMetadataException() throws IOException { + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + transferService, + remoteBaseTransferPath, + null, + r -> r + ); + + when(transferService.listAll(any(BlobPath.class))).thenReturn( + Sets.newHashSet("12__234__56789", "12__235__56823", "12__233__56700") + ); + + when(transferService.downloadBlob(any(BlobPath.class), eq("12__235__56823"))).thenThrow(new IOException("Something went wrong")); + + assertNull(translogTransferManager.readMetadata()); + } + + public void testReadMetadataSamePrimaryTermGeneration() throws IOException { + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + transferService, + remoteBaseTransferPath, + null, + r -> r + ); + + when(transferService.listAll(any(BlobPath.class))).thenReturn( + Sets.newHashSet("12__234__56789", "12__235__56823", "12__234__56700") + ); + + assertThrows(IllegalArgumentException.class, translogTransferManager::readMetadata); + } + + public void testDownloadTranslog() throws IOException { + Path location = createTempDir(); + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + transferService, + remoteBaseTransferPath, + null, + r -> r + ); + + when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.tlog"))).thenReturn( + new ByteArrayInputStream("Hello Translog".getBytes(StandardCharsets.UTF_8)) + ); + + when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.ckp"))).thenReturn( + new ByteArrayInputStream("Hello Checkpoint".getBytes(StandardCharsets.UTF_8)) + ); + + assertFalse(Files.exists(location.resolve("translog-23.tlog"))); + assertFalse(Files.exists(location.resolve("translog-23.ckp"))); + translogTransferManager.downloadTranslog("12", "23", location); + assertTrue(Files.exists(location.resolve("translog-23.tlog"))); + assertTrue(Files.exists(location.resolve("translog-23.ckp"))); + } + + public void testDownloadTranslogAlreadyExists() throws IOException { + Path location = createTempDir(); + Files.createFile(location.resolve("translog-23.tlog")); + Files.createFile(location.resolve("translog-23.ckp")); + + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + transferService, + remoteBaseTransferPath, + null, + r -> r + ); + + when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.tlog"))).thenReturn( + new ByteArrayInputStream("Hello Translog".getBytes(StandardCharsets.UTF_8)) + ); + when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.ckp"))).thenReturn( + new ByteArrayInputStream("Hello Checkpoint".getBytes(StandardCharsets.UTF_8)) + ); + + translogTransferManager.downloadTranslog("12", "23", location); + + verify(transferService).downloadBlob(any(BlobPath.class), eq("translog-23.tlog")); + verify(transferService).downloadBlob(any(BlobPath.class), eq("translog-23.ckp")); + assertTrue(Files.exists(location.resolve("translog-23.tlog"))); + assertTrue(Files.exists(location.resolve("translog-23.ckp"))); + } } diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index f4babda725057..20fe47c1d4cc0 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -98,6 +98,7 @@ import org.opensearch.index.shard.PrimaryReplicaSyncer; import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardPath; +import org.opensearch.index.store.Store; import org.opensearch.index.translog.Translog; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; @@ -106,6 +107,7 @@ import org.opensearch.threadpool.ThreadPool.Names; import java.io.IOException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -146,9 +148,14 @@ protected ReplicationGroup createGroup(int replicas, Settings settings, EngineFa } protected ReplicationGroup createGroup(int replicas, Settings settings, String mappings, EngineFactory engineFactory) + throws IOException { + return createGroup(replicas, settings, mappings, engineFactory, null); + } + + protected ReplicationGroup createGroup(int replicas, Settings settings, String mappings, EngineFactory engineFactory, Path remotePath) throws IOException { IndexMetadata metadata = buildIndexMetadata(replicas, settings, mappings); - return new ReplicationGroup(metadata) { + return new ReplicationGroup(metadata, remotePath) { @Override protected EngineFactory getEngineFactory(ShardRouting routing) { return engineFactory; @@ -235,13 +242,29 @@ protected class ReplicationGroup implements AutoCloseable, Iterable ); protected ReplicationGroup(final IndexMetadata indexMetadata) throws IOException { + this(indexMetadata, null); + } + + protected ReplicationGroup(final IndexMetadata indexMetadata, Path remotePath) throws IOException { final ShardRouting primaryRouting = this.createShardRouting("s0", true); - primary = newShard(primaryRouting, indexMetadata, null, getEngineFactory(primaryRouting), () -> {}, retentionLeaseSyncer, null); + Store remoteStore = null; + if (remotePath != null) { + remoteStore = createRemoteStore(remotePath, primaryRouting, indexMetadata); + } + primary = newShard( + primaryRouting, + indexMetadata, + null, + getEngineFactory(primaryRouting), + () -> {}, + retentionLeaseSyncer, + remoteStore + ); replicas = new CopyOnWriteArrayList<>(); this.indexMetadata = indexMetadata; updateAllocationIDsOnPrimary(); for (int i = 0; i < indexMetadata.getNumberOfReplicas(); i++) { - addReplica(); + addReplica(remotePath); } } @@ -356,7 +379,15 @@ public void startPrimary() throws IOException { } public IndexShard addReplica() throws IOException { + return addReplica((Path) null); + } + + public IndexShard addReplica(Path remotePath) throws IOException { final ShardRouting replicaRouting = createShardRouting("s" + replicaId.incrementAndGet(), false); + Store remoteStore = null; + if (remotePath != null) { + remoteStore = createRemoteStore(remotePath, replicaRouting, indexMetadata); + } final IndexShard replica = newShard( replicaRouting, indexMetadata, @@ -364,7 +395,7 @@ public IndexShard addReplica() throws IOException { getEngineFactory(replicaRouting), () -> {}, retentionLeaseSyncer, - null + remoteStore ); addReplica(replica); return replica; diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 11b3ce1dd05d4..1f563d188c693 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -545,14 +545,7 @@ protected IndexShard newShard( clusterSettings ); if (remoteStore == null && indexSettings.isRemoteStoreEnabled()) { - ShardId shardId = shardPath.getShardId(); - NodeEnvironment.NodePath remoteNodePath = new NodeEnvironment.NodePath(createTempDir()); - ShardPath remoteShardPath = new ShardPath(false, remoteNodePath.resolve(shardId), remoteNodePath.resolve(shardId), shardId); - RemoteDirectory dataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); - RemoteDirectory metadataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); - RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory); - storeProvider = is -> createStore(shardId, is, remoteSegmentStoreDirectory); - remoteStore = storeProvider.apply(indexSettings); + remoteStore = createRemoteStore(createTempDir(), routing, indexMetadata); } indexShard = new IndexShard( routing, @@ -589,6 +582,18 @@ protected IndexShard newShard( return indexShard; } + protected Store createRemoteStore(Path path, ShardRouting shardRouting, IndexMetadata metadata) throws IOException { + Settings nodeSettings = Settings.builder().put("node.name", shardRouting.currentNodeId()).build(); + + ShardId shardId = new ShardId("index", "_na_", 0); + NodeEnvironment.NodePath remoteNodePath = new NodeEnvironment.NodePath(path); + ShardPath remoteShardPath = new ShardPath(false, remoteNodePath.resolve(shardId), remoteNodePath.resolve(shardId), shardId); + RemoteDirectory dataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); + RemoteDirectory metadataDirectory = newRemoteDirectory(remoteShardPath.resolveIndex()); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = new RemoteSegmentStoreDirectory(dataDirectory, metadataDirectory); + return createStore(shardId, new IndexSettings(metadata, nodeSettings), remoteSegmentStoreDirectory); + } + private RemoteDirectory newRemoteDirectory(Path f) throws IOException { FsBlobStore fsBlobStore = new FsBlobStore(1024, f, false); BlobPath blobPath = new BlobPath(); diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index a100aa1c9fe42..8bb54e2df4766 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -102,6 +102,7 @@ import org.opensearch.common.network.NetworkAddress; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.regex.Regex; +import org.opensearch.common.settings.FeatureFlagSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; @@ -763,6 +764,20 @@ public Settings indexSettings() { return builder.build(); } + /** + * Setting all feature flag settings at base IT, which can be overridden later by individual + * IT classes. + * + * @return Feature flag settings. + */ + protected Settings featureFlagSettings() { + Settings.Builder featureSettings = Settings.builder(); + for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) { + featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY)); + } + return featureSettings.build(); + } + /** * Creates one or more indices and asserts that the indices are acknowledged. If one of the indices * already exists this method will fail and wipe all the indices created so far.