Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Update segrep bwc tests to verify replica checkpoints and skip tests for 1.x bwc versions #8203

Merged
merged 8 commits into from
Jun 24, 2023
2 changes: 2 additions & 0 deletions qa/mixed-cluster/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) {
}

String baseName = "v${bwcVersion}"
String bwcVersionStr = "${bwcVersion}"

/* This project runs the core REST tests against a 4 node cluster where two of
the nodes has a different minor. */
Expand Down Expand Up @@ -86,6 +87,7 @@ for (Version bwcVersion : BuildParams.bwcVersions.wireCompatible) {
nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}".allHttpSocketURI.join(",")}")
nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}".getName()}")
}
systemProperty 'tests.upgrade_from_version', bwcVersionStr
systemProperty 'tests.path.repo', "${buildDir}/cluster/shared/repo/${baseName}"
onlyIf { project.bwc_tests_enabled }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@

public class IndexingIT extends OpenSearchRestTestCase {

protected static final Version UPGRADE_FROM_VERSION = Version.fromString(System.getProperty("tests.upgrade_from_version"));


private int indexDocs(String index, final int idStart, final int numDocs) throws IOException {
for (int i = 0; i < numDocs; i++) {
final int id = idStart + i;
Expand Down Expand Up @@ -114,12 +117,16 @@ private void printClusterRouting() throws IOException, ParseException {
* @throws Exception
*/
public void testIndexingWithPrimaryOnBwcNodes() throws Exception {
if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) {
logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION);
return;
}
Nodes nodes = buildNodeAndVersions();
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
logger.info("cluster discovered:\n {}", nodes.toString());
final List<String> bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList());
final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(","));
// Exclude bwc nodes from allocation so that primaries gets allocated on current version
// Update allocation settings so that primaries gets allocated only on nodes running on older version
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
Expand All @@ -133,7 +140,7 @@ public void testIndexingWithPrimaryOnBwcNodes() throws Exception {
try (RestClient nodeClient = buildClient(restClientSettings(),
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {

logger.info("allowing replica shards assignment on bwc nodes");
logger.info("Remove allocation include settings so that shards can be allocated on current version nodes");
updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.include._name"));
// Add replicas so that it can be assigned on higher OS version nodes.
updateIndexSettings(index, Settings.builder().put("index.number_of_replicas", 2));
Expand All @@ -154,13 +161,17 @@ public void testIndexingWithPrimaryOnBwcNodes() throws Exception {


/**
* This test creates a cluster with primary on older version but due to {@link org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider};
* This test creates a cluster with primary on higher version but due to {@link org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider};
* replica shard allocation on lower OpenSearch version is prevented. Thus, this test though cover the use case where
* primary shard containing nodes are running on higher OS version while replicas are unassigned.
*
* @throws Exception
*/
public void testIndexingWithReplicaOnBwcNodes() throws Exception {
if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) {
logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION);
return;
}
Nodes nodes = buildNodeAndVersions();
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
logger.info("cluster discovered:\n {}", nodes.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,17 @@
import org.opensearch.client.Response;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Booleans;
import org.opensearch.common.io.Streams;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.codec.CodecService;
import org.opensearch.index.engine.EngineConfig;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.rest.yaml.ObjectPath;

import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -90,6 +94,7 @@ private void waitForSearchableDocs(String index, int shardCount) throws Exceptio
waitForClusterHealthWithNoShardMigration(index, "green");
logger.info("--> _cat/shards before search \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity()));

verifySegmentStats(index);
Request request = new Request("GET", index + "/_stats");
request.addParameter("level", "shards");
Response response = client().performRequest(request);
Expand All @@ -109,14 +114,12 @@ private void waitForSearchableDocs(String index, int shardCount) throws Exceptio
logger.info("--> replicaShardToNodeIDMap {}", replicaShardToNodeIDMap);

for (int shardNumber = 0; shardNumber < shardCount; shardNumber++) {
logger.info("--> Verify doc count for shard number {}", shardNumber);
Request searchTestIndexRequest = new Request("POST", "/" + index + "/_search");
searchTestIndexRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");
searchTestIndexRequest.addParameter("filter_path", "hits.total");
searchTestIndexRequest.addParameter("preference", "_shards:" + shardNumber + "|_only_nodes:" + primaryShardToNodeIDMap.get(shardNumber));
Response searchTestIndexResponse = client().performRequest(searchTestIndexRequest);
final int primaryHits = ObjectPath.createFromResponse(searchTestIndexResponse).evaluate("hits.total");
logger.info("--> primaryHits {}", primaryHits);
final int shardNum = shardNumber;
// Verify replica shard doc count only when available.
if (replicaShardToNodeIDMap.get(shardNum) != null) {
Expand All @@ -127,8 +130,7 @@ private void waitForSearchableDocs(String index, int shardCount) throws Exceptio
replicaRequest.addParameter("preference", "_shards:" + shardNum + "|_only_nodes:" + replicaShardToNodeIDMap.get(shardNum));
Response replicaResponse = client().performRequest(replicaRequest);
int replicaHits = ObjectPath.createFromResponse(replicaResponse).evaluate("hits.total");
logger.info("--> ReplicaHits {}", replicaHits);
assertEquals(primaryHits, replicaHits);
assertEquals("Doc count mismatch for shard " + shardNum + ". Primary hits " + primaryHits + " Replica hits " + replicaHits, primaryHits, replicaHits);
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
}, 1, TimeUnit.MINUTES);
}
}
Expand All @@ -145,6 +147,18 @@ private void waitForClusterHealthWithNoShardMigration(String indexName, String s
client().performRequest(waitForStatus);
}

private void verifySegmentStats(String indexName) throws Exception {
assertBusy(() -> {
Request segrepStatsRequest = new Request("GET", "/_cat/segment_replication/" + indexName);
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
segrepStatsRequest.addParameter("h", "shardId,target_node,checkpoints_behind");
Response segrepStatsResponse = client().performRequest(segrepStatsRequest);
for (String statLine : Streams.readAllLines(segrepStatsResponse.getEntity().getContent())) {
String[] elements = statLine.split(" +");
assertEquals("Replica shard " + elements[0] + "not upto date with primary ", 0, Integer.parseInt(elements[2]));
}
});
}

public void testIndexing() throws IOException, ParseException {
switch (CLUSTER_TYPE) {
case OLD:
Expand Down Expand Up @@ -239,9 +253,13 @@ public void testIndexing() throws IOException, ParseException {
* @throws Exception
*/
public void testIndexingWithSegRep() throws Exception {
if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) {
logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION);
return;
}
final String indexName = "test-index-segrep";
final int shardCount = 3;
final int replicaCount = 1;
final int replicaCount = 2;
logger.info("--> Case {}", CLUSTER_TYPE);
printClusterNodes();
logger.info("--> _cat/shards before test execution \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity()));
Expand All @@ -251,6 +269,10 @@ public void testIndexingWithSegRep() throws Exception {
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), shardCount)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), replicaCount)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(
EngineConfig.INDEX_CODEC_SETTING.getKey(),
randomFrom(CodecService.DEFAULT_CODEC, CodecService.BEST_COMPRESSION_CODEC, CodecService.LUCENE_DEFAULT_CODEC)
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms");
createIndex(indexName, settings.build());
waitForClusterHealthWithNoShardMigration(indexName, "green");
Expand Down