diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java index 87d913c337642..f039810ed940c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java @@ -6,11 +6,15 @@ package org.elasticsearch.xpack.ccr.action; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.admin.indices.mapping.put.MappingRequestValidator; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.rest.RestStatus; @@ -18,6 +22,7 @@ import java.util.Arrays; import java.util.List; +import java.util.function.Supplier; import java.util.stream.Collectors; public final class CcrRequests { @@ -40,6 +45,39 @@ public static PutMappingRequest putMappingRequest(String followerIndex, MappingM return putMappingRequest; } + /** + * Gets an {@link IndexMetaData} of the given index. The mapping version and metadata version of the returned {@link IndexMetaData} + * must be at least the provided {@code mappingVersion} and {@code metadataVersion} respectively. + */ + public static void getIndexMetadata(Client client, Index index, long mappingVersion, long metadataVersion, + Supplier timeoutSupplier, ActionListener listener) { + final ClusterStateRequest request = CcrRequests.metaDataRequest(index.getName()); + if (metadataVersion > 0) { + request.waitForMetaDataVersion(metadataVersion).waitForTimeout(timeoutSupplier.get()); + } + client.admin().cluster().state(request, ActionListener.wrap( + response -> { + if (response.getState() == null) { + assert metadataVersion > 0 : metadataVersion; + throw new IllegalStateException("timeout to get cluster state with" + + " metadata version [" + metadataVersion + "], mapping version [" + mappingVersion + "]"); + } + final MetaData metaData = response.getState().metaData(); + final IndexMetaData indexMetaData = metaData.getIndexSafe(index); + if (indexMetaData.getMappingVersion() >= mappingVersion) { + listener.onResponse(indexMetaData); + return; + } + if (timeoutSupplier.get().nanos() < 0) { + throw new IllegalStateException("timeout to get cluster state with mapping version [" + mappingVersion + "]"); + } + // ask for the next version. + getIndexMetadata(client, index, mappingVersion, metaData.version() + 1, timeoutSupplier, listener); + }, + listener::onFailure + )); + } + public static final MappingRequestValidator CCR_PUT_MAPPING_REQUEST_VALIDATOR = (request, state, indices) -> { if (request.origin() == null) { return null; // a put-mapping-request on old versions does not have origin. diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 56538d395feda..c0e2d7f54b318 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -24,7 +24,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; @@ -59,6 +58,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; +import java.util.function.Supplier; import static org.elasticsearch.xpack.ccr.CcrLicenseChecker.wrapClient; import static org.elasticsearch.xpack.ccr.action.TransportResumeFollowAction.extractLeaderShardHistoryUUIDs; @@ -111,7 +111,9 @@ protected AllocatedPersistentTask createTask(long id, String type, String action @Override protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer errorHandler) { final Index followerIndex = params.getFollowShardId().getIndex(); - getIndexMetadata(minRequiredMappingVersion, 0L, params, ActionListener.wrap( + final Index leaderIndex = params.getLeaderShardId().getIndex(); + final Supplier timeout = () -> isStopped() ? TimeValue.MINUS_ONE : waitForMetadataTimeOut; + CcrRequests.getIndexMetadata(remoteClient(params), leaderIndex, minRequiredMappingVersion, 0L, timeout, ActionListener.wrap( indexMetaData -> { if (indexMetaData.getMappings().isEmpty()) { assert indexMetaData.getMappingVersion() == 1; @@ -246,39 +248,6 @@ private Client remoteClient(ShardFollowTask params) { return wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders()); } - private void getIndexMetadata(long minRequiredMappingVersion, long minRequiredMetadataVersion, - ShardFollowTask params, ActionListener listener) { - final Index leaderIndex = params.getLeaderShardId().getIndex(); - final ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName()); - if (minRequiredMetadataVersion > 0) { - clusterStateRequest.waitForMetaDataVersion(minRequiredMetadataVersion).waitForTimeout(waitForMetadataTimeOut); - } - try { - remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap( - r -> { - // if wait_for_metadata_version timeout, the response is empty - if (r.getState() == null) { - assert minRequiredMetadataVersion > 0; - getIndexMetadata(minRequiredMappingVersion, minRequiredMetadataVersion, params, listener); - return; - } - final MetaData metaData = r.getState().metaData(); - final IndexMetaData indexMetaData = metaData.getIndexSafe(leaderIndex); - if (indexMetaData.getMappingVersion() < minRequiredMappingVersion) { - // ask for the next version. - getIndexMetadata(minRequiredMappingVersion, metaData.version() + 1, params, listener); - } else { - assert metaData.version() >= minRequiredMetadataVersion : metaData.version() + " < " + minRequiredMetadataVersion; - listener.onResponse(indexMetaData); - } - }, - listener::onFailure - )); - } catch (Exception e) { - listener.onFailure(e); - } - } - interface FollowerStatsInfoHandler { void accept(String followerHistoryUUID, long globalCheckpoint, long maxSeqNo); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java index 07ee076135a1b..eceacc1d926d8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java @@ -72,7 +72,8 @@ protected PutCcrRestoreSessionResponse shardOperation(PutCcrRestoreSessionReques throw new ShardNotFoundException(shardId); } Store.MetadataSnapshot storeFileMetaData = ccrRestoreService.openSession(request.getSessionUUID(), indexShard); - return new PutCcrRestoreSessionResponse(clusterService.localNode(), storeFileMetaData); + long mappingVersion = indexShard.indexSettings().getIndexMetaData().getMappingVersion(); + return new PutCcrRestoreSessionResponse(clusterService.localNode(), storeFileMetaData, mappingVersion); } @Override @@ -97,19 +98,22 @@ public static class PutCcrRestoreSessionResponse extends ActionResponse { private DiscoveryNode node; private Store.MetadataSnapshot storeFileMetaData; + private long mappingVersion; PutCcrRestoreSessionResponse() { } - PutCcrRestoreSessionResponse(DiscoveryNode node, Store.MetadataSnapshot storeFileMetaData) { + PutCcrRestoreSessionResponse(DiscoveryNode node, Store.MetadataSnapshot storeFileMetaData, long mappingVersion) { this.node = node; this.storeFileMetaData = storeFileMetaData; + this.mappingVersion = mappingVersion; } PutCcrRestoreSessionResponse(StreamInput in) throws IOException { super(in); node = new DiscoveryNode(in); storeFileMetaData = new Store.MetadataSnapshot(in); + mappingVersion = in.readVLong(); } @Override @@ -117,6 +121,7 @@ public void readFrom(StreamInput in) throws IOException { super.readFrom(in); node = new DiscoveryNode(in); storeFileMetaData = new Store.MetadataSnapshot(in); + mappingVersion = in.readVLong(); } @Override @@ -124,6 +129,7 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); node.writeTo(out); storeFileMetaData.writeTo(out); + out.writeVLong(mappingVersion); } public DiscoveryNode getNode() { @@ -133,5 +139,9 @@ public DiscoveryNode getNode() { public Store.MetadataSnapshot getStoreFileMetaData() { return storeFileMetaData; } + + public long getMappingVersion() { + return mappingVersion; + } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index bcf0e5f6dc6e9..baad95d5a94df 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -29,9 +29,9 @@ import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CombinedRateLimiter; import org.elasticsearch.index.Index; -import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardRecoveryException; @@ -72,6 +72,8 @@ import java.util.Map; import java.util.Set; import java.util.function.LongConsumer; +import java.util.function.Supplier; + /** * This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to @@ -288,11 +290,10 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v String name = metadata.name(); try (RestoreSession restoreSession = openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) { restoreSession.restoreFiles(); + updateMappings(remoteClient, leaderIndex, restoreSession.mappingVersion, client, indexShard.routingEntry().index()); } catch (Exception e) { throw new IndexShardRestoreFailedException(indexShard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e); } - - maybeUpdateMappings(client, remoteClient, leaderIndex, indexShard.indexSettings()); } @Override @@ -300,18 +301,20 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } - private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) { - ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName()); - ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest) - .actionGet(ccrSettings.getRecoveryActionTimeout()); - IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex); - long leaderMappingVersion = leaderIndexMetadata.getMappingVersion(); - - if (leaderMappingVersion > followerIndexSettings.getIndexMetaData().getMappingVersion()) { - Index followerIndex = followerIndexSettings.getIndex(); - MappingMetaData mappingMetaData = leaderIndexMetadata.mapping(); - PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData); - localClient.admin().indices().putMapping(putMappingRequest).actionGet(ccrSettings.getRecoveryActionTimeout()); + private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion, + Client followerClient, Index followerIndex) { + final PlainActionFuture indexMetadataFuture = new PlainActionFuture<>(); + final long startTimeInNanos = System.nanoTime(); + final Supplier timeout = () -> { + final long elapsedInNanos = System.nanoTime() - startTimeInNanos; + return TimeValue.timeValueNanos(ccrSettings.getRecoveryActionTimeout().nanos() - elapsedInNanos); + }; + CcrRequests.getIndexMetadata(leaderClient, leaderIndex, leaderMappingVersion, 0L, timeout, indexMetadataFuture); + final IndexMetaData leaderIndexMetadata = indexMetadataFuture.actionGet(ccrSettings.getRecoveryActionTimeout()); + final MappingMetaData mappingMetaData = leaderIndexMetadata.mapping(); + if (mappingMetaData != null) { + final PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData); + followerClient.admin().indices().putMapping(putMappingRequest).actionGet(ccrSettings.getRecoveryActionTimeout()); } } @@ -321,7 +324,7 @@ private RestoreSession openSession(String repositoryName, Client remoteClient, S PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(ccrSettings.getRecoveryActionTimeout()); return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState, - response.getStoreFileMetaData(), ccrSettings, throttledTime::inc); + response.getStoreFileMetaData(), response.getMappingVersion(), ccrSettings, throttledTime::inc); } private static class RestoreSession extends FileRestoreContext implements Closeable { @@ -332,17 +335,19 @@ private static class RestoreSession extends FileRestoreContext implements Closea private final String sessionUUID; private final DiscoveryNode node; private final Store.MetadataSnapshot sourceMetaData; + private final long mappingVersion; private final CcrSettings ccrSettings; private final LongConsumer throttleListener; RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard, - RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CcrSettings ccrSettings, - LongConsumer throttleListener) { + RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, long mappingVersion, + CcrSettings ccrSettings, LongConsumer throttleListener) { super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE); this.remoteClient = remoteClient; this.sessionUUID = sessionUUID; this.node = node; this.sourceMetaData = sourceMetaData; + this.mappingVersion = mappingVersion; this.ccrSettings = ccrSettings; this.throttleListener = throttleListener; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index f34f73ef70592..d4d6d13f7a292 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -390,7 +390,6 @@ public void testIndividualActionsTimeout() throws Exception { assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37887") public void testFollowerMappingIsUpdated() throws IOException { String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; String leaderIndex = "index1"; @@ -413,16 +412,8 @@ public void testFollowerMappingIsUpdated() throws IOException { .renameReplacement(followerIndex).masterNodeTimeout(new TimeValue(1L, TimeUnit.HOURS)) .indexSettings(settingsBuilder); - // TODO: Eventually when the file recovery work is complete, we should test updated mappings by - // indexing to the leader while the recovery is happening. However, into order to that test mappings - // are updated prior to that work, we index documents in the clear session callback. This will - // ensure a mapping change prior to the final mapping check on the follower side. - for (CcrRestoreSourceService restoreSourceService : getLeaderCluster().getDataNodeInstances(CcrRestoreSourceService.class)) { - restoreSourceService.addCloseSessionListener(s -> { - final String source = String.format(Locale.ROOT, "{\"k\":%d}", 1); - leaderClient().prepareIndex("index1", "doc", Long.toString(1)).setSource(source, XContentType.JSON).get(); - }); - } + final String source = String.format(Locale.ROOT, "{\"k\":%d}", 1); + leaderClient().prepareIndex("index1", "doc", Long.toString(1)).setSource(source, XContentType.JSON).get(); PlainActionFuture future = PlainActionFuture.newFuture(); restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); @@ -435,10 +426,6 @@ public void testFollowerMappingIsUpdated() throws IOException { clusterStateRequest.clear(); clusterStateRequest.metaData(true); clusterStateRequest.indices(followerIndex); - ClusterStateResponse clusterState = followerClient().admin().cluster().state(clusterStateRequest).actionGet(); - IndexMetaData followerIndexMetadata = clusterState.getState().metaData().index(followerIndex); - assertEquals(2, followerIndexMetadata.getMappingVersion()); - MappingMetaData mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings() .get("index2").get("doc"); assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long"));