Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Renew retention leases while following #39335

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -540,12 +540,16 @@ protected void syncRetentionLeases(ShardId shardId, RetentionLeases leases, Acti
new SyncRetentionLeases(request, ReplicationGroup.this, wrappedListener).execute();
}

public RetentionLease addRetentionLease(String id, long retainingSequenceNumber, String source,
public synchronized RetentionLease addRetentionLease(String id, long retainingSequenceNumber, String source,
ActionListener<ReplicationResponse> listener) {
return getPrimary().addRetentionLease(id, retainingSequenceNumber, source, listener);
}

public void removeRetentionLease(String id, ActionListener<ReplicationResponse> listener) {
public synchronized RetentionLease renewRetentionLease(String id, long retainingSequenceNumber, String source) {
return getPrimary().renewRetentionLease(id, retainingSequenceNumber, source);
}

public synchronized void removeRetentionLease(String id, ActionListener<ReplicationResponse> listener) {
getPrimary().removeRetentionLease(id, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
Expand All @@ -18,11 +19,18 @@

import java.util.Locale;
import java.util.Optional;

import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
import java.util.concurrent.TimeUnit;

public class CcrRetentionLeases {

// this setting is intentionally not registered, it is only used in tests
public static final Setting<TimeValue> RETENTION_LEASE_RENEW_INTERVAL_SETTING =
Setting.timeSetting(
"index.ccr.retention_lease.renew_interval",
new TimeValue(5, TimeUnit.MINUTES),
new TimeValue(0, TimeUnit.MILLISECONDS),
Setting.Property.NodeScope);

/**
* The retention lease ID used by followers.
*
Expand Down Expand Up @@ -52,20 +60,22 @@ public static String retentionLeaseId(
* Synchronously requests to add a retention lease with the specified retention lease ID on the specified leader shard using the given
* remote client. Note that this method will block up to the specified timeout.
*
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param remoteClient the remote client on which to execute this request
* @param timeout the timeout
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param retainingSequenceNumber the retaining sequence number
* @param remoteClient the remote client on which to execute this request
* @param timeout the timeout
* @return an optional exception indicating whether or not the retention lease already exists
*/
public static Optional<RetentionLeaseAlreadyExistsException> syncAddRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final long retainingSequenceNumber,
final Client remoteClient,
final TimeValue timeout) {
try {
final PlainActionFuture<RetentionLeaseActions.Response> response = new PlainActionFuture<>();
asyncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response);
asyncAddRetentionLease(leaderShardId, retentionLeaseId, retainingSequenceNumber, remoteClient, response);
response.actionGet(timeout);
return Optional.empty();
} catch (final RetentionLeaseAlreadyExistsException e) {
Expand All @@ -78,39 +88,43 @@ public static Optional<RetentionLeaseAlreadyExistsException> syncAddRetentionLea
* remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a response
* or failure.
*
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param remoteClient the remote client on which to execute this request
* @param listener the listener
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param retainingSequenceNumber the retaining sequence number
* @param remoteClient the remote client on which to execute this request
* @param listener the listener
*/
public static void asyncAddRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final long retainingSequenceNumber,
final Client remoteClient,
final ActionListener<RetentionLeaseActions.Response> listener) {
final RetentionLeaseActions.AddRequest request =
new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr");
new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, retainingSequenceNumber, "ccr");
remoteClient.execute(RetentionLeaseActions.Add.INSTANCE, request, listener);
}

/**
* Synchronously requests to renew a retention lease with the specified retention lease ID on the specified leader shard using the given
* remote client. Note that this method will block up to the specified timeout.
*
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param remoteClient the remote client on which to execute this request
* @param timeout the timeout
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param retainingSequenceNumber the retaining sequence number
* @param remoteClient the remote client on which to execute this request
* @param timeout the timeout
* @return an optional exception indicating whether or not the retention lease already exists
*/
public static Optional<RetentionLeaseNotFoundException> syncRenewRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final long retainingSequenceNumber,
final Client remoteClient,
final TimeValue timeout) {
try {
final PlainActionFuture<RetentionLeaseActions.Response> response = new PlainActionFuture<>();
asyncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response);
asyncRenewRetentionLease(leaderShardId, retentionLeaseId, retainingSequenceNumber, remoteClient, response);
response.actionGet(timeout);
return Optional.empty();
} catch (final RetentionLeaseNotFoundException e) {
Expand All @@ -123,18 +137,20 @@ public static Optional<RetentionLeaseNotFoundException> syncRenewRetentionLease(
* given remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a
* response or failure.
*
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param remoteClient the remote client on which to execute this request
* @param listener the listener
* @param leaderShardId the leader shard ID
* @param retentionLeaseId the retention lease ID
* @param retainingSequenceNumber the retaining sequence number
* @param remoteClient the remote client on which to execute this request
* @param listener the listener
*/
public static void asyncRenewRetentionLease(
final ShardId leaderShardId,
final String retentionLeaseId,
final long retainingSequenceNumber,
final Client remoteClient,
final ActionListener<RetentionLeaseActions.Response> listener) {
final RetentionLeaseActions.RenewRequest request =
new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr");
new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, retainingSequenceNumber, "ccr");
remoteClient.execute(RetentionLeaseActions.Renew.INSTANCE, request, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.transport.NoSuchRemoteClusterException;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;

Expand Down Expand Up @@ -94,6 +95,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {

private volatile ElasticsearchException fatalException;

private Scheduler.Cancellable renewable;

synchronized Scheduler.Cancellable getRenewable() {
return renewable;
}

ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, final LongSupplier relativeTimeProvider) {
super(id, type, action, description, parentTask, headers);
Expand Down Expand Up @@ -121,7 +128,8 @@ void start(
final long followerMaxSeqNo) {
/*
* While this should only ever be called once and before any other threads can touch these fields, we use synchronization here to
* avoid the need to declare these fields as volatile. That is, we are ensuring thesefields are always accessed under the same lock.
* avoid the need to declare these fields as volatile. That is, we are ensuring these fields are always accessed under the same
* lock.
*/
synchronized (this) {
this.followerHistoryUUID = followerHistoryUUID;
Expand All @@ -130,6 +138,11 @@ void start(
this.followerGlobalCheckpoint = followerGlobalCheckpoint;
this.followerMaxSeqNo = followerMaxSeqNo;
this.lastRequestedSeqNo = followerGlobalCheckpoint;
renewable = scheduleBackgroundRetentionLeaseRenewal(() -> {
synchronized (ShardFollowNodeTask.this) {
return this.followerGlobalCheckpoint;
}
});
}

// updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical
Expand Down Expand Up @@ -507,8 +520,16 @@ protected abstract void innerSendBulkShardOperationsRequest(String followerHisto
protected abstract void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler,
Consumer<Exception> errorHandler);

protected abstract Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(LongSupplier followerGlobalCheckpoint);

@Override
protected void onCancelled() {
synchronized (this) {
if (renewable != null) {
renewable.cancel();
renewable = null;
}
}
markAsCompleted();
}

Expand Down
Loading