Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Added mixed and rolling upgrade bwc test #7537

Merged
merged 8 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
package org.opensearch.backwards;

import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.ParseException;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.client.Request;
Expand All @@ -45,6 +47,7 @@
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.common.xcontent.support.XContentMapValues;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.rest.OpenSearchRestTestCase;
import org.opensearch.test.rest.yaml.ObjectPath;
Expand Down Expand Up @@ -98,6 +101,112 @@ private int indexDocWithConcurrentUpdates(String index, final int docId, int nUp
return nUpdates + 1;
}

private void printClusterRouting() throws IOException, ParseException {
Request clusterStateRequest = new Request("GET", "_cluster/state/routing_nodes?pretty");
String clusterState = EntityUtils.toString(client().performRequest(clusterStateRequest).getEntity()).trim();
logger.info("cluster nodes: {}", clusterState);
}

private void printIndexSettings(String index) throws IOException, ParseException {
Request indexSettings = new Request("GET", index + "/_settings?pretty");
String idxSettings = EntityUtils.toString(client().performRequest(indexSettings).getEntity()).trim();
logger.info("idxSettings : {}", idxSettings);
}

/**
* This test verifies that segment replication does not break when primary shards are on lower OS version. It does this
* by verifying replica shards contains same number of documents as primary's.
*
* @throws Exception
*/
public void testIndexingWithPrimaryOnBwcNodes() throws Exception {
Nodes nodes = buildNodeAndVersions();
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
logger.info("cluster discovered:\n {}", nodes.toString());
final List<String> bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList());
final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(","));
// Exclude bwc nodes from allocation so that primaries gets allocated on current version
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put("index.routing.allocation.include._name", bwcNames);
final String index = "test-index";
createIndex(index, settings.build());
ensureYellow(index);
printIndexSettings(index);
mch2 marked this conversation as resolved.
Show resolved Hide resolved

int docCount = 200;
try (RestClient nodeClient = buildClient(restClientSettings(),
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {

logger.info("allowing replica shards assignment on bwc nodes");
updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.include._name"));
// Add replicas so that it can be assigned on higher OS version nodes.
updateIndexSettings(index, Settings.builder().put("index.number_of_replicas", 2));

printClusterRouting();
printIndexSettings(index);
ensureGreen(index);

// Index docs
indexDocs(index, 0, docCount);

// perform a refresh
assertOK(client().performRequest(new Request("POST", index + "/_flush")));

// verify replica catch up with primary
assertSeqNoOnShards(index, nodes, docCount, nodeClient);
}
}


/**
* This test creates a cluster with primary on older version but due to {@link org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider};
* replica shard allocation on lower OpenSearch version is prevented. Thus, this test though cover the use case where
* primary shard containing nodes are running on higher OS version while replicas are unassigned.
*
* @throws Exception
*/
public void testIndexingWithReplicaOnBwcNodes() throws Exception {
Nodes nodes = buildNodeAndVersions();
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
logger.info("cluster discovered:\n {}", nodes.toString());
final List<String> bwcNamesList = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.toList());
final String bwcNames = bwcNamesList.stream().collect(Collectors.joining(","));
// Exclude bwc nodes from allocation so that primaries gets allocated on current/higher version
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put("index.routing.allocation.exclude._name", bwcNames);
final String index = "test-index";
createIndex(index, settings.build());
ensureYellow(index);
printClusterRouting();
printIndexSettings(index);

int docCount = 200;
try (RestClient nodeClient = buildClient(restClientSettings(),
nodes.values().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {

logger.info("allowing replica shards assignment on bwc nodes");
updateIndexSettings(index, Settings.builder().putNull("index.routing.allocation.exclude._name"));
// Add replicas so that it can be assigned on lower OS version nodes, but it doesn't work as called out in test overview
updateIndexSettings(index, Settings.builder().put("index.number_of_replicas", 2));
printClusterRouting();

// Index docs
indexDocs(index, 0, docCount);

// perform a refresh
assertOK(client().performRequest(new Request("POST", index + "/_flush")));

// verify replica catch up with primary
assertSeqNoOnShards(index, nodes, docCount, nodeClient);
}
}

public void testIndexVersionPropagation() throws Exception {
Nodes nodes = buildNodeAndVersions();
assumeFalse("new nodes is empty", nodes.getNewNodes().isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,34 @@

package org.opensearch.upgrades;

import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.ParseException;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Booleans;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.rest.action.document.RestBulkAction;
import org.opensearch.test.rest.yaml.ObjectPath;

import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
Expand All @@ -62,6 +74,81 @@
*/
public class IndexingIT extends AbstractRollingTestCase {

private void printClusterNodes() throws IOException, ParseException, URISyntaxException {
Request clusterStateRequest = new Request("GET", "_nodes");
Response response = client().performRequest(clusterStateRequest);

ObjectPath objectPath = ObjectPath.createFromResponse(response);
Map<String, Object> nodesAsMap = objectPath.evaluate("nodes");
for (String id : nodesAsMap.keySet()) {
logger.info("--> {} {} {}",
id,
objectPath.evaluate("nodes." + id + ".name"),
Version.fromString(objectPath.evaluate("nodes." + id + ".version")));
}
response = client().performRequest(new Request("GET", "_cluster/state"));
String cm = ObjectPath.createFromResponse(response).evaluate("master_node");
logger.info("--> Cluster manager {}", cm);
}

// Verifies that for each shard copy holds same document count across all containing nodes.
private void waitForSearchableDocs(String index, int shardCount) throws Exception {
Map<Integer,String> primaryShardToNodeIDMap = new HashMap<>();
Map<Integer,String> replicaShardToNodeIDMap = new HashMap<>();
logger.info("--> _cat/shards \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity()));

Request request = new Request("GET", index + "/_stats");
request.addParameter("level", "shards");
Response response = client().performRequest(request);
for (int shardNumber = 0; shardNumber < shardCount; shardNumber++) {
List<Object> shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards." + shardNumber);
for (Object shard : shardStats) {
final String nodeId = ObjectPath.evaluate(shard, "routing.node");
final Boolean primary = ObjectPath.evaluate(shard, "routing.primary");
if (primary) {
primaryShardToNodeIDMap.putIfAbsent(shardNumber, nodeId);
} else {
replicaShardToNodeIDMap.putIfAbsent(shardNumber, nodeId);
}
}
}
logger.info("--> primaryShardToNodeIDMap {}", primaryShardToNodeIDMap);
logger.info("--> replicaShardToNodeIDMap {}", replicaShardToNodeIDMap);

for (int shardNumber = 0; shardNumber < shardCount; shardNumber++) {
logger.info("--> Verify doc count for shard number {}", shardNumber);
Request searchTestIndexRequest = new Request("POST", "/" + index + "/_search");
searchTestIndexRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");
searchTestIndexRequest.addParameter("filter_path", "hits.total");
searchTestIndexRequest.addParameter("preference", "_shards:" + shardNumber + "|_only_nodes:" + primaryShardToNodeIDMap.get(shardNumber));
Response searchTestIndexResponse = client().performRequest(searchTestIndexRequest);
final int primaryHits = ObjectPath.createFromResponse(searchTestIndexResponse).evaluate("hits.total");
logger.info("--> primaryHits {}", primaryHits);
final int shardNum = shardNumber;
assertBusy(() -> {
Request replicaRequest = new Request("POST", "/" + index + "/_search");
replicaRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");
replicaRequest.addParameter("filter_path", "hits.total");
replicaRequest.addParameter("preference", "_shards:" + shardNum + "|_only_nodes:" + replicaShardToNodeIDMap.get(shardNum));
Response replicaResponse = client().performRequest(replicaRequest);
int replicaHits = ObjectPath.createFromResponse(replicaResponse).evaluate("hits.total");
logger.info("--> ReplicaHits {}", replicaHits);
assertEquals(primaryHits, replicaHits);
}, 1, TimeUnit.MINUTES);
}
}

private void waitForClusterHealthWithNoShardMigration(String indexName, String status) throws IOException {
Request waitForStatus = new Request("GET", "/_cluster/health/" + indexName);
waitForStatus.addParameter("wait_for_status", status);
// wait for long enough that we give delayed unassigned shards to stop being delayed
waitForStatus.addParameter("timeout", "70s");
waitForStatus.addParameter("level", "shards");
waitForStatus.addParameter("wait_for_no_initializing_shards", "true");
waitForStatus.addParameter("wait_for_no_relocating_shards", "true");
client().performRequest(waitForStatus);
}

public void testIndexing() throws IOException, ParseException {
switch (CLUSTER_TYPE) {
case OLD:
Expand Down Expand Up @@ -148,6 +235,83 @@ public void testIndexing() throws IOException, ParseException {
}
}


/**
* This test verifies that during rolling upgrades the segment replication does not break when replica shards can
* be running on older codec versions.
*
* @throws Exception
*/
public void testIndexingWithSegRep() throws Exception {
final String indexName = "test-index-segrep";
final int shardCount = 3;
final int replicaCount = 1;
logger.info("--> Case {}", CLUSTER_TYPE);
printClusterNodes();
logger.info("--> _cat/shards before test execution \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity()));
switch (CLUSTER_TYPE) {
case OLD:
Settings.Builder settings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), shardCount)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), replicaCount)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms");
createIndex(indexName, settings.build());
waitForClusterHealthWithNoShardMigration(indexName, "green");
bulk(indexName, "_OLD", 5);
break;
case MIXED:
waitForClusterHealthWithNoShardMigration(indexName, "yellow");
break;
case UPGRADED:
waitForClusterHealthWithNoShardMigration(indexName, "green");
break;
default:
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
}

int expectedCount;
switch (CLUSTER_TYPE) {
case OLD:
expectedCount = 5;
break;
case MIXED:
if (Booleans.parseBoolean(System.getProperty("tests.first_round"))) {
expectedCount = 5;
} else {
expectedCount = 10;
}
break;
case UPGRADED:
expectedCount = 15;
break;
default:
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
}

waitForSearchableDocs(indexName, shardCount);
assertCount(indexName, expectedCount);

if (CLUSTER_TYPE != ClusterType.OLD) {
logger.info("--> Index one doc (to be deleted next) and verify doc count");
bulk(indexName, "_" + CLUSTER_TYPE, 5);
Request toBeDeleted = new Request("PUT", "/" + indexName + "/_doc/to_be_deleted");
toBeDeleted.addParameter("refresh", "true");
toBeDeleted.setJsonEntity("{\"f1\": \"delete-me\"}");
client().performRequest(toBeDeleted);
waitForSearchableDocs(indexName, shardCount);
assertCount(indexName, expectedCount + 6);

logger.info("--> Delete previously added doc and verify doc count");
Request delete = new Request("DELETE", "/" + indexName + "/_doc/to_be_deleted");
delete.addParameter("refresh", "true");
client().performRequest(delete);
waitForSearchableDocs(indexName, shardCount);
assertCount(indexName, expectedCount + 5);
}
logger.info("--> _cat/shards post execution \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity()));
}

public void testAutoIdWithOpTypeCreate() throws IOException {
final String indexName = "auto_id_and_op_type_create_index";
StringBuilder b = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,20 @@ public static void ensureGreen(String index) throws IOException {
});
}

/**
* Checks that the specific index is yellow.
*
* @param index index to test for
**/
public static void ensureYellow(String index) throws IOException {
ensureHealth(index, (request) -> {
request.addParameter("wait_for_status", "yellow");
request.addParameter("wait_for_no_relocating_shards", "true");
request.addParameter("timeout", "70s");
request.addParameter("level", "shards");
});
}

protected static void ensureHealth(Consumer<Request> requestConsumer) throws IOException {
ensureHealth("", requestConsumer);
}
Expand Down