Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Segments] Retry segment uploads to remote store on failure #7400

Merged
merged 10 commits into from
May 9, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -60,23 +60,21 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres
/**
* The initial retry interval at which the retry job gets scheduled after a failure.
*/
private static final int REMOTE_REFRESH_RETRY_BASE_INTERVAL_SECONDS = 1;
private static final int REMOTE_REFRESH_RETRY_BASE_INTERVAL_MILLIS = 1_000;

/**
* In an exponential back off setup, the maximum retry interval after the retry interval increases exponentially.
*/
private static final int REMOTE_REFRESH_RETRY_MAX_INTERVAL_SECONDS = 30;
private static final int REMOTE_REFRESH_RETRY_MAX_INTERVAL_MILLIS = 30_000;

/**
* Exponential back off policy with max retry interval.
*/
private static final BackoffPolicy EXPONENTIAL_BACKOFF_POLICY = BackoffPolicy.exponentialEqualJitterBackoff(
REMOTE_REFRESH_RETRY_BASE_INTERVAL_SECONDS,
REMOTE_REFRESH_RETRY_MAX_INTERVAL_SECONDS
REMOTE_REFRESH_RETRY_BASE_INTERVAL_MILLIS,
REMOTE_REFRESH_RETRY_MAX_INTERVAL_MILLIS
);

private static final int MAX_CONCURRENT_SCHEDULED_REMOTE_REFRESH_RETRIES = 1;

// Visible for testing
static final Set<String> EXCLUDE_FILES = Set.of("write.lock");
// Visible for testing
Expand All @@ -92,7 +90,7 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres
/**
* Semaphore that ensures there is only 1 retry scheduled at any time.
*/
private final Semaphore retrySemaphore = new Semaphore(MAX_CONCURRENT_SCHEDULED_REMOTE_REFRESH_RETRIES);
private final Semaphore SCHEDULE_RETRY_PERMITS = new Semaphore(1);

private volatile Iterator<TimeValue> backoffDelayIterator;

Expand All @@ -105,7 +103,7 @@ public RemoteStoreRefreshListener(IndexShard indexShard) {
.getDelegate()).getDelegate();
this.primaryTerm = indexShard.getOperationPrimaryTerm();
localSegmentChecksumMap = new HashMap<>();
if (indexShard.shardRouting.primary()) {
if (indexShard.routingEntry().primary()) {
try {
this.remoteDirectory.init();
} catch (IOException e) {
Expand Down Expand Up @@ -259,12 +257,12 @@ private void resetBackOffDelayIterator() {
private void afterSegmentsSync(boolean isRetry, boolean shouldRetry) {
// If this was a retry attempt, then we release the semaphore at the end so that further retries can be scheduled
if (isRetry) {
retrySemaphore.release();
SCHEDULE_RETRY_PERMITS.release();
}

// If there are failures in uploading segments, then we should retry as search idle can lead to
// refresh not occurring until write happens.
if (shouldRetry && indexShard.state() != IndexShardState.CLOSED && retrySemaphore.tryAcquire()) {
if (shouldRetry && indexShard.state() != IndexShardState.CLOSED && SCHEDULE_RETRY_PERMITS.tryAcquire()) {
scheduledCancellableRetry = indexShard.getThreadPool()
.schedule(() -> this.syncSegments(true), backoffDelayIterator.next(), ThreadPool.Names.REMOTE_REFRESH);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.index.shard.RemoteStoreRefreshListener.SEGMENT_INFO_SNAPSHOT_FILENAME_PREFIX;

public class RemoteStoreRefreshListenerTests extends IndexShardTestCase {
Expand Down Expand Up @@ -58,9 +62,11 @@ private void indexDocs(int startDocId, int numberOfDocs) throws IOException {

@After
public void tearDown() throws Exception {
Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) indexShard.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
closeShards(indexShard);
if (indexShard != null) {
Directory storeDirectory = ((FilterDirectory) ((FilterDirectory) indexShard.store().directory()).getDelegate()).getDelegate();
((BaseDirectoryWrapper) storeDirectory).setCheckIndexOnClose(false);
closeShards(indexShard);
}
super.tearDown();
}

Expand Down Expand Up @@ -204,6 +210,120 @@ public void onFailure(Exception e) {
verifyUploadedSegments(remoteSegmentStoreDirectory);
}

public void testRefreshSuccessOnFirstAttempt() throws Exception {
// This is the case of isRetry=false, shouldRetry=false
// Succeed on 1st attempt
int succeedOnAttempt = 1;
// We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation.
CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt);
// We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload.
CountDownLatch successLatch = new CountDownLatch(2);
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch);
assertBusy(() -> assertEquals(0, refreshCountLatch.getCount()));
assertBusy(() -> assertEquals(0, successLatch.getCount()));
}

public void testRefreshSuccessOnSecondAttempt() throws Exception {
// This covers 2 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false
// Succeed on 2nd attempt
int succeedOnAttempt = 2;
// We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation.
CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt);
// We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload.
CountDownLatch successLatch = new CountDownLatch(2);
mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch);
assertBusy(() -> assertEquals(0, refreshCountLatch.getCount()));
assertBusy(() -> assertEquals(0, successLatch.getCount()));
}

public void testRefreshSuccessOnThirdAttemptAttempt() throws Exception {
// This covers 3 cases - 1) isRetry=false, shouldRetry=true 2) isRetry=true, shouldRetry=false 3) isRetry=True, shouldRetry=true
// Succeed on 3rd attempt
int succeedOnAttempt = 3;
// We spy on IndexShard.getReplicationTracker() to validate that we have tried running remote time as per the expectation.
CountDownLatch refreshCountLatch = new CountDownLatch(succeedOnAttempt);
// We spy on IndexShard.getEngine() to validate that we have successfully hit the terminal code for ascertaining successful upload.
CountDownLatch successLatch = new CountDownLatch(2);
mockIndexShardWithRetryAndScheduleRefresh(succeedOnAttempt, refreshCountLatch, successLatch);
assertBusy(() -> assertEquals(0, refreshCountLatch.getCount()));
assertBusy(() -> assertEquals(0, successLatch.getCount()));
}

private void mockIndexShardWithRetryAndScheduleRefresh(
int SucceedOnAttempt,
CountDownLatch refreshCountLatch,
CountDownLatch successLatch
) throws IOException {
// Create index shard that we will be using to mock different methods in IndexShard for the unit test
indexShard = newStartedShard(
true,
Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(),
new InternalEngineFactory()
);

indexDocs(1, randomIntBetween(1, 100));

// Mock indexShard.store().directory()
IndexShard shard = mock(IndexShard.class);
Store store = mock(Store.class);
when(shard.store()).thenReturn(store);
when(store.directory()).thenReturn(indexShard.store().directory());

// Mock (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
Store remoteStore = mock(Store.class);
when(shard.remoteStore()).thenReturn(remoteStore);
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory =
(RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()).getDelegate())
.getDelegate();
FilterDirectory remoteStoreFilterDirectory = new TestFilterDirectory(new TestFilterDirectory(remoteSegmentStoreDirectory));
when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory);

// Mock indexShard.getOperationPrimaryTerm()
when(shard.getOperationPrimaryTerm()).thenReturn(indexShard.getOperationPrimaryTerm());

// Mock indexShard.routingEntry().primary()
when(shard.routingEntry()).thenReturn(indexShard.routingEntry());

// Mock threadpool
when(shard.getThreadPool()).thenReturn(threadPool);

// Mock indexShard.getReplicationTracker().isPrimaryMode()
doAnswer(invocation -> {
refreshCountLatch.countDown();
return indexShard.getReplicationTracker();
}).when(shard).getReplicationTracker();

AtomicLong counter = new AtomicLong();
// Mock indexShard.getSegmentInfosSnapshot()
doAnswer(invocation -> {
if (counter.incrementAndGet() <= SucceedOnAttempt - 1) {
throw new RuntimeException("Inducing failure in upload");
}
return indexShard.getSegmentInfosSnapshot();
}).when(shard).getSegmentInfosSnapshot();

when(shard.getEngine()).thenReturn(indexShard.getEngine());
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
doAnswer(invocation -> {
successLatch.countDown();
return indexShard.getEngine();
}).when(shard).getEngine();

RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard);
refreshListener.afterRefresh(false);
}

private static class TestFilterDirectory extends FilterDirectory {

/**
* Sole constructor, typically called from sub-classes.
*
* @param in
*/
protected TestFilterDirectory(Directory in) {
super(in);
}
}

private void verifyUploadedSegments(RemoteSegmentStoreDirectory remoteSegmentStoreDirectory) throws IOException {
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = remoteSegmentStoreDirectory
.getSegmentsUploadedToRemoteStore();
Expand Down