From 6adedc1d0d8114a03db301fb3bc9b1eedd27d9c2 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 23 Feb 2019 13:41:19 -0500 Subject: [PATCH 01/12] Renew retention leases while following This commit is the final piece of the integration of CCR with retention leases. Namely, we periodically renew retention leases and advance the retaining sequence number while following. --- .../ESIndexLevelReplicationTestCase.java | 8 +- .../xpack/ccr/CcrRetentionLeases.java | 60 ++- .../xpack/ccr/action/ShardFollowNodeTask.java | 25 +- .../ccr/action/ShardFollowTasksExecutor.java | 93 +++++ .../xpack/ccr/repository/CcrRepository.java | 21 +- .../elasticsearch/xpack/CcrIntegTestCase.java | 19 +- .../xpack/ccr/CcrRetentionLeaseIT.java | 380 +++++++++++++++--- .../ShardFollowNodeTaskRandomTests.java | 19 + .../ccr/action/ShardFollowNodeTaskTests.java | 74 ++++ .../ShardFollowTaskReplicationTests.java | 57 ++- 10 files changed, 659 insertions(+), 97 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 85e69de8824c9..d20d57aed64a9 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -540,12 +540,16 @@ protected void syncRetentionLeases(ShardId shardId, RetentionLeases leases, Acti new SyncRetentionLeases(request, ReplicationGroup.this, wrappedListener).execute(); } - public RetentionLease addRetentionLease(String id, long retainingSequenceNumber, String source, + public synchronized RetentionLease addRetentionLease(String id, long retainingSequenceNumber, String source, ActionListener listener) { return getPrimary().addRetentionLease(id, retainingSequenceNumber, source, listener); } - public void removeRetentionLease(String id, ActionListener listener) { + public synchronized RetentionLease renewRetentionLease(String id, long retainingSequenceNumber, String source) { + return getPrimary().renewRetentionLease(id, retainingSequenceNumber, source); + } + + public synchronized void removeRetentionLease(String id, ActionListener listener) { getPrimary().removeRetentionLease(id, listener); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java index 6afef8c42aa8b..a65069dc51809 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRetentionLeases.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.seqno.RetentionLeaseActions; @@ -18,11 +19,18 @@ import java.util.Locale; import java.util.Optional; - -import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; +import java.util.concurrent.TimeUnit; public class CcrRetentionLeases { + // this setting is intentionally not registered, it is only used in tests + public static final Setting RETENTION_LEASE_RENEW_INTERVAL_SETTING = + Setting.timeSetting( + "index.ccr.retention_lease.renew_interval", + new TimeValue(5, TimeUnit.MINUTES), + new TimeValue(0, TimeUnit.MILLISECONDS), + Setting.Property.NodeScope); + /** * The retention lease ID used by followers. * @@ -52,20 +60,22 @@ public static String retentionLeaseId( * Synchronously requests to add a retention lease with the specified retention lease ID on the specified leader shard using the given * remote client. Note that this method will block up to the specified timeout. * - * @param leaderShardId the leader shard ID - * @param retentionLeaseId the retention lease ID - * @param remoteClient the remote client on which to execute this request - * @param timeout the timeout + * @param leaderShardId the leader shard ID + * @param retentionLeaseId the retention lease ID + * @param retainingSequenceNumber the retaining sequence number + * @param remoteClient the remote client on which to execute this request + * @param timeout the timeout * @return an optional exception indicating whether or not the retention lease already exists */ public static Optional syncAddRetentionLease( final ShardId leaderShardId, final String retentionLeaseId, + final long retainingSequenceNumber, final Client remoteClient, final TimeValue timeout) { try { final PlainActionFuture response = new PlainActionFuture<>(); - asyncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response); + asyncAddRetentionLease(leaderShardId, retentionLeaseId, retainingSequenceNumber, remoteClient, response); response.actionGet(timeout); return Optional.empty(); } catch (final RetentionLeaseAlreadyExistsException e) { @@ -78,18 +88,20 @@ public static Optional syncAddRetentionLea * remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a response * or failure. * - * @param leaderShardId the leader shard ID - * @param retentionLeaseId the retention lease ID - * @param remoteClient the remote client on which to execute this request - * @param listener the listener + * @param leaderShardId the leader shard ID + * @param retentionLeaseId the retention lease ID + * @param retainingSequenceNumber the retaining sequence number + * @param remoteClient the remote client on which to execute this request + * @param listener the listener */ public static void asyncAddRetentionLease( final ShardId leaderShardId, final String retentionLeaseId, + final long retainingSequenceNumber, final Client remoteClient, final ActionListener listener) { final RetentionLeaseActions.AddRequest request = - new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr"); + new RetentionLeaseActions.AddRequest(leaderShardId, retentionLeaseId, retainingSequenceNumber, "ccr"); remoteClient.execute(RetentionLeaseActions.Add.INSTANCE, request, listener); } @@ -97,20 +109,22 @@ public static void asyncAddRetentionLease( * Synchronously requests to renew a retention lease with the specified retention lease ID on the specified leader shard using the given * remote client. Note that this method will block up to the specified timeout. * - * @param leaderShardId the leader shard ID - * @param retentionLeaseId the retention lease ID - * @param remoteClient the remote client on which to execute this request - * @param timeout the timeout + * @param leaderShardId the leader shard ID + * @param retentionLeaseId the retention lease ID + * @param retainingSequenceNumber the retaining sequence number + * @param remoteClient the remote client on which to execute this request + * @param timeout the timeout * @return an optional exception indicating whether or not the retention lease already exists */ public static Optional syncRenewRetentionLease( final ShardId leaderShardId, final String retentionLeaseId, + final long retainingSequenceNumber, final Client remoteClient, final TimeValue timeout) { try { final PlainActionFuture response = new PlainActionFuture<>(); - asyncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, response); + asyncRenewRetentionLease(leaderShardId, retentionLeaseId, retainingSequenceNumber, remoteClient, response); response.actionGet(timeout); return Optional.empty(); } catch (final RetentionLeaseNotFoundException e) { @@ -123,18 +137,20 @@ public static Optional syncRenewRetentionLease( * given remote client. Note that this method will return immediately, with the specified listener callback invoked to indicate a * response or failure. * - * @param leaderShardId the leader shard ID - * @param retentionLeaseId the retention lease ID - * @param remoteClient the remote client on which to execute this request - * @param listener the listener + * @param leaderShardId the leader shard ID + * @param retentionLeaseId the retention lease ID + * @param retainingSequenceNumber the retaining sequence number + * @param remoteClient the remote client on which to execute this request + * @param listener the listener */ public static void asyncRenewRetentionLease( final ShardId leaderShardId, final String retentionLeaseId, + final long retainingSequenceNumber, final Client remoteClient, final ActionListener listener) { final RetentionLeaseActions.RenewRequest request = - new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, RETAIN_ALL, "ccr"); + new RetentionLeaseActions.RenewRequest(leaderShardId, retentionLeaseId, retainingSequenceNumber, "ccr"); remoteClient.execute(RetentionLeaseActions.Renew.INSTANCE, request, listener); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 3918b815e9150..cf7046193c8a4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -30,9 +30,10 @@ import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.transport.NoSuchRemoteClusterException; +import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; @@ -94,6 +95,12 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private volatile ElasticsearchException fatalException; + private Scheduler.Cancellable renewable; + + Scheduler.Cancellable getRenewable() { + return renewable; + } + ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map headers, ShardFollowTask params, BiConsumer scheduler, final LongSupplier relativeTimeProvider) { super(id, type, action, description, parentTask, headers); @@ -151,6 +158,14 @@ void start( coordinateReads(); }); }); + + synchronized (this) { + renewable = scheduleBackgroundRetentionLeaseRenewal(() -> { + synchronized (ShardFollowNodeTask.this) { + return this.followerGlobalCheckpoint; + } + }); + } } synchronized void coordinateReads() { @@ -507,8 +522,16 @@ protected abstract void innerSendBulkShardOperationsRequest(String followerHisto protected abstract void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer handler, Consumer errorHandler); + protected abstract Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(LongSupplier followerGlobalCheckpoint); + @Override protected void onCancelled() { + synchronized (this) { + if (renewable != null) { + renewable.cancel(); + renewable = null; + } + } markAsCompleted(); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 46b3c6e54f576..568f6cb75d726 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -9,6 +9,8 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -32,10 +34,14 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.seqno.RetentionLeaseActions; +import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException; +import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; @@ -45,9 +51,11 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.NoSuchRemoteClusterException; import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.ccr.CcrRetentionLeases; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; @@ -60,6 +68,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; +import java.util.function.LongSupplier; import java.util.function.Supplier; import static org.elasticsearch.xpack.ccr.CcrLicenseChecker.wrapClient; @@ -73,6 +82,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor this.waitForMetadataTimeOut = newVal); @@ -245,6 +256,88 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co errorHandler.accept(e); } } + + @Override + protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) { + final String retentionLeaseId = CcrRetentionLeases.retentionLeaseId( + clusterService.getClusterName().value(), + params.getFollowShardId().getIndex(), + params.getRemoteCluster(), + params.getLeaderShardId().getIndex()); + + /* + * We are going to attempt to renew the retention lease. If this fails it is either because the retention lease does not + * lease does not exist, or something else happened. If the retention lease does not exist, we will attempt + * to add the retention lease again. If that fails, it had better not be because the retention lease already + * exists. Either way, we will attempt to renew again on the next scheduled execution. We will start by + * creating listeners + */ + final ActionListener listener = ActionListener.wrap( + r -> {}, + e -> { + final Throwable cause = ExceptionsHelper.unwrapCause(e); + logRetentionLeaseFailure(retentionLeaseId, cause); + // noinspection StatementWithEmptyBody + if (cause instanceof RetentionLeaseNotFoundException) { + logger.trace( + "{} background adding retention lease [{}] while following", + params.getFollowShardId(), + retentionLeaseId); + CcrRetentionLeases.asyncAddRetentionLease( + params.getLeaderShardId(), + retentionLeaseId, + followerGlobalCheckpoint.getAsLong(), + remoteClient(params), + ActionListener.wrap( + r -> {}, + inner -> { + /* + * If this fails that the retention lease already exists, something highly unusual is + * going on. Log it, and renew again after another renew interval has passed. + */ + final Throwable innerCause = ExceptionsHelper.unwrapCause(inner); + assert innerCause instanceof RetentionLeaseAlreadyExistsException == false; + logRetentionLeaseFailure(retentionLeaseId, innerCause); + })); + } else { + /* + * If something else happened, we will attempt to renew again after another renew interval + * has passed. + */ + } + }); + + return threadPool.scheduleWithFixedDelay( + () -> { + final ThreadContext threadContext = threadPool.getThreadContext(); + try (ThreadContext.StoredContext ignore = threadContext.stashContext()) { + // we have to execute under the system context so that if security is enabled the management is authorized + threadContext.markAsSystemContext(); + logger.trace( + "{} background renewing retention lease [{}] while following", + params.getFollowShardId(), + retentionLeaseId); + CcrRetentionLeases.asyncRenewRetentionLease( + params.getLeaderShardId(), + retentionLeaseId, + followerGlobalCheckpoint.getAsLong(), + remoteClient(params), + listener); + } + }, + retentionLeaseRenewInterval, + Ccr.CCR_THREAD_POOL_NAME); + } + + private void logRetentionLeaseFailure(final String retentionLeaseId, final Throwable cause) { + assert cause instanceof ElasticsearchSecurityException == false : cause; + logger.warn(new ParameterizedMessage( + "{} background management of retention lease [{}] failed while following", + params.getFollowShardId(), + retentionLeaseId), + cause); + } + }; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 9c580bc245406..0b445a3eb01ef 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -35,7 +35,6 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.metrics.CounterMetric; -import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -89,11 +88,11 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongConsumer; import java.util.function.Supplier; +import static org.elasticsearch.index.seqno.RetentionLeaseActions.RETAIN_ALL; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId; import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.syncAddRetentionLease; @@ -330,6 +329,7 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v CcrRetentionLeases.asyncRenewRetentionLease( leaderShardId, retentionLeaseId, + RETAIN_ALL, remoteClient, ActionListener.wrap( r -> {}, @@ -343,7 +343,7 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v })); } }, - RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(indexShard.indexSettings().getSettings()), + CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(indexShard.indexSettings().getNodeSettings()), Ccr.CCR_THREAD_POOL_NAME); // TODO: There should be some local timeout. And if the remote cluster returns an unknown session @@ -380,7 +380,7 @@ void acquireRetentionLeaseOnLeader( () -> new ParameterizedMessage("{} requesting leader to add retention lease [{}]", shardId, retentionLeaseId)); final TimeValue timeout = ccrSettings.getRecoveryActionTimeout(); final Optional maybeAddAlready = - syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout); + syncAddRetentionLease(leaderShardId, retentionLeaseId, RETAIN_ALL, remoteClient, timeout); maybeAddAlready.ifPresent(addAlready -> { logger.trace(() -> new ParameterizedMessage( "{} retention lease [{}] already exists, requesting a renewal", @@ -388,7 +388,7 @@ void acquireRetentionLeaseOnLeader( retentionLeaseId), addAlready); final Optional maybeRenewNotFound = - syncRenewRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout); + syncRenewRetentionLease(leaderShardId, retentionLeaseId, RETAIN_ALL, remoteClient, timeout); maybeRenewNotFound.ifPresent(renewNotFound -> { logger.trace(() -> new ParameterizedMessage( "{} retention lease [{}] not found while attempting to renew, requesting a final add", @@ -396,7 +396,7 @@ void acquireRetentionLeaseOnLeader( retentionLeaseId), renewNotFound); final Optional maybeFallbackAddAlready = - syncAddRetentionLease(leaderShardId, retentionLeaseId, remoteClient, timeout); + syncAddRetentionLease(leaderShardId, retentionLeaseId, RETAIN_ALL, remoteClient, timeout); maybeFallbackAddAlready.ifPresent(fallbackAddAlready -> { /* * At this point we tried to add the lease and the retention lease already existed. By the time we tried to renew the @@ -409,15 +409,6 @@ void acquireRetentionLeaseOnLeader( }); } - // this setting is intentionally not registered, it is only used in tests - public static final Setting RETENTION_LEASE_RENEW_INTERVAL_SETTING = - Setting.timeSetting( - "index.ccr.retention_lease.renew_interval", - new TimeValue(5, TimeUnit.MINUTES), - new TimeValue(0, TimeUnit.MILLISECONDS), - Setting.Property.Dynamic, - Setting.Property.IndexScope); - @Override public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId leaderShardId) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index d219ddefa066b..fd84725e4bd6e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -131,6 +131,14 @@ protected Collection> nodePlugins() { return Collections.emptyList(); } + protected Settings leaderClusterSettings() { + return Settings.EMPTY; + } + + protected Settings followerClusterSettings() { + return Settings.EMPTY; + } + @Before public final void startClusters() throws Exception { if (clusterGroup != null && reuseClusters()) { @@ -145,7 +153,7 @@ public final void startClusters() throws Exception { MockNioTransportPlugin.class, InternalSettingsPlugin.class); InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(), - numberOfNodesPerCluster(), "leader_cluster", createNodeConfigurationSource(null), 0, "leader", mockPlugins, + numberOfNodesPerCluster(), "leader_cluster", createNodeConfigurationSource(null, true), 0, "leader", mockPlugins, Function.identity()); leaderCluster.beforeTest(random(), 0.0D); leaderCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster()); @@ -156,7 +164,7 @@ public final void startClusters() throws Exception { String address = leaderCluster.getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); InternalTestCluster followerCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(), - numberOfNodesPerCluster(), "follower_cluster", createNodeConfigurationSource(address), 0, "follower", + numberOfNodesPerCluster(), "follower_cluster", createNodeConfigurationSource(address, false), 0, "follower", mockPlugins, Function.identity()); clusterGroup = new ClusterGroup(leaderCluster, followerCluster); @@ -203,7 +211,7 @@ public void afterTest() throws Exception { } } - private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedAddress) { + private NodeConfigurationSource createNodeConfigurationSource(final String leaderSeedAddress, final boolean leaderCluster) { Settings.Builder builder = Settings.builder(); builder.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE); // Default the watermarks to absurdly low to prevent the tests @@ -225,6 +233,11 @@ private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedA builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); // Let cluster state api return quickly in order to speed up auto follow tests: builder.put(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); + if (leaderCluster) { + builder.put(leaderClusterSettings()); + } else { + builder.put(followerClusterSettings()); + } if (configureRemoteClusterViaNodeSettings() && leaderSeedAddress != null) { builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index eb25bd8c34288..8f96f43a37be6 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -21,6 +21,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -48,6 +49,7 @@ import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.repository.CcrRepository; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; +import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import java.io.IOException; @@ -83,7 +85,7 @@ public static final class RetentionLeaseRenewIntervalSettingPlugin extends Plugi @Override public List> getSettings() { - return Collections.singletonList(CcrRepository.RETENTION_LEASE_RENEW_INTERVAL_SETTING); + return Collections.singletonList(CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING); } } @@ -105,6 +107,13 @@ protected Collection> nodePlugins() { .collect(Collectors.toList()); } + @Override + protected Settings followerClusterSettings() { + return Settings.builder() + .put(CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200)) + .build(); + } + private final IndicesOptions indicesOptions = IndicesOptions.strictSingleIndexNoExpandForbidClosed(); private RestoreSnapshotRequest setUpRestoreSnapshotRequest( @@ -140,7 +149,6 @@ private RestoreSnapshotRequest setUpRestoreSnapshotRequest( final Settings.Builder settingsBuilder = Settings.builder() .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex) - .put(CcrRepository.RETENTION_LEASE_RENEW_INTERVAL_SETTING.getKey(), TimeValue.timeValueMillis(200)) .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); return new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST) .indexSettings(settingsBuilder) @@ -227,42 +235,7 @@ public void testRetentionLeaseIsRenewedDuringRecovery() throws Exception { restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); try { - // ensure that a retention lease has been put in place on each shard, and grab a copy of them - final List retentionLeases = new ArrayList<>(); - assertBusy(() -> { - retentionLeases.clear(); - final IndicesStatsResponse stats = - leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); - assertNotNull(stats.getShards()); - assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); - final List shardsStats = getShardsStats(stats); - for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { - final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); - assertThat(currentRetentionLeases.leases(), hasSize(1)); - final RetentionLease retentionLease = - currentRetentionLeases.leases().iterator().next(); - assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); - retentionLeases.add(currentRetentionLeases); - } - }); - - // now ensure that the retention leases are being renewed - assertBusy(() -> { - final IndicesStatsResponse stats = - leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); - assertNotNull(stats.getShards()); - assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); - final List shardsStats = getShardsStats(stats); - for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { - final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); - assertThat(currentRetentionLeases.leases(), hasSize(1)); - final RetentionLease retentionLease = - currentRetentionLeases.leases().iterator().next(); - assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); - // we assert that retention leases are being renewed by an increase in the timestamp - assertThat(retentionLease.timestamp(), greaterThan(retentionLeases.get(i).leases().iterator().next().timestamp())); - } - }); + assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex); latch.countDown(); } finally { for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { @@ -354,15 +327,7 @@ public void testRetentionLeasesAreNotBeingRenewedAfterRecoveryCompletes() throws * After we wake up, it should be the case that the retention leases are the same (same timestamp) as that indicates that they were * not renewed while we were sleeping. */ - final TimeValue renewIntervalSetting = CcrRepository.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get( - followerClient() - .admin() - .indices() - .prepareGetSettings(followerIndex) - .get() - .getIndexToSettings() - .get(followerIndex)); - + final TimeValue renewIntervalSetting = CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(followerClusterSettings()); final long renewEnd = System.nanoTime(); Thread.sleep(Math.max(0, randomIntBetween(2, 4) * renewIntervalSetting.millis() - TimeUnit.NANOSECONDS.toMillis(renewEnd - start))); @@ -469,9 +434,6 @@ public void testUnfollowRemovesRetentionLeases() throws Exception { }); } - - - pauseFollow(followerIndex); followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet(); assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet()); @@ -560,6 +522,324 @@ public void testUnfollowFailsToRemoveRetentionLeases() throws Exception { } } + public void testRetentionLeaseRenewedWhileFollowing() throws Exception { + final String leaderIndex = "leader"; + final String followerIndex = "follower"; + final int numberOfShards = randomIntBetween(1, 4); + final int numberOfReplicas = randomIntBetween(0, 1); + final Map additionalIndexSettings = new HashMap<>(); + additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true)); + additionalIndexSettings.put( + IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), + TimeValue.timeValueMillis(200).getStringRep()); + final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, followerIndex); + assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex); + } + + public void testRetentionLeaseAdvancesWhileFollowing() throws Exception { + final String leaderIndex = "leader"; + final String followerIndex = "follower"; + final int numberOfShards = randomIntBetween(1, 4); + final int numberOfReplicas = randomIntBetween(0, 1); + final Map additionalIndexSettings = new HashMap<>(); + additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true)); + additionalIndexSettings.put( + IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), + TimeValue.timeValueMillis(200).getStringRep()); + final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, followerIndex); + + final int numberOfDocuments = randomIntBetween(128, 2048); + logger.debug("indexing [{}] docs", numberOfDocuments); + for (int i = 0; i < numberOfDocuments; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + leaderClient().prepareIndex(leaderIndex, "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + if (rarely()) { + leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get(); + } + } + + // wait until the follower global checkpoints have caught up to the leader + assertIndexFullyReplicatedToFollower(leaderIndex, followerIndex); + + final List leaderShardsStats = getShardsStats(leaderClient().admin().indices().prepareStats(leaderIndex).get()); + final Map leaderGlobalCheckpoints = new HashMap<>(); + for (final ShardStats leaderShardStats : leaderShardsStats) { + final ShardRouting routing = leaderShardStats.getShardRouting(); + if (routing.primary() == false) { + continue; + } + leaderGlobalCheckpoints.put(routing.id(), leaderShardStats.getSeqNoStats().getGlobalCheckpoint()); + } + + // now assert that the retention leases have advanced to the global checkpoints + assertBusy(() -> { + final IndicesStatsResponse stats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); + final List shardsStats = getShardsStats(stats); + for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); + assertThat(currentRetentionLeases.leases(), hasSize(1)); + final RetentionLease retentionLease = + currentRetentionLeases.leases().iterator().next(); + assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); + // we assert that retention leases are being advanced + assertThat( + retentionLease.retainingSequenceNumber(), + equalTo(leaderGlobalCheckpoints.get(shardsStats.get(i).getShardRouting().id()))); + } + }); + } + + public void testRetentionLeaseRenewalIsCancelledWhenFollowingIsPaused() throws Exception { + final String leaderIndex = "leader"; + final String followerIndex = "follower"; + final int numberOfShards = randomIntBetween(1, 4); + final int numberOfReplicas = randomIntBetween(0, 1); + final Map additionalIndexSettings = new HashMap<>(); + additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true)); + additionalIndexSettings.put( + IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), + TimeValue.timeValueMillis(200).getStringRep()); + final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, followerIndex); + + final long start = System.nanoTime(); + pauseFollow(followerIndex); + + /* + * We want to ensure that the retention leases have been synced to all shard copies, as otherwise they might sync between the two + * times that we sample the retention leases, which would cause our check to fail. + */ + final TimeValue syncIntervalSetting = IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.get( + leaderClient() + .admin() + .indices() + .prepareGetSettings(leaderIndex) + .get() + .getIndexToSettings() + .get(leaderIndex)); + final long syncEnd = System.nanoTime(); + Thread.sleep(Math.max(0, randomIntBetween(2, 4) * syncIntervalSetting.millis() - TimeUnit.NANOSECONDS.toMillis(syncEnd - start))); + + final ClusterStateResponse leaderIndexClusterState = + leaderClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(leaderIndex).get(); + final String leaderUUID = leaderIndexClusterState.getState().metaData().index(leaderIndex).getIndexUUID(); + + // sample the leases after pausing + final List retentionLeases = new ArrayList<>(); + assertBusy(() -> { + retentionLeases.clear(); + final IndicesStatsResponse stats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); + final List shardsStats = getShardsStats(stats); + for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); + assertThat(currentRetentionLeases.leases(), hasSize(1)); + final ClusterStateResponse followerIndexClusterState = + followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); + final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); + final RetentionLease retentionLease = + currentRetentionLeases.leases().iterator().next(); + final String expectedRetentionLeaseId = retentionLeaseId( + getFollowerCluster().getClusterName(), + new Index(followerIndex, followerUUID), + getLeaderCluster().getClusterName(), + new Index(leaderIndex, leaderUUID)); + assertThat(retentionLease.id(), equalTo(expectedRetentionLeaseId)); + retentionLeases.add(currentRetentionLeases); + } + }); + + /* + * We want to ensure that the background renewal is cancelled after pausing. To do this, we will sleep a small multiple of the renew + * interval. If the renews are not cancelled, we expect that a renewal would have been sent while we were sleeping. After we wake + * up, it should be the case that the retention leases are the same (same timestamp) as that indicates that they were not renewed + * while we were sleeping. + */ + final TimeValue renewIntervalSetting = CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get(followerClusterSettings()); + final long renewEnd = System.nanoTime(); + Thread.sleep(Math.max(0, randomIntBetween(2, 4) * renewIntervalSetting.millis() - TimeUnit.NANOSECONDS.toMillis(renewEnd - start))); + + // now ensure that the retention leases are the same + assertBusy(() -> { + final IndicesStatsResponse stats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); + final List shardsStats = getShardsStats(stats); + for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { + if (shardsStats.get(i).getShardRouting().primary() == false) { + continue; + } + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); + assertThat(currentRetentionLeases.leases(), hasSize(1)); + final ClusterStateResponse followerIndexClusterState = + followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); + final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); + final RetentionLease retentionLease = + currentRetentionLeases.leases().iterator().next(); + assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, followerUUID, leaderIndex, leaderUUID))); + // we assert that retention leases are not being renewed by an unchanged timestamp + assertThat(retentionLease.timestamp(), equalTo(retentionLeases.get(i).leases().iterator().next().timestamp())); + } + }); + } + + public void testRetentionLeaseRenewalIsResumedWhenFollowingIsResumed() throws Exception { + final String leaderIndex = "leader"; + final String followerIndex = "follower"; + final int numberOfShards = randomIntBetween(1, 4); + final int numberOfReplicas = randomIntBetween(0, 1); + final Map additionalIndexSettings = new HashMap<>(); + additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true)); + additionalIndexSettings.put( + IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), + TimeValue.timeValueMillis(200).getStringRep()); + final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, followerIndex); + + pauseFollow(followerIndex); + + followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow(followerIndex)).actionGet(); + + ensureFollowerGreen(true, followerIndex); + + assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex); + } + + public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Exception { + final String leaderIndex = "leader"; + final String followerIndex = "follower"; + final int numberOfShards = 1; + final int numberOfReplicas = 1; + final Map additionalIndexSettings = new HashMap<>(); + additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true)); + additionalIndexSettings.put( + IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), + TimeValue.timeValueMillis(200).getStringRep()); + final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, followerIndex); + + final CountDownLatch latch = new CountDownLatch(1); + + final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); + for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getNodes().values()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + senderTransportService.addSendBehavior( + (connection, requestId, action, request, options) -> { + if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action) + || TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) { + senderTransportService.clearAllRules(); + final RetentionLeaseActions.RenewRequest renewRequest = (RetentionLeaseActions.RenewRequest) request; + final String primaryShardNodeId = + getLeaderCluster() + .clusterService() + .state() + .routingTable() + .index(leaderIndex) + .shard(renewRequest.getShardId().id()) + .primaryShard() + .currentNodeId(); + final String primaryShardNodeName = + getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName(); + final IndexShard primary = + getLeaderCluster() + .getInstance(IndicesService.class, primaryShardNodeName) + .getShardOrNull(renewRequest.getShardId()); + final CountDownLatch innerLatch = new CountDownLatch(1); + // this forces the background renewal from following to face a retention lease not found exception + primary.removeRetentionLease( + getRetentionLeaseId(followerIndex, leaderIndex), + ActionListener.wrap(r -> innerLatch.countDown(), e -> fail(e.toString()))); + + try { + innerLatch.await(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + fail(e.toString()); + } + + latch.countDown(); + } + connection.sendRequest(requestId, action, request, options); + }); + } + + latch.await(); + + assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex); + } + + private void assertRetentionLeaseRenewal( + final int numberOfShards, + final int numberOfReplicas, + final String followerIndex, + final String leaderIndex) throws Exception { + // ensure that a retention lease has been put in place on each shard, and grab a copy of them + final List retentionLeases = new ArrayList<>(); + assertBusy(() -> { + retentionLeases.clear(); + final IndicesStatsResponse stats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); + final List shardsStats = getShardsStats(stats); + for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); + assertThat(currentRetentionLeases.leases(), hasSize(1)); + final RetentionLease retentionLease = + currentRetentionLeases.leases().iterator().next(); + assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); + retentionLeases.add(currentRetentionLeases); + } + }); + + // now ensure that the retention leases are being renewed + assertBusy(() -> { + final IndicesStatsResponse stats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + assertNotNull(stats.getShards()); + assertThat(stats.getShards(), arrayWithSize(numberOfShards * (1 + numberOfReplicas))); + final List shardsStats = getShardsStats(stats); + for (int i = 0; i < numberOfShards * (1 + numberOfReplicas); i++) { + final RetentionLeases currentRetentionLeases = shardsStats.get(i).getRetentionLeaseStats().retentionLeases(); + assertThat(currentRetentionLeases.leases(), hasSize(1)); + final RetentionLease retentionLease = + currentRetentionLeases.leases().iterator().next(); + assertThat(retentionLease.id(), equalTo(getRetentionLeaseId(followerIndex, leaderIndex))); + // we assert that retention leases are being renewed by an increase in the timestamp + assertThat(retentionLease.timestamp(), greaterThan(retentionLeases.get(i).leases().iterator().next().timestamp())); + } + }); + } + /** * Extract the shard stats from an indices stats response, with the stats ordered by shard ID with primaries first. This is to have a * consistent ordering when comparing two responses. diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index 88412aa8fd3b4..46c7c51586c53 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; @@ -32,6 +33,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; +import java.util.function.LongSupplier; import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; @@ -177,6 +179,23 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co threadPool.generic().execute(task); } + @Override + protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) { + return new Scheduler.Cancellable() { + + @Override + public boolean cancel() { + return true; + } + + @Override + public boolean isCancelled() { + return true; + } + + }; + } + @Override protected boolean isStopped() { return stopped.get(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 178f09c86835a..09d00dc6a33ac 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -8,13 +8,16 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; @@ -27,12 +30,17 @@ import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; +import java.util.function.LongSupplier; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.contains; @@ -53,6 +61,9 @@ public class ShardFollowNodeTaskTests extends ESTestCase { private Consumer beforeSendShardChangesRequest = status -> {}; + private AtomicBoolean scheduleRetentionLeaseRenewal = new AtomicBoolean(); + private LongConsumer retentionLeaseRenewal = followerGlobalCheckpoint -> {}; + private AtomicBoolean simulateResponse = new AtomicBoolean(); private Queue readFailures; @@ -936,6 +947,28 @@ public void testComputeDelay() { assertThat(ShardFollowNodeTask.computeDelay(1024, maxDelayInMillis), allOf(greaterThanOrEqualTo(0L), lessThanOrEqualTo(1000L))); } + public void testRetentionLeaseRenewal() throws InterruptedException { + scheduleRetentionLeaseRenewal.set(true); + final CountDownLatch latch = new CountDownLatch(1); + final long expectedFollowerGlobalChekcpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + retentionLeaseRenewal = followerGlobalCheckpoint -> { + assertThat(followerGlobalCheckpoint, equalTo(expectedFollowerGlobalChekcpoint)); + latch.countDown(); + }; + + final ShardFollowTaskParams params = new ShardFollowTaskParams(); + final ShardFollowNodeTask task = createShardFollowTask(params); + + try { + startTask(task, randomLongBetween(expectedFollowerGlobalChekcpoint, Long.MAX_VALUE), expectedFollowerGlobalChekcpoint); + latch.await(); + } finally { + task.onCancelled(); + scheduleRetentionLeaseRenewal.set(false); + } + } + + static final class ShardFollowTaskParams { private String remoteCluster = null; private ShardId followShardId = new ShardId("follow_index", "", 0); @@ -1063,6 +1096,47 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con } } + @Override + protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) { + if (scheduleRetentionLeaseRenewal.get()) { + final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); + final ScheduledFuture future = scheduler.scheduleWithFixedDelay( + () -> retentionLeaseRenewal.accept(followerGlobalCheckpoint.getAsLong()), + 0, + TimeValue.timeValueMillis(200).millis(), + TimeUnit.MILLISECONDS); + return new Scheduler.Cancellable() { + + @Override + public boolean cancel() { + final boolean cancel = future.cancel(true); + scheduler.shutdown(); + return cancel; + } + + @Override + public boolean isCancelled() { + return future.isCancelled(); + } + + }; + } else { + return new Scheduler.Cancellable() { + + @Override + public boolean cancel() { + return true; + } + + @Override + public boolean isCancelled() { + return true; + } + + }; + } + } + @Override protected boolean isStopped() { return super.isStopped() || stopped.get(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 1f5f487580bee..dd495ee5ca587 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -47,7 +48,9 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ccr.CcrRetentionLeases; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; @@ -69,6 +72,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; +import java.util.function.LongSupplier; import java.util.stream.Collectors; import static java.util.Collections.emptyMap; @@ -390,6 +394,28 @@ public void testSimpleRemoteRecovery() throws Exception { } } + public void testRetentionLeaseManagement() throws Exception { + try (ReplicationGroup leader = createLeaderGroup(0)) { + leader.startAll(); + try (ReplicationGroup follower = createFollowGroup(leader, 0)) { + follower.startAll(); + final ShardFollowNodeTask task = createShardFollowTask(leader, follower); + task.start( + follower.getPrimary().getHistoryUUID(), + leader.getPrimary().getGlobalCheckpoint(), + leader.getPrimary().seqNoStats().getMaxSeqNo(), + follower.getPrimary().getGlobalCheckpoint(), + follower.getPrimary().seqNoStats().getMaxSeqNo()); + final Scheduler.Cancellable renewable = task.getRenewable(); + assertNotNull(renewable); + assertFalse(renewable.isCancelled()); + task.onCancelled(); + assertTrue(renewable.isCancelled()); + assertNull(task.getRenewable()); + } + } + } + private ReplicationGroup createLeaderGroup(int replicas) throws IOException { Settings settings = Settings.builder() .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) @@ -399,10 +425,12 @@ private ReplicationGroup createLeaderGroup(int replicas) throws IOException { } private ReplicationGroup createFollowGroup(ReplicationGroup leaderGroup, int replicas) throws IOException { - Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) - .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB)) - .build(); + final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put( + IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), + new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB)) + .build(); IndexMetaData indexMetaData = buildIndexMetaData(replicas, settings, indexMapping); return new ReplicationGroup(indexMetaData) { @Override @@ -543,6 +571,27 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co threadPool.executor(ThreadPool.Names.GENERIC).execute(task); } + @Override + protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) { + final String retentionLeaseId = CcrRetentionLeases.retentionLeaseId( + "follower", + followerGroup.getPrimary().routingEntry().index(), + "remote", + leaderGroup.getPrimary().routingEntry().index()); + final PlainActionFuture response = new PlainActionFuture<>(); + leaderGroup.addRetentionLease( + retentionLeaseId, + followerGlobalCheckpoint.getAsLong(), + "ccr", + ActionListener.wrap(response::onResponse, e -> fail(e.toString()))); + response.actionGet(); + return threadPool.scheduleWithFixedDelay( + () -> leaderGroup.renewRetentionLease(retentionLeaseId, followerGlobalCheckpoint.getAsLong(), "ccr"), + CcrRetentionLeases.RETENTION_LEASE_RENEW_INTERVAL_SETTING.get( + followerGroup.getPrimary().indexSettings().getSettings()), + ThreadPool.Names.GENERIC); + } + @Override protected boolean isStopped() { return super.isStopped() || stopped.get(); From c1351b155532769850703456577070119875ae9b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 24 Feb 2019 11:48:52 -0500 Subject: [PATCH 02/12] Guard against race --- .../ccr/action/ShardFollowTasksExecutor.java | 3 + .../xpack/ccr/CcrRetentionLeaseIT.java | 101 +++++++++++++++++- 2 files changed, 103 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 568f6cb75d726..092d46ebd4c38 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -275,6 +275,9 @@ protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final Lo final ActionListener listener = ActionListener.wrap( r -> {}, e -> { + if (isCancelled() || isCompleted()) { + return; + } final Throwable cause = ExceptionsHelper.unwrapCause(e); logRetentionLeaseFailure(retentionLeaseId, cause); // noinspection StatementWithEmptyBody diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index 8f96f43a37be6..304b16ee150dc 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseActions; +import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -43,7 +44,10 @@ import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportActionProxy; +import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; @@ -435,7 +439,7 @@ public void testUnfollowRemovesRetentionLeases() throws Exception { } pauseFollow(followerIndex); - followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet(); + assertAcked(followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet()); assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet()); final IndicesStatsResponse afterUnfollowStats = @@ -797,6 +801,101 @@ public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Excep assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex); } + /** + * This test is fairly evil. This test is to ensure that we are protected against a race condition when unfollowing and a background + * renewal fires. The action of unfollowing will remove retention leases from the leader. If a background renewal is firing at that + * times, it means that we will be met with a retention lease not found exception. That will in turn trigger behavior to attempt to + * re-add the retention lease, which means we are left in a situation where we have unfollowed, but the retention lease still remains + * on the leader. However, we have a guard against this in the callback after the retention lease not found exception is thrown, which + * checks if the shard follow node task is cancelled or completed. + * + * To test this this behavior is correct, we capture the call to renew the retention lease. Then, we will step in between and execute + * an unfollow request. This will remove the retention lease on the leader. At this point, we can unlatch the renew call, which will + * now be met with a retention lease not found exception. We will cheat and wait for that response to come back, and then synchronously + * trigger the listener which will check to see if the shard follow node task is cancelled or completed, and if not, add the retention + * lease back. After that listener returns, we can check to see if a retention lease exists on the leader. + * + * Note, this done mean that listener will fire twice, once in our onResponseReceived hook, and once after our onResponseReceived + * callback returns. 🤷‍♀️ + * + * @throws Exception if an exception occurs in the main test thread + */ + public void testPeriodicRenewalDoesNotAddRetentionLeaseAfterUnfollow() throws Exception { + final String leaderIndex = "leader"; + final String followerIndex = "follower"; + final int numberOfShards = 1; + final int numberOfReplicas = 1; + final Map additionalIndexSettings = new HashMap<>(); + additionalIndexSettings.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), Boolean.toString(true)); + additionalIndexSettings.put( + IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), + TimeValue.timeValueMillis(200).getStringRep()); + final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, followerIndex); + + final CountDownLatch removeLeaseLatch = new CountDownLatch(1); + final CountDownLatch unfollowLatch = new CountDownLatch(1); + final CountDownLatch responseLatch = new CountDownLatch(1); + + final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); + for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getNodes().values()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + senderTransportService.addSendBehavior( + (connection, requestId, action, request, options) -> { + if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action) + || TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) { + senderTransportService.clearAllRules(); + final String retentionLeaseId = getRetentionLeaseId(followerIndex, leaderIndex); + try { + //innerLatch.await(); + removeLeaseLatch.countDown(); + unfollowLatch.await(); + + senderTransportService.transport().addMessageListener(new TransportMessageListener() { + + @Override + public void onResponseReceived(final long responseRequestId, final Transport.ResponseContext context) { + if (requestId == responseRequestId) { + final RetentionLeaseNotFoundException e = new RetentionLeaseNotFoundException(retentionLeaseId); + context.handler().handleException(new RemoteTransportException(e.getMessage(), e)); + responseLatch.countDown(); + } + } + + }); + + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + fail(e.toString()); + } + } + connection.sendRequest(requestId, action, request, options); + }); + } + + removeLeaseLatch.await(); + + pauseFollow(followerIndex); + assertAcked(followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet()); + assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet()); + + unfollowLatch.countDown(); + + responseLatch.await(); + + final IndicesStatsResponse afterUnfollowStats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + final List afterUnfollowShardsStats = getShardsStats(afterUnfollowStats); + for (final ShardStats shardStats : afterUnfollowShardsStats) { + assertThat(shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty()); + } + } + private void assertRetentionLeaseRenewal( final int numberOfShards, final int numberOfReplicas, From 285ce8703d041780b1b70a08d58fdcb02b954808 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 24 Feb 2019 11:49:33 -0500 Subject: [PATCH 03/12] Fix comment --- .../xpack/ccr/action/ShardFollowTasksExecutor.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 092d46ebd4c38..5c8bbbe227bb4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -267,10 +267,9 @@ protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final Lo /* * We are going to attempt to renew the retention lease. If this fails it is either because the retention lease does not - * lease does not exist, or something else happened. If the retention lease does not exist, we will attempt - * to add the retention lease again. If that fails, it had better not be because the retention lease already - * exists. Either way, we will attempt to renew again on the next scheduled execution. We will start by - * creating listeners + * exist, or something else happened. If the retention lease does not exist, we will attempt to add the retention lease + * again. If that fails, it had better not be because the retention lease already exists. Either way, we will attempt to + * renew again on the next scheduled execution. */ final ActionListener listener = ActionListener.wrap( r -> {}, From fbb1fcf217c3f42eae93f7746618bd71c84e8b77 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 24 Feb 2019 11:50:07 -0500 Subject: [PATCH 04/12] Mark synchronized --- .../org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index cf7046193c8a4..0b7a600989ad1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -97,7 +97,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private Scheduler.Cancellable renewable; - Scheduler.Cancellable getRenewable() { + synchronized Scheduler.Cancellable getRenewable() { return renewable; } From 88e4bccf3619e66d9c2225260da5a6662ae7a978 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 24 Feb 2019 12:08:41 -0500 Subject: [PATCH 05/12] Fix typo in comment --- .../java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index 304b16ee150dc..303ef2810ac46 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -804,7 +804,7 @@ public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Excep /** * This test is fairly evil. This test is to ensure that we are protected against a race condition when unfollowing and a background * renewal fires. The action of unfollowing will remove retention leases from the leader. If a background renewal is firing at that - * times, it means that we will be met with a retention lease not found exception. That will in turn trigger behavior to attempt to + * time, it means that we will be met with a retention lease not found exception. That will in turn trigger behavior to attempt to * re-add the retention lease, which means we are left in a situation where we have unfollowed, but the retention lease still remains * on the leader. However, we have a guard against this in the callback after the retention lease not found exception is thrown, which * checks if the shard follow node task is cancelled or completed. From 8901b5b02ffdc5ab0f8cfd7d2c524bc171497feb Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sun, 24 Feb 2019 12:20:01 -0500 Subject: [PATCH 06/12] Fix compilation --- .../java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index 303ef2810ac46..8996f92b9ec52 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -858,6 +858,7 @@ public void testPeriodicRenewalDoesNotAddRetentionLeaseAfterUnfollow() throws Ex senderTransportService.transport().addMessageListener(new TransportMessageListener() { + @SuppressWarnings("rawtypes") @Override public void onResponseReceived(final long responseRequestId, final Transport.ResponseContext context) { if (requestId == responseRequestId) { From 47aa6fb4ac19e8e3f6d817f1a964d31963df9a8c Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 25 Feb 2019 06:07:29 -0500 Subject: [PATCH 07/12] Cleanup comment --- .../xpack/ccr/action/ShardFollowTasksExecutor.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 5c8bbbe227bb4..be23396dd612c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -302,10 +302,7 @@ protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final Lo logRetentionLeaseFailure(retentionLeaseId, innerCause); })); } else { - /* - * If something else happened, we will attempt to renew again after another renew interval - * has passed. - */ + // if something else happened, we will attempt to renew again after another renew interval has passed } }); From 130105072f6453bb7cc7af6940a676b86a475670 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 25 Feb 2019 06:10:15 -0500 Subject: [PATCH 08/12] Add clarifying comment --- .../xpack/ccr/action/ShardFollowTasksExecutor.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index be23396dd612c..dc5d4a75e1cfb 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -274,6 +274,14 @@ protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final Lo final ActionListener listener = ActionListener.wrap( r -> {}, e -> { + /* + * We have to guard against the possibility that the shard follow node task has been stopped and the retention + * lease deliberately removed via the act of unfollowing. Note that the order of operations is important in + * TransportUnfollowAction. There, we first stop the shard follow node task, and then remove the retention + * leases on the leader. This means that if we end up here with the retention lease not existing because of an + * unfollow action, then we know that the unfollow action has already stopped the shard follow node task and + * there is no race condition with the unfollow action. + */ if (isCancelled() || isCompleted()) { return; } From 79f38b4e1c71239ba8158c937e3a89324b1da1e2 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 25 Feb 2019 06:11:11 -0500 Subject: [PATCH 09/12] Use existing lock --- .../xpack/ccr/action/ShardFollowNodeTask.java | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 0b7a600989ad1..0ee86a6058c63 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -128,7 +128,8 @@ void start( final long followerMaxSeqNo) { /* * While this should only ever be called once and before any other threads can touch these fields, we use synchronization here to - * avoid the need to declare these fields as volatile. That is, we are ensuring thesefields are always accessed under the same lock. + * avoid the need to declare these fields as volatile. That is, we are ensuring these fields are always accessed under the same + * lock. */ synchronized (this) { this.followerHistoryUUID = followerHistoryUUID; @@ -137,6 +138,11 @@ void start( this.followerGlobalCheckpoint = followerGlobalCheckpoint; this.followerMaxSeqNo = followerMaxSeqNo; this.lastRequestedSeqNo = followerGlobalCheckpoint; + renewable = scheduleBackgroundRetentionLeaseRenewal(() -> { + synchronized (ShardFollowNodeTask.this) { + return this.followerGlobalCheckpoint; + } + }); } // updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical @@ -158,14 +164,6 @@ void start( coordinateReads(); }); }); - - synchronized (this) { - renewable = scheduleBackgroundRetentionLeaseRenewal(() -> { - synchronized (ShardFollowNodeTask.this) { - return this.followerGlobalCheckpoint; - } - }); - } } synchronized void coordinateReads() { From 542238675b1ee6fa61b8a977c5ccca0b8b21f362 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 25 Feb 2019 06:40:34 -0500 Subject: [PATCH 10/12] Add clarifying comment --- .../elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index dc5d4a75e1cfb..81d8750d07c6d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -289,6 +289,7 @@ protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final Lo logRetentionLeaseFailure(retentionLeaseId, cause); // noinspection StatementWithEmptyBody if (cause instanceof RetentionLeaseNotFoundException) { + // note that we do not need to mark as system context here as that is restored from the original renew logger.trace( "{} background adding retention lease [{}] while following", params.getFollowShardId(), From 87386679696101028832201e130a1f8e9a3fb6b4 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 25 Feb 2019 10:49:59 -0500 Subject: [PATCH 11/12] Address test flakiness --- .../org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index 8996f92b9ec52..6454fbcdf8bfb 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -373,6 +373,7 @@ public void testUnfollowRemovesRetentionLeases() throws Exception { final String leaderIndexSettings = getIndexSettings(numberOfShards, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderYellow(leaderIndex); final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); @@ -464,6 +465,7 @@ public void testUnfollowFailsToRemoveRetentionLeases() throws Exception { final String leaderIndexSettings = getIndexSettings(numberOfShards, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderYellow(leaderIndex); final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); @@ -538,6 +540,7 @@ public void testRetentionLeaseRenewedWhileFollowing() throws Exception { TimeValue.timeValueMillis(200).getStringRep()); final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings); assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderYellow(leaderIndex); final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); @@ -557,6 +560,7 @@ public void testRetentionLeaseAdvancesWhileFollowing() throws Exception { TimeValue.timeValueMillis(200).getStringRep()); final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings); assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderYellow(leaderIndex); final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); @@ -618,6 +622,7 @@ public void testRetentionLeaseRenewalIsCancelledWhenFollowingIsPaused() throws E TimeValue.timeValueMillis(200).getStringRep()); final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings); assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderYellow(leaderIndex); final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); @@ -719,6 +724,7 @@ public void testRetentionLeaseRenewalIsResumedWhenFollowingIsResumed() throws Ex TimeValue.timeValueMillis(200).getStringRep()); final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings); assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderYellow(leaderIndex); final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); @@ -745,6 +751,7 @@ public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Excep TimeValue.timeValueMillis(200).getStringRep()); final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings); assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderYellow(leaderIndex); final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); @@ -832,6 +839,7 @@ public void testPeriodicRenewalDoesNotAddRetentionLeaseAfterUnfollow() throws Ex TimeValue.timeValueMillis(200).getStringRep()); final String leaderIndexSettings = getIndexSettings(numberOfShards, numberOfReplicas, additionalIndexSettings); assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderYellow(leaderIndex); final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); @@ -865,6 +873,7 @@ public void onResponseReceived(final long responseRequestId, final Transport.Res final RetentionLeaseNotFoundException e = new RetentionLeaseNotFoundException(retentionLeaseId); context.handler().handleException(new RemoteTransportException(e.getMessage(), e)); responseLatch.countDown(); + senderTransportService.transport().removeMessageListener(this); } } From 220fc67cc2baaec6e44af78f2ed15bbbefb8564c Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 25 Feb 2019 15:10:07 -0500 Subject: [PATCH 12/12] Cleanup after test to fix interaction between two tests --- .../xpack/ccr/CcrRetentionLeaseIT.java | 100 ++++++++++-------- 1 file changed, 56 insertions(+), 44 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index 6454fbcdf8bfb..44b38ddbadfa0 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -42,6 +42,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.RestoreService; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.RemoteTransportException; @@ -610,6 +611,7 @@ public void testRetentionLeaseAdvancesWhileFollowing() throws Exception { }); } + @TestLogging(value = "org.elasticsearch.xpack.ccr:trace") public void testRetentionLeaseRenewalIsCancelledWhenFollowingIsPaused() throws Exception { final String leaderIndex = "leader"; final String followerIndex = "follower"; @@ -850,59 +852,69 @@ public void testPeriodicRenewalDoesNotAddRetentionLeaseAfterUnfollow() throws Ex final CountDownLatch responseLatch = new CountDownLatch(1); final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); - for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getNodes().values()) { - final MockTransportService senderTransportService = - (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); - senderTransportService.addSendBehavior( - (connection, requestId, action, request, options) -> { - if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action) - || TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) { - senderTransportService.clearAllRules(); - final String retentionLeaseId = getRetentionLeaseId(followerIndex, leaderIndex); - try { - //innerLatch.await(); - removeLeaseLatch.countDown(); - unfollowLatch.await(); - - senderTransportService.transport().addMessageListener(new TransportMessageListener() { - - @SuppressWarnings("rawtypes") - @Override - public void onResponseReceived(final long responseRequestId, final Transport.ResponseContext context) { - if (requestId == responseRequestId) { - final RetentionLeaseNotFoundException e = new RetentionLeaseNotFoundException(retentionLeaseId); - context.handler().handleException(new RemoteTransportException(e.getMessage(), e)); - responseLatch.countDown(); - senderTransportService.transport().removeMessageListener(this); + + try { + for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getNodes().values()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + senderTransportService.addSendBehavior( + (connection, requestId, action, request, options) -> { + if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action) + || TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) { + final String retentionLeaseId = getRetentionLeaseId(followerIndex, leaderIndex); + try { + removeLeaseLatch.countDown(); + unfollowLatch.await(); + + senderTransportService.transport().addMessageListener(new TransportMessageListener() { + + @SuppressWarnings("rawtypes") + @Override + public void onResponseReceived( + final long responseRequestId, + final Transport.ResponseContext context) { + if (requestId == responseRequestId) { + final RetentionLeaseNotFoundException e = + new RetentionLeaseNotFoundException(retentionLeaseId); + context.handler().handleException(new RemoteTransportException(e.getMessage(), e)); + responseLatch.countDown(); + senderTransportService.transport().removeMessageListener(this); + } } - } - }); + }); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - fail(e.toString()); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + fail(e.toString()); + } } - } - connection.sendRequest(requestId, action, request, options); - }); - } + connection.sendRequest(requestId, action, request, options); + }); + } - removeLeaseLatch.await(); + removeLeaseLatch.await(); - pauseFollow(followerIndex); - assertAcked(followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet()); - assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet()); + pauseFollow(followerIndex); + assertAcked(followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet()); + assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet()); - unfollowLatch.countDown(); + unfollowLatch.countDown(); - responseLatch.await(); + responseLatch.await(); - final IndicesStatsResponse afterUnfollowStats = - leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); - final List afterUnfollowShardsStats = getShardsStats(afterUnfollowStats); - for (final ShardStats shardStats : afterUnfollowShardsStats) { - assertThat(shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty()); + final IndicesStatsResponse afterUnfollowStats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + final List afterUnfollowShardsStats = getShardsStats(afterUnfollowStats); + for (final ShardStats shardStats : afterUnfollowShardsStats) { + assertThat(shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty()); + } + } finally { + for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + senderTransportService.clearAllRules(); + } } }