diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index 4254586f3d70e..357b6c2eaa456 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -21,7 +21,7 @@ * * @opensearch.internal */ -public class CheckpointRefreshListener implements ReferenceManager.RefreshListener { +public class CheckpointRefreshListener extends CloseableRetryableRefreshListener { protected static Logger logger = LogManager.getLogger(CheckpointRefreshListener.class); @@ -39,12 +39,18 @@ public void beforeRefresh() throws IOException { } @Override - public void afterRefresh(boolean didRefresh) throws IOException { + protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode() && !shard.indexSettings.isSegRepWithRemoteEnabled()) { publisher.publish(shard, shard.getLatestReplicationCheckpoint()); } + return true; + } + + @Override + protected Logger getLogger() { + return logger; } } diff --git a/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java new file mode 100644 index 0000000000000..10e3e04033da3 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/shard/CloseableRetryableRefreshListener.java @@ -0,0 +1,143 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.search.ReferenceManager; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.threadpool.ThreadPool; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * RefreshListener that runs afterRefresh method if and only if there is a permit available. Once the listener + * is closed, all the permits are acquired and there are no available permits to afterRefresh. This abstract class provides + * necessary abstract methods to schedule retry. + */ +public abstract class CloseableRetryableRefreshListener implements ReferenceManager.RefreshListener, Closeable { + + /** + * Total permits = 1 ensures that there is only single instance of performAfterRefresh that is running at a time. + * In case there are use cases where concurrency is required, the total permit variable can be put inside the ctor. + */ + private static final int TOTAL_PERMITS = 1; + + private final Semaphore semaphore = new Semaphore(TOTAL_PERMITS); + + private final ThreadPool threadPool; + + /** + * This boolean is used to ensure that there is only 1 retry scheduled/running at any time. + */ + private final AtomicBoolean retryScheduled = new AtomicBoolean(false); + + public CloseableRetryableRefreshListener() { + this.threadPool = null; + } + + public CloseableRetryableRefreshListener(ThreadPool threadPool) { + this.threadPool = threadPool; + } + + @Override + public final void afterRefresh(boolean didRefresh) throws IOException { + boolean successful; + boolean permitAcquired = semaphore.tryAcquire(); + try { + successful = permitAcquired && performAfterRefresh(didRefresh, false); + } finally { + if (permitAcquired) { + semaphore.release(); + } + } + scheduleRetry(successful, didRefresh, permitAcquired); + } + + protected String getRetryThreadPoolName() { + return null; + } + + protected TimeValue getNextRetryInterval() { + return null; + } + + private void scheduleRetry(TimeValue interval, String retryThreadPoolName, boolean didRefresh, boolean isRetry) { + if (this.threadPool == null + || interval == null + || retryThreadPoolName == null + || ThreadPool.THREAD_POOL_TYPES.containsKey(retryThreadPoolName) == false + || interval == TimeValue.MINUS_ONE + || retryScheduled.compareAndSet(false, true) == false) { + return; + } + boolean scheduled = false; + try { + this.threadPool.schedule(() -> { + boolean successful; + boolean permitAcquired = semaphore.tryAcquire(); + try { + successful = permitAcquired && performAfterRefresh(didRefresh, isRetry); + } finally { + if (permitAcquired) { + semaphore.release(); + } + retryScheduled.set(false); + } + scheduleRetry(successful, didRefresh, isRetry || permitAcquired); + }, interval, retryThreadPoolName); + scheduled = true; + getLogger().info("Scheduled retry with didRefresh={} isRetry={}", didRefresh, isRetry); + } finally { + if (scheduled == false) { + retryScheduled.set(false); + } + } + } + + /** + * Schedules the retry based on the {@code afterRefreshSuccessful} value. + * + * @param afterRefreshSuccessful is sent true if the performAfterRefresh(..) is successful. + * @param didRefresh if the refresh did open a new reference then didRefresh will be true + * @param isRetry if this is a failure or permit was not acquired. + */ + private void scheduleRetry(boolean afterRefreshSuccessful, boolean didRefresh, boolean isRetry) { + if (afterRefreshSuccessful == false) { + scheduleRetry(getNextRetryInterval(), getRetryThreadPoolName(), didRefresh, isRetry); + } + } + + /** + * This method needs to be overridden and be provided with what needs to be run on after refresh. + * + * @param didRefresh true if the refresh opened a new reference + * @param isRetry true if this is a retry attempt + * @return true if a retry is needed else false. + */ + protected abstract boolean performAfterRefresh(boolean didRefresh, boolean isRetry); + + @Override + public final void close() throws IOException { + try { + if (semaphore.tryAcquire(TOTAL_PERMITS, 10, TimeUnit.MINUTES)) { + assert semaphore.availablePermits() == 0; + } else { + throw new RuntimeException("timeout while closing gated refresh listener"); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + protected abstract Logger getLogger(); +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 8541a1f5e554b..726a1f0159f3e 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -339,6 +339,8 @@ Runnable getGlobalCheckpointSyncer() { private final boolean isTimeSeriesIndex; private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; + private final List internalRefreshListener = new ArrayList<>(); + public IndexShard( final ShardRouting shardRouting, final IndexSettings indexSettings, @@ -813,6 +815,13 @@ public void relocated( if (syncTranslog) { maybeSync(); } + + // Ensures all in-flight remote store operations drain, before we perform the handoff. + internalRefreshListener.stream() + .filter(refreshListener -> refreshListener instanceof Closeable) + .map(refreshListener -> (Closeable) refreshListener) + .close(); + // no shard operation permits are being held here, move state from started to relocated assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED : "in-flight operations in progress while moving shard state to relocated"; @@ -3660,7 +3669,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro } }; - final List internalRefreshListener = new ArrayList<>(); + internalRefreshListener.clear(); internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric)); if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) { internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher)); diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 8f520c0e007bb..2d168839258dd 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -14,7 +14,6 @@ import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; @@ -37,7 +36,6 @@ import org.opensearch.index.translog.Translog; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; -import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; import java.io.IOException; @@ -51,8 +49,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -63,7 +59,7 @@ * * @opensearch.internal */ -public final class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener { +public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshListener { private final Logger logger; @@ -98,16 +94,8 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres private final RemoteRefreshSegmentTracker segmentTracker; private final Map localSegmentChecksumMap; private long primaryTerm; - - /** - * This boolean is used to ensure that there is only 1 retry scheduled/running at any time. - */ - private final AtomicBoolean retryScheduled = new AtomicBoolean(false); - private volatile Iterator backoffDelayIterator; - private volatile Scheduler.ScheduledCancellable scheduledCancellableRetry; - /** * Keeps track of segment files and their size in bytes which are part of the most recent refresh. */ @@ -122,6 +110,7 @@ public RemoteStoreRefreshListener( SegmentReplicationCheckpointPublisher checkpointPublisher, RemoteRefreshSegmentTracker segmentTracker ) { + super(indexShard.getThreadPool()); logger = Loggers.getLogger(getClass(), indexShard.shardId()); this.indexShard = indexShard; this.storeDirectory = indexShard.store().directory(); @@ -172,32 +161,40 @@ public void beforeRefresh() throws IOException {} * This method also uploads remote_segments_metadata file which contains metadata of each segment file uploaded. * * @param didRefresh true if the refresh opened a new reference + * @return true if the method runs successfully. */ @Override - public void afterRefresh(boolean didRefresh) { + protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + if (didRefresh && isRetry == false) { + updateLocalRefreshTimeAndSeqNo(); + } + boolean successful; if (this.primaryTerm != indexShard.getOperationPrimaryTerm() || didRefresh || remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()) { - updateLocalRefreshTimeAndSeqNo(); - try { - indexShard.getThreadPool().executor(ThreadPool.Names.REMOTE_REFRESH).submit(() -> syncSegments(false)).get(); - } catch (InterruptedException | ExecutionException e) { - logger.info("Exception occurred while scheduling syncSegments", e); - } + successful = syncSegments(); + } else { + successful = true; } + return successful; } - private synchronized void syncSegments(boolean isRetry) { - if (indexShard.getReplicationTracker().isPrimaryMode() == false) { - return; + private synchronized boolean syncSegments() { + if (indexShard.getReplicationTracker().isPrimaryMode() == false || indexShard.state() == IndexShardState.CLOSED) { + logger.info( + "Skipped syncing segments with primaryMode={} indexShardState={}", + indexShard.getReplicationTracker().isPrimaryMode(), + indexShard.state() + ); + return true; } ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint(); indexShard.onCheckpointPublished(checkpoint); - beforeSegmentsSync(isRetry); + beforeSegmentsSync(); long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs(); long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo(); long bytesBeforeUpload = segmentTracker.getUploadBytesSucceeded(), startTimeInNS = System.nanoTime(); - final AtomicBoolean shouldRetry = new AtomicBoolean(true); + final AtomicBoolean successful = new AtomicBoolean(false); try { if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { @@ -265,7 +262,7 @@ public void onResponse(Void unused) { ); // At this point since we have uploaded new segments, segment infos and segment metadata file, // along with marking minSeqNoToKeep, upload has succeeded completely. - shouldRetry.set(false); + successful.set(true); } catch (Exception e) { // We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried // in the next refresh. This should not affect durability of the indexed data after remote trans-log @@ -295,8 +292,10 @@ public void onFailure(Exception e) { } catch (Throwable t) { logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t); } - updateFinalStatusInSegmentTracker(shouldRetry.get() == false, bytesBeforeUpload, startTimeInNS); - afterSegmentsSync(isRetry, shouldRetry.get()); + updateFinalStatusInSegmentTracker(successful.get(), bytesBeforeUpload, startTimeInNS); + // If there are failures in uploading segments, then we should retry as search idle can lead to + // refresh not occurring until write happens. + return successful.get(); } /** @@ -312,10 +311,7 @@ private void clearStaleFilesFromLocalSegmentChecksumMap(Collection local .forEach(localSegmentChecksumMap::remove); } - private void beforeSegmentsSync(boolean isRetry) { - if (isRetry) { - logger.info("Retrying to sync the segments to remote store"); - } + private void beforeSegmentsSync() { // Start tracking total uploads started segmentTracker.incrementTotalUploadsStarted(); } @@ -333,28 +329,12 @@ private void onSuccessfulSegmentsSync( updateRemoteRefreshTimeAndSeqNo(refreshTimeMs, refreshClockTimeMs, refreshSeqNo); // Reset the backoffDelayIterator for the future failures resetBackOffDelayIterator(); - // Cancel the scheduled cancellable retry if possible and set it to null - cancelAndResetScheduledCancellableRetry(); // Set the minimum sequence number for keeping translog indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1); // Publishing the new checkpoint which is used for remote store + segrep indexes checkpointPublisher.publish(indexShard, checkpoint); } - /** - * Cancels the scheduled retry if there is one scheduled, and it has not started yet. Clears the reference as the - * schedule retry has been cancelled, or it was null in the first place, or it is running/ran already. - */ - private void cancelAndResetScheduledCancellableRetry() { - if (scheduledCancellableRetry != null && scheduledCancellableRetry.getDelay(TimeUnit.NANOSECONDS) > 0) { - scheduledCancellableRetry.cancel(); - // Since we are cancelling the retry attempt as an internal/external refresh happened already before the retry job could be - // started and the current run successfully uploaded the segments. - retryScheduled.set(false); - } - scheduledCancellableRetry = null; - } - /** * Resets the backoff delay iterator so that the next set of failures starts with the base delay and goes upto max delay. */ @@ -362,18 +342,14 @@ private void resetBackOffDelayIterator() { backoffDelayIterator = EXPONENTIAL_BACKOFF_POLICY.iterator(); } - private void afterSegmentsSync(boolean isRetry, boolean shouldRetry) { - // If this was a retry attempt, then we set the retryScheduled to false so that the next retry (if needed) can be scheduled - if (isRetry) { - retryScheduled.set(false); - } + @Override + protected TimeValue getNextRetryInterval() { + return backoffDelayIterator.next(); + } - // If there are failures in uploading segments, then we should retry as search idle can lead to - // refresh not occurring until write happens. - if (shouldRetry && indexShard.state() != IndexShardState.CLOSED && retryScheduled.compareAndSet(false, true)) { - scheduledCancellableRetry = indexShard.getThreadPool() - .schedule(() -> this.syncSegments(true), backoffDelayIterator.next(), ThreadPool.Names.REMOTE_REFRESH); - } + @Override + protected String getRetryThreadPoolName() { + return ThreadPool.Names.REMOTE_REFRESH_RETRY; } private boolean isRefreshAfterCommit() throws IOException { @@ -522,4 +498,9 @@ private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesB segmentTracker.incrementTotalUploadsFailed(); } } + + @Override + protected Logger getLogger() { + return logger; + } } diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 99bb7f95a85e5..0851677bcb13a 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -112,7 +112,7 @@ public static class Names { public static final String TRANSLOG_TRANSFER = "translog_transfer"; public static final String TRANSLOG_SYNC = "translog_sync"; public static final String REMOTE_PURGE = "remote_purge"; - public static final String REMOTE_REFRESH = "remote_refresh"; + public static final String REMOTE_REFRESH_RETRY = "remote_refresh_retry"; public static final String INDEX_SEARCHER = "index_searcher"; } @@ -181,7 +181,7 @@ public static ThreadPoolType fromType(String type) { map.put(Names.TRANSLOG_TRANSFER, ThreadPoolType.SCALING); map.put(Names.TRANSLOG_SYNC, ThreadPoolType.FIXED); map.put(Names.REMOTE_PURGE, ThreadPoolType.SCALING); - map.put(Names.REMOTE_REFRESH, ThreadPoolType.SCALING); + map.put(Names.REMOTE_REFRESH_RETRY, ThreadPoolType.SCALING); if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { map.put(Names.INDEX_SEARCHER, ThreadPoolType.RESIZABLE); } @@ -264,8 +264,8 @@ public ThreadPool( builders.put(Names.TRANSLOG_SYNC, new FixedExecutorBuilder(settings, Names.TRANSLOG_SYNC, allocatedProcessors * 4, 10000)); builders.put(Names.REMOTE_PURGE, new ScalingExecutorBuilder(Names.REMOTE_PURGE, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); builders.put( - Names.REMOTE_REFRESH, - new ScalingExecutorBuilder(Names.REMOTE_REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)) + Names.REMOTE_REFRESH_RETRY, + new ScalingExecutorBuilder(Names.REMOTE_REFRESH_RETRY, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)) ); if (FeatureFlags.isEnabled(FeatureFlags.CONCURRENT_SEGMENT_SEARCH)) { builders.put( diff --git a/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java new file mode 100644 index 0000000000000..b9df9ed5a13d8 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/shard/CloseableRetryableRefreshListenerTests.java @@ -0,0 +1,307 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +public class CloseableRetryableRefreshListenerTests extends OpenSearchTestCase { + + private static final Logger logger = LogManager.getLogger(CloseableRetryableRefreshListenerTests.class); + + private ThreadPool threadPool; + + @Before + public void init() { + threadPool = new TestThreadPool(getTestName()); + } + + /** + * This tests that the performAfterRefresh method is being invoked when the afterRefresh method is invoked. We check that the countDownLatch is decreasing as intended to validate that the performAfterRefresh is being invoked. + */ + public void testPerformAfterRefresh() throws IOException { + + CountDownLatch countDownLatch = new CountDownLatch(2); + CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(null) { + @Override + protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + countDownLatch.countDown(); + return false; + } + + @Override + public void beforeRefresh() {} + + @Override + protected Logger getLogger() { + return logger; + } + }; + + // First invocation of afterRefresh method + testRefreshListener.afterRefresh(true); + assertEquals(1, countDownLatch.getCount()); + + // Second invocation of afterRefresh method + testRefreshListener.afterRefresh(true); + assertEquals(0, countDownLatch.getCount()); + testRefreshListener.close(); + } + + /** + * This tests that close is acquiring all permits and even if the afterRefresh method is called, it is no-op. + */ + public void testCloseAfterRefresh() throws IOException { + final int initialCount = randomIntBetween(10, 100); + final CountDownLatch countDownLatch = new CountDownLatch(initialCount); + CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(null) { + @Override + protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + countDownLatch.countDown(); + return false; + } + + @Override + public void beforeRefresh() {} + + @Override + protected Logger getLogger() { + return logger; + } + }; + + int refreshCount = randomIntBetween(1, initialCount); + for (int i = 0; i < refreshCount; i++) { + testRefreshListener.afterRefresh(true); + } + assertEquals(initialCount - refreshCount, countDownLatch.getCount()); + + // Closing the refresh listener so that no further afterRefreshes are executed going forward + testRefreshListener.close(); + + for (int i = 0; i < initialCount - refreshCount; i++) { + testRefreshListener.afterRefresh(true); + } + assertEquals(initialCount - refreshCount, countDownLatch.getCount()); + } + + /** + * This tests that the retry does not get triggered when there are missing configurations or method overrides that empowers the retry to happen. + */ + public void testNoRetry() throws IOException { + int initialCount = randomIntBetween(10, 100); + final CountDownLatch countDownLatch = new CountDownLatch(initialCount); + CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(null) { + @Override + protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + countDownLatch.countDown(); + return countDownLatch.getCount() == 0; + } + + @Override + public void beforeRefresh() {} + + @Override + protected Logger getLogger() { + return logger; + } + }; + testRefreshListener.afterRefresh(true); + assertEquals(initialCount - 1, countDownLatch.getCount()); + testRefreshListener.close(); + + testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + @Override + protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + countDownLatch.countDown(); + return countDownLatch.getCount() == 0; + } + + @Override + public void beforeRefresh() {} + + @Override + protected Logger getLogger() { + return logger; + } + }; + testRefreshListener.afterRefresh(true); + assertEquals(initialCount - 2, countDownLatch.getCount()); + testRefreshListener.close(); + + testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + @Override + protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + countDownLatch.countDown(); + return countDownLatch.getCount() == 0; + } + + @Override + public void beforeRefresh() {} + + @Override + protected String getRetryThreadPoolName() { + return ThreadPool.Names.REMOTE_REFRESH_RETRY; + } + + @Override + protected Logger getLogger() { + return logger; + } + }; + testRefreshListener.afterRefresh(true); + assertEquals(initialCount - 3, countDownLatch.getCount()); + testRefreshListener.close(); + + testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + @Override + protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + countDownLatch.countDown(); + return countDownLatch.getCount() == 0; + } + + @Override + public void beforeRefresh() {} + + @Override + protected TimeValue getNextRetryInterval() { + return TimeValue.timeValueMillis(100); + } + + @Override + protected Logger getLogger() { + return logger; + } + }; + testRefreshListener.afterRefresh(true); + assertEquals(initialCount - 4, countDownLatch.getCount()); + testRefreshListener.close(); + } + + /** + * This tests that retry gets scheduled and executed when the configurations and method overrides are done properly. + */ + public void testRetry() throws Exception { + int initialCount = randomIntBetween(10, 20); + final CountDownLatch countDownLatch = new CountDownLatch(initialCount); + CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + @Override + protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + countDownLatch.countDown(); + return countDownLatch.getCount() == 0; + } + + @Override + public void beforeRefresh() {} + + @Override + protected String getRetryThreadPoolName() { + return ThreadPool.Names.REMOTE_REFRESH_RETRY; + } + + @Override + protected TimeValue getNextRetryInterval() { + return TimeValue.timeValueMillis(100); + } + + @Override + protected Logger getLogger() { + return logger; + } + }; + testRefreshListener.afterRefresh(true); + assertBusy(() -> assertEquals(0, countDownLatch.getCount())); + testRefreshListener.close(); + } + + /** + * This tests that once close method is invoked, then even the retries would become no-op. + */ + public void testCloseWithRetryPending() throws IOException { + int initialCount = randomIntBetween(10, 20); + final CountDownLatch countDownLatch = new CountDownLatch(initialCount); + CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + @Override + protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + countDownLatch.countDown(); + return countDownLatch.getCount() == 0; + } + + @Override + public void beforeRefresh() {} + + @Override + protected String getRetryThreadPoolName() { + return ThreadPool.Names.REMOTE_REFRESH_RETRY; + } + + @Override + protected TimeValue getNextRetryInterval() { + return TimeValue.timeValueMillis(100); + } + + @Override + protected Logger getLogger() { + return logger; + } + }; + testRefreshListener.afterRefresh(randomBoolean()); + testRefreshListener.close(); + assertNotEquals(0, countDownLatch.getCount()); + } + + public void testCloseWaitsForAcquiringAllPermits() throws Exception { + final CountDownLatch countDownLatch = new CountDownLatch(1); + CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(threadPool) { + @Override + protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + countDownLatch.countDown(); + return false; + } + + @Override + public void beforeRefresh() {} + + @Override + protected Logger getLogger() { + return logger; + } + }; + Thread thread = new Thread(() -> { + try { + testRefreshListener.afterRefresh(randomBoolean()); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + thread.start(); + assertBusy(() -> assertEquals(0, countDownLatch.getCount())); + testRefreshListener.close(); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + terminate(threadPool); + } +} diff --git a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java index 33ad845ea647e..92bd15d818bca 100644 --- a/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java +++ b/server/src/test/java/org/opensearch/threadpool/ScalingThreadPoolTests.java @@ -135,7 +135,7 @@ private int expectedSize(final String threadPoolName, final int numberOfProcesso sizes.put(ThreadPool.Names.TRANSLOG_TRANSFER, ThreadPool::halfAllocatedProcessorsMaxTen); sizes.put(ThreadPool.Names.TRANSLOG_SYNC, n -> 4 * n); sizes.put(ThreadPool.Names.REMOTE_PURGE, ThreadPool::halfAllocatedProcessorsMaxFive); - sizes.put(ThreadPool.Names.REMOTE_REFRESH, ThreadPool::halfAllocatedProcessorsMaxTen); + sizes.put(ThreadPool.Names.REMOTE_REFRESH_RETRY, ThreadPool::halfAllocatedProcessorsMaxTen); return sizes.get(threadPoolName).apply(numberOfProcessors); } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index b23c28be14865..93b9742ada0da 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -42,7 +42,6 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.Version; import org.opensearch.action.ActionListener; -import org.opensearch.action.LatchedActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.PlainActionFuture; @@ -651,7 +650,7 @@ protected RepositoriesService createRepositoriesService() { BlobStore blobStore = Mockito.mock(BlobStore.class); BlobContainer blobContainer = Mockito.mock(BlobContainer.class); doAnswer(invocation -> { - LatchedActionListener> listener = invocation.getArgument(3); + ActionListener> listener = invocation.getArgument(3); listener.onResponse(new ArrayList<>()); return null; }).when(blobContainer)