diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java index 13f76adc8a5a7..4005e6359a2f7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreRefreshListenerIT.java @@ -27,7 +27,6 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStoreRefreshListenerIT extends AbstractRemoteStoreMockRepositoryIntegTestCase { - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/7703") public void testRemoteRefreshRetryOnFailure() throws Exception { Path location = randomRepoPath().toAbsolutePath(); diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index cd3e7aa3b11a9..aaba74cd54341 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -47,7 +47,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -96,9 +95,9 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private long primaryTerm; /** - * Semaphore that ensures there is only 1 retry scheduled at any time. + * This boolean is used to ensure that there is only 1 retry scheduled/running at any time. */ - private final Semaphore SCHEDULE_RETRY_PERMITS = new Semaphore(1); + private final AtomicBoolean retryScheduled = new AtomicBoolean(false); private volatile Iterator backoffDelayIterator; @@ -321,6 +320,9 @@ private void onSuccessfulSegmentsSync( private void cancelAndResetScheduledCancellableRetry() { if (scheduledCancellableRetry != null && scheduledCancellableRetry.getDelay(TimeUnit.NANOSECONDS) > 0) { scheduledCancellableRetry.cancel(); + // Since we are cancelling the retry attempt as an internal/external refresh happened already before the retry job could be + // started and the current run successfully uploaded the segments. + retryScheduled.set(false); } scheduledCancellableRetry = null; } @@ -333,14 +335,14 @@ private void resetBackOffDelayIterator() { } private void afterSegmentsSync(boolean isRetry, boolean shouldRetry) { - // If this was a retry attempt, then we release the semaphore at the end so that further retries can be scheduled + // If this was a retry attempt, then we set the retryScheduled to false so that the next retry (if needed) can be scheduled if (isRetry) { - SCHEDULE_RETRY_PERMITS.release(); + retryScheduled.set(false); } // If there are failures in uploading segments, then we should retry as search idle can lead to // refresh not occurring until write happens. - if (shouldRetry && indexShard.state() != IndexShardState.CLOSED && SCHEDULE_RETRY_PERMITS.tryAcquire()) { + if (shouldRetry && indexShard.state() != IndexShardState.CLOSED && retryScheduled.compareAndSet(false, true)) { scheduledCancellableRetry = indexShard.getThreadPool() .schedule(() -> this.syncSegments(true), backoffDelayIterator.next(), ThreadPool.Names.REMOTE_REFRESH); }