Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Update bwc test to rely on segments for verification #8267

Merged
merged 2 commits into from
Jun 28, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,12 @@
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
import static org.opensearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;

/**
* Basic test that indexed documents survive the rolling restart. See
Expand Down Expand Up @@ -88,52 +84,51 @@ private void printClusterNodes() throws IOException, ParseException, URISyntaxEx
}

// Verifies that for each shard copy holds same document count across all containing nodes.
private void waitForSearchableDocs(String index, int shardCount) throws Exception {
Map<Integer,String> primaryShardToNodeIDMap = new HashMap<>();
Map<Integer,String> replicaShardToNodeIDMap = new HashMap<>();
private void waitForSearchableDocs(String index, int shardCount, int replicaCount) throws Exception {
assertTrue(shardCount > 0);
assertTrue(replicaCount > 0);
waitForClusterHealthWithNoShardMigration(index, "green");
logger.info("--> _cat/shards before search \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity()));

// Verify segment replication stats
verifySegmentStats(index);
Request request = new Request("GET", index + "/_stats");
request.addParameter("level", "shards");
Response response = client().performRequest(request);
for (int shardNumber = 0; shardNumber < shardCount; shardNumber++) {
List<Object> shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards." + shardNumber);
for (Object shard : shardStats) {
final String nodeId = ObjectPath.evaluate(shard, "routing.node");
final Boolean primary = ObjectPath.evaluate(shard, "routing.primary");
if (primary) {
primaryShardToNodeIDMap.putIfAbsent(shardNumber, nodeId);
} else {
replicaShardToNodeIDMap.putIfAbsent(shardNumber, nodeId);

// Verify segment store
assertBusy(() -> {
/**
* Use default tabular output and sort response based on shard,segment,primaryOrReplica columns to allow line by
* line parsing where records related to a segment (e.g. _0) are chunked together with first record belonging
* to primary while remaining *replicaCount* records belongs to replica copies
* */
Request segrepStatsRequest = new Request("GET", "/_cat/segments/" + index + "?s=shard,segment,primaryOrReplica");
segrepStatsRequest.addParameter("h", "index,shard,primaryOrReplica,segment,docs.count");
Response segrepStatsResponse = client().performRequest(segrepStatsRequest);
logger.info("--> _cat/segments response\n {}", EntityUtils.toString(segrepStatsResponse.getEntity()));
List<String> responseList = Streams.readAllLines(segrepStatsResponse.getEntity().getContent());
for (int segmentsIndex=0; segmentsIndex < responseList.size();) {
String[] primaryRow = responseList.get(segmentsIndex++).split(" +");
kotwanikunal marked this conversation as resolved.
Show resolved Hide resolved
String shardId = primaryRow[0] + primaryRow[1];
assertTrue(primaryRow[2].equals("p"));
kotwanikunal marked this conversation as resolved.
Show resolved Hide resolved
for(int replicaIndex = 1; replicaIndex <= replicaCount; replicaIndex++) {
String[] replicaRow = responseList.get(segmentsIndex).split(" +");
String replicaShardId = replicaRow[0] + replicaRow[1];
// When segment has 0 doc count, not all replica copies posses that segment. Skip to next segment
if (replicaRow[2].equals("p")) {
assertTrue(primaryRow[4].equals("0"));
break;
}
// verify same shard id
assertTrue(replicaShardId.equals(shardId));
// verify replica row
assertTrue(replicaRow[2].equals("r"));
// Verify segment name matches e.g. _0
assertTrue(replicaRow[3].equals(primaryRow[3]));
// Verify doc count matches
assertTrue(replicaRow[4].equals(primaryRow[4]));
segmentsIndex++;
}
}
}
logger.info("--> primaryShardToNodeIDMap {}", primaryShardToNodeIDMap);
logger.info("--> replicaShardToNodeIDMap {}", replicaShardToNodeIDMap);

for (int shardNumber = 0; shardNumber < shardCount; shardNumber++) {
Request searchTestIndexRequest = new Request("POST", "/" + index + "/_search");
searchTestIndexRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");
searchTestIndexRequest.addParameter("filter_path", "hits.total");
searchTestIndexRequest.addParameter("preference", "_shards:" + shardNumber + "|_only_nodes:" + primaryShardToNodeIDMap.get(shardNumber));
Response searchTestIndexResponse = client().performRequest(searchTestIndexRequest);
final int primaryHits = ObjectPath.createFromResponse(searchTestIndexResponse).evaluate("hits.total");
final int shardNum = shardNumber;
// Verify replica shard doc count only when available.
if (replicaShardToNodeIDMap.get(shardNum) != null) {
assertBusy(() -> {
Request replicaRequest = new Request("POST", "/" + index + "/_search");
replicaRequest.addParameter(TOTAL_HITS_AS_INT_PARAM, "true");
replicaRequest.addParameter("filter_path", "hits.total");
replicaRequest.addParameter("preference", "_shards:" + shardNum + "|_only_nodes:" + replicaShardToNodeIDMap.get(shardNum));
Response replicaResponse = client().performRequest(replicaRequest);
int replicaHits = ObjectPath.createFromResponse(replicaResponse).evaluate("hits.total");
assertEquals("Doc count mismatch for shard " + shardNum + ". Primary hits " + primaryHits + " Replica hits " + replicaHits, primaryHits, replicaHits);
}, 1, TimeUnit.MINUTES);
}
}
}, 1, TimeUnit.MINUTES);
}

private void waitForClusterHealthWithNoShardMigration(String indexName, String status) throws IOException {
Expand All @@ -156,7 +151,7 @@ private void verifySegmentStats(String indexName) throws Exception {
String[] elements = statLine.split(" +");
assertEquals("Replica shard " + elements[0] + "not upto date with primary ", 0, Integer.parseInt(elements[2]));
}
});
}, 1, TimeUnit.MINUTES);
}

public void testIndexing() throws IOException, ParseException {
Expand Down Expand Up @@ -307,7 +302,7 @@ public void testIndexingWithSegRep() throws Exception {
throw new UnsupportedOperationException("Unknown cluster type [" + CLUSTER_TYPE + "]");
}

waitForSearchableDocs(indexName, shardCount);
waitForSearchableDocs(indexName, shardCount, replicaCount);
assertCount(indexName, expectedCount);

if (CLUSTER_TYPE != ClusterType.OLD) {
Expand All @@ -318,17 +313,16 @@ public void testIndexingWithSegRep() throws Exception {
toBeDeleted.addParameter("refresh", "true");
toBeDeleted.setJsonEntity("{\"f1\": \"delete-me\"}");
client().performRequest(toBeDeleted);
waitForSearchableDocs(indexName, shardCount);
waitForSearchableDocs(indexName, shardCount, replicaCount);
assertCount(indexName, expectedCount + 6);

logger.info("--> Delete previously added doc and verify doc count");
Request delete = new Request("DELETE", "/" + indexName + "/_doc/to_be_deleted");
delete.addParameter("refresh", "true");
client().performRequest(delete);
waitForSearchableDocs(indexName, shardCount);
waitForSearchableDocs(indexName, shardCount, replicaCount);
assertCount(indexName, expectedCount + 5);
}
logger.info("--> _cat/shards post execution \n{}", EntityUtils.toString(client().performRequest(new Request("GET", "/_cat/shards?v")).getEntity()));
}

public void testAutoIdWithOpTypeCreate() throws IOException {
Expand Down