Skip to content

Commit

Permalink
Fix flakyness in RemoteStoreRefreshListenerIT (opensearch-project#8547)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 authored and dzane17 committed Jul 12, 2023
1 parent 9236ecd commit c99f6ba
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase {

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/7703")
public void testRemoteRefreshRetryOnFailure() throws Exception {

Path location = randomRepoPath().toAbsolutePath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -96,9 +95,9 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres
private long primaryTerm;

/**
* Semaphore that ensures there is only 1 retry scheduled at any time.
* This boolean is used to ensure that there is only 1 retry scheduled/running at any time.
*/
private final Semaphore SCHEDULE_RETRY_PERMITS = new Semaphore(1);
private final AtomicBoolean retryScheduled = new AtomicBoolean(false);

private volatile Iterator<TimeValue> backoffDelayIterator;

Expand Down Expand Up @@ -321,6 +320,9 @@ private void onSuccessfulSegmentsSync(
private void cancelAndResetScheduledCancellableRetry() {
if (scheduledCancellableRetry != null && scheduledCancellableRetry.getDelay(TimeUnit.NANOSECONDS) > 0) {
scheduledCancellableRetry.cancel();
// Since we are cancelling the retry attempt as an internal/external refresh happened already before the retry job could be
// started and the current run successfully uploaded the segments.
retryScheduled.set(false);
}
scheduledCancellableRetry = null;
}
Expand All @@ -333,14 +335,14 @@ private void resetBackOffDelayIterator() {
}

private void afterSegmentsSync(boolean isRetry, boolean shouldRetry) {
// If this was a retry attempt, then we release the semaphore at the end so that further retries can be scheduled
// If this was a retry attempt, then we set the retryScheduled to false so that the next retry (if needed) can be scheduled
if (isRetry) {
SCHEDULE_RETRY_PERMITS.release();
retryScheduled.set(false);
}

// If there are failures in uploading segments, then we should retry as search idle can lead to
// refresh not occurring until write happens.
if (shouldRetry && indexShard.state() != IndexShardState.CLOSED && SCHEDULE_RETRY_PERMITS.tryAcquire()) {
if (shouldRetry && indexShard.state() != IndexShardState.CLOSED && retryScheduled.compareAndSet(false, true)) {
scheduledCancellableRetry = indexShard.getThreadPool()
.schedule(() -> this.syncSegments(true), backoffDelayIterator.next(), ThreadPool.Names.REMOTE_REFRESH);
}
Expand Down

0 comments on commit c99f6ba

Please sign in to comment.