Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add integ tests for remote store stats api #8135

Merged
merged 1 commit into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,36 @@ private Settings defaultIndexSettings() {
.build();
}

protected Settings remoteStoreIndexSettings(int numberOfReplicas) {
protected Settings remoteStoreIndexSettings(int numberOfReplicas, int numberOfShards) {
return Settings.builder()
.put(defaultIndexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
.build();
}

protected Settings remoteTranslogIndexSettings(int numberOfReplicas) {
protected Settings remoteStoreIndexSettings(int numberOfReplicas) {
return remoteStoreIndexSettings(numberOfReplicas, 1);
}

protected Settings remoteTranslogIndexSettings(int numberOfReplicas, int numberOfShards) {
return Settings.builder()
.put(remoteStoreIndexSettings(numberOfReplicas))
.put(remoteStoreIndexSettings(numberOfReplicas, numberOfShards))
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
}

protected Settings remoteTranslogIndexSettings(int numberOfReplicas) {
return remoteTranslogIndexSettings(numberOfReplicas, 1);
}

protected void putRepository(Path path) {
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", path))
);
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,21 @@
package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsRequestBuilder;
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.UUIDs;
import org.opensearch.index.remote.RemoteRefreshSegmentTracker;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 3)
public class RemoteStoreStatsIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";
Expand All @@ -29,7 +32,6 @@ public void testStatsResponseFromAllNodes() {

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
internalCluster().startDataOnlyNodes(3);
if (randomBoolean()) {
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0));
} else {
Expand All @@ -38,18 +40,7 @@ public void testStatsResponseFromAllNodes() {
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

// Indexing documents along with refreshes and flushes.
for (int i = 0; i < randomIntBetween(5, 10); i++) {
if (randomBoolean()) {
flush(INDEX_NAME);
} else {
refresh(INDEX_NAME);
}
int numberOfOperations = randomIntBetween(20, 50);
for (int j = 0; j < numberOfOperations; j++) {
indexSingleDoc();
}
}
indexDocs();

// Step 2 - We find all the nodes that are present in the cluster. We make the remote store stats api call from
// each of the node in the cluster and check that the response is coming as expected.
Expand All @@ -66,23 +57,93 @@ public void testStatsResponseFromAllNodes() {
.collect(Collectors.toList());
assertEquals(1, matches.size());
RemoteRefreshSegmentTracker.Stats stats = matches.get(0).getStats();
assertEquals(0, stats.refreshTimeLagMs);
assertEquals(stats.localRefreshNumber, stats.remoteRefreshNumber);
assertTrue(stats.uploadBytesStarted > 0);
assertEquals(0, stats.uploadBytesFailed);
assertTrue(stats.uploadBytesSucceeded > 0);
assertTrue(stats.totalUploadsStarted > 0);
assertEquals(0, stats.totalUploadsFailed);
assertTrue(stats.totalUploadsSucceeded > 0);
assertEquals(0, stats.rejectionCount);
assertEquals(0, stats.consecutiveFailuresCount);
assertEquals(0, stats.bytesLag);
assertTrue(stats.uploadBytesMovingAverage > 0);
assertTrue(stats.uploadBytesPerSecMovingAverage > 0);
assertTrue(stats.uploadTimeMovingAverage > 0);
assertResponseStats(stats);
}
}

public void testStatsResponseAllShards() {

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0, 3));
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

indexDocs();

// Step 2 - We find all the nodes that are present in the cluster. We make the remote store stats api call from
// each of the node in the cluster and check that the response is coming as expected.
ClusterState state = getClusterState();
String node = state.nodes().getDataNodes().values().stream().map(DiscoveryNode::getName).findFirst().get();
RemoteStoreStatsRequestBuilder remoteStoreStatsRequestBuilder = client(node).admin()
.cluster()
.prepareRemoteStoreStats(INDEX_NAME, null);
RemoteStoreStatsResponse response = remoteStoreStatsRequestBuilder.get();
assertTrue(response.getSuccessfulShards() == 3);
assertTrue(response.getShards() != null && response.getShards().length == 3);
RemoteRefreshSegmentTracker.Stats stats = response.getShards()[0].getStats();
assertResponseStats(stats);
}

public void testStatsResponseFromLocalNode() {

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0, 3));
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

indexDocs();

// Step 2 - We find a data node in the cluster. We make the remote store stats api call from
// each of the data node in the cluster and check that only local shards are returned.
ClusterState state = getClusterState();
List<String> nodes = state.nodes().getDataNodes().values().stream().map(DiscoveryNode::getName).collect(Collectors.toList());
for (String node : nodes) {
RemoteStoreStatsRequestBuilder remoteStoreStatsRequestBuilder = client(node).admin()
.cluster()
.prepareRemoteStoreStats(INDEX_NAME, null);
remoteStoreStatsRequestBuilder.setLocal(true);
RemoteStoreStatsResponse response = remoteStoreStatsRequestBuilder.get();
assertTrue(response.getSuccessfulShards() == 1);
assertTrue(response.getShards() != null && response.getShards().length == 1);
RemoteRefreshSegmentTracker.Stats stats = response.getShards()[0].getStats();
assertResponseStats(stats);
}
}

private void indexDocs() {
// Indexing documents along with refreshes and flushes.
for (int i = 0; i < randomIntBetween(5, 10); i++) {
if (randomBoolean()) {
flush(INDEX_NAME);
} else {
refresh(INDEX_NAME);
}
int numberOfOperations = randomIntBetween(20, 50);
for (int j = 0; j < numberOfOperations; j++) {
indexSingleDoc();
}
}
}

private void assertResponseStats(RemoteRefreshSegmentTracker.Stats stats) {
assertEquals(0, stats.refreshTimeLagMs);
assertEquals(stats.localRefreshNumber, stats.remoteRefreshNumber);
assertTrue(stats.uploadBytesStarted > 0);
assertEquals(0, stats.uploadBytesFailed);
assertTrue(stats.uploadBytesSucceeded > 0);
assertTrue(stats.totalUploadsStarted > 0);
assertEquals(0, stats.totalUploadsFailed);
assertTrue(stats.totalUploadsSucceeded > 0);
assertEquals(0, stats.rejectionCount);
assertEquals(0, stats.consecutiveFailuresCount);
assertEquals(0, stats.bytesLag);
assertTrue(stats.uploadBytesMovingAverage > 0);
assertTrue(stats.uploadBytesPerSecMovingAverage > 0);
assertTrue(stats.uploadTimeMovingAverage > 0);
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
Expand Down