diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 85e69de8824c9..d20d57aed64a9 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -540,12 +540,16 @@ protected void syncRetentionLeases(ShardId shardId, RetentionLeases leases, Acti new SyncRetentionLeases(request, ReplicationGroup.this, wrappedListener).execute(); } - public RetentionLease addRetentionLease(String id, long retainingSequenceNumber, String source, + public synchronized RetentionLease addRetentionLease(String id, long retainingSequenceNumber, String source, ActionListener listener) { return getPrimary().addRetentionLease(id, retainingSequenceNumber, source, listener); } - public void removeRetentionLease(String id, ActionListener listener) { + public synchronized RetentionLease renewRetentionLease(String id, long retainingSequenceNumber, String source) { + return getPrimary().renewRetentionLease(id, retainingSequenceNumber, source); + } + + public synchronized void removeRetentionLease(String id, ActionListener listener) { getPrimary().removeRetentionLease(id, listener); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java index 6afef8c42aa8b..a65069dc51809 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.seqno.RetentionLeaseActions; @@ -18,11 +19,18 @@ import java.util.Locale; import java.util.Optional; - -import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; +import java.util.concurrent.TimeUnit; public class CcrRetentionLeases { + // this setting is intentionally not registered, it is only used in tests + public static final Setting RETENTION_LEASE_RENEW_INTERVAL_SETTING = + Setting.timeSetting( + "index.ccr.retention_lease.renew_interval", + new TimeValue(5, TimeUnit.MINUTES), + new TimeValue(0, TimeUnit.MILLISECONDS), + Setting.Property.NodeScope); + /** * The retention lease ID used by followers. * @@ -52,20 +60,22 @@ public static String retentionLeaseId( * Synchronously requests to add a retention lease with the specified retention lease ID on the specified leader shard using the given * remote client. Note that this method will block up to the specified timeout. * - * @param leaderShardId the leader shard ID - * @param retentionLeaseId the retention lease ID - * @param remoteClient the remote client on which to execute this request - * @param timeout the timeout + * @param leaderShardId the leader shard ID + * @param retentionLeaseId the retention lease ID + * @param retainingSequenceNumber the retaining sequence number + * @param remoteClient the remote client on which to execute this request + * @param timeout the timeout * @return an optional exception indicating whether or not the retention lease already exists */ public static Optional syncAddRetentionLease( final ShardId leaderShardId, final String retentionLeaseId, + final long retainingSequenceNumber, final Client remoteClient, final TimeValue timeout) { try { final PlainActionFuture response = new PlainActionFuture<>(); - asyncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response); + asyncAddRetentionLease(leaderShardId, retentionLeaseId, retainingSequenceNumber, remoteClient, response); response.actionGet(timeout); return Optional.empty(); } catch (final RetentionLeaseAlreadyExistsException e) { @@ -78,18 +88,20 @@ public static Optional syncAddRetentionLea * remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a response * or failure. * - * @param leaderShardId the leader shard ID - * @param retentionLeaseId the retention lease ID - * @param remoteClient the remote client on which to execute this request - * @param listener the listener + * @param leaderShardId the leader shard ID + * @param retentionLeaseId the retention lease ID + * @param retainingSequenceNumber the retaining sequence number + * @param remoteClient the remote client on which to execute this request + * @param listener the listener */ public static void asyncAddRetentionLease( final ShardId leaderShardId, final String retentionLeaseId, + final long retainingSequenceNumber, final Client remoteClient, final ActionListener listener) { final RetentionLeaseActions.AddRequest request = - new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr"); + new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, retainingSequenceNumber, "ccr"); remoteClient.execute(RetentionLeaseActions.Add.INSTANCE, request, listener); } @@ -97,20 +109,22 @@ public static void asyncAddRetentionLease( * Synchronously requests to renew a retention lease with the specified retention lease ID on the specified leader shard using the given * remote client. Note that this method will block up to the specified timeout. * - * @param leaderShardId the leader shard ID - * @param retentionLeaseId the retention lease ID - * @param remoteClient the remote client on which to execute this request - * @param timeout the timeout + * @param leaderShardId the leader shard ID + * @param retentionLeaseId the retention lease ID + * @param retainingSequenceNumber the retaining sequence number + * @param remoteClient the remote client on which to execute this request + * @param timeout the timeout * @return an optional exception indicating whether or not the retention lease already exists */ public static Optional syncRenewRetentionLease( final ShardId leaderShardId, final String retentionLeaseId, + final long retainingSequenceNumber, final Client remoteClient, final TimeValue timeout) { try { final PlainActionFuture response = new PlainActionFuture<>(); - asyncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response); + asyncRenewRetentionLease(leaderShardId, retentionLeaseId, retainingSequenceNumber, remoteClient, response); response.actionGet(timeout); return Optional.empty(); } catch (final RetentionLeaseNotFoundException e) { @@ -123,18 +137,20 @@ public static Optional syncRenewRetentionLease( * given remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a * response or failure. * - * @param leaderShardId the leader shard ID - * @param retentionLeaseId the retention lease ID - * @param remoteClient the remote client on which to execute this request - * @param listener the listener + * @param leaderShardId the leader shard ID + * @param retentionLeaseId the retention lease ID + * @param retainingSequenceNumber the retaining sequence number + * @param remoteClient the remote client on which to execute this request + * @param listener the listener */ public static void asyncRenewRetentionLease( final ShardId leaderShardId, final String retentionLeaseId, + final long retainingSequenceNumber, final Client remoteClient, final ActionListener listener) { final RetentionLeaseActions.RenewRequest request = - new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr"); + new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, retainingSequenceNumber, "ccr"); remoteClient.execute(RetentionLeaseActions.Renew.INSTANCE, request, listener); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 3918b815e9150..0ee86a6058c63 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -30,9 +30,10 @@ import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.transport.NoSuchRemoteClusterException; +import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; @@ -94,6 +95,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private volatile ElasticsearchException fatalException; + private Scheduler.Cancellable renewable; + + synchronized Scheduler.Cancellable getRenewable() { + return renewable; + } + ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map headers, ShardFollowTask params, BiConsumer scheduler, final LongSupplier relativeTimeProvider) { super(id, type, action, description, parentTask, headers); @@ -121,7 +128,8 @@ void start( final long followerMaxSeqNo) { /* * While this should only ever be called once and before any other threads can touch these fields, we use synchronization here to - * avoid the need to declare these fields as volatile. That is, we are ensuring thesefields are always accessed under the same lock. + * avoid the need to declare these fields as volatile. That is, we are ensuring these fields are always accessed under the same + * lock. */ synchronized (this) { this.followerHistoryUUID = followerHistoryUUID; @@ -130,6 +138,11 @@ void start( this.followerGlobalCheckpoint = followerGlobalCheckpoint; this.followerMaxSeqNo = followerMaxSeqNo; this.lastRequestedSeqNo = followerGlobalCheckpoint; + renewable = scheduleBackgroundRetentionLeaseRenewal(() -> { + synchronized (ShardFollowNodeTask.this) { + return this.followerGlobalCheckpoint; + } + }); } // updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical @@ -507,8 +520,16 @@ protected abstract void innerSendBulkShardOperationsRequest(String followerHisto protected abstract void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer handler, Consumer errorHandler); + protected abstract Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(LongSupplier followerGlobalCheckpoint); + @Override protected void onCancelled() { + synchronized (this) { + if (renewable != null) { + renewable.cancel(); + renewable = null; + } + } markAsCompleted(); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 46b3c6e54f576..81d8750d07c6d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -9,6 +9,8 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -32,10 +34,14 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.seqno.RetentionLeaseActions; +import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException; +import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; @@ -45,9 +51,11 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.ccr.CcrRetentionLeases; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; @@ -60,6 +68,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; +import java.util.function.LongSupplier; import java.util.function.Supplier; import static org.elasticsearch.xpack.ccr.CcrLicenseChecker.wrapClient; @@ -73,6 +82,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor this.waitForMetadataTimeOut = newVal); @@ -245,6 +256,96 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co errorHandler.accept(e); } } + + @Override + protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) { + final String retentionLeaseId = CcrRetentionLeases.retentionLeaseId( + clusterService.getClusterName().value(), + params.getFollowShardId().getIndex(), + params.getRemoteCluster(), + params.getLeaderShardId().getIndex()); + + /* + * We are going to attempt to renew the retention lease. If this fails it is either because the retention lease does not + * exist, or something else happened. If the retention lease does not exist, we will attempt to add the retention lease + * again. If that fails, it had better not be because the retention lease already exists. Either way, we will attempt to + * renew again on the next scheduled execution. + */ + final ActionListener listener = ActionListener.wrap( + r -> {}, + e -> { + /* + * We have to guard against the possibility that the shard follow node task has been stopped and the retention + * lease deliberately removed via the act of unfollowing. Note that the order of operations is important in + * TransportUnfollowAction. There, we first stop the shard follow node task, and then remove the retention + * leases on the leader. This means that if we end up here with the retention lease not existing because of an + * unfollow action, then we know that the unfollow action has already stopped the shard follow node task and + * there is no race condition with the unfollow action. + */ + if (isCancelled() || isCompleted()) { + return; + } + final Throwable cause = ExceptionsHelper.unwrapCause(e); + logRetentionLeaseFailure(retentionLeaseId, cause); + // noinspection StatementWithEmptyBody + if (cause instanceof RetentionLeaseNotFoundException) { + // note that we do not need to mark as system context here as that is restored from the original renew + logger.trace( + "{} background adding retention lease [{}] while following", + params.getFollowShardId(), + retentionLeaseId); + CcrRetentionLeases.asyncAddRetentionLease( + params.getLeaderShardId(), + retentionLeaseId, + followerGlobalCheckpoint.getAsLong(), + remoteClient(params), + ActionListener.wrap( + r -> {}, + inner -> { + /* + * If this fails that the retention lease already exists, something highly unusual is + * going on. Log it, and renew again after another renew interval has passed. + */ + final Throwable innerCause = ExceptionsHelper.unwrapCause(inner); + assert innerCause instanceof RetentionLeaseAlreadyExistsException == false; + logRetentionLeaseFailure(retentionLeaseId, innerCause); + })); + } else { + // if something else happened, we will attempt to renew again after another renew interval has passed + } + }); + + return threadPool.scheduleWithFixedDelay( + () -> { + final ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + // we have to execute under the system context so that if security is enabled the management is authorized + threadContext.markAsSystemContext(); + logger.trace( + "{} background renewing retention lease [{}] while following", + params.getFollowShardId(), + retentionLeaseId); + CcrRetentionLeases.asyncRenewRetentionLease( + params.getLeaderShardId(), + retentionLeaseId, + followerGlobalCheckpoint.getAsLong(), + remoteClient(params), + listener); + } + }, + retentionLeaseRenewInterval, + Ccr.CCR_THREAD_POOL_NAME); + } + + private void logRetentionLeaseFailure(final String retentionLeaseId, final Throwable cause) { + assert cause instanceof ElasticsearchSecurityException == false : cause; + logger.warn(new ParameterizedMessage( + "{} background management of retention lease [{}] failed while following", + params.getFollowShardId(), + retentionLeaseId), + cause); + } + }; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 9c580bc245406..0b445a3eb01ef 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.metrics.CounterMetric; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -89,11 +88,11 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongConsumer; import java.util.function.Supplier; +import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId; import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncAddRetentionLease; @@ -330,6 +329,7 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v CcrRetentionLeases.asyncRenewRetentionLease( leaderShardId, retentionLeaseId, + RETAIN_ALL, remoteClient, ActionListener.wrap( r -> {}, @@ -343,7 +343,7 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v })); } }, - RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(indexShard.indexSettings().getSettings()), + CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(indexShard.indexSettings().getNodeSettings()), Ccr.CCR_THREAD_POOL_NAME); // TODO: There should be some local timeout. And if the remote cluster returns an unknown session @@ -380,7 +380,7 @@ void acquireRetentionLeaseOnLeader( () -> new ParameterizedMessage("{} requesting leader to add retention lease [{}]", shardId, retentionLeaseId)); final TimeValue timeout = ccrSettings.getRecoveryActionTimeout(); final Optional maybeAddAlready = - syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout); + syncAddRetentionLease(leaderShardId, retentionLeaseId, RETAIN_ALL, remoteClient, timeout); maybeAddAlready.ifPresent(addAlready -> { logger.trace(() -> new ParameterizedMessage( "{} retention lease [{}] already exists, requesting a renewal", @@ -388,7 +388,7 @@ void acquireRetentionLeaseOnLeader( retentionLeaseId), addAlready); final Optional maybeRenewNotFound = - syncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout); + syncRenewRetentionLease(leaderShardId, retentionLeaseId, RETAIN_ALL, remoteClient, timeout); maybeRenewNotFound.ifPresent(renewNotFound -> { logger.trace(() -> new ParameterizedMessage( "{} retention lease [{}] not found while attempting to renew, requesting a final add", @@ -396,7 +396,7 @@ void acquireRetentionLeaseOnLeader( retentionLeaseId), renewNotFound); final Optional maybeFallbackAddAlready = - syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout); + syncAddRetentionLease(leaderShardId, retentionLeaseId, RETAIN_ALL, remoteClient, timeout); maybeFallbackAddAlready.ifPresent(fallbackAddAlready -> { /* * At this point we tried to add the lease and the retention lease already existed. By the time we tried to renew the @@ -409,15 +409,6 @@ void acquireRetentionLeaseOnLeader( }); } - // this setting is intentionally not registered, it is only used in tests - public static final Setting RETENTION_LEASE_RENEW_INTERVAL_SETTING = - Setting.timeSetting( - "index.ccr.retention_lease.renew_interval", - new TimeValue(5, TimeUnit.MINUTES), - new TimeValue(0, TimeUnit.MILLISECONDS), - Setting.Property.Dynamic, - Setting.Property.IndexScope); - @Override public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId leaderShardId) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index d219ddefa066b..fd84725e4bd6e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -131,6 +131,14 @@ protected Collection> nodePlugins() { return Collections.emptyList(); } + protected Settings leaderClusterSettings() { + return Settings.EMPTY; + } + + protected Settings followerClusterSettings() { + return Settings.EMPTY; + } + @Before public final void startClusters() throws Exception { if (clusterGroup != null && reuseClusters()) { @@ -145,7 +153,7 @@ public final void startClusters() throws Exception { MockNioTransportPlugin.class, InternalSettingsPlugin.class); InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(), - numberOfNodesPerCluster(), "leader_cluster", createNodeConfigurationSource(null), 0, "leader", mockPlugins, + numberOfNodesPerCluster(), "leader_cluster", createNodeConfigurationSource(null, true), 0, "leader", mockPlugins, Function.identity()); leaderCluster.beforeTest(random(), 0.0D); leaderCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster()); @@ -156,7 +164,7 @@ public final void startClusters() throws Exception { String address = leaderCluster.getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); InternalTestCluster followerCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(), - numberOfNodesPerCluster(), "follower_cluster", createNodeConfigurationSource(address), 0, "follower", + numberOfNodesPerCluster(), "follower_cluster", createNodeConfigurationSource(address, false), 0, "follower", mockPlugins, Function.identity()); clusterGroup = new ClusterGroup(leaderCluster, followerCluster); @@ -203,7 +211,7 @@ public void afterTest() throws Exception { } } - private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedAddress) { + private NodeConfigurationSource createNodeConfigurationSource(final String leaderSeedAddress, final boolean leaderCluster) { Settings.Builder builder = Settings.builder(); builder.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE); // Default the watermarks to absurdly low to prevent the tests @@ -225,6 +233,11 @@ private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedA builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); // Let cluster state api return quickly in order to speed up auto follow tests: builder.put(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); + if (leaderCluster) { + builder.put(leaderClusterSettings()); + } else { + builder.put(followerClusterSettings()); + } if (configureRemoteClusterViaNodeSettings() && leaderSeedAddress != null) { builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index eb25bd8c34288..44b38ddbadfa0 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -33,6 +34,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseActions; +import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -40,14 +42,19 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.RestoreService; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportActionProxy; +import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.repository.CcrRepository; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; +import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import java.io.IOException; @@ -83,7 +90,7 @@ public static final class RetentionLeaseRenewIntervalSettingPlugin extends Plugi @Override public List> getSettings() { - return Collections.singletonList(CcrRepository.RETENTION_LEASE_RENEW_INTERVAL_SETTING); + return Collections.singletonList(CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING); } } @@ -105,6 +112,13 @@ protected Collection> nodePlugins() { .collect(Collectors.toList()); } + @Override + protected Settings followerClusterSettings() { + return Settings.builder() + .put(CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200)) + .build(); + } + private final IndicesOptions indicesOptions = IndicesOptions.strictSingleIndexNoExpandForbidClosed(); private RestoreSnapshotRequest setUpRestoreSnapshotRequest( @@ -140,7 +154,6 @@ private RestoreSnapshotRequest setUpRestoreSnapshotRequest( final Settings.Builder settingsBuilder = Settings.builder() .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex) - .put(CcrRepository.RETENTION_LEASE_RENEW_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200)) .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); return new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST) .indexSettings(settingsBuilder) @@ -227,42 +240,7 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception { restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); try { - // ensure that a retention lease has been put in place on each shard, and grab a copy of them - final List retentionLeases = new ArrayList<>(); - assertBusy(() -> { - retentionLeases.clear(); - final IndicesStatsResponse stats = - leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); - assertNotNull(stats.getShards()); - assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); - final List shardsStats = getShardsStats(stats); - for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { - final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); - assertThat(currentRetentionLeases.leases(), hasSize(1)); - final RetentionLease retentionLease = - currentRetentionLeases.leases().iterator().next(); - assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); - retentionLeases.add(currentRetentionLeases); - } - }); - - // now ensure that the retention leases are being renewed - assertBusy(() -> { - final IndicesStatsResponse stats = - leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); - assertNotNull(stats.getShards()); - assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); - final List shardsStats = getShardsStats(stats); - for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { - final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); - assertThat(currentRetentionLeases.leases(), hasSize(1)); - final RetentionLease retentionLease = - currentRetentionLeases.leases().iterator().next(); - assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); - // we assert that retention leases are being renewed by an increase in the timestamp - assertThat(retentionLease.timestamp(), greaterThan(retentionLeases.get(i).leases().iterator().next().timestamp())); - } - }); + assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex); latch.countDown(); } finally { for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { @@ -354,15 +332,7 @@ public void testRetentionLeasesAreNotBeingRenewedAfterRecoveryCompletes() throws * After we wake up, it should be the case that the retention leases are the same (same timestamp) as that indicates that they were * not renewed while we were sleeping. */ - final TimeValue renewIntervalSetting = CcrRepository.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get( - followerClient() - .admin() - .indices() - .prepareGetSettings(followerIndex) - .get() - .getIndexToSettings() - .get(followerIndex)); - + final TimeValue renewIntervalSetting = CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(followerClusterSettings()); final long renewEnd = System.nanoTime(); Thread.sleep(Math.max(0, randomIntBetween(2, 4) * renewIntervalSetting.millis() - TimeUnit.NANOSECONDS.toMillis(renewEnd - start))); @@ -404,6 +374,7 @@ public void testUnfollowRemovesRetentionLeases() throws Exception { final String leaderIndexSettings = getIndexSettings(numberOfShards, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderYellow(leaderIndex); final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); @@ -469,11 +440,8 @@ public void testUnfollowRemovesRetentionLeases() throws Exception { }); } - - - pauseFollow(followerIndex); - followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet(); + assertAcked(followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet()); assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet()); final IndicesStatsResponse afterUnfollowStats = @@ -498,6 +466,7 @@ public void testUnfollowFailsToRemoveRetentionLeases() throws Exception { final String leaderIndexSettings = getIndexSettings(numberOfShards, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderYellow(leaderIndex); final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); @@ -560,6 +529,438 @@ public void testUnfollowFailsToRemoveRetentionLeases() throws Exception { } } + public void testRetentionLeaseRenewedWhileFollowing() throws Exception { + final String leaderIndex = "leader"; + final String followerIndex = "follower"; + final int numberOfShards = randomIntBetween(1, 4); + final int numberOfReplicas = randomIntBetween(0, 1); + final Map additionalIndexSettings = new HashMap<>(); + additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true)); + additionalIndexSettings.put( + IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), + TimeValue.timeValueMillis(200).getStringRep()); + final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderYellow(leaderIndex); + final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, followerIndex); + assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex); + } + + public void testRetentionLeaseAdvancesWhileFollowing() throws Exception { + final String leaderIndex = "leader"; + final String followerIndex = "follower"; + final int numberOfShards = randomIntBetween(1, 4); + final int numberOfReplicas = randomIntBetween(0, 1); + final Map additionalIndexSettings = new HashMap<>(); + additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true)); + additionalIndexSettings.put( + IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), + TimeValue.timeValueMillis(200).getStringRep()); + final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderYellow(leaderIndex); + final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, followerIndex); + + final int numberOfDocuments = randomIntBetween(128, 2048); + logger.debug("indexing [{}] docs", numberOfDocuments); + for (int i = 0; i < numberOfDocuments; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + leaderClient().prepareIndex(leaderIndex, "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + if (rarely()) { + leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get(); + } + } + + // wait until the follower global checkpoints have caught up to the leader + assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex); + + final List leaderShardsStats = getShardsStats(leaderClient().admin().indices().prepareStats(leaderIndex).get()); + final Map leaderGlobalCheckpoints = new HashMap<>(); + for (final ShardStats leaderShardStats : leaderShardsStats) { + final ShardRouting routing = leaderShardStats.getShardRouting(); + if (routing.primary() == false) { + continue; + } + leaderGlobalCheckpoints.put(routing.id(), leaderShardStats.getSeqNoStats().getGlobalCheckpoint()); + } + + // now assert that the retention leases have advanced to the global checkpoints + assertBusy(() -> { + final IndicesStatsResponse stats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); + final List shardsStats = getShardsStats(stats); + for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); + assertThat(currentRetentionLeases.leases(), hasSize(1)); + final RetentionLease retentionLease = + currentRetentionLeases.leases().iterator().next(); + assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); + // we assert that retention leases are being advanced + assertThat( + retentionLease.retainingSequenceNumber(), + equalTo(leaderGlobalCheckpoints.get(shardsStats.get(i).getShardRouting().id()))); + } + }); + } + + @TestLogging(value = "org.elasticsearch.xpack.ccr:trace") + public void testRetentionLeaseRenewalIsCancelledWhenFollowingIsPaused() throws Exception { + final String leaderIndex = "leader"; + final String followerIndex = "follower"; + final int numberOfShards = randomIntBetween(1, 4); + final int numberOfReplicas = randomIntBetween(0, 1); + final Map additionalIndexSettings = new HashMap<>(); + additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true)); + additionalIndexSettings.put( + IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), + TimeValue.timeValueMillis(200).getStringRep()); + final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderYellow(leaderIndex); + final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, followerIndex); + + final long start = System.nanoTime(); + pauseFollow(followerIndex); + + /* + * We want to ensure that the retention leases have been synced to all shard copies, as otherwise they might sync between the two + * times that we sample the retention leases, which would cause our check to fail. + */ + final TimeValue syncIntervalSetting = IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.get( + leaderClient() + .admin() + .indices() + .prepareGetSettings(leaderIndex) + .get() + .getIndexToSettings() + .get(leaderIndex)); + final long syncEnd = System.nanoTime(); + Thread.sleep(Math.max(0, randomIntBetween(2, 4) * syncIntervalSetting.millis() - TimeUnit.NANOSECONDS.toMillis(syncEnd - start))); + + final ClusterStateResponse leaderIndexClusterState = + leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get(); + final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID(); + + // sample the leases after pausing + final List retentionLeases = new ArrayList<>(); + assertBusy(() -> { + retentionLeases.clear(); + final IndicesStatsResponse stats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); + final List shardsStats = getShardsStats(stats); + for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); + assertThat(currentRetentionLeases.leases(), hasSize(1)); + final ClusterStateResponse followerIndexClusterState = + followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); + final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); + final RetentionLease retentionLease = + currentRetentionLeases.leases().iterator().next(); + final String expectedRetentionLeaseId = retentionLeaseId( + getFollowerCluster().getClusterName(), + new Index(followerIndex, followerUUID), + getLeaderCluster().getClusterName(), + new Index(leaderIndex, leaderUUID)); + assertThat(retentionLease.id(), equalTo(expectedRetentionLeaseId)); + retentionLeases.add(currentRetentionLeases); + } + }); + + /* + * We want to ensure that the background renewal is cancelled after pausing. To do this, we will sleep a small multiple of the renew + * interval. If the renews are not cancelled, we expect that a renewal would have been sent while we were sleeping. After we wake + * up, it should be the case that the retention leases are the same (same timestamp) as that indicates that they were not renewed + * while we were sleeping. + */ + final TimeValue renewIntervalSetting = CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(followerClusterSettings()); + final long renewEnd = System.nanoTime(); + Thread.sleep(Math.max(0, randomIntBetween(2, 4) * renewIntervalSetting.millis() - TimeUnit.NANOSECONDS.toMillis(renewEnd - start))); + + // now ensure that the retention leases are the same + assertBusy(() -> { + final IndicesStatsResponse stats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); + final List shardsStats = getShardsStats(stats); + for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { + if (shardsStats.get(i).getShardRouting().primary() == false) { + continue; + } + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); + assertThat(currentRetentionLeases.leases(), hasSize(1)); + final ClusterStateResponse followerIndexClusterState = + followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); + final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); + final RetentionLease retentionLease = + currentRetentionLeases.leases().iterator().next(); + assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID))); + // we assert that retention leases are not being renewed by an unchanged timestamp + assertThat(retentionLease.timestamp(), equalTo(retentionLeases.get(i).leases().iterator().next().timestamp())); + } + }); + } + + public void testRetentionLeaseRenewalIsResumedWhenFollowingIsResumed() throws Exception { + final String leaderIndex = "leader"; + final String followerIndex = "follower"; + final int numberOfShards = randomIntBetween(1, 4); + final int numberOfReplicas = randomIntBetween(0, 1); + final Map additionalIndexSettings = new HashMap<>(); + additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true)); + additionalIndexSettings.put( + IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), + TimeValue.timeValueMillis(200).getStringRep()); + final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderYellow(leaderIndex); + final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, followerIndex); + + pauseFollow(followerIndex); + + followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow(followerIndex)).actionGet(); + + ensureFollowerGreen(true, followerIndex); + + assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex); + } + + public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Exception { + final String leaderIndex = "leader"; + final String followerIndex = "follower"; + final int numberOfShards = 1; + final int numberOfReplicas = 1; + final Map additionalIndexSettings = new HashMap<>(); + additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true)); + additionalIndexSettings.put( + IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), + TimeValue.timeValueMillis(200).getStringRep()); + final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderYellow(leaderIndex); + final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, followerIndex); + + final CountDownLatch latch = new CountDownLatch(1); + + final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); + for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getNodes().values()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + senderTransportService.addSendBehavior( + (connection, requestId, action, request, options) -> { + if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action) + || TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) { + senderTransportService.clearAllRules(); + final RetentionLeaseActions.RenewRequest renewRequest = (RetentionLeaseActions.RenewRequest) request; + final String primaryShardNodeId = + getLeaderCluster() + .clusterService() + .state() + .routingTable() + .index(leaderIndex) + .shard(renewRequest.getShardId().id()) + .primaryShard() + .currentNodeId(); + final String primaryShardNodeName = + getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName(); + final IndexShard primary = + getLeaderCluster() + .getInstance(IndicesService.class, primaryShardNodeName) + .getShardOrNull(renewRequest.getShardId()); + final CountDownLatch innerLatch = new CountDownLatch(1); + // this forces the background renewal from following to face a retention lease not found exception + primary.removeRetentionLease( + getRetentionLeaseId(followerIndex, leaderIndex), + ActionListener.wrap(r -> innerLatch.countDown(), e -> fail(e.toString()))); + + try { + innerLatch.await(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + fail(e.toString()); + } + + latch.countDown(); + } + connection.sendRequest(requestId, action, request, options); + }); + } + + latch.await(); + + assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex); + } + + /** + * This test is fairly evil. This test is to ensure that we are protected against a race condition when unfollowing and a background + * renewal fires. The action of unfollowing will remove retention leases from the leader. If a background renewal is firing at that + * time, it means that we will be met with a retention lease not found exception. That will in turn trigger behavior to attempt to + * re-add the retention lease, which means we are left in a situation where we have unfollowed, but the retention lease still remains + * on the leader. However, we have a guard against this in the callback after the retention lease not found exception is thrown, which + * checks if the shard follow node task is cancelled or completed. + * + * To test this this behavior is correct, we capture the call to renew the retention lease. Then, we will step in between and execute + * an unfollow request. This will remove the retention lease on the leader. At this point, we can unlatch the renew call, which will + * now be met with a retention lease not found exception. We will cheat and wait for that response to come back, and then synchronously + * trigger the listener which will check to see if the shard follow node task is cancelled or completed, and if not, add the retention + * lease back. After that listener returns, we can check to see if a retention lease exists on the leader. + * + * Note, this done mean that listener will fire twice, once in our onResponseReceived hook, and once after our onResponseReceived + * callback returns. 🤷‍♀️ + * + * @throws Exception if an exception occurs in the main test thread + */ + public void testPeriodicRenewalDoesNotAddRetentionLeaseAfterUnfollow() throws Exception { + final String leaderIndex = "leader"; + final String followerIndex = "follower"; + final int numberOfShards = 1; + final int numberOfReplicas = 1; + final Map additionalIndexSettings = new HashMap<>(); + additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true)); + additionalIndexSettings.put( + IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), + TimeValue.timeValueMillis(200).getStringRep()); + final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderYellow(leaderIndex); + final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, followerIndex); + + final CountDownLatch removeLeaseLatch = new CountDownLatch(1); + final CountDownLatch unfollowLatch = new CountDownLatch(1); + final CountDownLatch responseLatch = new CountDownLatch(1); + + final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); + + try { + for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getNodes().values()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + senderTransportService.addSendBehavior( + (connection, requestId, action, request, options) -> { + if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action) + || TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) { + final String retentionLeaseId = getRetentionLeaseId(followerIndex, leaderIndex); + try { + removeLeaseLatch.countDown(); + unfollowLatch.await(); + + senderTransportService.transport().addMessageListener(new TransportMessageListener() { + + @SuppressWarnings("rawtypes") + @Override + public void onResponseReceived( + final long responseRequestId, + final Transport.ResponseContext context) { + if (requestId == responseRequestId) { + final RetentionLeaseNotFoundException e = + new RetentionLeaseNotFoundException(retentionLeaseId); + context.handler().handleException(new RemoteTransportException(e.getMessage(), e)); + responseLatch.countDown(); + senderTransportService.transport().removeMessageListener(this); + } + } + + }); + + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + fail(e.toString()); + } + } + connection.sendRequest(requestId, action, request, options); + }); + } + + removeLeaseLatch.await(); + + pauseFollow(followerIndex); + assertAcked(followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet()); + assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet()); + + unfollowLatch.countDown(); + + responseLatch.await(); + + final IndicesStatsResponse afterUnfollowStats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + final List afterUnfollowShardsStats = getShardsStats(afterUnfollowStats); + for (final ShardStats shardStats : afterUnfollowShardsStats) { + assertThat(shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty()); + } + } finally { + for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + senderTransportService.clearAllRules(); + } + } + } + + private void assertRetentionLeaseRenewal( + final int numberOfShards, + final int numberOfReplicas, + final String followerIndex, + final String leaderIndex) throws Exception { + // ensure that a retention lease has been put in place on each shard, and grab a copy of them + final List retentionLeases = new ArrayList<>(); + assertBusy(() -> { + retentionLeases.clear(); + final IndicesStatsResponse stats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); + final List shardsStats = getShardsStats(stats); + for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); + assertThat(currentRetentionLeases.leases(), hasSize(1)); + final RetentionLease retentionLease = + currentRetentionLeases.leases().iterator().next(); + assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); + retentionLeases.add(currentRetentionLeases); + } + }); + + // now ensure that the retention leases are being renewed + assertBusy(() -> { + final IndicesStatsResponse stats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); + final List shardsStats = getShardsStats(stats); + for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); + assertThat(currentRetentionLeases.leases(), hasSize(1)); + final RetentionLease retentionLease = + currentRetentionLeases.leases().iterator().next(); + assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); + // we assert that retention leases are being renewed by an increase in the timestamp + assertThat(retentionLease.timestamp(), greaterThan(retentionLeases.get(i).leases().iterator().next().timestamp())); + } + }); + } + /** * Extract the shard stats from an indices stats response, with the stats ordered by shard ID with primaries first. This is to have a * consistent ordering when comparing two responses. diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index 88412aa8fd3b4..46c7c51586c53 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; @@ -32,6 +33,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; +import java.util.function.LongSupplier; import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; @@ -177,6 +179,23 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co threadPool.generic().execute(task); } + @Override + protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) { + return new Scheduler.Cancellable() { + + @Override + public boolean cancel() { + return true; + } + + @Override + public boolean isCancelled() { + return true; + } + + }; + } + @Override protected boolean isStopped() { return stopped.get(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 178f09c86835a..09d00dc6a33ac 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -8,13 +8,16 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; @@ -27,12 +30,17 @@ import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; +import java.util.function.LongSupplier; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.contains; @@ -53,6 +61,9 @@ public class ShardFollowNodeTaskTests extends ESTestCase { private Consumer beforeSendShardChangesRequest = status -> {}; + private AtomicBoolean scheduleRetentionLeaseRenewal = new AtomicBoolean(); + private LongConsumer retentionLeaseRenewal = followerGlobalCheckpoint -> {}; + private AtomicBoolean simulateResponse = new AtomicBoolean(); private Queue readFailures; @@ -936,6 +947,28 @@ public void testComputeDelay() { assertThat(ShardFollowNodeTask.computeDelay(1024, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(1000L))); } + public void testRetentionLeaseRenewal() throws InterruptedException { + scheduleRetentionLeaseRenewal.set(true); + final CountDownLatch latch = new CountDownLatch(1); + final long expectedFollowerGlobalChekcpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + retentionLeaseRenewal = followerGlobalCheckpoint -> { + assertThat(followerGlobalCheckpoint, equalTo(expectedFollowerGlobalChekcpoint)); + latch.countDown(); + }; + + final ShardFollowTaskParams params = new ShardFollowTaskParams(); + final ShardFollowNodeTask task = createShardFollowTask(params); + + try { + startTask(task, randomLongBetween(expectedFollowerGlobalChekcpoint, Long.MAX_VALUE), expectedFollowerGlobalChekcpoint); + latch.await(); + } finally { + task.onCancelled(); + scheduleRetentionLeaseRenewal.set(false); + } + } + + static final class ShardFollowTaskParams { private String remoteCluster = null; private ShardId followShardId = new ShardId("follow_index", "", 0); @@ -1063,6 +1096,47 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con } } + @Override + protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) { + if (scheduleRetentionLeaseRenewal.get()) { + final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); + final ScheduledFuture future = scheduler.scheduleWithFixedDelay( + () -> retentionLeaseRenewal.accept(followerGlobalCheckpoint.getAsLong()), + 0, + TimeValue.timeValueMillis(200).millis(), + TimeUnit.MILLISECONDS); + return new Scheduler.Cancellable() { + + @Override + public boolean cancel() { + final boolean cancel = future.cancel(true); + scheduler.shutdown(); + return cancel; + } + + @Override + public boolean isCancelled() { + return future.isCancelled(); + } + + }; + } else { + return new Scheduler.Cancellable() { + + @Override + public boolean cancel() { + return true; + } + + @Override + public boolean isCancelled() { + return true; + } + + }; + } + } + @Override protected boolean isStopped() { return super.isStopped() || stopped.get(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 1f5f487580bee..dd495ee5ca587 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -47,7 +48,9 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ccr.CcrRetentionLeases; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; @@ -69,6 +72,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; +import java.util.function.LongSupplier; import java.util.stream.Collectors; import static java.util.Collections.emptyMap; @@ -390,6 +394,28 @@ public void testSimpleRemoteRecovery() throws Exception { } } + public void testRetentionLeaseManagement() throws Exception { + try (ReplicationGroup leader = createLeaderGroup(0)) { + leader.startAll(); + try (ReplicationGroup follower = createFollowGroup(leader, 0)) { + follower.startAll(); + final ShardFollowNodeTask task = createShardFollowTask(leader, follower); + task.start( + follower.getPrimary().getHistoryUUID(), + leader.getPrimary().getGlobalCheckpoint(), + leader.getPrimary().seqNoStats().getMaxSeqNo(), + follower.getPrimary().getGlobalCheckpoint(), + follower.getPrimary().seqNoStats().getMaxSeqNo()); + final Scheduler.Cancellable renewable = task.getRenewable(); + assertNotNull(renewable); + assertFalse(renewable.isCancelled()); + task.onCancelled(); + assertTrue(renewable.isCancelled()); + assertNull(task.getRenewable()); + } + } + } + private ReplicationGroup createLeaderGroup(int replicas) throws IOException { Settings settings = Settings.builder() .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) @@ -399,10 +425,12 @@ private ReplicationGroup createLeaderGroup(int replicas) throws IOException { } private ReplicationGroup createFollowGroup(ReplicationGroup leaderGroup, int replicas) throws IOException { - Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) - .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB)) - .build(); + final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put( + IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), + new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB)) + .build(); IndexMetaData indexMetaData = buildIndexMetaData(replicas, settings, indexMapping); return new ReplicationGroup(indexMetaData) { @Override @@ -543,6 +571,27 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co threadPool.executor(ThreadPool.Names.GENERIC).execute(task); } + @Override + protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) { + final String retentionLeaseId = CcrRetentionLeases.retentionLeaseId( + "follower", + followerGroup.getPrimary().routingEntry().index(), + "remote", + leaderGroup.getPrimary().routingEntry().index()); + final PlainActionFuture response = new PlainActionFuture<>(); + leaderGroup.addRetentionLease( + retentionLeaseId, + followerGlobalCheckpoint.getAsLong(), + "ccr", + ActionListener.wrap(response::onResponse, e -> fail(e.toString()))); + response.actionGet(); + return threadPool.scheduleWithFixedDelay( + () -> leaderGroup.renewRetentionLease(retentionLeaseId, followerGlobalCheckpoint.getAsLong(), "ccr"), + CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get( + followerGroup.getPrimary().indexSettings().getSettings()), + ThreadPool.Names.GENERIC); + } + @Override protected boolean isStopped() { return super.isStopped() || stopped.get();