diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 13b12d4b96f2b..b1d6467168c90 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -29,8 +29,7 @@ import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.transport.NodeDisconnectedException; -import org.elasticsearch.transport.NodeNotConnectedException; +import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; @@ -448,7 +447,10 @@ static boolean shouldRetry(String remoteCluster, Exception e) { return true; } + // This is thrown when using a Client and its remote cluster alias went MIA String noSuchRemoteClusterMessage = "no such remote cluster: " + remoteCluster; + // This is thrown when creating a Client and the remote cluster does not exist: + String unknownClusterMessage = "unknown cluster alias [" + remoteCluster + "]"; final Throwable actual = ExceptionsHelper.unwrapCause(e); return actual instanceof ShardNotFoundException || actual instanceof IllegalIndexShardStateException || @@ -458,11 +460,11 @@ static boolean shouldRetry(String remoteCluster, Exception e) { actual instanceof ElasticsearchSecurityException || // If user does not have sufficient privileges actual instanceof ClusterBlockException || // If leader index is closed or no elected master actual instanceof IndexClosedException || // If follow index is closed - actual instanceof NodeDisconnectedException || - actual instanceof NodeNotConnectedException || + actual instanceof ConnectTransportException || actual instanceof NodeClosedException || (actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")) || - (actual instanceof IllegalArgumentException && noSuchRemoteClusterMessage.equals(actual.getMessage())); + (actual instanceof IllegalArgumentException && (noSuchRemoteClusterMessage.equals(actual.getMessage()) || + unknownClusterMessage.equals(actual.getMessage()))); } // These methods are protected for testing purposes: diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 63fd80e6d48e6..c8a71e0e91a88 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -95,12 +95,6 @@ protected AllocatedPersistentTask createTask(long id, String type, String action PersistentTasksCustomMetaData.PersistentTask taskInProgress, Map headers) { ShardFollowTask params = taskInProgress.getParams(); - final Client remoteClient; - if (params.getRemoteCluster() != null) { - remoteClient = wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders()); - } else { - remoteClient = wrapClient(client, params.getHeaders()); - } Client followerClient = wrapClient(client, params.getHeaders()); BiConsumer scheduler = (delay, command) -> { try { @@ -127,8 +121,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro clusterStateRequest.clear(); clusterStateRequest.metaData(true); clusterStateRequest.indices(leaderIndex.getName()); - - remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { + CheckedConsumer onResponse = clusterStateResponse -> { IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); if (indexMetaData.getMappings().isEmpty()) { assert indexMetaData.getMappingVersion() == 1; @@ -146,7 +139,12 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap( putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()), errorHandler)); - }, errorHandler)); + }; + try { + remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler)); + } catch (Exception e) { + errorHandler.accept(e); + } } @Override @@ -190,7 +188,11 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum } } }; - remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler)); + try { + remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler)); + } catch (Exception e) { + errorHandler.accept(e); + } } private void closeIndexUpdateSettingsAndOpenIndex(String followIndex, @@ -245,7 +247,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co request.setMaxBatchSize(params.getMaxReadRequestSize()); request.setPollTimeout(params.getReadPollTimeout()); try { - remoteClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler)); + remoteClient(params).execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler)); } catch (Exception e) { errorHandler.accept(e); } @@ -260,6 +262,10 @@ private String getLeaderShardHistoryUUID(ShardFollowTask params) { return recordedLeaderShardHistoryUUIDs[params.getLeaderShardId().id()]; } + private Client remoteClient(ShardFollowTask params) { + return wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders()); + } + interface FollowerStatsInfoHandler { void accept(String followerHistoryUUID, long globalCheckpoint, long maxSeqNo); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java index 49fbe15ddabae..af666b045953e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java @@ -6,8 +6,11 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; @@ -27,9 +30,9 @@ protected int numberOfNodesPerCluster() { public void testFollowIndex() throws Exception { final String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); - singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"); assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); ensureLeaderGreen("index1"); + setupRemoteCluster(); final PutFollowAction.Request followRequest = putFollow("index1", "index2"); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); @@ -57,6 +60,29 @@ public void testFollowIndex() throws Exception { assertThat(followerClient().prepareSearch("index2").get().getHits().totalHits, equalTo(firstBatchNumDocs + secondBatchNumDocs)); }); + + getLeaderCluster().fullRestart(); + ensureLeaderGreen("index1"); + // Remote connection needs to be re-configured, because all the nodes in leader cluster have been restarted: + setupRemoteCluster(); + + final long thirdBatchNumDocs = randomIntBetween(2, 64); + for (int i = 0; i < thirdBatchNumDocs; i++) { + leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get(); + } + + assertBusy(() -> { + assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits(), + equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs)); + }); + } + + private void setupRemoteCluster() { + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + String masterNode = getLeaderCluster().getMasterName(); + String address = getLeaderCluster().getInstance(TransportService.class, masterNode).boundAddress().publishAddress().toString(); + updateSettingsRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", address)); + assertAcked(followerClient().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); } }