From ce5b54c289c93874309c533d656386725f002c11 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 20 Dec 2018 13:53:04 -0700 Subject: [PATCH 1/2] Update index mappings when ccr restore complete (#36879) This is related to #35975. When the shard restore process is complete, the index mappings need to be updated to ensure that the data in the files restores is compatible with the follower mappings. This commit implements a mapping update as the final step in a shard restore. --- .../xpack/ccr/action/CcrRequests.java | 31 ++++++++++ .../ccr/action/ShardFollowTasksExecutor.java | 15 +---- .../xpack/ccr/repository/CcrRepository.java | 48 +++++++++------ .../xpack/ccr/CcrRepositoryIT.java | 59 +++++++++++++++++++ 4 files changed, 122 insertions(+), 31 deletions(-) create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java new file mode 100644 index 0000000000000..12432c740a701 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/CcrRequests.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.xcontent.XContentType; + +public final class CcrRequests { + + private CcrRequests() {} + + public static ClusterStateRequest metaDataRequest(String leaderIndex) { + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + clusterStateRequest.indices(leaderIndex); + return clusterStateRequest; + } + + public static PutMappingRequest putMappingRequest(String followerIndex, MappingMetaData mappingMetaData) { + PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex); + putMappingRequest.type(mappingMetaData.type()); + putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); + return putMappingRequest; + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index bd22b85684ca4..0fed083bba9ac 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -31,7 +31,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.CommitStats; @@ -123,10 +122,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro Index leaderIndex = params.getLeaderShardId().getIndex(); Index followIndex = params.getFollowShardId().getIndex(); - ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); - clusterStateRequest.clear(); - clusterStateRequest.metaData(true); - clusterStateRequest.indices(leaderIndex.getName()); + ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName()); remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> { IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); @@ -140,9 +136,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro indexMetaData.getMappings().size() + "]"; MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value; - PutMappingRequest putMappingRequest = new PutMappingRequest(followIndex.getName()); - putMappingRequest.type(mappingMetaData.type()); - putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); + PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followIndex.getName(), mappingMetaData); followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap( putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()), errorHandler)); @@ -154,10 +148,7 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum final Index leaderIndex = params.getLeaderShardId().getIndex(); final Index followIndex = params.getFollowShardId().getIndex(); - final ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); - clusterStateRequest.clear(); - clusterStateRequest.metaData(true); - clusterStateRequest.indices(leaderIndex.getName()); + ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName()); CheckedConsumer onResponse = clusterStateResponse -> { final IndexMetaData leaderIMD = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 17450bc05ad2b..e905cd08ce117 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -8,10 +8,13 @@ import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -21,6 +24,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardRecoveryException; @@ -37,6 +41,7 @@ import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; +import org.elasticsearch.xpack.ccr.action.CcrRequests; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest; import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction; @@ -111,15 +116,10 @@ public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) { public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) { assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId"; Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); - ClusterStateResponse response = remoteClient - .admin() - .cluster() - .prepareState() - .clear() - .setMetaData(true) - .setIndices("dummy_index_name") // We set a single dummy index name to avoid fetching all the index data - .get(); - return response.getState().metaData(); + // We set a single dummy index name to avoid fetching all the index data + ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest("dummy_index_name"); + ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); + return clusterState.getState().metaData(); } @Override @@ -128,18 +128,12 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind String leaderIndex = index.getName(); Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); - ClusterStateResponse response = remoteClient - .admin() - .cluster() - .prepareState() - .clear() - .setMetaData(true) - .setIndices(leaderIndex) - .get(); + ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex); + ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); // Validates whether the leader cluster has been configured properly: PlainActionFuture future = PlainActionFuture.newFuture(); - IndexMetaData leaderIndexMetaData = response.getState().metaData().index(leaderIndex); + IndexMetaData leaderIndexMetaData = clusterState.getState().metaData().index(leaderIndex); ccrLicenseChecker.fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, future::onFailure, future::onResponse); String[] leaderHistoryUUIDs = future.actionGet(); @@ -252,7 +246,8 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v Map ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); - ShardId leaderShardId = new ShardId(shardId.getIndexName(), leaderUUID, shardId.getId()); + Index leaderIndex = new Index(shardId.getIndexName(), leaderUUID); + ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId()); Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); String sessionUUID = UUIDs.randomBase64UUID(); @@ -261,6 +256,7 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v String nodeId = response.getNodeId(); // TODO: Implement file restore closeSession(remoteClient, nodeId, sessionUUID); + maybeUpdateMappings(client, remoteClient, leaderIndex, indexShard.indexSettings()); } @Override @@ -268,6 +264,20 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } + private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) { + ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName()); + ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet(); + IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex); + long leaderMappingVersion = leaderIndexMetadata.getMappingVersion(); + + if (leaderMappingVersion > followerIndexSettings.getIndexMetaData().getMappingVersion()) { + Index followerIndex = followerIndexSettings.getIndex(); + MappingMetaData mappingMetaData = leaderIndexMetadata.mapping(); + PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData); + localClient.admin().indices().putMapping(putMappingRequest).actionGet(); + } + } + private void closeSession(Client remoteClient, String nodeId, String sessionUUID) { ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(nodeId, new ClearCcrRestoreSessionRequest.Request(nodeId, sessionUUID)); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index f711dd4303f2a..2d3ca857ff848 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -8,6 +8,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; @@ -15,12 +16,14 @@ import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.RepositoriesService; @@ -35,6 +38,7 @@ import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; import java.io.IOException; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -42,6 +46,7 @@ import static java.util.Collections.singletonMap; import static org.elasticsearch.snapshots.RestoreService.restoreInProgress; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; // TODO: Fold this integration test into a more expansive integration test as more bootstrap from remote work // TODO: is completed. @@ -195,6 +200,60 @@ public void testThatSessionIsRegisteredWithPrimaryShard() throws IOException { assertEquals(0, restoreInfo.failedShards()); } + public void testFollowerMappingIsUpdated() throws IOException { + String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; + String leaderIndex = "index1"; + String followerIndex = "index2"; + + final int numberOfPrimaryShards = randomIntBetween(1, 3); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderGreen(leaderIndex); + + final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class); + final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); + + Settings.Builder settingsBuilder = Settings.builder() + .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex) + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); + RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(leaderClusterRepoName, + CcrRepository.LATEST, new String[]{leaderIndex}, indicesOptions, + "^(.*)$", followerIndex, Settings.EMPTY, new TimeValue(1, TimeUnit.HOURS), false, + false, true, settingsBuilder.build(), new String[0], + "restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]"); + + // TODO: Eventually when the file recovery work is complete, we should test updated mappings by + // indexing to the leader while the recovery is happening. However, into order to that test mappings + // are updated prior to that work, we index documents in the clear session callback. This will + // ensure a mapping change prior to the final mapping check on the follower side. + for (CcrRestoreSourceService restoreSourceService : getLeaderCluster().getDataNodeInstances(CcrRestoreSourceService.class)) { + restoreSourceService.addCloseSessionListener(s -> { + final String source = String.format(Locale.ROOT, "{\"k\":%d}", 1); + leaderClient().prepareIndex("index1", "doc", Long.toString(1)).setSource(source, XContentType.JSON).get(); + }); + } + + PlainActionFuture future = PlainActionFuture.newFuture(); + restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); + RestoreInfo restoreInfo = future.actionGet(); + + assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards()); + assertEquals(0, restoreInfo.failedShards()); + + ClusterStateRequest clusterStateRequest = new ClusterStateRequest(); + clusterStateRequest.clear(); + clusterStateRequest.metaData(true); + clusterStateRequest.indices(followerIndex); + ClusterStateResponse clusterState = followerClient().admin().cluster().state(clusterStateRequest).actionGet(); + IndexMetaData followerIndexMetadata = clusterState.getState().metaData().index(followerIndex); + assertEquals(2, followerIndexMetadata.getMappingVersion()); + + MappingMetaData mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings() + .get("index2").get("doc"); + assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long")); + } + private ActionListener waitForRestore(ClusterService clusterService, ActionListener listener) { return new ActionListener() { From 1b440d77659911a745b222a9a2e1f9f34af2607b Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 20 Dec 2018 17:05:31 -0700 Subject: [PATCH 2/2] Fix --- .../org/elasticsearch/xpack/ccr/repository/CcrRepository.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index e905cd08ce117..1dcf2d51aea73 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -272,7 +272,9 @@ private void maybeUpdateMappings(Client localClient, Client remoteClient, Index if (leaderMappingVersion > followerIndexSettings.getIndexMetaData().getMappingVersion()) { Index followerIndex = followerIndexSettings.getIndex(); - MappingMetaData mappingMetaData = leaderIndexMetadata.mapping(); + assert leaderIndexMetadata.getMappings().size() == 1 : "expected exactly one mapping, but got [" + + leaderIndexMetadata.getMappings().size() + "]"; + MappingMetaData mappingMetaData = leaderIndexMetadata.getMappings().iterator().next().value; PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData); localClient.admin().indices().putMapping(putMappingRequest).actionGet(); }