From 6a4e8f675f1cbc0172cf9a89fed7f4e7e0a46077 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Thu, 11 May 2023 14:39:52 -0700 Subject: [PATCH 1/8] [Segment Replication] Added mixed cluster bwc test Signed-off-by: Suraj Singh --- qa/mixed-cluster/build.gradle | 30 +++++ .../org/opensearch/backwards/IndexingIT.java | 111 ++++++++++++++++++ .../MixedClusterClientYamlTestSuiteIT.java | 1 + .../test/rest/OpenSearchRestTestCase.java | 14 +++ 4 files changed, 156 insertions(+) diff --git a/qa/mixed-cluster/build.gradle b/qa/mixed-cluster/build.gradle index 90aeb8faadf80..1c1f18d2a6b45 100644 --- a/qa/mixed-cluster/build.gradle +++ b/qa/mixed-cluster/build.gradle @@ -55,6 +55,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { } String baseName = "v${bwcVersion}" + String segRepCluster = "${baseName}segrep" /* This project runs the core REST tests against a 4 node cluster where two of the nodes has a different minor. */ @@ -65,6 +66,12 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}" } + + "$segRepCluster" { + versions = [bwcVersion.toString(), project.version] + numberOfNodes = 2 + setting 'path.repo', "${buildDir}/cluster/shared/repo/${segRepCluster}" + } } tasks.register("${baseName}#mixedClusterTest", StandaloneRestIntegTestTask) { @@ -89,7 +96,30 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { onlyIf { project.bwc_tests_enabled } } + /** + * Defines a new task for segment replication where tests utilizes system property to set boolean param, consumed + * in tests. + */ + tasks.register("${baseName}#segrepMixedClusterTest", StandaloneRestIntegTestTask) { + useCluster testClusters."${segRepCluster}" + doFirst { + delete("${buildDir}/cluster/shared/repo/${segRepCluster}") + // Getting the endpoints causes a wait for the cluster + println "Test cluster endpoints are: ${-> testClusters."${segRepCluster}".allHttpSocketURI.join(",")}" + println "Upgrading one node to create a mixed cluster" + testClusters."${segRepCluster}".nextNodeToNextVersion() + // Getting the endpoints causes a wait for the cluster + println "Upgrade complete, endpoints are: ${-> testClusters."${segRepCluster}".allHttpSocketURI.join(",")}" + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${segRepCluster}".allHttpSocketURI.join(",")}") + nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${segRepCluster}".getName()}") + } + systemProperty 'tests.path.repo', "${buildDir}/cluster/shared/repo/${segRepCluster}" + systemProperty 'tests.segrep_enabled', true + onlyIf { project.bwc_tests_enabled } + } + tasks.register(bwcTaskName(bwcVersion)) { dependsOn "${baseName}#mixedClusterTest" + dependsOn "${baseName}#segrepMixedClusterTest" } } diff --git a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java index 2e36a352c75dd..fc63a3602df2e 100644 --- a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java @@ -32,6 +32,8 @@ package org.opensearch.backwards; import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.ParseException; +import org.apache.hc.core5.http.io.entity.EntityUtils; import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.client.Request; @@ -40,11 +42,13 @@ import org.opensearch.client.ResponseException; import org.opensearch.client.RestClient; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.opensearch.common.Strings; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.common.xcontent.support.XContentMapValues; import org.opensearch.index.seqno.SeqNoStats; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.rest.RestStatus; import org.opensearch.test.rest.OpenSearchRestTestCase; import org.opensearch.test.rest.yaml.ObjectPath; @@ -63,6 +67,9 @@ public class IndexingIT extends OpenSearchRestTestCase { + protected static final Boolean SEGREP_ENABLED = Boolean.parseBoolean(System.getProperty("tests.segrep_enabled")); + + private int indexDocs(String index, final int idStart, final int numDocs) throws IOException { for (int i = 0; i < numDocs; i++) { final int id = idStart + i; @@ -98,6 +105,110 @@ private int indexDocWithConcurrentUpdates(String index, final int docId, int nUp return nUpdates + 1; } + private void printClusterRouting() throws IOException, ParseException { + Request clusterStateRequest = new Request("GET", "_cluster/state/routing_nodes?pretty"); + String clusterState = EntityUtils.toString(client().performRequest(clusterStateRequest).getEntity()).trim(); + logger.info("cluster nodes: {}", clusterState); + } + + private void printIndexSettings(String index) throws IOException, ParseException { + Request indexSettings = new Request("GET", index + "/_settings?pretty"); + String idxSettings = EntityUtils.toString(client().performRequest(indexSettings).getEntity()).trim(); + logger.info("idxSettings : {}", idxSettings); + } + + /** + * This test verifies that segment replication does not break when primary shards are on higher version. + * + * @throws Exception + */ + public void testIndexingWithPrimaryOnBwcNodes() throws Exception { + if (SEGREP_ENABLED == false) return; + Nodes nodes = buildNodeAndVersions(); + assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty()); + logger.info("cluster discovered:\n {}", nodes.toString()); + final List bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList()); + logger.info("--> bwc nodes {}", bwcNamesList); + final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(",")); + // Exclude bwc nodes from allocation so that primaries gets allocated on current version + Settings.Builder settings = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put("index.routing.allocation.include._name", bwcNames); + final String index = "test-index"; + createIndex(index, settings.build()); + ensureYellow(index); + printClusterRouting(); + + printIndexSettings(index); + + int docCount = 200; + try (RestClient nodeClient = buildClient(restClientSettings(), + nodes.values().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { + + logger.info("allowing replica shards assignment on bwc nodes"); + updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.include._name")); + printClusterRouting(); + printIndexSettings(index); + ensureGreen(index); + + // Index docs + indexDocs(index, 0, docCount); + + // perform a refresh + assertOK(client().performRequest(new Request("POST", index + "/_flush"))); + + // verify replica catch up with primary + assertSeqNoOnShards(index, nodes, docCount, nodeClient); + } + } + + + /** + * This test creates a cluster with primary on older version but due to {@link org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider} which prevents replica shard allocation on lower OpenSearch version + * + * @throws Exception + */ + public void testIndexingWithReplicaOnBwcNodes() throws Exception { + if (SEGREP_ENABLED == false) return; + Nodes nodes = buildNodeAndVersions(); + assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty()); + logger.info("cluster discovered:\n {}", nodes.toString()); + final List bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList()); + logger.info("--> bwc nodes {}", bwcNamesList); + final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(",")); + // Exclude bwc nodes from allocation so that primaries gets allocated on current version + Settings.Builder settings = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put("index.routing.allocation.exclude._name", bwcNames); + final String index = "test-index"; + createIndex(index, settings.build()); + ensureYellow(index); + printClusterRouting(); + printIndexSettings(index); + + int docCount = 200; + try (RestClient nodeClient = buildClient(restClientSettings(), + nodes.values().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { + + logger.info("allowing replica shards assignment on bwc nodes"); + updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.exclude._name")); + printClusterRouting(); + + // Index docs + indexDocs(index, 0, docCount); + + // perform a refresh + assertOK(client().performRequest(new Request("POST", index + "/_flush"))); + + // verify replica catch up with primary + assertSeqNoOnShards(index, nodes, docCount, nodeClient); + } + } + public void testIndexVersionPropagation() throws Exception { Nodes nodes = buildNodeAndVersions(); assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty()); diff --git a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/MixedClusterClientYamlTestSuiteIT.java b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/MixedClusterClientYamlTestSuiteIT.java index 3860b845d6046..66b31c6045315 100644 --- a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/MixedClusterClientYamlTestSuiteIT.java +++ b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/MixedClusterClientYamlTestSuiteIT.java @@ -34,6 +34,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite; +import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.lucene.tests.util.TimeUnits; import org.opensearch.test.rest.yaml.ClientYamlTestCandidate; import org.opensearch.test.rest.yaml.OpenSearchClientYamlSuiteTestCase; diff --git a/test/framework/src/main/java/org/opensearch/test/rest/OpenSearchRestTestCase.java b/test/framework/src/main/java/org/opensearch/test/rest/OpenSearchRestTestCase.java index a5d2e6793f397..11df944f1644a 100644 --- a/test/framework/src/main/java/org/opensearch/test/rest/OpenSearchRestTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/rest/OpenSearchRestTestCase.java @@ -918,6 +918,20 @@ public static void ensureGreen(String index) throws IOException { }); } + /** + * Checks that the specific index is yellow. + * + * @param index index to test for + **/ + public static void ensureYellow(String index) throws IOException { + ensureHealth(index, (request) -> { + request.addParameter("wait_for_status", "yellow"); + request.addParameter("wait_for_no_relocating_shards", "true"); + request.addParameter("timeout", "70s"); + request.addParameter("level", "shards"); + }); + } + protected static void ensureHealth(Consumer requestConsumer) throws IOException { ensureHealth("", requestConsumer); } From 688561d7d0dd254b741b02da053357908f28cee8 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Thu, 11 May 2023 15:58:26 -0700 Subject: [PATCH 2/8] Remove unnecessary gradle task for segrep Signed-off-by: Suraj Singh --- qa/mixed-cluster/build.gradle | 30 ----------------- .../org/opensearch/backwards/IndexingIT.java | 33 ++++++++++--------- 2 files changed, 17 insertions(+), 46 deletions(-) diff --git a/qa/mixed-cluster/build.gradle b/qa/mixed-cluster/build.gradle index 1c1f18d2a6b45..90aeb8faadf80 100644 --- a/qa/mixed-cluster/build.gradle +++ b/qa/mixed-cluster/build.gradle @@ -55,7 +55,6 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { } String baseName = "v${bwcVersion}" - String segRepCluster = "${baseName}segrep" /* This project runs the core REST tests against a 4 node cluster where two of the nodes has a different minor. */ @@ -66,12 +65,6 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}" } - - "$segRepCluster" { - versions = [bwcVersion.toString(), project.version] - numberOfNodes = 2 - setting 'path.repo', "${buildDir}/cluster/shared/repo/${segRepCluster}" - } } tasks.register("${baseName}#mixedClusterTest", StandaloneRestIntegTestTask) { @@ -96,30 +89,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { onlyIf { project.bwc_tests_enabled } } - /** - * Defines a new task for segment replication where tests utilizes system property to set boolean param, consumed - * in tests. - */ - tasks.register("${baseName}#segrepMixedClusterTest", StandaloneRestIntegTestTask) { - useCluster testClusters."${segRepCluster}" - doFirst { - delete("${buildDir}/cluster/shared/repo/${segRepCluster}") - // Getting the endpoints causes a wait for the cluster - println "Test cluster endpoints are: ${-> testClusters."${segRepCluster}".allHttpSocketURI.join(",")}" - println "Upgrading one node to create a mixed cluster" - testClusters."${segRepCluster}".nextNodeToNextVersion() - // Getting the endpoints causes a wait for the cluster - println "Upgrade complete, endpoints are: ${-> testClusters."${segRepCluster}".allHttpSocketURI.join(",")}" - nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${segRepCluster}".allHttpSocketURI.join(",")}") - nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${segRepCluster}".getName()}") - } - systemProperty 'tests.path.repo', "${buildDir}/cluster/shared/repo/${segRepCluster}" - systemProperty 'tests.segrep_enabled', true - onlyIf { project.bwc_tests_enabled } - } - tasks.register(bwcTaskName(bwcVersion)) { dependsOn "${baseName}#mixedClusterTest" - dependsOn "${baseName}#segrepMixedClusterTest" } } diff --git a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java index fc63a3602df2e..0ab8adbd4a78a 100644 --- a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java @@ -67,8 +67,6 @@ public class IndexingIT extends OpenSearchRestTestCase { - protected static final Boolean SEGREP_ENABLED = Boolean.parseBoolean(System.getProperty("tests.segrep_enabled")); - private int indexDocs(String index, final int idStart, final int numDocs) throws IOException { for (int i = 0; i < numDocs; i++) { @@ -118,37 +116,37 @@ private void printIndexSettings(String index) throws IOException, ParseException } /** - * This test verifies that segment replication does not break when primary shards are on higher version. + * This test verifies that segment replication does not break when primary shards are on lower OS version. It does this + * by verifying replica shards contains same number of documents as primary's. * * @throws Exception */ public void testIndexingWithPrimaryOnBwcNodes() throws Exception { - if (SEGREP_ENABLED == false) return; Nodes nodes = buildNodeAndVersions(); assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty()); logger.info("cluster discovered:\n {}", nodes.toString()); final List bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList()); - logger.info("--> bwc nodes {}", bwcNamesList); final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(",")); // Exclude bwc nodes from allocation so that primaries gets allocated on current version Settings.Builder settings = Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) - .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put("index.routing.allocation.include._name", bwcNames); final String index = "test-index"; createIndex(index, settings.build()); ensureYellow(index); - printClusterRouting(); - printIndexSettings(index); - int docCount = 200; + int docCount = 20; try (RestClient nodeClient = buildClient(restClientSettings(), - nodes.values().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { + nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { logger.info("allowing replica shards assignment on bwc nodes"); updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.include._name")); + // Add replicas so that it can be assigned on higher OS version nodes. + updateIndexSettings(index, Settings.builder().put("index.number_of_replicas", 2)); + printClusterRouting(); printIndexSettings(index); ensureGreen(index); @@ -166,22 +164,22 @@ public void testIndexingWithPrimaryOnBwcNodes() throws Exception { /** - * This test creates a cluster with primary on older version but due to {@link org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider} which prevents replica shard allocation on lower OpenSearch version + * This test creates a cluster with primary on older version but due to {@link org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider}; + * replica shard allocation on lower OpenSearch version is prevented. Thus, this test though cover the use case where + * primary shard containing nodes are running on higher OS version while replicas are unassigned. * * @throws Exception */ public void testIndexingWithReplicaOnBwcNodes() throws Exception { - if (SEGREP_ENABLED == false) return; Nodes nodes = buildNodeAndVersions(); assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty()); logger.info("cluster discovered:\n {}", nodes.toString()); final List bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList()); - logger.info("--> bwc nodes {}", bwcNamesList); final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(",")); - // Exclude bwc nodes from allocation so that primaries gets allocated on current version + // Exclude bwc nodes from allocation so that primaries gets allocated on current/higher version Settings.Builder settings = Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) - .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put("index.routing.allocation.exclude._name", bwcNames); final String index = "test-index"; @@ -196,6 +194,8 @@ public void testIndexingWithReplicaOnBwcNodes() throws Exception { logger.info("allowing replica shards assignment on bwc nodes"); updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.exclude._name")); + // Add replicas so that it can be assigned on lower OS version nodes, but it doesn't work as called out in test overview + updateIndexSettings(index, Settings.builder().put("index.number_of_replicas", 2)); printClusterRouting(); // Index docs @@ -507,10 +507,11 @@ private void assertSeqNoOnShards(String index, Nodes nodes, int numDocs, RestCli }); } - private List buildShards(String index, Nodes nodes, RestClient client) throws IOException { + private List buildShards(String index, Nodes nodes, RestClient client) throws IOException, ParseException { Request request = new Request("GET", index + "/_stats"); request.addParameter("level", "shards"); Response response = client.performRequest(request); + logger.info("_stats response --> {}", EntityUtils.toString(client().performRequest(request).getEntity()).trim()); List shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards.0"); ArrayList shards = new ArrayList<>(); for (Object shard : shardStats) { From 3e872e0ef712e77a05469738a07afbba31ea3487 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Thu, 11 May 2023 16:04:12 -0700 Subject: [PATCH 3/8] Spotless fix Signed-off-by: Suraj Singh --- .../src/test/java/org/opensearch/backwards/IndexingIT.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java index 0ab8adbd4a78a..9808aea4f79e9 100644 --- a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java @@ -42,7 +42,6 @@ import org.opensearch.client.ResponseException; import org.opensearch.client.RestClient; import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.opensearch.common.Strings; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.json.JsonXContent; @@ -67,7 +66,6 @@ public class IndexingIT extends OpenSearchRestTestCase { - private int indexDocs(String index, final int idStart, final int numDocs) throws IOException { for (int i = 0; i < numDocs; i++) { final int id = idStart + i; @@ -138,7 +136,7 @@ public void testIndexingWithPrimaryOnBwcNodes() throws Exception { ensureYellow(index); printIndexSettings(index); - int docCount = 20; + int docCount = 200; try (RestClient nodeClient = buildClient(restClientSettings(), nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { @@ -507,11 +505,10 @@ private void assertSeqNoOnShards(String index, Nodes nodes, int numDocs, RestCli }); } - private List buildShards(String index, Nodes nodes, RestClient client) throws IOException, ParseException { + private List buildShards(String index, Nodes nodes, RestClient client) throws IOException { Request request = new Request("GET", index + "/_stats"); request.addParameter("level", "shards"); Response response = client.performRequest(request); - logger.info("_stats response --> {}", EntityUtils.toString(client().performRequest(request).getEntity()).trim()); List shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards.0"); ArrayList shards = new ArrayList<>(); for (Object shard : shardStats) { From a37cdc33c9d063d423cc09a50a35c14c321a8bad Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Thu, 11 May 2023 16:05:11 -0700 Subject: [PATCH 4/8] Spotless fix Signed-off-by: Suraj Singh --- .../opensearch/backwards/MixedClusterClientYamlTestSuiteIT.java | 1 - 1 file changed, 1 deletion(-) diff --git a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/MixedClusterClientYamlTestSuiteIT.java b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/MixedClusterClientYamlTestSuiteIT.java index 66b31c6045315..3860b845d6046 100644 --- a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/MixedClusterClientYamlTestSuiteIT.java +++ b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/MixedClusterClientYamlTestSuiteIT.java @@ -34,7 +34,6 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite; -import org.apache.lucene.tests.util.LuceneTestCase; import org.apache.lucene.tests.util.TimeUnits; import org.opensearch.test.rest.yaml.ClientYamlTestCandidate; import org.opensearch.test.rest.yaml.OpenSearchClientYamlSuiteTestCase; From ec6776969288835c4b7ede8cb327d0db76778445 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Mon, 15 May 2023 15:58:33 -0700 Subject: [PATCH 5/8] [Segment Replication] Rolling upgrade test Signed-off-by: Suraj Singh --- .../org/opensearch/upgrades/IndexingIT.java | 164 ++++++++++++++++++ 1 file changed, 164 insertions(+) diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java index ed4bf11041c88..4d965ce4ff8fe 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java @@ -31,22 +31,34 @@ package org.opensearch.upgrades; +import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.ParseException; import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.LegacyESVersion; import org.opensearch.Version; +import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Request; import org.opensearch.client.Response; import org.opensearch.client.ResponseException; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.Booleans; import org.opensearch.common.settings.Settings; +import org.opensearch.index.seqno.SeqNoStats; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.rest.action.document.RestBulkAction; +import org.opensearch.test.rest.yaml.ObjectPath; import java.io.IOException; +import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; import static org.opensearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; @@ -62,6 +74,81 @@ */ public class IndexingIT extends AbstractRollingTestCase { + private void printClusterNodes() throws IOException, ParseException, URISyntaxException { + Request clusterStateRequest = new Request("GET", "_nodes"); + Response response = client().performRequest(clusterStateRequest); + + ObjectPath objectPath = ObjectPath.createFromResponse(response); + Map nodesAsMap = objectPath.evaluate("nodes"); + for (String id : nodesAsMap.keySet()) { + logger.info("--> {} {} {}", + id, + objectPath.evaluate("nodes." + id + ".name"), + Version.fromString(objectPath.evaluate("nodes." + id + ".version"))); + } + response = client().performRequest(new Request("GET", "_cluster/state")); + String cm = ObjectPath.createFromResponse(response).evaluate("master_node"); + logger.info("--> Cluster manager {}", cm); + } + + // Verifies that for each shard copy holds same document count across all containing nodes. + private void waitForSearchableDocs(String index, int shardCount) throws Exception { + Map primaryShardToNodeIDMap = new HashMap<>(); + Map replicaShardToNodeIDMap = new HashMap<>(); + logger.info("--> _cat/shards \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity())); + + Request request = new Request("GET", index + "/_stats"); + request.addParameter("level", "shards"); + Response response = client().performRequest(request); + for (int shardNumber = 0; shardNumber < shardCount; shardNumber++) { + List shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards." + shardNumber); + for (Object shard : shardStats) { + final String nodeId = ObjectPath.evaluate(shard, "routing.node"); + final Boolean primary = ObjectPath.evaluate(shard, "routing.primary"); + if (primary) { + primaryShardToNodeIDMap.putIfAbsent(shardNumber, nodeId); + } else { + replicaShardToNodeIDMap.putIfAbsent(shardNumber, nodeId); + } + } + } + logger.info("--> primaryShardToNodeIDMap {}", primaryShardToNodeIDMap); + logger.info("--> replicaShardToNodeIDMap {}", replicaShardToNodeIDMap); + + for (int shardNumber = 0; shardNumber < shardCount; shardNumber++) { + logger.info("--> Verify doc count for shard number {}", shardNumber); + Request searchTestIndexRequest = new Request("POST", "/" + index + "/_search"); + searchTestIndexRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true"); + searchTestIndexRequest.addParameter("filter_path", "hits.total"); + searchTestIndexRequest.addParameter("preference", "_shards:" + shardNumber + "|_only_nodes:" + primaryShardToNodeIDMap.get(shardNumber)); + Response searchTestIndexResponse = client().performRequest(searchTestIndexRequest); + final int primaryHits = ObjectPath.createFromResponse(searchTestIndexResponse).evaluate("hits.total"); + logger.info("--> primaryHits {}", primaryHits); + final int shardNum = shardNumber; + assertBusy(() -> { + Request replicaRequest = new Request("POST", "/" + index + "/_search"); + replicaRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true"); + replicaRequest.addParameter("filter_path", "hits.total"); + replicaRequest.addParameter("preference", "_shards:" + shardNum + "|_only_nodes:" + replicaShardToNodeIDMap.get(shardNum)); + Response replicaResponse = client().performRequest(replicaRequest); + int replicaHits = ObjectPath.createFromResponse(replicaResponse).evaluate("hits.total"); + logger.info("--> ReplicaHits {}", replicaHits); + assertEquals(primaryHits, replicaHits); + }, 1, TimeUnit.MINUTES); + } + } + + private void waitForClusterHealthWithNoShardMigration(String indexName, String status) throws IOException { + Request waitForStatus = new Request("GET", "/_cluster/health/" + indexName); + waitForStatus.addParameter("wait_for_status", status); + // wait for long enough that we give delayed unassigned shards to stop being delayed + waitForStatus.addParameter("timeout", "70s"); + waitForStatus.addParameter("level", "shards"); + waitForStatus.addParameter("wait_for_no_initializing_shards", "true"); + waitForStatus.addParameter("wait_for_no_relocating_shards", "true"); + client().performRequest(waitForStatus); + } + public void testIndexing() throws IOException, ParseException { switch (CLUSTER_TYPE) { case OLD: @@ -148,6 +235,83 @@ public void testIndexing() throws IOException, ParseException { } } + + /** + * This test verifies that during rolling upgrades the segment replication does not break when replica shards can + * be running on older codec versions. + * + * @throws Exception + */ + public void testIndexingWithSegRep() throws Exception { + final String indexName = "test-index-segrep"; + final int shardCount = 3; + final int replicaCount = 1; + logger.info("--> Case {}", CLUSTER_TYPE); + printClusterNodes(); + logger.info("--> _cat/shards before test execution \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity())); + switch (CLUSTER_TYPE) { + case OLD: + Settings.Builder settings = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), shardCount) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), replicaCount) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms"); + createIndex(indexName, settings.build()); + waitForClusterHealthWithNoShardMigration(indexName, "green"); + bulk(indexName, "_OLD", 5); + break; + case MIXED: + waitForClusterHealthWithNoShardMigration(indexName, "yellow"); + break; + case UPGRADED: + waitForClusterHealthWithNoShardMigration(indexName, "green"); + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + + int expectedCount; + switch (CLUSTER_TYPE) { + case OLD: + expectedCount = 5; + break; + case MIXED: + if (Booleans.parseBoolean(System.getProperty("tests.first_round"))) { + expectedCount = 5; + } else { + expectedCount = 10; + } + break; + case UPGRADED: + expectedCount = 15; + break; + default: + throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]"); + } + + waitForSearchableDocs(indexName, shardCount); + assertCount(indexName, expectedCount); + + if (CLUSTER_TYPE != ClusterType.OLD) { + logger.info("--> Index one doc (to be deleted next) and verify doc count"); + bulk(indexName, "_" + CLUSTER_TYPE, 5); + Request toBeDeleted = new Request("PUT", "/" + indexName + "/_doc/to_be_deleted"); + toBeDeleted.addParameter("refresh", "true"); + toBeDeleted.setJsonEntity("{\"f1\": \"delete-me\"}"); + client().performRequest(toBeDeleted); + waitForSearchableDocs(indexName, shardCount); + assertCount(indexName, expectedCount + 6); + + logger.info("--> Delete previously added doc and verify doc count"); + Request delete = new Request("DELETE", "/" + indexName + "/_doc/to_be_deleted"); + delete.addParameter("refresh", "true"); + client().performRequest(delete); + waitForSearchableDocs(indexName, shardCount); + assertCount(indexName, expectedCount + 5); + } + logger.info("--> _cat/shards post execution \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity())); + } + public void testAutoIdWithOpTypeCreate() throws IOException { final String indexName = "auto_id_and_op_type_create_index"; StringBuilder b = new StringBuilder(); From 5e76b2fe3008c262d065d587459d32f53e448bdf Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 17 May 2023 14:23:13 -0700 Subject: [PATCH 6/8] PR feedback and cleanup Signed-off-by: Suraj Singh --- .../test/java/org/opensearch/backwards/IndexingIT.java | 9 --------- .../test/java/org/opensearch/upgrades/IndexingIT.java | 3 ++- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java index 9808aea4f79e9..048024bebcdd7 100644 --- a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java @@ -107,12 +107,6 @@ private void printClusterRouting() throws IOException, ParseException { logger.info("cluster nodes: {}", clusterState); } - private void printIndexSettings(String index) throws IOException, ParseException { - Request indexSettings = new Request("GET", index + "/_settings?pretty"); - String idxSettings = EntityUtils.toString(client().performRequest(indexSettings).getEntity()).trim(); - logger.info("idxSettings : {}", idxSettings); - } - /** * This test verifies that segment replication does not break when primary shards are on lower OS version. It does this * by verifying replica shards contains same number of documents as primary's. @@ -134,7 +128,6 @@ public void testIndexingWithPrimaryOnBwcNodes() throws Exception { final String index = "test-index"; createIndex(index, settings.build()); ensureYellow(index); - printIndexSettings(index); int docCount = 200; try (RestClient nodeClient = buildClient(restClientSettings(), @@ -146,7 +139,6 @@ public void testIndexingWithPrimaryOnBwcNodes() throws Exception { updateIndexSettings(index, Settings.builder().put("index.number_of_replicas", 2)); printClusterRouting(); - printIndexSettings(index); ensureGreen(index); // Index docs @@ -184,7 +176,6 @@ public void testIndexingWithReplicaOnBwcNodes() throws Exception { createIndex(index, settings.build()); ensureYellow(index); printClusterRouting(); - printIndexSettings(index); int docCount = 200; try (RestClient nodeClient = buildClient(restClientSettings(), diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java index 4d965ce4ff8fe..689c9acd8fb73 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java @@ -293,8 +293,9 @@ public void testIndexingWithSegRep() throws Exception { assertCount(indexName, expectedCount); if (CLUSTER_TYPE != ClusterType.OLD) { - logger.info("--> Index one doc (to be deleted next) and verify doc count"); + logger.info("--> Bulk index 5 documents"); bulk(indexName, "_" + CLUSTER_TYPE, 5); + logger.info("--> Index one doc (to be deleted next) and verify doc count"); Request toBeDeleted = new Request("PUT", "/" + indexName + "/_doc/to_be_deleted"); toBeDeleted.addParameter("refresh", "true"); toBeDeleted.setJsonEntity("{\"f1\": \"delete-me\"}"); From 85f2f11a2bd04c06ca6e704947b6b573087d2f4b Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Wed, 17 May 2023 17:39:24 -0700 Subject: [PATCH 7/8] Verify replica doc count only when it is assigned Signed-off-by: Suraj Singh --- .../org/opensearch/upgrades/IndexingIT.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java index 689c9acd8fb73..cec43159ff116 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java @@ -125,16 +125,19 @@ private void waitForSearchableDocs(String index, int shardCount) throws Exceptio final int primaryHits = ObjectPath.createFromResponse(searchTestIndexResponse).evaluate("hits.total"); logger.info("--> primaryHits {}", primaryHits); final int shardNum = shardNumber; - assertBusy(() -> { - Request replicaRequest = new Request("POST", "/" + index + "/_search"); - replicaRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true"); - replicaRequest.addParameter("filter_path", "hits.total"); - replicaRequest.addParameter("preference", "_shards:" + shardNum + "|_only_nodes:" + replicaShardToNodeIDMap.get(shardNum)); - Response replicaResponse = client().performRequest(replicaRequest); - int replicaHits = ObjectPath.createFromResponse(replicaResponse).evaluate("hits.total"); - logger.info("--> ReplicaHits {}", replicaHits); - assertEquals(primaryHits, replicaHits); - }, 1, TimeUnit.MINUTES); + // Verify replica shard doc count only when available. + if (replicaShardToNodeIDMap.get(shardNum) != null) { + assertBusy(() -> { + Request replicaRequest = new Request("POST", "/" + index + "/_search"); + replicaRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true"); + replicaRequest.addParameter("filter_path", "hits.total"); + replicaRequest.addParameter("preference", "_shards:" + shardNum + "|_only_nodes:" + replicaShardToNodeIDMap.get(shardNum)); + Response replicaResponse = client().performRequest(replicaRequest); + int replicaHits = ObjectPath.createFromResponse(replicaResponse).evaluate("hits.total"); + logger.info("--> ReplicaHits {}", replicaHits); + assertEquals(primaryHits, replicaHits); + }, 1, TimeUnit.MINUTES); + } } } From 2eb217410fc412c74c176f5a893c458355293e62 Mon Sep 17 00:00:00 2001 From: Suraj Singh Date: Thu, 18 May 2023 09:02:00 -0700 Subject: [PATCH 8/8] Remove wait for yellow cluster Signed-off-by: Suraj Singh --- .../java/org/opensearch/backwards/IndexingIT.java | 4 ++-- .../test/rest/OpenSearchRestTestCase.java | 14 -------------- 2 files changed, 2 insertions(+), 16 deletions(-) diff --git a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java index 048024bebcdd7..a6675a6d0ddb5 100644 --- a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java @@ -127,7 +127,7 @@ public void testIndexingWithPrimaryOnBwcNodes() throws Exception { .put("index.routing.allocation.include._name", bwcNames); final String index = "test-index"; createIndex(index, settings.build()); - ensureYellow(index); + ensureNoInitializingShards(); // wait for all other shard activity to finish int docCount = 200; try (RestClient nodeClient = buildClient(restClientSettings(), @@ -174,7 +174,7 @@ public void testIndexingWithReplicaOnBwcNodes() throws Exception { .put("index.routing.allocation.exclude._name", bwcNames); final String index = "test-index"; createIndex(index, settings.build()); - ensureYellow(index); + ensureNoInitializingShards(); // wait for all other shard activity to finish printClusterRouting(); int docCount = 200; diff --git a/test/framework/src/main/java/org/opensearch/test/rest/OpenSearchRestTestCase.java b/test/framework/src/main/java/org/opensearch/test/rest/OpenSearchRestTestCase.java index 11df944f1644a..a5d2e6793f397 100644 --- a/test/framework/src/main/java/org/opensearch/test/rest/OpenSearchRestTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/rest/OpenSearchRestTestCase.java @@ -918,20 +918,6 @@ public static void ensureGreen(String index) throws IOException { }); } - /** - * Checks that the specific index is yellow. - * - * @param index index to test for - **/ - public static void ensureYellow(String index) throws IOException { - ensureHealth(index, (request) -> { - request.addParameter("wait_for_status", "yellow"); - request.addParameter("wait_for_no_relocating_shards", "true"); - request.addParameter("timeout", "70s"); - request.addParameter("level", "shards"); - }); - } - protected static void ensureHealth(Consumer requestConsumer) throws IOException { ensureHealth("", requestConsumer); }