Skip to content

Commit

Permalink
Fix flaky test testSendCorruptBytesToReplica
Browse files Browse the repository at this point in the history
This test is failing with WindowsFS.
This change moves SegRep corruption tests to a separate IT class and skips running on WindowsFS.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Oct 24, 2023
1 parent 5bd413c commit 91bd1e9
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 136 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportService;
import org.junit.Before;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;

/**
* These tests simulate corruption cases during replication. They are skipped on WindowsFS simulation where file renaming
* can fail with an access denied IOException because deletion is not permitted.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
@LuceneTestCase.SuppressFileSystems("WindowsFS")
public class SegmentReplicationDisruptionIT extends SegmentReplicationBaseIT {
@Before
private void setup() {
internalCluster().startClusterManagerOnlyNode();
}

public void testSendCorruptBytesToReplica() throws Exception {
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.refresh_interval", -1)
.build()
);
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean failed = new AtomicBoolean(false);
primaryTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replicaNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK) && failed.getAndSet(true) == false) {
FileChunkRequest req = (FileChunkRequest) request;
TransportRequest corrupt = new FileChunkRequest(
req.recoveryId(),
((FileChunkRequest) request).requestSeqNo(),
((FileChunkRequest) request).shardId(),
((FileChunkRequest) request).metadata(),
((FileChunkRequest) request).position(),
new BytesArray("test"),
false,
0,
0L
);
connection.sendRequest(requestId, action, corrupt, options);
latch.countDown();
} else {
connection.sendRequest(requestId, action, request, options);
}
}
);
for (int i = 0; i < 100; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
final long originalRecoveryTime = getRecoveryStopTime(replicaNode);
assertNotEquals(originalRecoveryTime, 0);
refresh(INDEX_NAME);
latch.await();
assertTrue(failed.get());
waitForNewPeerRecovery(replicaNode, originalRecoveryTime);
// reset checkIndex to ensure our original shard doesn't throw
resetCheckIndexStatus();
waitForSearchableDocs(100, primaryNode, replicaNode);
}

public void testWipeSegmentBetweenSyncs() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.refresh_interval", -1)
.build()
);
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
refresh(INDEX_NAME);
ensureGreen(INDEX_NAME);
final long originalRecoveryTime = getRecoveryStopTime(replicaNode);

final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME);
waitForSearchableDocs(INDEX_NAME, 10, List.of(replicaNode));
indexShard.store().directory().deleteFile("_0.si");

for (int i = 11; i < 21; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
refresh(INDEX_NAME);
waitForNewPeerRecovery(replicaNode, originalRecoveryTime);
resetCheckIndexStatus();
waitForSearchableDocs(20, primaryNode, replicaNode);
}

private void waitForNewPeerRecovery(String replicaNode, long originalRecoveryTime) throws Exception {
assertBusy(() -> {
// assert we have a peer recovery after the original
final long time = getRecoveryStopTime(replicaNode);
assertNotEquals(time, 0);
assertNotEquals(originalRecoveryTime, time);

}, 1, TimeUnit.MINUTES);
}

private long getRecoveryStopTime(String nodeName) {
final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(INDEX_NAME).get();
final List<RecoveryState> recoveryStates = recoveryResponse.shardRecoveryStates().get(INDEX_NAME);
for (RecoveryState recoveryState : recoveryStates) {
if (recoveryState.getTargetNode().getName().equals(nodeName)) {
return recoveryState.getTimer().stopTime();
}
}
return 0L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.lucene.util.BytesRef;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.action.admin.indices.flush.FlushRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.get.GetResponse;
Expand Down Expand Up @@ -59,7 +58,6 @@
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.xcontent.XContentBuilder;
Expand All @@ -73,7 +71,6 @@
import org.opensearch.index.engine.NRTReplicationReaderManager;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.node.NodeClosedException;
Expand All @@ -85,7 +82,6 @@
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportService;
import org.junit.Before;

Expand All @@ -98,7 +94,6 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -1781,135 +1776,4 @@ public void testRealtimeTermVectorRequestsUnSuccessful() throws IOException {
assertThat(response.getIndex(), equalTo(INDEX_NAME));

}

public void testSendCorruptBytesToReplica() throws Exception {
// this test stubs transport calls specific to node-node replication.
assumeFalse(
"Skipping the test as its not compatible with segment replication with remote store.",
segmentReplicationWithRemoteEnabled()
);
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.refresh_interval", -1)
.build()
);
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

MockTransportService primaryTransportService = ((MockTransportService) internalCluster().getInstance(
TransportService.class,
primaryNode
));
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean failed = new AtomicBoolean(false);
primaryTransportService.addSendBehavior(
internalCluster().getInstance(TransportService.class, replicaNode),
(connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationTargetService.Actions.FILE_CHUNK) && failed.getAndSet(true) == false) {
FileChunkRequest req = (FileChunkRequest) request;
logger.info("SENDING CORRUPT file chunk [{}] lastChunk: {}", req, req.lastChunk());
TransportRequest corrupt = new FileChunkRequest(
req.recoveryId(),
((FileChunkRequest) request).requestSeqNo(),
((FileChunkRequest) request).shardId(),
((FileChunkRequest) request).metadata(),
((FileChunkRequest) request).position(),
new BytesArray("test"),
false,
0,
0L
);
connection.sendRequest(requestId, action, corrupt, options);
latch.countDown();
} else {
connection.sendRequest(requestId, action, request, options);
}
}
);
for (int i = 0; i < 100; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
final long originalRecoveryTime = getRecoveryStopTime(replicaNode);
assertNotEquals(originalRecoveryTime, 0);
refresh(INDEX_NAME);
latch.await();
assertTrue(failed.get());
waitForNewPeerRecovery(replicaNode, originalRecoveryTime);
// reset checkIndex to ensure our original shard doesn't throw
resetCheckIndexStatus();
waitForSearchableDocs(100, primaryNode, replicaNode);
}

public void testWipeSegmentBetweenSyncs() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put("index.refresh_interval", -1)
.build()
);
ensureYellow(INDEX_NAME);
final String replicaNode = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
refresh(INDEX_NAME);
ensureGreen(INDEX_NAME);
final long originalRecoveryTime = getRecoveryStopTime(replicaNode);

final IndexShard indexShard = getIndexShard(replicaNode, INDEX_NAME);
waitForSearchableDocs(INDEX_NAME, 10, List.of(replicaNode));
indexShard.store().directory().deleteFile("_0.si");

for (int i = 11; i < 21; i++) {
client().prepareIndex(INDEX_NAME)
.setId(String.valueOf(i))
.setSource(jsonBuilder().startObject().field("field", i).endObject())
.get();
}
refresh(INDEX_NAME);
waitForNewPeerRecovery(replicaNode, originalRecoveryTime);
resetCheckIndexStatus();
waitForSearchableDocs(20, primaryNode, replicaNode);
}

private void waitForNewPeerRecovery(String replicaNode, long originalRecoveryTime) throws Exception {
assertBusy(() -> {
// assert we have a peer recovery after the original
final long time = getRecoveryStopTime(replicaNode);
assertNotEquals(time, 0);
assertNotEquals(originalRecoveryTime, time);

}, 1, TimeUnit.MINUTES);
}

private long getRecoveryStopTime(String nodeName) {
final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(INDEX_NAME).get();
final List<RecoveryState> recoveryStates = recoveryResponse.shardRecoveryStates().get(INDEX_NAME);
logger.info("Recovery states {}", recoveryResponse);
for (RecoveryState recoveryState : recoveryStates) {
if (recoveryState.getTargetNode().getName().equals(nodeName)) {
return recoveryState.getTimer().stopTime();
}
}
return 0L;
}
}

0 comments on commit 91bd1e9

Please sign in to comment.