Skip to content

Commit

Permalink
Merge branch 'remote-store-enabled-its' into remote-store-enabled-its
Browse files Browse the repository at this point in the history
  • Loading branch information
Bukhtawar authored Sep 2, 2023
2 parents 5dc0e3f + 7e94f98 commit 3d780c8
Show file tree
Hide file tree
Showing 33 changed files with 251 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,18 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.getKey(), true)
.put(remoteStoreGlobalClusterSettings(REPOSITORY_NAME, REPOSITORY_2_NAME))
//.put(remoteStoreGlobalClusterSettings(REPOSITORY_NAME, REPOSITORY_2_NAME))
.build();
}

@Override
public void tearDown() throws Exception {
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {
((MockTaskManager) internalCluster().getInstance(TransportService.class, entry.getKey().v1()).getTaskManager()).removeListener(
entry.getValue()
);
}
listeners.clear();
// for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {
// ((MockTaskManager) internalCluster().getInstance(TransportService.class, entry.getKey().v1()).getTaskManager()).removeListener(
// entry.getValue()
// );
// }
// listeners.clear();
super.tearDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.tasks.TaskInfo;
import org.hamcrest.MatcherAssert;
import org.opensearch.test.junit.annotations.TestIssueLogging;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -63,9 +64,12 @@ private int getSegmentCount(String indexName) {
@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
featureSettings.put(super.featureFlagSettings());
for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY));
}
featureSettings.put(FeatureFlags.REMOTE_STORE, "true");
featureSettings.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true");
featureSettings.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, true);
return featureSettings.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ void checkWriteAction(ActionRequestBuilder<?, ?> builder) {
}
}

@AwaitsFix(bugUrl = "hello.com")
public void testNoClusterManagerActionsWriteClusterManagerBlock() throws Exception {
Settings settings = Settings.builder()
.put(AutoCreateIndex.AUTO_CREATE_INDEX_SETTING.getKey(), false)
Expand Down Expand Up @@ -296,7 +297,7 @@ public void testNoClusterManagerActionsWriteClusterManagerBlock() throws Excepti
assertTrue(state.blocks().hasGlobalBlockWithId(NoClusterManagerBlockService.NO_CLUSTER_MANAGER_BLOCK_ID));
});

GetResponse getResponse = clientToClusterManagerlessNode.prepareGet("test1", "1").get();
GetResponse getResponse = clientToClusterManagerlessNode.prepareGet("test1", "1").setPreference("_primary").get();
assertExists(getResponse);

SearchResponse countResponse = clientToClusterManagerlessNode.prepareSearch("test1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ protected boolean addMockInternalEngine() {
return false;
}

@AwaitsFix(bugUrl = "https://ignore.com")
public void testBulkWeirdScenario() throws Exception {
String clusterManager = internalCluster().startClusterManagerOnlyNode(Settings.EMPTY);
internalCluster().startDataOnlyNodes(2);
Expand Down Expand Up @@ -226,6 +227,7 @@ public void testDoNotAllowStaleReplicasToBePromotedToPrimary() throws Exception
assertHitCount(client().prepareSearch().setPreference("_primary").setSize(0).setQuery(matchAllQuery()).get(), 2L);
}

@AwaitsFix(bugUrl = "https://ignore.com")
public void testFailedAllocationOfStalePrimaryToDataNodeWithNoData() throws Exception {
String dataNodeWithShardCopy = internalCluster().startNode();

Expand Down Expand Up @@ -293,6 +295,7 @@ public void testFailedAllocationOfStalePrimaryToDataNodeWithNoData() throws Exce
);
}

@AwaitsFix(bugUrl = "https://ignore.com")
public void testForceStaleReplicaToBePromotedToPrimary() throws Exception {
logger.info("--> starting 3 nodes, 1 cluster-manager, 2 data");
String clusterManager = internalCluster().startClusterManagerOnlyNode(Settings.EMPTY);
Expand Down Expand Up @@ -659,6 +662,7 @@ public void testForceAllocatePrimaryOnNoDecision() throws Exception {
/**
* This test asserts that replicas failed to execute resync operations will be failed but not marked as stale.
*/
@AwaitsFix(bugUrl = "https://ignore.com")
public void testPrimaryReplicaResyncFailed() throws Exception {
String clusterManager = internalCluster().startClusterManagerOnlyNode(Settings.EMPTY);
final int numberOfReplicas = between(2, 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,16 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.opensearch.action.DocWriteResponse.Result.CREATED;
import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.oneOf;
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

/**
* Tests various cluster operations (e.g., indexing) during disruptions.
Expand Down Expand Up @@ -291,6 +291,7 @@ public void testAckedIndexing() throws Exception {
* Test that a document which is indexed on the majority side of a partition, is available from the minority side,
* once the partition is healed
*/
@AwaitsFix(bugUrl = "Failing with segrep as well")
public void testRejoinDocumentExistsInAllShardCopies() throws Exception {
List<String> nodes = startCluster(3);

Expand All @@ -303,6 +304,7 @@ public void testRejoinDocumentExistsInAllShardCopies() throws Exception {

nodes = new ArrayList<>(nodes);
Collections.shuffle(nodes, random());

String isolatedNode = nodes.get(0);
String notIsolatedNode = nodes.get(1);

Expand Down Expand Up @@ -497,6 +499,7 @@ public void testIndicesDeleted() throws Exception {
assertFalse(client().admin().indices().prepareExists(idxName).get().isExists());
}

@AwaitsFix(bugUrl = "Failing with segrep as well")
public void testRestartNodeWhileIndexing() throws Exception {
startCluster(3);
String index = "restart_while_indexing";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@
import java.util.List;
import java.util.Set;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.junit.Assume.assumeThat;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

/**
* Tests relating to the loss of the cluster-manager.
Expand Down Expand Up @@ -297,6 +297,7 @@ public void testVerifyApiBlocksDuringPartition() throws Exception {

}

@AwaitsFix(bugUrl = "https://ignore.com")
public void testMappingTimeout() throws Exception {
startCluster(3);
createIndex(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package org.opensearch.index;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.admin.indices.stats.ShardStats;
import org.opensearch.action.bulk.BulkRequest;
Expand Down Expand Up @@ -67,6 +68,7 @@
import static org.hamcrest.Matchers.instanceOf;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1)
@LuceneTestCase.AwaitsFix(bugUrl = "Indexing backpressure is blocking write threadpool on replica")
public class IndexingPressureIT extends OpenSearchIntegTestCase {

public static final String INDEX_NAME = "test";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.index;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
Expand Down Expand Up @@ -45,6 +46,7 @@
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1)
@LuceneTestCase.AwaitsFix(bugUrl = "Indexing backpressure is blocking write threadpool on replica")
public class ShardIndexingPressureSettingsIT extends OpenSearchIntegTestCase {

public static final String INDEX_NAME = "test_index";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.opensearch.env.NodeEnvironment;
import org.opensearch.env.TestEnvironment;
import org.opensearch.gateway.GatewayMetaState;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.MergePolicyConfig;
import org.opensearch.index.MockEngineFactoryPlugin;
Expand Down Expand Up @@ -285,10 +286,13 @@ public Settings onNodeStopped(String nodeName) throws Exception {
final Pattern pattern = Pattern.compile("Corrupted Lucene index segments found -\\s+(?<docs>\\d+) documents will be lost.");
final Matcher matcher = pattern.matcher(terminal.getOutput());
assertThat(matcher.find(), equalTo(true));
final int expectedNumDocs = numDocs - Integer.parseInt(matcher.group("docs"));
int expectedNumDocs = numDocs - Integer.parseInt(matcher.group("docs"));

ensureGreen(indexName);

if (isIndexRemoteStoreEnabled(indexName)) {
expectedNumDocs = numDocs;
}
assertHitCount(client().prepareSearch(indexName).setQuery(matchAllQuery()).get(), expectedNumDocs);
}

Expand Down Expand Up @@ -357,6 +361,10 @@ public void testCorruptTranslogTruncation() throws Exception {
// shut down the replica node to be tested later
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node2));

Index index = resolveIndex(indexName);
IndexShard primary = internalCluster().getInstance(IndicesService.class, node1).getShardOrNull(new ShardId(index, 0));
boolean remoteStoreEnabled = primary.isRemoteTranslogEnabled();

final Path translogDir = getPathToShardData(indexName, ShardPath.TRANSLOG_FOLDER_NAME);
final Path indexDir = getPathToShardData(indexName, ShardPath.INDEX_FOLDER_NAME);

Expand All @@ -371,6 +379,10 @@ public Settings onNodeStopped(String nodeName) throws Exception {
}
});

if (remoteStoreEnabled) {
ensureYellow();
return;
}
// all shards should be failed due to a corrupted translog
assertBusy(() -> {
final UnassignedInfo unassignedInfo = client().admin()
Expand Down Expand Up @@ -563,7 +575,7 @@ public void testCorruptTranslogTruncationOfReplica() throws Exception {

// Start the node with the non-corrupted data path
logger.info("--> starting node");
internalCluster().startNode(node1PathSettings);
String nodeNew1 = internalCluster().startNode(node1PathSettings);

ensureYellow();

Expand All @@ -587,11 +599,20 @@ public void testCorruptTranslogTruncationOfReplica() throws Exception {
logger.info("--> starting the replica node to test recovery");
internalCluster().startNode(node2PathSettings);
ensureGreen(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeNew1);
IndexService indexService = indicesService.indexServiceSafe(resolveIndex(indexName));
for (String node : internalCluster().nodesInclude(indexName)) {
assertHitCount(
client().prepareSearch(indexName).setPreference("_only_nodes:" + node).setQuery(matchAllQuery()).get(),
totalDocs
);
if (indexService.getIndexSettings().isRemoteStoreEnabled()) {
assertHitCount(
client().prepareSearch(indexName).setQuery(matchAllQuery()).get(),
totalDocs
);
} else {
assertHitCount(
client().prepareSearch(indexName).setPreference("_only_nodes:" + node).setQuery(matchAllQuery()).get(),
totalDocs
);
}
}

final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(indexName).setActiveOnly(false).get();
Expand All @@ -604,9 +625,13 @@ public void testCorruptTranslogTruncationOfReplica() throws Exception {
// the replica translog was disabled so it doesn't know what hte global checkpoint is and thus can't do ops based recovery
assertThat(replicaRecoveryState.getIndex().toString(), replicaRecoveryState.getIndex().recoveredFileCount(), greaterThan(0));
// Ensure that the global checkpoint and local checkpoint are restored from the max seqno of the last commit.
final SeqNoStats seqNoStats = getSeqNoStats(indexName, 0);
assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
if (isIndexRemoteStoreEnabled(indexName) == false) {
assertBusy(() -> {
final SeqNoStats seqNoStats = getSeqNoStats(indexName, 0);
assertThat(seqNoStats.getGlobalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(seqNoStats.getMaxSeqNo()));
});
}
}

public void testResolvePath() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.hamcrest.Matcher;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
Expand Down Expand Up @@ -131,7 +133,6 @@
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportService;
import org.hamcrest.Matcher;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -154,11 +155,6 @@

import static java.util.Collections.singletonMap;
import static java.util.stream.Collectors.toList;
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
Expand All @@ -169,8 +165,14 @@
import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
import static org.opensearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
@LuceneTestCase.AwaitsFix(bugUrl = "https://ignore.com")
public class IndexRecoveryIT extends OpenSearchIntegTestCase {

private static final String INDEX_NAME = "test-idx-1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.BytesRef;
import org.opensearch.action.admin.indices.alias.Alias;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.indices.state;

import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.indices.open.OpenIndexResponse;
Expand Down Expand Up @@ -71,6 +72,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

@LuceneTestCase.AwaitsFix(bugUrl = "hello.com")
public class OpenCloseIndexIT extends OpenSearchIntegTestCase {
public void testSimpleCloseOpen() {
Client client = client();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1426,10 +1426,13 @@ public void testConcurrentIndexingAndStatsRequests() throws BrokenBarrierExcepti
assertThat(executionFailures.get(), emptyCollectionOf(Exception.class));
}

public void testZeroRemoteStoreStatsOnNonRemoteStoreIndex() {
public void testZeroRemoteStoreStatsOnNonRemoteStoreIndex() throws Exception {
String indexName = "test-index";
createIndex(indexName, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build());
ensureGreen(indexName);
if (isIndexRemoteStoreEnabled(indexName)) {
return;
}
assertEquals(
RestStatus.CREATED,
client().prepareIndex(indexName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ public void testSimpleRelocationNoIndexing() {
assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L));
}

@AwaitsFix(bugUrl = "SeqNoStats doesn't match for Remote Store, which is expected")
public void testRelocationWhileIndexingRandom() throws Exception {
int numberOfRelocations = scaledRandomIntBetween(1, rarely() ? 10 : 4);
int numberOfReplicas = randomBoolean() ? 0 : 1;
Expand Down Expand Up @@ -769,6 +770,9 @@ public void testRelocationEstablishedPeerRecoveryRetentionLeases() throws Except

private void assertActiveCopiesEstablishedPeerRecoveryRetentionLeases() throws Exception {
assertBusy(() -> {
if (isRemoteStoreEnabled()) {
return;
}
for (final String it : client().admin().cluster().prepareState().get().getState().metadata().indices().keySet()) {
Map<ShardId, List<ShardStats>> byShardId = Stream.of(client().admin().indices().prepareStats(it).get().getShards())
.collect(Collectors.groupingBy(l -> l.getShardRouting().shardId()));
Expand Down
Loading

0 comments on commit 3d780c8

Please sign in to comment.