Skip to content

Commit

Permalink
Refactor code
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed May 8, 2023
1 parent f80f4ac commit 329de5e
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.snapshots.AbstractSnapshotIntegTestCase;
import org.opensearch.test.FeatureFlagSetter;
Expand All @@ -31,6 +32,7 @@
import java.util.Collections;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -83,19 +85,45 @@ public void teardown() {
public void testRemoteRefreshRetryOnFailure() throws Exception {

Path location = randomRepoPath().toAbsolutePath();
setup(location, randomDoubleBetween(0.1, 0.25, true), "metadata");
logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, location);

// Here we are having flush/refresh after each iteration of indexing. However, the refresh will not always succeed
// due to IOExceptions that are thrown while doing uploadBlobs.
indexData(randomIntBetween(5, 10), randomBoolean());
logger.info("--> Indexed data");

// TODO - Once the segments stats api is available, we need to verify that there were failed upload attempts.
IndicesStatsResponse response = client().admin().indices().stats(new IndicesStatsRequest()).get();
assertEquals(1, response.getShards().length);

String indexUuid = response.getShards()[0].getShardRouting().index().getUUID();
Path segmentDataRepoPath = location.resolve(String.format(Locale.ROOT, "%s/0/segments/data", indexUuid));
String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid);

logger.info("--> Verify that the segment files are same on local and repository eventually");
// This can take time as the retry interval is exponential and maxed at 30s
assertBusy(() -> {
Set<String> filesInLocal = getSegmentFiles(location.getRoot().resolve(segmentDataLocalPath));
Set<String> filesInRepo = getSegmentFiles(segmentDataRepoPath);
assertTrue(filesInRepo.containsAll(filesInLocal));
}, 60, TimeUnit.SECONDS);
}

private void setup(Path repoLocation, double ioFailureRate, String skipExceptionBlobList) {
logger.info("--> Creating repository={} at the path={}", REPOSITORY_NAME, repoLocation);
// The random_control_io_exception_rate setting ensures that 10-25% of all operations to remote store results in
/// IOException. skip_exception_on_verification_file & skip_exception_on_list_blobs settings ensures that the
// repository creation can happen without failure.
createRepository(
REPOSITORY_NAME,
"mock",
Settings.builder()
.put("location", location)
.put("random_control_io_exception_rate", randomIntBetween(10, 25) / 100f)
.put("location", repoLocation)
.put("random_control_io_exception_rate", ioFailureRate)
.put("skip_exception_on_verification_file", true)
.put("skip_exception_on_list_blobs", true)
.put("max_failure_number", Long.MAX_VALUE)
);

internalCluster().startDataOnlyNodes(1);
Expand All @@ -105,24 +133,6 @@ public void testRemoteRefreshRetryOnFailure() throws Exception {
logger.info("--> Cluster is yellow with no initializing shards");
ensureGreen(INDEX_NAME);
logger.info("--> Cluster is green");

// Here we are having flush/refresh after each iteration of indexing. However, the refresh will not always succeed
// due to IOExceptions that are thrown while doing uploadBlobs.
indexData(randomIntBetween(5, 10), randomBoolean());
logger.info("--> Indexed data");

// TODO - Once the segments stats api is available, we need to verify that there were failed upload attempts.
IndicesStatsResponse response = client().admin().indices().stats(new IndicesStatsRequest()).get();
assertEquals(1, response.getShards().length);

String indexUuid = response.getShards()[0].getShardRouting().index().getUUID();
Path segmentDataRepoPath = location.resolve(String.format(Locale.ROOT, "%s/0/segments/data", indexUuid));
String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid);

logger.info("--> Verify that the segment files are same on local and repository eventually");
assertBusy(
() -> assertEquals(getSegmentFiles(location.getRoot().resolve(segmentDataLocalPath)), getSegmentFiles(segmentDataRepoPath))
);
}

/**
Expand All @@ -134,15 +144,20 @@ public void testRemoteRefreshRetryOnFailure() throws Exception {
private Set<String> getSegmentFiles(Path location) {
try {
return Arrays.stream(FileSystemUtils.files(location))
.filter(path -> path.getFileName().startsWith("_"))
.filter(path -> path.getFileName().toString().startsWith("_"))
.map(path -> path.getFileName().toString())
.map(this::getLocalSegmentFilename)
.collect(Collectors.toSet());
} catch (IOException exception) {
logger.error("Exception occurred while getting segment files", exception);
}
return Collections.emptySet();
}

private String getLocalSegmentFilename(String remoteFilename) {
return remoteFilename.split(RemoteSegmentStoreDirectory.SEGMENT_NAME_UUID_SEPARATOR)[0];
}

private IndexResponse indexSingleDoc() {
return client().prepareIndex(INDEX_NAME)
.setId(UUIDs.randomBase64UUID())
Expand All @@ -153,7 +168,7 @@ private IndexResponse indexSingleDoc() {
private void indexData(int numberOfIterations, boolean invokeFlush) {
logger.info("--> Indexing data for {} iterations with flush={}", numberOfIterations, invokeFlush);
for (int i = 0; i < numberOfIterations; i++) {
int numberOfOperations = randomIntBetween(20, 50);
int numberOfOperations = randomIntBetween(1, 5);
logger.info("--> Indexing {} operations in iteration #{}", numberOfOperations, i);
for (int j = 0; j < numberOfOperations; j++) {
indexSingleDoc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres
/**
* In an exponential back off setup, the maximum retry interval after the retry interval increases exponentially.
*/
private static final int REMOTE_REFRESH_RETRY_MAX_INTERVAL_MILLIS = 30_000;
private static final int REMOTE_REFRESH_RETRY_MAX_INTERVAL_MILLIS = 10_000;

/**
* Exponential back off policy with max retry interval.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -251,7 +252,7 @@ public void testRefreshSuccessOnThirdAttemptAttempt() throws Exception {
}

private void mockIndexShardWithRetryAndScheduleRefresh(
int SucceedOnAttempt,
int succeedOnAttempt,
CountDownLatch refreshCountLatch,
CountDownLatch successLatch
) throws IOException {
Expand Down Expand Up @@ -289,22 +290,27 @@ private void mockIndexShardWithRetryAndScheduleRefresh(
when(shard.getThreadPool()).thenReturn(threadPool);

// Mock indexShard.getReplicationTracker().isPrimaryMode()

doAnswer(invocation -> {
refreshCountLatch.countDown();
if (Objects.nonNull(refreshCountLatch)) {
refreshCountLatch.countDown();
}
return indexShard.getReplicationTracker();
}).when(shard).getReplicationTracker();

AtomicLong counter = new AtomicLong();
// Mock indexShard.getSegmentInfosSnapshot()
doAnswer(invocation -> {
if (counter.incrementAndGet() <= SucceedOnAttempt - 1) {
if (counter.incrementAndGet() <= succeedOnAttempt - 1) {
throw new RuntimeException("Inducing failure in upload");
}
return indexShard.getSegmentInfosSnapshot();
}).when(shard).getSegmentInfosSnapshot();

doAnswer(invocation -> {
successLatch.countDown();
if (Objects.nonNull(successLatch)) {
successLatch.countDown();
}
return indexShard.getEngine();
}).when(shard).getEngine();

Expand Down

0 comments on commit 329de5e

Please sign in to comment.