From 28eaa98dea788d146a46215855fa010c839ff31e Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 19 Jun 2017 23:31:17 -0400 Subject: [PATCH 1/4] Initialize primary term for shrunk indices Today when an index is shrunk, the primary terms for its shards start from one. Yet, this is a problem as the index will already contain assigned sequence numbers across primary terms. To ensure document-level sequence number semantics, the primary terms of the target shards must start from the maximum of all the shards in the source index. This commit causes this to be the case. --- .../metadata/MetaDataCreateIndexService.java | 44 +++++++++++--- .../admin/indices/create/ShrinkIndexIT.java | 60 +++++++++++++++++-- 2 files changed, 90 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 43c13087dd01b..59bbd8c15cb80 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -91,6 +91,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Predicate; +import java.util.stream.IntStream; import static org.elasticsearch.action.support.ContextPreservingActionListener.wrapPreservingContext; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS; @@ -340,19 +341,39 @@ public ClusterState execute(ClusterState currentState) throws Exception { indexSettingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getProvidedName()); indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID()); final Index shrinkFromIndex = request.shrinkFrom(); - int routingNumShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexSettingsBuilder.build());; - if (shrinkFromIndex != null) { - prepareShrinkIndexSettings(currentState, mappings.keySet(), indexSettingsBuilder, shrinkFromIndex, - request.index()); - IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(shrinkFromIndex); + final IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index()); + + final int routingNumShards; + if (shrinkFromIndex == null) { + routingNumShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexSettingsBuilder.build()); + } else { + final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(shrinkFromIndex); routingNumShards = sourceMetaData.getRoutingNumShards(); } + tmpImdBuilder.setRoutingNumShards(routingNumShards); + + if (shrinkFromIndex != null) { + prepareShrinkIndexSettings( + currentState, mappings.keySet(), indexSettingsBuilder, shrinkFromIndex, request.index()); + } + final Settings actualIndexSettings = indexSettingsBuilder.build(); + tmpImdBuilder.settings(actualIndexSettings); + + if (shrinkFromIndex != null) { + final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(shrinkFromIndex); + final long primaryTerm = + IntStream + .range(0, sourceMetaData.getNumberOfShards()) + .mapToLong(sourceMetaData::primaryTerm) + .max() + .getAsLong(); + for (int shardId = 0; shardId < tmpImdBuilder.numberOfShards(); shardId++) { + tmpImdBuilder.primaryTerm(shardId, primaryTerm); + } + } - Settings actualIndexSettings = indexSettingsBuilder.build(); - IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index()) - .setRoutingNumShards(routingNumShards); // Set up everything, now locally create the index to see that things are ok, and apply - final IndexMetaData tmpImd = tmpImdBuilder.settings(actualIndexSettings).build(); + final IndexMetaData tmpImd = tmpImdBuilder.build(); ActiveShardCount waitForActiveShards = request.waitForActiveShards(); if (waitForActiveShards == ActiveShardCount.DEFAULT) { waitForActiveShards = tmpImd.getWaitForActiveShards(); @@ -408,6 +429,11 @@ public ClusterState execute(ClusterState currentState) throws Exception { final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index()) .settings(actualIndexSettings) .setRoutingNumShards(routingNumShards); + + for (int shardId = 0; shardId < tmpImd.getNumberOfShards(); shardId++) { + indexMetaDataBuilder.primaryTerm(shardId, tmpImd.primaryTerm(shardId)); + } + for (MappingMetaData mappingMd : mappingsMetaData.values()) { indexMetaDataBuilder.putMapping(mappingMd); } diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java index 36c7da7894b50..df98079213f3e 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java @@ -25,15 +25,14 @@ import org.apache.lucene.search.SortedSetSortField; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.action.admin.indices.segments.IndexSegments; -import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; -import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; -import org.elasticsearch.action.admin.indices.segments.ShardSegments; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.InternalClusterInfoService; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.UnassignedInfo; @@ -41,7 +40,6 @@ import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; @@ -50,10 +48,13 @@ import java.util.Arrays; import java.util.Collection; +import java.util.List; +import java.util.stream.IntStream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; public class ShrinkIndexIT extends ESIntegTestCase { @@ -135,6 +136,55 @@ public void testCreateShrinkIndexToN() { assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); } + public void testShrinkIndexPrimaryTerm() throws Exception { + final List factors = Arrays.asList(2, 2, 2, 2, 2, 2, 2, 2, 2, 2); + final List numberOfShardsFactors = randomSubsetOf(randomIntBetween(1, factors.size()), factors); + final int numberOfShards = numberOfShardsFactors.stream().reduce(1, (x, y) -> x * y); + final int numberOfTargetShards = randomSubsetOf(numberOfShardsFactors).stream().reduce(1, (x, y) -> x * y); + internalCluster().ensureAtLeastNumDataNodes(2); + prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", numberOfShards)).get(); + + ImmutableOpenMap dataNodes = client().admin().cluster().prepareState().get().getState().nodes() + .getDataNodes(); + assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2); + DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class); + String mergeNode = discoveryNodes[0].getName(); + ensureGreen(); + + // restart a random data node several times to force the primary term for some shards to increase + for (int i = 0; i < randomIntBetween(0, 16); i++) { + internalCluster().restartRandomDataNode(); + ensureGreen(); + } + // relocate all shards to one node such that we can merge it. + client().admin().indices().prepareUpdateSettings("source") + .setSettings(Settings.builder() + .put("index.routing.allocation.require._name", mergeNode) + .put("index.blocks.write", true)).get(); + ensureGreen(); + + final IndexMetaData indexMetaData = indexMetaData(client(), "source"); + final long beforeShrinkPrimaryTerm = IntStream.range(0, numberOfShards).mapToLong(indexMetaData::primaryTerm).max().getAsLong(); + + // now merge source into target + assertAcked(client().admin().indices().prepareShrinkIndex("source", "target") + .setSettings(Settings.builder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", numberOfTargetShards).build()).get()); + + ensureGreen(); + + final IndexMetaData afterShrinkIndexMetaData = indexMetaData(client(), "target"); + for (int shardId = 0; shardId < numberOfTargetShards; shardId++) { + assertThat(afterShrinkIndexMetaData.primaryTerm(shardId), equalTo(beforeShrinkPrimaryTerm + 1)); + } + } + + private static IndexMetaData indexMetaData(final Client client, final String index) { + final ClusterStateResponse clusterStateResponse = client.admin().cluster().state(new ClusterStateRequest()).actionGet(); + return clusterStateResponse.getState().metaData().index(index); + } + public void testCreateShrinkIndex() { internalCluster().ensureAtLeastNumDataNodes(2); Version version = VersionUtils.randomVersion(random()); From 993943d6cd5e8245a7c33344b2265370250fead7 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 19 Jun 2017 23:47:06 -0400 Subject: [PATCH 2/4] Formatting --- .../admin/indices/create/ShrinkIndexIT.java | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java index df98079213f3e..b8eff61de6f8b 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java @@ -55,6 +55,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class ShrinkIndexIT extends ESIntegTestCase { @@ -137,40 +138,38 @@ public void testCreateShrinkIndexToN() { } public void testShrinkIndexPrimaryTerm() throws Exception { - final List factors = Arrays.asList(2, 2, 2, 2, 2, 2, 2, 2, 2, 2); + final List factors = Arrays.asList(2, 3, 5, 7); final List numberOfShardsFactors = randomSubsetOf(randomIntBetween(1, factors.size()), factors); final int numberOfShards = numberOfShardsFactors.stream().reduce(1, (x, y) -> x * y); final int numberOfTargetShards = randomSubsetOf(numberOfShardsFactors).stream().reduce(1, (x, y) -> x * y); internalCluster().ensureAtLeastNumDataNodes(2); prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", numberOfShards)).get(); - ImmutableOpenMap dataNodes = client().admin().cluster().prepareState().get().getState().nodes() - .getDataNodes(); - assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2); - DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class); - String mergeNode = discoveryNodes[0].getName(); + final ImmutableOpenMap dataNodes = + client().admin().cluster().prepareState().get().getState().nodes().getDataNodes(); + assertThat(dataNodes.size(), greaterThanOrEqualTo(2)); + final DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class); + final String mergeNode = discoveryNodes[0].getName(); ensureGreen(); - // restart a random data node several times to force the primary term for some shards to increase + // restart random data nodes to force the primary term for some shards to increase for (int i = 0; i < randomIntBetween(0, 16); i++) { internalCluster().restartRandomDataNode(); ensureGreen(); } // relocate all shards to one node such that we can merge it. - client().admin().indices().prepareUpdateSettings("source") - .setSettings(Settings.builder() - .put("index.routing.allocation.require._name", mergeNode) - .put("index.blocks.write", true)).get(); + final Settings.Builder prepareShrinkSettings = + Settings.builder().put("index.routing.allocation.require._name", mergeNode).put("index.blocks.write", true); + client().admin().indices().prepareUpdateSettings("source").setSettings(prepareShrinkSettings).get(); ensureGreen(); final IndexMetaData indexMetaData = indexMetaData(client(), "source"); final long beforeShrinkPrimaryTerm = IntStream.range(0, numberOfShards).mapToLong(indexMetaData::primaryTerm).max().getAsLong(); // now merge source into target - assertAcked(client().admin().indices().prepareShrinkIndex("source", "target") - .setSettings(Settings.builder() - .put("index.number_of_replicas", 0) - .put("index.number_of_shards", numberOfTargetShards).build()).get()); + final Settings shrinkSettings = + Settings.builder().put("index.number_of_replicas", 0).put("index.number_of_shards", numberOfTargetShards).build(); + assertAcked(client().admin().indices().prepareShrinkIndex("source", "target").setSettings(shrinkSettings).get()); ensureGreen(); From f4b69c2f854b864320b553de27f5df5fe1a8755e Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 20 Jun 2017 10:52:40 -0400 Subject: [PATCH 3/4] Fail instead of restart --- .../admin/indices/create/ShrinkIndexIT.java | 52 +++++++++++++++++-- 1 file changed, 47 insertions(+), 5 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java index b8eff61de6f8b..ec20817aa2ab9 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.create; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedSetSelector; @@ -28,19 +29,28 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.Murmur3HashFunction; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.query.TermsQueryBuilder; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalSettingsPlugin; @@ -48,11 +58,15 @@ import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -139,7 +153,7 @@ public void testCreateShrinkIndexToN() { public void testShrinkIndexPrimaryTerm() throws Exception { final List factors = Arrays.asList(2, 3, 5, 7); - final List numberOfShardsFactors = randomSubsetOf(randomIntBetween(1, factors.size()), factors); + final List numberOfShardsFactors = randomSubsetOf(scaledRandomIntBetween(1, factors.size()), factors); final int numberOfShards = numberOfShardsFactors.stream().reduce(1, (x, y) -> x * y); final int numberOfTargetShards = randomSubsetOf(numberOfShardsFactors).stream().reduce(1, (x, y) -> x * y); internalCluster().ensureAtLeastNumDataNodes(2); @@ -152,11 +166,39 @@ public void testShrinkIndexPrimaryTerm() throws Exception { final String mergeNode = discoveryNodes[0].getName(); ensureGreen(); - // restart random data nodes to force the primary term for some shards to increase - for (int i = 0; i < randomIntBetween(0, 16); i++) { - internalCluster().restartRandomDataNode(); - ensureGreen(); + // fail random primary shards to force primary terms to increase + final Index source = resolveIndex("source"); + final int iterations = scaledRandomIntBetween(0, 16); + for (int i = 0; i < iterations; i++) { + final String node = randomSubsetOf(1, internalCluster().nodesInclude("source")).get(0); + final IndicesService indexServices = internalCluster().getInstance(IndicesService.class, node); + final IndexService indexShards = indexServices.indexServiceSafe(source); + for (final Integer shardId : indexShards.shardIds()) { + final IndexShard shard = indexShards.getShard(shardId); + if (shard.routingEntry().primary() && randomBoolean()) { + disableAllocation("source"); + shard.failShard("test", new Exception("test")); + // this can not succeed until the shard is failed and a replica is promoted + int id = 0; + while (true) { + // find an ID that routes to the right shard, we will only index to the shard that saw a primary failure + final String s = Integer.toString(id); + final int hash = Math.floorMod(Murmur3HashFunction.hash(s), numberOfShards); + if (hash == shardId) { + final IndexRequest request = + new IndexRequest("source", "type", s).source("{ \"f\": \"" + s + "\"}", XContentType.JSON); + client().index(request).get(); + break; + } else { + id++; + } + } + enableAllocation("source"); + ensureGreen(); + } + } } + // relocate all shards to one node such that we can merge it. final Settings.Builder prepareShrinkSettings = Settings.builder().put("index.routing.allocation.require._name", mergeNode).put("index.blocks.write", true); From 73b34429519230a97d098b7bef2e38dd5302f50d Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 20 Jun 2017 11:32:07 -0400 Subject: [PATCH 4/4] Comment --- .../cluster/metadata/MetaDataCreateIndexService.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 59bbd8c15cb80..0a2830e55fce8 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -360,6 +360,11 @@ public ClusterState execute(ClusterState currentState) throws Exception { tmpImdBuilder.settings(actualIndexSettings); if (shrinkFromIndex != null) { + /* + * We need to arrange that the primary term on all the shards in the shrunken index is at least as large as + * the maximum primary term on all the shards in the source index. This ensures that we have correct + * document-level semantics regarding sequence numbers in the shrunken index. + */ final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(shrinkFromIndex); final long primaryTerm = IntStream