Skip to content

Commit

Permalink
Extend existing IndexRecoveryIT for remote indexes
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Jul 6, 2023
1 parent 8bebf5b commit 444731c
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.index.IndexCommit;
import org.hamcrest.Matcher;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
Expand Down Expand Up @@ -101,8 +102,8 @@
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.NodeIndicesStats;
import org.opensearch.indices.analysis.AnalysisModule;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.recovery.RecoveryState.Stage;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.node.NodeClosedException;
import org.opensearch.node.RecoverySettingsChunkSizePlugin;
import org.opensearch.plugins.AnalysisPlugin;
Expand Down Expand Up @@ -231,7 +232,7 @@ private void assertRecoveryState(
int shardId,
RecoverySource type,
boolean primary,
Stage stage,
RecoveryState.Stage stage,
String sourceNode,
String targetNode
) {
Expand Down Expand Up @@ -287,6 +288,7 @@ private void restoreRecoverySpeed() {
public void testGatewayRecovery() throws Exception {
logger.info("--> start nodes");
String node = internalCluster().startNode();
afterFirstStartNode();

createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT);

Expand All @@ -309,9 +311,14 @@ public void testGatewayRecovery() throws Exception {
validateIndexRecoveryState(recoveryState.getIndex());
}

protected void afterFirstStartNode() {
// No-op
}

public void testGatewayRecoveryTestActiveOnly() throws Exception {
logger.info("--> start nodes");
internalCluster().startNode();
afterFirstStartNode();

createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT);

Expand All @@ -328,6 +335,7 @@ public void testGatewayRecoveryTestActiveOnly() throws Exception {

public void testReplicaRecovery() throws Exception {
final String nodeA = internalCluster().startNode();
afterFirstStartNode();
createIndex(
INDEX_NAME,
Settings.builder()
Expand Down Expand Up @@ -399,6 +407,7 @@ public void testReplicaRecovery() throws Exception {
public void testCancelNewShardRecoveryAndUsesExistingShardCopy() throws Exception {
logger.info("--> start node A");
final String nodeA = internalCluster().startNode();
afterFirstStartNode();

logger.info("--> create index on node: {}", nodeA);
createIndex(
Expand Down Expand Up @@ -497,6 +506,7 @@ public Settings onNodeStopped(String nodeName) throws Exception {
public void testRerouteRecovery() throws Exception {
logger.info("--> start node A");
final String nodeA = internalCluster().startNode();
afterFirstStartNode();

logger.info("--> create index on node: {}", nodeA);
ByteSizeValue shardSize = createAndPopulateIndex(INDEX_NAME, 1, SHARD_COUNT, REPLICA_COUNT).getShards()[0].getStats()
Expand Down Expand Up @@ -584,14 +594,14 @@ public void testRerouteRecovery() throws Exception {
assertThat(
"node A throttling should increase",
recoveryStats.throttleTime().millis(),
greaterThan(finalNodeAThrottling)
getMatcherForThrottling(finalNodeAThrottling)
);
}
if (nodeStats.getNode().getName().equals(nodeB)) {
assertThat(
"node B throttling should increase",
recoveryStats.throttleTime().millis(),
greaterThan(finalNodeBThrottling)
getMatcherForThrottling(finalNodeBThrottling)
);
}
}
Expand Down Expand Up @@ -623,7 +633,7 @@ public void testRerouteRecovery() throws Exception {
final RecoveryStats recoveryStats = nodeStats.getIndices().getRecoveryStats();
assertThat(recoveryStats.currentAsSource(), equalTo(0));
assertThat(recoveryStats.currentAsTarget(), equalTo(0));
assertThat(nodeName + " throttling should be >0", recoveryStats.throttleTime().millis(), greaterThan(0L));
assertThat(nodeName + " throttling should be >0", recoveryStats.throttleTime().millis(), getMatcherForThrottling(0));
};
// we have to use assertBusy as recovery counters are decremented only when the last reference to the RecoveryTarget
// is decremented, which may happen after the recovery was done.
Expand Down Expand Up @@ -722,9 +732,14 @@ public void testRerouteRecovery() throws Exception {
validateIndexRecoveryState(nodeCRecoveryStates.get(0).getIndex());
}

protected Matcher<Long> getMatcherForThrottling(long value) {
return greaterThan(value);
}

public void testSnapshotRecovery() throws Exception {
logger.info("--> start node A");
String nodeA = internalCluster().startNode();
afterFirstStartNode();

logger.info("--> create repository");
assertAcked(
Expand Down Expand Up @@ -824,7 +839,7 @@ private IndicesStatsResponse createAndPopulateIndex(String name, int nodeCount,
ensureGreen();

logger.info("--> indexing sample data");
final int numDocs = between(MIN_DOC_COUNT, MAX_DOC_COUNT);
final int numDocs = numDocs();
final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs];

for (int i = 0; i < numDocs; i++) {
Expand All @@ -846,6 +861,10 @@ private void validateIndexRecoveryState(ReplicationLuceneIndex indexState) {
assertThat(indexState.recoveredBytesPercent(), lessThanOrEqualTo(100.0f));
}

protected int numDocs() {
return between(MIN_DOC_COUNT, MAX_DOC_COUNT);
}

public void testTransientErrorsDuringRecoveryAreRetried() throws Exception {
final String indexName = "test";
final Settings nodeSettings = Settings.builder()
Expand All @@ -855,6 +874,7 @@ public void testTransientErrorsDuringRecoveryAreRetried() throws Exception {
.build();
// start a cluster-manager node
internalCluster().startNode(nodeSettings);
afterFirstStartNode();

final String blueNodeName = internalCluster().startNode(
Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build()
Expand Down Expand Up @@ -1057,6 +1077,7 @@ public void testDisconnectsWhileRecovering() throws Exception {
.build();
// start a cluster-manager node
internalCluster().startNode(nodeSettings);
afterFirstStartNode();

final String blueNodeName = internalCluster().startNode(
Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build()
Expand Down Expand Up @@ -1214,6 +1235,7 @@ public void testDisconnectsDuringRecovery() throws Exception {
TimeValue disconnectAfterDelay = TimeValue.timeValueMillis(randomIntBetween(0, 100));
// start a cluster-manager node
String clusterManagerNodeName = internalCluster().startClusterManagerOnlyNode(nodeSettings);
afterFirstStartNode();

final String blueNodeName = internalCluster().startNode(
Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build()
Expand Down Expand Up @@ -1359,6 +1381,7 @@ public void sendRequest(

public void testHistoryRetention() throws Exception {
internalCluster().startNodes(3);
afterFirstStartNode();

final String indexName = "test";
client().admin()
Expand Down Expand Up @@ -1425,6 +1448,7 @@ public void testHistoryRetention() throws Exception {

public void testDoNotInfinitelyWaitForMapping() {
internalCluster().ensureAtLeastNumDataNodes(3);
afterFirstStartNode();
createIndex(
"test",
Settings.builder()
Expand Down Expand Up @@ -1471,6 +1495,7 @@ public void testDoNotInfinitelyWaitForMapping() {
public void testOngoingRecoveryAndClusterManagerFailOver() throws Exception {
String indexName = "test";
internalCluster().startNodes(2);
afterFirstStartNode();
String nodeWithPrimary = internalCluster().startDataOnlyNode();
assertAcked(
client().admin()
Expand Down Expand Up @@ -1535,6 +1560,7 @@ public void testOngoingRecoveryAndClusterManagerFailOver() throws Exception {

public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
afterFirstStartNode();
List<String> nodes = randomSubsetOf(
2,
StreamSupport.stream(Spliterators.spliterator(clusterService().state().nodes().getDataNodes().values(), 0), false)
Expand Down Expand Up @@ -1642,6 +1668,7 @@ public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception {

public void testUsesFileBasedRecoveryIfRetentionLeaseMissing() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
afterFirstStartNode();

String indexName = "test-index";
createIndex(
Expand Down Expand Up @@ -1713,6 +1740,7 @@ public Settings onNodeStopped(String nodeName) throws Exception {

public void testUsesFileBasedRecoveryIfRetentionLeaseAheadOfGlobalCheckpoint() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
afterFirstStartNode();

String indexName = "test-index";
createIndex(
Expand Down Expand Up @@ -1798,6 +1826,7 @@ public Settings onNodeStopped(String nodeName) throws Exception {

public void testUsesFileBasedRecoveryIfOperationsBasedRecoveryWouldBeUnreasonable() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
afterFirstStartNode();

String indexName = "test-index";
final Settings.Builder settings = Settings.builder()
Expand Down Expand Up @@ -1943,6 +1972,7 @@ public Settings onNodeStopped(String nodeName) throws Exception {

public void testDoesNotCopyOperationsInSafeCommit() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
afterFirstStartNode();

String indexName = "test-index";
createIndex(
Expand Down Expand Up @@ -2027,6 +2057,7 @@ public TokenStream create(TokenStream tokenStream) {

public void testRepeatedRecovery() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
afterFirstStartNode();

// Ensures that you can remove a replica and then add it back again without any ill effects, even if it's allocated back to the
// node that held it previously, in case that node hasn't completely cleared it up.
Expand Down Expand Up @@ -2091,6 +2122,7 @@ public void testRepeatedRecovery() throws Exception {
public void testAllocateEmptyPrimaryResetsGlobalCheckpoint() throws Exception {
internalCluster().startClusterManagerOnlyNode(Settings.EMPTY);
final List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
afterFirstStartNode();
final Settings randomNodeDataPathSettings = internalCluster().dataPathSettings(randomFrom(dataNodes));
final String indexName = "test";
assertAcked(
Expand Down Expand Up @@ -2131,6 +2163,7 @@ public void testAllocateEmptyPrimaryResetsGlobalCheckpoint() throws Exception {

public void testPeerRecoveryTrimsLocalTranslog() throws Exception {
internalCluster().startNode();
afterFirstStartNode();
List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
String indexName = "test-index";
createIndex(
Expand Down Expand Up @@ -2190,6 +2223,7 @@ public void testPeerRecoveryTrimsLocalTranslog() throws Exception {

public void testCancelRecoveryWithAutoExpandReplicas() throws Exception {
internalCluster().startClusterManagerOnlyNode();
afterFirstStartNode();
assertAcked(
client().admin()
.indices()
Expand All @@ -2210,6 +2244,7 @@ public void testCancelRecoveryWithAutoExpandReplicas() throws Exception {

public void testReservesBytesDuringPeerRecoveryPhaseOne() throws Exception {
internalCluster().startNode();
afterFirstStartNode();
List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
String indexName = "test-index";
createIndex(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.indices.recovery.IndexRecoveryIT;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteIndexRecoveryIT extends IndexRecoveryIT {

protected static final String REPOSITORY_NAME = "test-remore-store-repo";

protected Path absolutePath;

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build();
}

@Override
protected void afterFirstStartNode() {
absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "300s")
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

@Override
protected Matcher<Long> getMatcherForThrottling(long value) {
return Matchers.greaterThanOrEqualTo(value);
}

@Override
protected int numDocs() {
return randomIntBetween(100, 200);
}
}
11 changes: 6 additions & 5 deletions server/src/main/java/org/opensearch/index/engine/NoOpEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,12 @@ public void trimUnreferencedTranslogFiles() throws TranslogException {
translog.trimUnreferencedReaders();
// refresh the translog stats
translogStats = translog.stats();
assert translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed "
+ " current gen "
+ translog.currentFileGeneration()
+ " != min gen "
+ translog.getMinFileGeneration();
assert engineConfig.getIndexSettings().isRemoteTranslogStoreEnabled()
|| translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed "
+ " current gen "
+ translog.currentFileGeneration()
+ " != min gen "
+ translog.getMinFileGeneration();
}
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1524,11 +1524,13 @@ public void assertSeqNos() throws Exception {
}
assertThat(replicaShardRouting + " seq_no_stats mismatch", seqNoStats, equalTo(primarySeqNoStats));
// the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard
assertThat(
replicaShardRouting + " global checkpoint syncs mismatch",
seqNoStats.getGlobalCheckpoint(),
equalTo(syncGlobalCheckpoints.get(replicaShardRouting.allocationId().getId()))
);
if (primaryShard.isRemoteTranslogEnabled() == false) {
assertThat(
replicaShardRouting + " global checkpoint syncs mismatch",
seqNoStats.getGlobalCheckpoint(),
equalTo(syncGlobalCheckpoints.get(replicaShardRouting.allocationId().getId()))
);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2481,5 +2481,4 @@ protected String replicaNodeName(String indexName) {
protected ClusterState getClusterState() {
return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();
}

}

0 comments on commit 444731c

Please sign in to comment.