Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use CloseableRetryableRefreshListener to drain ongoing after refresh tasks on relocation #8683

Merged
merged 7 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
*
* @opensearch.internal
*/
public class CheckpointRefreshListener implements ReferenceManager.RefreshListener {
public class CheckpointRefreshListener extends CloseableRetryableRefreshListener {

protected static Logger logger = LogManager.getLogger(CheckpointRefreshListener.class);

private final IndexShard shard;
private final SegmentReplicationCheckpointPublisher publisher;

public CheckpointRefreshListener(IndexShard shard, SegmentReplicationCheckpointPublisher publisher) {
super(null);
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
this.shard = shard;
this.publisher = publisher;
}
Expand All @@ -39,12 +40,13 @@ public void beforeRefresh() throws IOException {
}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) {
if (didRefresh
&& shard.state() == IndexShardState.STARTED
&& shard.getReplicationTracker().isPrimaryMode()
&& !shard.indexSettings.isSegRepWithRemoteEnabled()) {
publisher.publish(shard, shard.getLatestReplicationCheckpoint());
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.shard;

import org.apache.lucene.search.ReferenceManager;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* RefreshListener that runs afterRefresh method if and only if there is a permit available. Once the listener
* is closed, all the permits are acquired and there are no available permits to afterRefresh. This abstract class provides
* necessary abstract methods to schedule retry.
*/
public abstract class CloseableRetryableRefreshListener implements ReferenceManager.RefreshListener, Closeable {

/**
* Total permits = 1 ensures that there is only single instance of performAfterRefresh that is running at a time.
* In case there are use cases where concurrency is required, the total permit variable can be put inside the ctor.
*/
private static final int TOTAL_PERMITS = 1;

private final Semaphore semaphore = new Semaphore(TOTAL_PERMITS);

private final ThreadPool threadPool;

/**
* This boolean is used to ensure that there is only 1 retry scheduled/running at any time.
*/
private final AtomicBoolean retryScheduled = new AtomicBoolean(false);

public CloseableRetryableRefreshListener(ThreadPool threadPool) {
this.threadPool = threadPool;
}

@Override
public final void afterRefresh(boolean didRefresh) throws IOException {
boolean successful;
boolean permitAcquired = semaphore.tryAcquire();
try {
successful = permitAcquired && performAfterRefresh(didRefresh, false);
} finally {
if (permitAcquired) {
semaphore.release();
}
}
scheduleRetry(successful, didRefresh, permitAcquired == false);
}

protected String getRetryThreadPoolName() {
return null;
}

protected TimeValue getNextRetryInterval() {
return null;
}
ashking94 marked this conversation as resolved.
Show resolved Hide resolved

private void scheduleRetry(TimeValue interval, String retryThreadPoolName, boolean didRefresh, boolean permitsUnavailable) {
if (this.threadPool == null
|| interval == null
|| retryThreadPoolName == null
|| ThreadPool.THREAD_POOL_TYPES.containsKey(retryThreadPoolName) == false
|| interval == TimeValue.MINUS_ONE
|| retryScheduled.compareAndSet(false, true) == false) {
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
return;
}
boolean finalDidRefresh = didRefresh && permitsUnavailable;
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
boolean scheduled = false;
try {
this.threadPool.schedule(() -> {
boolean successful;
boolean permitAcquired = semaphore.tryAcquire();
try {
successful = permitAcquired && performAfterRefresh(finalDidRefresh, true);
} finally {
if (permitAcquired) {
semaphore.release();
}
retryScheduled.set(false);
}
scheduleRetry(successful, finalDidRefresh, permitAcquired == false);
}, interval, retryThreadPoolName);
scheduled = true;
} finally {
if (scheduled == false) {
retryScheduled.set(false);
}
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* Schedules the retry based on the {@code afterRefreshSuccessful} value.
*
* @param afterRefreshSuccessful is sent true if the performAfterRefresh(..) is successful.
* @param didRefresh if the refresh did open a new reference then didRefresh will be true
* @param permitsUnavailable if the permits were unavailable during running performAfterRefresh or scheduling retry.
*/
private void scheduleRetry(boolean afterRefreshSuccessful, boolean didRefresh, boolean permitsUnavailable) {
if (afterRefreshSuccessful == false) {
scheduleRetry(getNextRetryInterval(), getRetryThreadPoolName(), didRefresh, permitsUnavailable);
}
}

/**
* This method needs to be overridden and be provided with what needs to be run on after refresh.
*
* @param didRefresh true if the refresh opened a new reference
* @param isRetry true if this is a retry attempt
* @return true if a retry is needed else false.
*/
protected abstract boolean performAfterRefresh(boolean didRefresh, boolean isRetry);

@Override
public final void close() throws IOException {
try {
if (semaphore.tryAcquire(TOTAL_PERMITS, 10, TimeUnit.MINUTES)) {
assert semaphore.availablePermits() == 0;
} else {
throw new RuntimeException("timeout while closing gated refresh listener");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,8 @@ Runnable getGlobalCheckpointSyncer() {
private final boolean isTimeSeriesIndex;
private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService;

private final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();

public IndexShard(
final ShardRouting shardRouting,
final IndexSettings indexSettings,
Expand Down Expand Up @@ -813,6 +815,13 @@ public void relocated(
if (syncTranslog) {
maybeSync();
}

// Ensures all in-flight remote store operations drain, before we perform the handoff.
internalRefreshListener.stream()
.filter(refreshListener -> refreshListener instanceof Closeable)
.map(refreshListener -> (Closeable) refreshListener)
.close();

// no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
: "in-flight operations in progress while moving shard state to relocated";
Expand Down Expand Up @@ -3660,7 +3669,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
}
};

final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
internalRefreshListener.clear();
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) {
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
Expand All @@ -37,7 +36,6 @@
import org.opensearch.index.translog.Translog;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
Expand All @@ -51,8 +49,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

Expand All @@ -63,7 +59,7 @@
*
* @opensearch.internal
*/
public final class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener {
public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshListener {

private final Logger logger;

Expand Down Expand Up @@ -98,16 +94,8 @@ public final class RemoteStoreRefreshListener implements ReferenceManager.Refres
private final RemoteRefreshSegmentTracker segmentTracker;
private final Map<String, String> localSegmentChecksumMap;
private long primaryTerm;

/**
* This boolean is used to ensure that there is only 1 retry scheduled/running at any time.
*/
private final AtomicBoolean retryScheduled = new AtomicBoolean(false);

private volatile Iterator<TimeValue> backoffDelayIterator;

private volatile Scheduler.ScheduledCancellable scheduledCancellableRetry;

/**
* Keeps track of segment files and their size in bytes which are part of the most recent refresh.
*/
Expand All @@ -122,6 +110,7 @@ public RemoteStoreRefreshListener(
SegmentReplicationCheckpointPublisher checkpointPublisher,
RemoteRefreshSegmentTracker segmentTracker
) {
super(indexShard.getThreadPool());
logger = Loggers.getLogger(getClass(), indexShard.shardId());
this.indexShard = indexShard;
this.storeDirectory = indexShard.store().directory();
Expand Down Expand Up @@ -170,34 +159,42 @@ public void beforeRefresh() throws IOException {}
/**
* Upload new segment files created as part of the last refresh to the remote segment store.
* This method also uploads remote_segments_metadata file which contains metadata of each segment file uploaded.
*
* @param didRefresh true if the refresh opened a new reference
* @return true if the method runs successfully.
*/
@Override
public void afterRefresh(boolean didRefresh) {
protected boolean performAfterRefresh(boolean didRefresh, boolean isRetry) {
if (didRefresh) {
updateLocalRefreshTimeAndSeqNo();
}
boolean successful;
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()
|| didRefresh
|| isRetry
|| remoteDirectory.getSegmentsUploadedToRemoteStore().isEmpty()) {
updateLocalRefreshTimeAndSeqNo();
try {
indexShard.getThreadPool().executor(ThreadPool.Names.REMOTE_REFRESH).submit(() -> syncSegments(false)).get();
} catch (InterruptedException | ExecutionException e) {
logger.info("Exception occurred while scheduling syncSegments", e);
}
successful = syncSegments(false);
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
} else {
successful = true;
}
return successful;
}

private synchronized void syncSegments(boolean isRetry) {
if (indexShard.getReplicationTracker().isPrimaryMode() == false) {
return;
private synchronized boolean syncSegments(boolean isRetry) {
if (indexShard.getReplicationTracker().isPrimaryMode() == false || indexShard.state() == IndexShardState.CLOSED) {
logger.info(
"Skipped syncing segments with primaryMode={} indexShardState={}",
indexShard.getReplicationTracker().isPrimaryMode(),
indexShard.state()
);
return true;
}
ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint();
indexShard.onCheckpointPublished(checkpoint);
beforeSegmentsSync(isRetry);
long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs();
long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo();
long bytesBeforeUpload = segmentTracker.getUploadBytesSucceeded(), startTimeInNS = System.nanoTime();
final AtomicBoolean shouldRetry = new AtomicBoolean(true);
final AtomicBoolean successful = new AtomicBoolean(false);

try {
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
Expand Down Expand Up @@ -265,7 +262,7 @@ public void onResponse(Void unused) {
);
// At this point since we have uploaded new segments, segment infos and segment metadata file,
// along with marking minSeqNoToKeep, upload has succeeded completely.
shouldRetry.set(false);
successful.set(true);
} catch (Exception e) {
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
// in the next refresh. This should not affect durability of the indexed data after remote trans-log
Expand Down Expand Up @@ -295,8 +292,10 @@ public void onFailure(Exception e) {
} catch (Throwable t) {
logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t);
}
updateFinalStatusInSegmentTracker(shouldRetry.get() == false, bytesBeforeUpload, startTimeInNS);
afterSegmentsSync(isRetry, shouldRetry.get());
updateFinalStatusInSegmentTracker(successful.get(), bytesBeforeUpload, startTimeInNS);
// If there are failures in uploading segments, then we should retry as search idle can lead to
// refresh not occurring until write happens.
return successful.get();
}

/**
Expand Down Expand Up @@ -333,47 +332,27 @@ private void onSuccessfulSegmentsSync(
updateRemoteRefreshTimeAndSeqNo(refreshTimeMs, refreshClockTimeMs, refreshSeqNo);
// Reset the backoffDelayIterator for the future failures
resetBackOffDelayIterator();
// Cancel the scheduled cancellable retry if possible and set it to null
cancelAndResetScheduledCancellableRetry();
// Set the minimum sequence number for keeping translog
indexShard.getEngine().translogManager().setMinSeqNoToKeep(lastRefreshedCheckpoint + 1);
// Publishing the new checkpoint which is used for remote store + segrep indexes
checkpointPublisher.publish(indexShard, checkpoint);
}

/**
* Cancels the scheduled retry if there is one scheduled, and it has not started yet. Clears the reference as the
* schedule retry has been cancelled, or it was null in the first place, or it is running/ran already.
*/
private void cancelAndResetScheduledCancellableRetry() {
if (scheduledCancellableRetry != null && scheduledCancellableRetry.getDelay(TimeUnit.NANOSECONDS) > 0) {
scheduledCancellableRetry.cancel();
// Since we are cancelling the retry attempt as an internal/external refresh happened already before the retry job could be
// started and the current run successfully uploaded the segments.
retryScheduled.set(false);
}
scheduledCancellableRetry = null;
}

/**
* Resets the backoff delay iterator so that the next set of failures starts with the base delay and goes upto max delay.
*/
private void resetBackOffDelayIterator() {
backoffDelayIterator = EXPONENTIAL_BACKOFF_POLICY.iterator();
}

private void afterSegmentsSync(boolean isRetry, boolean shouldRetry) {
// If this was a retry attempt, then we set the retryScheduled to false so that the next retry (if needed) can be scheduled
if (isRetry) {
retryScheduled.set(false);
}
@Override
protected TimeValue getNextRetryInterval() {
return backoffDelayIterator.next();
}

// If there are failures in uploading segments, then we should retry as search idle can lead to
// refresh not occurring until write happens.
if (shouldRetry && indexShard.state() != IndexShardState.CLOSED && retryScheduled.compareAndSet(false, true)) {
scheduledCancellableRetry = indexShard.getThreadPool()
.schedule(() -> this.syncSegments(true), backoffDelayIterator.next(), ThreadPool.Names.REMOTE_REFRESH);
}
@Override
protected String getRetryThreadPoolName() {
return ThreadPool.Names.REMOTE_REFRESH_RETRY;
}

private boolean isRefreshAfterCommit() throws IOException {
Expand Down
Loading