Skip to content

Commit

Permalink
Tests corrupt index bug.
Browse files Browse the repository at this point in the history
  • Loading branch information
itiyamas committed May 30, 2021
1 parent 8915c6d commit 1267b43
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.plugins.Plugin;
import org.opensearch.snapshots.SnapshotState;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.CorruptionUtils;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.InternalSettingsPlugin;
Expand Down Expand Up @@ -148,8 +149,85 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
}

/**
* Tests that we can actually recover from a corruption on the primary given that we have replica shards around.
* Tests that we cannot recover from a corruption on the primary that is actively indexing, even if it has replica.
*/
public void testCorruptFileDuringIndexing() throws Exception {
int numDocs = scaledRandomIntBetween(1, 1);
// have enough space for 3 copies
internalCluster().ensureAtLeastNumDataNodes(3);
if (cluster().numDataNodes() == 3) {
logger.info("--> cluster has [3] data nodes, corrupted primary will be overwritten");
}

assertThat(cluster().numDataNodes(), greaterThanOrEqualTo(3));

assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1")
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
// no checkindex - we corrupt shards on purpose
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false)
// no translog based flush - it might change the .liv / segments.N files
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB))
));
ensureGreen();
disableAllocation("test");
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < builders.length; i++) {
builders[i] = client().prepareIndex("test", "type").setSource("field", "value");
}
indexRandom(true, builders);
ensureGreen();
// double flush to create safe commit in case of async durability
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).get());
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).get());
// we have to flush at least once here since we don't corrupt the translog
SearchResponse countResponse = client().prepareSearch().setSize(0).get();
assertHitCount(countResponse, numDocs);
ShardRouting corruptedShardRouting = corruptRandomPrimaryFile();
logger.info("--> {} corrupted", corruptedShardRouting);


logger.info("--> creating repository");
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", randomRepoPath().toAbsolutePath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
logger.info("--> snapshot");





enableAllocation("test");

try (BackgroundIndexer indexer = new BackgroundIndexer("test", "_doc", client(), 200)) {


waitForDocs(20, indexer);
final CreateSnapshotResponse createSnapshotResponse =
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true)
.setIndices("test")
.get();
final SnapshotState snapshotState = createSnapshotResponse.getSnapshotInfo().state();
logger.info("--> snapshot terminated with state " + snapshotState);
Thread.sleep(60000);
}

logger.info("--> Checking RED status");

ClusterHealthResponse health = client().admin().cluster()
.health(Requests.clusterHealthRequest("test")
.timeout("5m") // sometimes due to cluster rebalacing and random settings default timeout is just not enough.
.waitForNoRelocatingShards(true)).actionGet();
assertThat(health.getStatus(), equalTo(ClusterHealthStatus.RED));
logger.info("--> Confirmed RED status");


}

public void testCorruptFileAndRecover() throws ExecutionException, InterruptedException, IOException {
int numDocs = scaledRandomIntBetween(100, 1000);
// have enough space for 3 copies
Expand Down Expand Up @@ -188,7 +266,7 @@ public void testCorruptFileAndRecover() throws ExecutionException, InterruptedEx
ShardRouting corruptedShardRouting = corruptRandomPrimaryFile();
logger.info("--> {} corrupted", corruptedShardRouting);
enableAllocation("test");
/*
/*
* we corrupted the primary shard - now lets make sure we never recover from it successfully
*/
Settings build = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "2").build();
Expand All @@ -203,6 +281,8 @@ public void testCorruptFileAndRecover() throws ExecutionException, InterruptedEx
assertThat("timed out waiting for green state", health.isTimedOut(), equalTo(false));
}
assertThat(health.getStatus(), equalTo(ClusterHealthStatus.GREEN));

ensureGreen("test");
final int numIterations = scaledRandomIntBetween(5, 20);
for (int i = 0; i < numIterations; i++) {
SearchResponse response = client().prepareSearch().setSize(numDocs).get();
Expand Down Expand Up @@ -246,7 +326,7 @@ public void afterIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard,
};

for (MockIndexEventListener.TestEventListener eventListener :
internalCluster().getDataNodeInstances(MockIndexEventListener.TestEventListener.class)) {
internalCluster().getDataNodeInstances(MockIndexEventListener.TestEventListener.class)) {
eventListener.setNewDelegate(listener);
}
try {
Expand All @@ -255,7 +335,7 @@ public void afterIndexShardClosed(ShardId sid, @Nullable IndexShard indexShard,
assertThat(exception, empty());
} finally {
for (MockIndexEventListener.TestEventListener eventListener :
internalCluster().getDataNodeInstances(MockIndexEventListener.TestEventListener.class)) {
internalCluster().getDataNodeInstances(MockIndexEventListener.TestEventListener.class)) {
eventListener.setNewDelegate(null);
}
}
Expand Down Expand Up @@ -682,7 +762,7 @@ private ShardRouting corruptRandomPrimaryFile(final boolean includePerCommitFile
continue;
}
for (String commitFile : commitInfo.files()) {
if (includePerCommitFiles || isPerSegmentFile(commitFile)) {
if (commitFile.endsWith(".cfe")) {
files.add(file.resolve(commitFile));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,11 @@ public boolean shouldRetry(Exception e) {
};

pendingReplicationActions.addPendingAction(allocationId, replicationAction);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
replicationAction.run();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public class BackgroundIndexer implements AutoCloseable {
* @param numOfDocs number of document to index before pausing. Set to -1 to have no limit.
*/
public BackgroundIndexer(String index, String type, Client client, int numOfDocs) {
this(index, type, client, numOfDocs, RandomizedTest.scaledRandomIntBetween(2, 5));
this(index, type, client, numOfDocs, RandomizedTest.scaledRandomIntBetween(5, 5));
}

/**
Expand Down

0 comments on commit 1267b43

Please sign in to comment.