Skip to content

Commit

Permalink
Merge branch 'main' into feature/api-versioning
Browse files Browse the repository at this point in the history
  • Loading branch information
Anshu Agarwal committed Jan 5, 2023
2 parents d80408a + 4e2369d commit ffc638b
Show file tree
Hide file tree
Showing 23 changed files with 787 additions and 118 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* @reta @anasalkouz @andrross @reta @Bukhtawar @CEHENKLE @dblock @setiah @kartg @kotwanikunal @mch2 @nknize @owaiskazi19 @adnapibar @Rishikesh1159 @ryanbogan @saratvemulapalli @shwetathareja @dreamer-89 @tlfeng @VachaShah @xuezhou25
* @reta @anasalkouz @andrross @reta @Bukhtawar @CEHENKLE @dblock @gbbafna @setiah @kartg @kotwanikunal @mch2 @nknize @owaiskazi19 @adnapibar @Rishikesh1159 @ryanbogan @saratvemulapalli @shwetathareja @dreamer-89 @tlfeng @VachaShah @xuezhou25
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for ppc64le architecture ([#5459](https://github.com/opensearch-project/OpenSearch/pull/5459))
- Support versioning for Weighted routing apis([#5255](https://github.com/opensearch-project/OpenSearch/pull/5255))
- Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668))
- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959))

### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
Expand Down Expand Up @@ -107,6 +108,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Changed
- Add support for refresh level durability ([#5253](https://github.com/opensearch-project/OpenSearch/pull/5253))
- Integrate remote segment store in the failover flow ([#5579](https://github.com/opensearch-project/OpenSearch/pull/5579))

### Deprecated
- Refactor fuzziness interface on query builders ([#5433](https://github.com/opensearch-project/OpenSearch/pull/5433))
Expand Down
23 changes: 23 additions & 0 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,26 @@ ${path.logs}
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true
#
# ---------------------------------- Experimental Features -----------------------------------
#
# Gates the visibility of the index setting that allows changing of replication type.
# Once the feature is ready for production release, this feature flag can be removed.
#
#opensearch.experimental.feature.replication_type.enabled: false
#
#
# Gates the visibility of the index setting that allows persisting data to remote store along with local disk.
# Once the feature is ready for production release, this feature flag can be removed.
#
#opensearch.experimental.feature.remote_store.enabled: false
#
#
# Gates the functionality of a new parameter to the snapshot restore API
# that allows for creation of a new index type that searches a snapshot
# directly in a remote repository without restoring all index data to disk
# ahead of time.
#
#opensearch.experimental.feature.searchable_snapshot.enabled: false
#
#
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package org.opensearch.cluster.remote.test;

import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.index.IndexRequest;
Expand Down Expand Up @@ -122,6 +123,9 @@ public void testHAProxyModeConnectionWorks() throws IOException {

RemoteConnectionInfo rci = cluster1Client().cluster().remoteInfo(new RemoteInfoRequest(), RequestOptions.DEFAULT).getInfos().get(0);
logger.info("Connection info: {}", rci);
if (!rci.isConnected()) {
logger.info("Cluster health: {}", cluster1Client().cluster().health(new ClusterHealthRequest(), RequestOptions.DEFAULT));
}
assertTrue(rci.isConnected());

assertEquals(2L, cluster1Client().search(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.BeforeClass;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
Expand Down Expand Up @@ -70,11 +69,6 @@ public class SegmentReplicationIT extends OpenSearchIntegTestCase {
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 1;

@BeforeClass
public static void assumeFeatureFlag() {
assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE)));
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
Expand All @@ -96,11 +90,16 @@ protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build();
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
Expand All @@ -127,7 +126,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);

// start another node, index another doc and replicate.
String nodeC = internalCluster().startNode();
String nodeC = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get();
refresh(INDEX_NAME);
Expand All @@ -138,10 +137,10 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
}

public void testRestartPrimary() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

assertEquals(getNodeContainingPrimaryShard().getName(), primary);
Expand All @@ -167,10 +166,10 @@ public void testRestartPrimary() throws Exception {

public void testCancelPrimaryAllocation() throws Exception {
// this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica.
final String primary = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

final int initialDocCount = 1;
Expand Down Expand Up @@ -201,10 +200,13 @@ public void testCancelPrimaryAllocation() throws Exception {

/**
* This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery.
*
* TODO: Ignoring this test as its flaky and needs separate fix
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testAddNewReplicaFailure() throws Exception {
logger.info("--> starting [Primary Node] ...");
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startNode(featureFlagSettings());

logger.info("--> creating test index ...");
prepareCreate(
Expand All @@ -227,7 +229,7 @@ public void testAddNewReplicaFailure() throws Exception {
assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L));

logger.info("--> start empty node to add replica shard");
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startNode(featureFlagSettings());

// Mock transport service to add behaviour of throwing corruption exception during segment replication process.
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
Expand Down Expand Up @@ -268,9 +270,10 @@ public void testAddNewReplicaFailure() throws Exception {
assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME)));
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -310,8 +313,8 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
}

public void testIndexReopenClose() throws Exception {
final String primary = internalCluster().startNode();
final String replica = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
final String replica = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -355,8 +358,8 @@ public void testMultipleShards() throws Exception {
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, indexSettings);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -396,8 +399,8 @@ public void testMultipleShards() throws Exception {
}

public void testReplicationAfterForceMerge() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -441,11 +444,11 @@ public void testReplicationAfterForceMerge() throws Exception {
}

public void testCancellation() throws Exception {
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureYellow(INDEX_NAME);

final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startNode(featureFlagSettings());

final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster().getInstance(
SegmentReplicationSourceService.class,
Expand Down Expand Up @@ -500,7 +503,7 @@ public void testCancellation() throws Exception {
}

public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(INDEX_NAME);

Expand All @@ -523,7 +526,7 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
Expand All @@ -538,8 +541,8 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
}

public void testDeleteOperations() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());

createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -601,10 +604,10 @@ public void testDeleteOperations() throws Exception {
}

public void testUpdateOperations() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellow(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());

final int initialDocCount = scaledRandomIntBetween(0, 200);
try (
Expand Down Expand Up @@ -705,10 +708,10 @@ public void testDropPrimaryDuringReplication() throws Exception {
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode(Settings.EMPTY);
final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(featureFlagSettings());
final String primaryNode = internalCluster().startDataOnlyNode(featureFlagSettings());
createIndex(INDEX_NAME, settings);
internalCluster().startDataOnlyNodes(6);
internalCluster().startDataOnlyNodes(6, featureFlagSettings());
ensureGreen(INDEX_NAME);

int initialDocCount = scaledRandomIntBetween(100, 200);
Expand All @@ -731,7 +734,7 @@ public void testDropPrimaryDuringReplication() throws Exception {
ensureYellow(INDEX_NAME);

// start another replica.
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

// index another doc and refresh - without this the new replica won't catch up.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.snapshots;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.BeforeClass;
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequestBuilder;
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
Expand Down Expand Up @@ -46,9 +45,9 @@ public class SegmentReplicationSnapshotIT extends AbstractSnapshotIntegTestCase
private static final String REPOSITORY_NAME = "test-segrep-repo";
private static final String SNAPSHOT_NAME = "test-segrep-snapshot";

@BeforeClass
public static void assumeFeatureFlag() {
assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE)));
@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build();
}

public Settings segRepEnableIndexSettings() {
Expand Down Expand Up @@ -100,11 +99,11 @@ public void ingestData(int docCount, String indexName) throws Exception {
// Start cluster with provided settings and return the node names as list
public List<String> startClusterWithSettings(Settings indexSettings, int replicaCount) throws Exception {
// Start primary
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startNode(featureFlagSettings());
List<String> nodeNames = new ArrayList<>();
nodeNames.add(primaryNode);
for (int i = 0; i < replicaCount; i++) {
nodeNames.add(internalCluster().startNode());
nodeNames.add(internalCluster().startNode(featureFlagSettings()));
}
createIndex(INDEX_NAME, indexSettings);
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -166,6 +165,7 @@ public void testRestoreOnSegRep() throws Exception {
assertHitCount(resp, DOC_COUNT);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testSnapshotOnSegRep_RestoreOnSegRepDuringIngestion() throws Exception {
startClusterWithSettings(segRepEnableIndexSettings(), 1);
createSnapshot();
Expand Down Expand Up @@ -266,7 +266,7 @@ public void testRestoreOnReplicaNode() throws Exception {

// Assertions
assertThat(restoreSnapshotResponse.status(), equalTo(RestStatus.ACCEPTED));
internalCluster().startNode();
internalCluster().startNode(featureFlagSettings());
ensureGreen(RESTORED_INDEX_NAME);
GetSettingsResponse settingsResponse = client().admin()
.indices()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.settings;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.util.FeatureFlags;

/**
* Encapsulates all valid feature flag level settings.
*
* @opensearch.internal
*/
public class FeatureFlagSettings extends AbstractScopedSettings {

protected FeatureFlagSettings(
Settings settings,
Set<Setting<?>> settingsSet,
Set<SettingUpgrader<?>> settingUpgraders,
Property scope
) {
super(settings, settingsSet, settingUpgraders, scope);
}

public static final Set<Setting<?>> BUILT_IN_FEATURE_FLAGS = Collections.unmodifiableSet(
new HashSet<>(
Arrays.asList(
FeatureFlags.REPLICATION_TYPE_SETTING,
FeatureFlags.REMOTE_STORE_SETTING,
FeatureFlags.SEARCHABLE_SNAPSHOT_SETTING,
FeatureFlags.EXTENSIONS_SETTING
)
)
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ public SettingsModule(
for (Setting<?> setting : IndexScopedSettings.BUILT_IN_INDEX_SETTINGS) {
registerSetting(setting);
}
for (Setting<?> setting : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
registerSetting(setting);
}

for (Map.Entry<String, List<Setting>> featureFlaggedSetting : IndexScopedSettings.FEATURE_FLAGGED_INDEX_SETTINGS.entrySet()) {
if (FeatureFlags.isEnabled(featureFlaggedSetting.getKey())) {
Expand Down
Loading

0 comments on commit ffc638b

Please sign in to comment.