Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use CloseableRetryableRefreshListener to drain ongoing after refresh tasks on relocation #8683

Merged
merged 7 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
*
* @opensearch.internal
*/
public class CheckpointRefreshListener implements ReferenceManager.RefreshListener {
public class CheckpointRefreshListener extends CloseableRetryableRefreshListener {

protected static Logger logger = LogManager.getLogger(CheckpointRefreshListener.class);

Expand All @@ -39,12 +39,18 @@ public void beforeRefresh() throws IOException {
}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) {
if (didRefresh
&& shard.state() == IndexShardState.STARTED
&& shard.getReplicationTracker().isPrimaryMode()
&& !shard.indexSettings.isSegRepWithRemoteEnabled()) {
publisher.publish(shard, shard.getLatestReplicationCheckpoint());
}
return true;
}

@Override
protected Logger getLogger() {
return logger;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.ReferenceManager;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* RefreshListener that runs afterRefresh method if and only if there is a permit available. Once the listener
* is closed, all the permits are acquired and there are no available permits to afterRefresh. This abstract class provides
* necessary abstract methods to schedule retry.
*/
public abstract class CloseableRetryableRefreshListener implements ReferenceManager.RefreshListener, Closeable {

/**
* Total permits = 1 ensures that there is only single instance of performAfterRefresh that is running at a time.
* In case there are use cases where concurrency is required, the total permit variable can be put inside the ctor.
*/
private static final int TOTAL_PERMITS = 1;

private final Semaphore semaphore = new Semaphore(TOTAL_PERMITS);

private final ThreadPool threadPool;

/**
* This boolean is used to ensure that there is only 1 retry scheduled/running at any time.
*/
private final AtomicBoolean retryScheduled = new AtomicBoolean(false);

public CloseableRetryableRefreshListener() {
this.threadPool = null;
}

public CloseableRetryableRefreshListener(ThreadPool threadPool) {
this.threadPool = threadPool;
}

@Override
public final void afterRefresh(boolean didRefresh) throws IOException {
boolean successful;
boolean permitAcquired = semaphore.tryAcquire();
try {
successful = permitAcquired && performAfterRefresh(didRefresh, false);
} finally {
if (permitAcquired) {
semaphore.release();
}
}
scheduleRetry(successful, didRefresh, permitAcquired);
}

protected String getRetryThreadPoolName() {
return null;
}

protected TimeValue getNextRetryInterval() {
return null;
}
ashking94 marked this conversation as resolved.
Show resolved Hide resolved

private void scheduleRetry(TimeValue interval, String retryThreadPoolName, boolean didRefresh, boolean isRetry) {
if (this.threadPool == null
|| interval == null
|| retryThreadPoolName == null
|| ThreadPool.THREAD_POOL_TYPES.containsKey(retryThreadPoolName) == false
|| interval == TimeValue.MINUS_ONE
|| retryScheduled.compareAndSet(false, true) == false) {
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
return;
}
boolean scheduled = false;
try {
this.threadPool.schedule(() -> {
boolean successful;
boolean permitAcquired = semaphore.tryAcquire();
try {
successful = permitAcquired && performAfterRefresh(didRefresh, isRetry);
} finally {
if (permitAcquired) {
semaphore.release();
}
retryScheduled.set(false);
}
scheduleRetry(successful, didRefresh, isRetry || permitAcquired);
}, interval, retryThreadPoolName);
scheduled = true;
getLogger().info("Scheduled retry with didRefresh={} isRetry={}", didRefresh, isRetry);
} finally {
if (scheduled == false) {
retryScheduled.set(false);
}
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* Schedules the retry based on the {@code afterRefreshSuccessful} value.
*
* @param afterRefreshSuccessful is sent true if the performAfterRefresh(..) is successful.
* @param didRefresh if the refresh did open a new reference then didRefresh will be true
* @param isRetry if this is a failure or permit was not acquired.
*/
private void scheduleRetry(boolean afterRefreshSuccessful, boolean didRefresh, boolean isRetry) {
if (afterRefreshSuccessful == false) {
scheduleRetry(getNextRetryInterval(), getRetryThreadPoolName(), didRefresh, isRetry);
}
}

/**
* This method needs to be overridden and be provided with what needs to be run on after refresh.
*
* @param didRefresh true if the refresh opened a new reference
* @param isRetry true if this is a retry attempt
* @return true if a retry is needed else false.
*/
protected abstract boolean performAfterRefresh(boolean didRefresh, boolean isRetry);

@Override
public final void close() throws IOException {
try {
if (semaphore.tryAcquire(TOTAL_PERMITS, 10, TimeUnit.MINUTES)) {
assert semaphore.availablePermits() == 0;
} else {
throw new RuntimeException("timeout while closing gated refresh listener");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

protected abstract Logger getLogger();
}
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,8 @@ Runnable getGlobalCheckpointSyncer() {
private final boolean isTimeSeriesIndex;
private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService;

private final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();

public IndexShard(
final ShardRouting shardRouting,
final IndexSettings indexSettings,
Expand Down Expand Up @@ -813,6 +815,13 @@ public void relocated(
if (syncTranslog) {
maybeSync();
}

// Ensures all in-flight remote store operations drain, before we perform the handoff.
internalRefreshListener.stream()
.filter(refreshListener -> refreshListener instanceof Closeable)
.map(refreshListener -> (Closeable) refreshListener)
.close();

// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
: "in-flight operations in progress while moving shard state to relocated";
Expand Down Expand Up @@ -3660,7 +3669,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
}
};

final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.clear();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
Expand Down
Loading