Skip to content

Commit

Permalink
[Remote Store] Fix couple of Remote Store flaky test and use bulk api…
Browse files Browse the repository at this point in the history
… for ingestion (#9190)

---------

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
gbbafna authored Aug 10, 2023
1 parent 61f865d commit d06926c
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
package org.opensearch.remotestore;

import org.junit.After;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.UUIDs;
Expand All @@ -31,10 +35,10 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_REPLICATION_TYPE_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -74,13 +78,18 @@ protected Map<String, Long> indexData(int numberOfIterations, boolean invokeFlus
indexingStats.put(MAX_SEQ_NO_REFRESHED_OR_FLUSHED + "-shard-" + shardId, maxSeqNoRefreshedOrFlushed);
refreshedOrFlushedOperations = totalOperations;
int numberOfOperations = randomIntBetween(20, 50);
for (int j = 0; j < numberOfOperations; j++) {
IndexResponse response = indexSingleDoc(index);
maxSeqNo = response.getSeqNo();
shardId = response.getShardId().id();
indexingStats.put(MAX_SEQ_NO_TOTAL + "-shard-" + shardId, maxSeqNo);
int numberOfBulk = randomIntBetween(1, 5);
for (int j = 0; j < numberOfBulk; j++) {
BulkResponse res = indexBulk(index, numberOfOperations);
for (BulkItemResponse singleResp : res.getItems()) {
indexingStats.put(
MAX_SEQ_NO_TOTAL + "-shard-" + singleResp.getResponse().getShardId().id(),
singleResp.getResponse().getSeqNo()
);
maxSeqNo = singleResp.getResponse().getSeqNo();
}
totalOperations += numberOfOperations;
}
totalOperations += numberOfOperations;
}

indexingStats.put(TOTAL_OPERATIONS, totalOperations);
Expand Down Expand Up @@ -123,6 +132,18 @@ protected IndexResponse indexSingleDoc(String indexName) {
.get();
}

protected BulkResponse indexBulk(String indexName, int numDocs) {
BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < numDocs; i++) {
final IndexRequest request = client().prepareIndex(indexName)
.setId(UUIDs.randomBase64UUID())
.setSource(documentKeys.get(randomIntBetween(0, documentKeys.size() - 1)), randomAlphaOfLength(5))
.request();
bulkRequest.add(request);
}
return client().bulk(bulkRequest).actionGet();
}

public static Settings remoteStoreClusterSettings(String segmentRepoName) {
return remoteStoreClusterSettings(segmentRepoName, segmentRepoName);
}
Expand Down Expand Up @@ -170,10 +191,11 @@ protected Settings remoteStoreIndexSettings(int numberOfReplicas) {
return remoteStoreIndexSettings(numberOfReplicas, 1);
}

protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFieldLimit) {
protected Settings remoteStoreIndexSettings(int numberOfReplicas, long totalFieldLimit, int refresh) {
return Settings.builder()
.put(remoteStoreIndexSettings(numberOfReplicas))
.put(MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.getKey(), totalFieldLimit)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), String.valueOf(refresh))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.RemoteStoreRefreshListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand All @@ -29,12 +28,10 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.comparesEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

Expand Down Expand Up @@ -148,10 +145,9 @@ public void testRemoteTranslogCleanup() throws Exception {
verifyRemoteStoreCleanup();
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8658")
public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
internalCluster().startDataOnlyNodes(3);
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l));
internalCluster().startDataOnlyNodes(1);
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
Expand All @@ -163,20 +159,22 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
// Delete is async.
assertBusy(() -> {
int actualFileCount = getFileCount(indexPath);
if (numberOfIterations <= RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP) {
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations, numberOfIterations + 1)));
if (numberOfIterations <= LAST_N_METADATA_FILES_TO_KEEP) {
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
} else {
// As delete is async its possible that the file gets created before the deletion or after
// deletion.
MatcherAssert.assertThat(actualFileCount, is(oneOf(10, 11)));
MatcherAssert.assertThat(
actualFileCount,
is(oneOf(LAST_N_METADATA_FILES_TO_KEEP - 1, LAST_N_METADATA_FILES_TO_KEEP, LAST_N_METADATA_FILES_TO_KEEP + 1))
);
}
}, 30, TimeUnit.SECONDS);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8658")
public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
internalCluster().startDataOnlyNodes(3);
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l));
internalCluster().startDataOnlyNodes(1);
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, false, INDEX_NAME);
String indexUUID = client().admin()
Expand All @@ -187,6 +185,6 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata");
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations, numberOfIterations + 1)));
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public void testPromoteReplicaToPrimary() throws Exception {
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), numOfDocs);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9130")
public void testFailoverWhileIndexing() throws Exception {
internalCluster().startNode();
internalCluster().startNode();
Expand All @@ -143,7 +142,7 @@ public void testFailoverWhileIndexing() throws Exception {
.setSource("field", numAutoGenDocs.get())
.get();

if (indexResponse.status() == RestStatus.CREATED || indexResponse.status() == RestStatus.ACCEPTED) {
if (indexResponse.status() == RestStatus.CREATED || indexResponse.status() == RestStatus.OK) {
numAutoGenDocs.incrementAndGet();
if (numAutoGenDocs.get() == docCount / 2) {
if (random().nextInt(3) == 0) {
Expand Down

0 comments on commit d06926c

Please sign in to comment.