diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 0ffa5ab23e0b6..d226d0d757638 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -62,22 +62,36 @@ private Settings defaultIndexSettings() { .build(); } - protected Settings remoteStoreIndexSettings(int numberOfReplicas) { + protected Settings remoteStoreIndexSettings(int numberOfReplicas, int numberOfShards) { return Settings.builder() .put(defaultIndexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) .build(); } - protected Settings remoteTranslogIndexSettings(int numberOfReplicas) { + protected Settings remoteStoreIndexSettings(int numberOfReplicas) { + return remoteStoreIndexSettings(numberOfReplicas, 1); + } + + protected Settings remoteTranslogIndexSettings(int numberOfReplicas, int numberOfShards) { return Settings.builder() - .put(remoteStoreIndexSettings(numberOfReplicas)) + .put(remoteStoreIndexSettings(numberOfReplicas, numberOfShards)) .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME) .build(); } + protected Settings remoteTranslogIndexSettings(int numberOfReplicas) { + return remoteTranslogIndexSettings(numberOfReplicas, 1); + } + + protected void putRepository(Path path) { + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", path)) + ); + } + @Before public void setup() { internalCluster().startClusterManagerOnlyNode(); diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java index 3c5853f9a64e9..0ea87d106c14e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsIT.java @@ -9,18 +9,21 @@ package org.opensearch.remotestore; import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats; +import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsRequestBuilder; import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse; import org.opensearch.action.index.IndexResponse; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.UUIDs; import org.opensearch.index.remote.RemoteRefreshSegmentTracker; +import org.opensearch.test.OpenSearchIntegTestCase; import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.stream.Collectors; +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 3) public class RemoteStoreStatsIT extends RemoteStoreBaseIntegTestCase { private static final String INDEX_NAME = "remote-store-test-idx-1"; @@ -29,7 +32,6 @@ public void testStatsResponseFromAllNodes() { // Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes // during this time frame. This ensures that the segment upload has started. - internalCluster().startDataOnlyNodes(3); if (randomBoolean()) { createIndex(INDEX_NAME, remoteTranslogIndexSettings(0)); } else { @@ -38,18 +40,7 @@ public void testStatsResponseFromAllNodes() { ensureYellowAndNoInitializingShards(INDEX_NAME); ensureGreen(INDEX_NAME); - // Indexing documents along with refreshes and flushes. - for (int i = 0; i < randomIntBetween(5, 10); i++) { - if (randomBoolean()) { - flush(INDEX_NAME); - } else { - refresh(INDEX_NAME); - } - int numberOfOperations = randomIntBetween(20, 50); - for (int j = 0; j < numberOfOperations; j++) { - indexSingleDoc(); - } - } + indexDocs(); // Step 2 - We find all the nodes that are present in the cluster. We make the remote store stats api call from // each of the node in the cluster and check that the response is coming as expected. @@ -66,23 +57,93 @@ public void testStatsResponseFromAllNodes() { .collect(Collectors.toList()); assertEquals(1, matches.size()); RemoteRefreshSegmentTracker.Stats stats = matches.get(0).getStats(); - assertEquals(0, stats.refreshTimeLagMs); - assertEquals(stats.localRefreshNumber, stats.remoteRefreshNumber); - assertTrue(stats.uploadBytesStarted > 0); - assertEquals(0, stats.uploadBytesFailed); - assertTrue(stats.uploadBytesSucceeded > 0); - assertTrue(stats.totalUploadsStarted > 0); - assertEquals(0, stats.totalUploadsFailed); - assertTrue(stats.totalUploadsSucceeded > 0); - assertEquals(0, stats.rejectionCount); - assertEquals(0, stats.consecutiveFailuresCount); - assertEquals(0, stats.bytesLag); - assertTrue(stats.uploadBytesMovingAverage > 0); - assertTrue(stats.uploadBytesPerSecMovingAverage > 0); - assertTrue(stats.uploadTimeMovingAverage > 0); + assertResponseStats(stats); + } + } + + public void testStatsResponseAllShards() { + + // Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes + // during this time frame. This ensures that the segment upload has started. + createIndex(INDEX_NAME, remoteTranslogIndexSettings(0, 3)); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + indexDocs(); + + // Step 2 - We find all the nodes that are present in the cluster. We make the remote store stats api call from + // each of the node in the cluster and check that the response is coming as expected. + ClusterState state = getClusterState(); + String node = state.nodes().getDataNodes().values().stream().map(DiscoveryNode::getName).findFirst().get(); + RemoteStoreStatsRequestBuilder remoteStoreStatsRequestBuilder = client(node).admin() + .cluster() + .prepareRemoteStoreStats(INDEX_NAME, null); + RemoteStoreStatsResponse response = remoteStoreStatsRequestBuilder.get(); + assertTrue(response.getSuccessfulShards() == 3); + assertTrue(response.getShards() != null && response.getShards().length == 3); + RemoteRefreshSegmentTracker.Stats stats = response.getShards()[0].getStats(); + assertResponseStats(stats); + } + + public void testStatsResponseFromLocalNode() { + + // Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes + // during this time frame. This ensures that the segment upload has started. + createIndex(INDEX_NAME, remoteTranslogIndexSettings(0, 3)); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + indexDocs(); + + // Step 2 - We find a data node in the cluster. We make the remote store stats api call from + // each of the data node in the cluster and check that only local shards are returned. + ClusterState state = getClusterState(); + List nodes = state.nodes().getDataNodes().values().stream().map(DiscoveryNode::getName).collect(Collectors.toList()); + for (String node : nodes) { + RemoteStoreStatsRequestBuilder remoteStoreStatsRequestBuilder = client(node).admin() + .cluster() + .prepareRemoteStoreStats(INDEX_NAME, null); + remoteStoreStatsRequestBuilder.setLocal(true); + RemoteStoreStatsResponse response = remoteStoreStatsRequestBuilder.get(); + assertTrue(response.getSuccessfulShards() == 1); + assertTrue(response.getShards() != null && response.getShards().length == 1); + RemoteRefreshSegmentTracker.Stats stats = response.getShards()[0].getStats(); + assertResponseStats(stats); + } + } + + private void indexDocs() { + // Indexing documents along with refreshes and flushes. + for (int i = 0; i < randomIntBetween(5, 10); i++) { + if (randomBoolean()) { + flush(INDEX_NAME); + } else { + refresh(INDEX_NAME); + } + int numberOfOperations = randomIntBetween(20, 50); + for (int j = 0; j < numberOfOperations; j++) { + indexSingleDoc(); + } } } + private void assertResponseStats(RemoteRefreshSegmentTracker.Stats stats) { + assertEquals(0, stats.refreshTimeLagMs); + assertEquals(stats.localRefreshNumber, stats.remoteRefreshNumber); + assertTrue(stats.uploadBytesStarted > 0); + assertEquals(0, stats.uploadBytesFailed); + assertTrue(stats.uploadBytesSucceeded > 0); + assertTrue(stats.totalUploadsStarted > 0); + assertEquals(0, stats.totalUploadsFailed); + assertTrue(stats.totalUploadsSucceeded > 0); + assertEquals(0, stats.rejectionCount); + assertEquals(0, stats.consecutiveFailuresCount); + assertEquals(0, stats.bytesLag); + assertTrue(stats.uploadBytesMovingAverage > 0); + assertTrue(stats.uploadBytesPerSecMovingAverage > 0); + assertTrue(stats.uploadTimeMovingAverage > 0); + } + private IndexResponse indexSingleDoc() { return client().prepareIndex(INDEX_NAME) .setId(UUIDs.randomBase64UUID())