diff --git a/qa/mixed-cluster/build.gradle b/qa/mixed-cluster/build.gradle index 55f900c52f2c2..d64bf245dbf8f 100644 --- a/qa/mixed-cluster/build.gradle +++ b/qa/mixed-cluster/build.gradle @@ -55,6 +55,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { } String baseName = "v${bwcVersion}" + String bwcVersionStr = "${bwcVersion}" /* This project runs the core REST tests against a 4 node cluster where two of the nodes has a different minor. */ @@ -86,6 +87,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) { nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}") nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}") } + systemProperty 'tests.upgrade_from_version', bwcVersionStr systemProperty 'tests.path.repo', "${buildDir}/cluster/shared/repo/${baseName}" onlyIf { project.bwc_tests_enabled } } diff --git a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java index a6675a6d0ddb5..b867b90af333c 100644 --- a/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java +++ b/qa/mixed-cluster/src/test/java/org/opensearch/backwards/IndexingIT.java @@ -66,6 +66,9 @@ public class IndexingIT extends OpenSearchRestTestCase { + protected static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version")); + + private int indexDocs(String index, final int idStart, final int numDocs) throws IOException { for (int i = 0; i < numDocs; i++) { final int id = idStart + i; @@ -114,12 +117,16 @@ private void printClusterRouting() throws IOException, ParseException { * @throws Exception */ public void testIndexingWithPrimaryOnBwcNodes() throws Exception { + if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) { + logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION); + return; + } Nodes nodes = buildNodeAndVersions(); assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty()); logger.info("cluster discovered:\n {}", nodes.toString()); final List bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList()); final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(",")); - // Exclude bwc nodes from allocation so that primaries gets allocated on current version + // Update allocation settings so that primaries gets allocated only on nodes running on older version Settings.Builder settings = Settings.builder() .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) @@ -133,7 +140,7 @@ public void testIndexingWithPrimaryOnBwcNodes() throws Exception { try (RestClient nodeClient = buildClient(restClientSettings(), nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { - logger.info("allowing replica shards assignment on bwc nodes"); + logger.info("Remove allocation include settings so that shards can be allocated on current version nodes"); updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.include._name")); // Add replicas so that it can be assigned on higher OS version nodes. updateIndexSettings(index, Settings.builder().put("index.number_of_replicas", 2)); @@ -154,13 +161,17 @@ public void testIndexingWithPrimaryOnBwcNodes() throws Exception { /** - * This test creates a cluster with primary on older version but due to {@link org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider}; + * This test creates a cluster with primary on higher version but due to {@link org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider}; * replica shard allocation on lower OpenSearch version is prevented. Thus, this test though cover the use case where * primary shard containing nodes are running on higher OS version while replicas are unassigned. * * @throws Exception */ public void testIndexingWithReplicaOnBwcNodes() throws Exception { + if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) { + logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION); + return; + } Nodes nodes = buildNodeAndVersions(); assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty()); logger.info("cluster discovered:\n {}", nodes.toString()); diff --git a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java index a758b8e4ccd72..173aa9f6557d2 100644 --- a/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java +++ b/qa/rolling-upgrade/src/test/java/org/opensearch/upgrades/IndexingIT.java @@ -38,13 +38,17 @@ import org.opensearch.client.Response; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.Booleans; +import org.opensearch.common.io.Streams; import org.opensearch.common.settings.Settings; +import org.opensearch.index.codec.CodecService; +import org.opensearch.index.engine.EngineConfig; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.rest.yaml.ObjectPath; import java.io.IOException; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -90,6 +94,7 @@ private void waitForSearchableDocs(String index, int shardCount) throws Exceptio waitForClusterHealthWithNoShardMigration(index, "green"); logger.info("--> _cat/shards before search \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity())); + verifySegmentStats(index); Request request = new Request("GET", index + "/_stats"); request.addParameter("level", "shards"); Response response = client().performRequest(request); @@ -109,14 +114,12 @@ private void waitForSearchableDocs(String index, int shardCount) throws Exceptio logger.info("--> replicaShardToNodeIDMap {}", replicaShardToNodeIDMap); for (int shardNumber = 0; shardNumber < shardCount; shardNumber++) { - logger.info("--> Verify doc count for shard number {}", shardNumber); Request searchTestIndexRequest = new Request("POST", "/" + index + "/_search"); searchTestIndexRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true"); searchTestIndexRequest.addParameter("filter_path", "hits.total"); searchTestIndexRequest.addParameter("preference", "_shards:" + shardNumber + "|_only_nodes:" + primaryShardToNodeIDMap.get(shardNumber)); Response searchTestIndexResponse = client().performRequest(searchTestIndexRequest); final int primaryHits = ObjectPath.createFromResponse(searchTestIndexResponse).evaluate("hits.total"); - logger.info("--> primaryHits {}", primaryHits); final int shardNum = shardNumber; // Verify replica shard doc count only when available. if (replicaShardToNodeIDMap.get(shardNum) != null) { @@ -127,8 +130,7 @@ private void waitForSearchableDocs(String index, int shardCount) throws Exceptio replicaRequest.addParameter("preference", "_shards:" + shardNum + "|_only_nodes:" + replicaShardToNodeIDMap.get(shardNum)); Response replicaResponse = client().performRequest(replicaRequest); int replicaHits = ObjectPath.createFromResponse(replicaResponse).evaluate("hits.total"); - logger.info("--> ReplicaHits {}", replicaHits); - assertEquals(primaryHits, replicaHits); + assertEquals("Doc count mismatch for shard " + shardNum + ". Primary hits " + primaryHits + " Replica hits " + replicaHits, primaryHits, replicaHits); }, 1, TimeUnit.MINUTES); } } @@ -145,6 +147,18 @@ private void waitForClusterHealthWithNoShardMigration(String indexName, String s client().performRequest(waitForStatus); } + private void verifySegmentStats(String indexName) throws Exception { + assertBusy(() -> { + Request segrepStatsRequest = new Request("GET", "/_cat/segment_replication/" + indexName); + segrepStatsRequest.addParameter("h", "shardId,target_node,checkpoints_behind"); + Response segrepStatsResponse = client().performRequest(segrepStatsRequest); + for (String statLine : Streams.readAllLines(segrepStatsResponse.getEntity().getContent())) { + String[] elements = statLine.split(" +"); + assertEquals("Replica shard " + elements[0] + "not upto date with primary ", 0, Integer.parseInt(elements[2])); + } + }); + } + public void testIndexing() throws IOException, ParseException { switch (CLUSTER_TYPE) { case OLD: @@ -239,9 +253,13 @@ public void testIndexing() throws IOException, ParseException { * @throws Exception */ public void testIndexingWithSegRep() throws Exception { + if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) { + logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION); + return; + } final String indexName = "test-index-segrep"; final int shardCount = 3; - final int replicaCount = 1; + final int replicaCount = 2; logger.info("--> Case {}", CLUSTER_TYPE); printClusterNodes(); logger.info("--> _cat/shards before test execution \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity())); @@ -251,6 +269,10 @@ public void testIndexingWithSegRep() throws Exception { .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), shardCount) .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), replicaCount) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put( + EngineConfig.INDEX_CODEC_SETTING.getKey(), + randomFrom(CodecService.DEFAULT_CODEC, CodecService.BEST_COMPRESSION_CODEC, CodecService.LUCENE_DEFAULT_CODEC) + ) .put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms"); createIndex(indexName, settings.build()); waitForClusterHealthWithNoShardMigration(indexName, "green");