From 775f6e27d4ab4fa81ef4502fb869539797ec65e9 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 10 Jan 2019 15:02:30 +0100 Subject: [PATCH] [CCR] Make shard follow tasks more resilient for restarts (#37239) If a running shard follow task needs to be restarted and the remote connection seeds have changed then a shard follow task currently fails with a fatal error. The change creates the remote client lazily and adjusts the errors a shard follow task should retry. This issue was found in test failures in the recently added ccr rolling upgrade tests. The reason why this issue occurs more frequently in the rolling upgrade test is because ccr is setup in local mode (so remote connection seed will become stale) and all nodes are restarted, which forces the shard follow tasks to get restarted at some point during the test. Note that these tests cannot be enabled yet, because this change will need to be backported to 6.x first. (otherwise the issue still occurs on non upgraded nodes) I also changed the RestartIndexFollowingIT to setup remote cluster via persistent settings and to also restart the leader cluster. This way what happens during the ccr rolling upgrade qa tests, also happens in this test. Relates to #37231 --- .../xpack/ccr/action/ShardFollowNodeTask.java | 12 ++++---- .../ccr/action/ShardFollowTasksExecutor.java | 28 +++++++++++-------- .../xpack/ccr/RestartIndexFollowingIT.java | 28 ++++++++++++++++++- 3 files changed, 51 insertions(+), 17 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 13b12d4b96f2b..b1d6467168c90 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -29,8 +29,7 @@ import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.transport.NodeDisconnectedException; -import org.elasticsearch.transport.NodeNotConnectedException; +import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; @@ -448,7 +447,10 @@ static boolean shouldRetry(String remoteCluster, Exception e) { return true; } + // This is thrown when using a Client and its remote cluster alias went MIA String noSuchRemoteClusterMessage = "no such remote cluster: " + remoteCluster; + // This is thrown when creating a Client and the remote cluster does not exist: + String unknownClusterMessage = "unknown cluster alias [" + remoteCluster + "]"; final Throwable actual = ExceptionsHelper.unwrapCause(e); return actual instanceof ShardNotFoundException || actual instanceof IllegalIndexShardStateException || @@ -458,11 +460,11 @@ static boolean shouldRetry(String remoteCluster, Exception e) { actual instanceof ElasticsearchSecurityException || // If user does not have sufficient privileges actual instanceof ClusterBlockException || // If leader index is closed or no elected master actual instanceof IndexClosedException || // If follow index is closed - actual instanceof NodeDisconnectedException || - actual instanceof NodeNotConnectedException || + actual instanceof ConnectTransportException || actual instanceof NodeClosedException || (actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")) || - (actual instanceof IllegalArgumentException && noSuchRemoteClusterMessage.equals(actual.getMessage())); + (actual instanceof IllegalArgumentException && (noSuchRemoteClusterMessage.equals(actual.getMessage()) || + unknownClusterMessage.equals(actual.getMessage()))); } // These methods are protected for testing purposes: diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 63fd80e6d48e6..c8a71e0e91a88 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -95,12 +95,6 @@ protected AllocatedPersistentTask createTask(long id, String type, String action PersistentTasksCustomMetaData.PersistentTask taskInProgress, Map headers) { ShardFollowTask params = taskInProgress.getParams(); - final Client remoteClient; - if (params.getRemoteCluster() != null) { - remoteClient = wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders()); - } else { - remoteClient = wrapClient(client, params.getHeaders()); - } Client followerClient = wrapClient(client, params.getHeaders()); BiConsumer scheduler = (delay, command) -> { try { @@ -127,8 +121,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro clusterStateRequest.clear(); clusterStateRequest.metaData(true); clusterStateRequest.indices(leaderIndex.getName()); - - remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { + CheckedConsumer onResponse = clusterStateResponse -> { IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); if (indexMetaData.getMappings().isEmpty()) { assert indexMetaData.getMappingVersion() == 1; @@ -146,7 +139,12 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap( putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()), errorHandler)); - }, errorHandler)); + }; + try { + remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler)); + } catch (Exception e) { + errorHandler.accept(e); + } } @Override @@ -190,7 +188,11 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum } } }; - remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler)); + try { + remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler)); + } catch (Exception e) { + errorHandler.accept(e); + } } private void closeIndexUpdateSettingsAndOpenIndex(String followIndex, @@ -245,7 +247,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co request.setMaxBatchSize(params.getMaxReadRequestSize()); request.setPollTimeout(params.getReadPollTimeout()); try { - remoteClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler)); + remoteClient(params).execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler)); } catch (Exception e) { errorHandler.accept(e); } @@ -260,6 +262,10 @@ private String getLeaderShardHistoryUUID(ShardFollowTask params) { return recordedLeaderShardHistoryUUIDs[params.getLeaderShardId().id()]; } + private Client remoteClient(ShardFollowTask params) { + return wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders()); + } + interface FollowerStatsInfoHandler { void accept(String followerHistoryUUID, long globalCheckpoint, long maxSeqNo); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java index 49fbe15ddabae..af666b045953e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java @@ -6,8 +6,11 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; @@ -27,9 +30,9 @@ protected int numberOfNodesPerCluster() { public void testFollowIndex() throws Exception { final String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); - singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"); assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); ensureLeaderGreen("index1"); + setupRemoteCluster(); final PutFollowAction.Request followRequest = putFollow("index1", "index2"); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); @@ -57,6 +60,29 @@ public void testFollowIndex() throws Exception { assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits, equalTo(firstBatchNumDocs + secondBatchNumDocs)); }); + + getLeaderCluster().fullRestart(); + ensureLeaderGreen("index1"); + // Remote connection needs to be re-configured, because all the nodes in leader cluster have been restarted: + setupRemoteCluster(); + + final long thirdBatchNumDocs = randomIntBetween(2, 64); + for (int i = 0; i < thirdBatchNumDocs; i++) { + leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get(); + } + + assertBusy(() -> { + assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits(), + equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs)); + }); + } + + private void setupRemoteCluster() { + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + String masterNode = getLeaderCluster().getMasterName(); + String address = getLeaderCluster().getInstance(TransportService.class, masterNode).boundAddress().publishAddress().toString(); + updateSettingsRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", address)); + assertAcked(followerClient().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); } }