diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index ef5fb954170c9..ae5ea432855af 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -724,7 +724,7 @@ private void maybeTrimTranslog() { } } - private void syncGlobalCheckpoints() { + private void maybeSyncGlobalCheckpoints() { for (final IndexShard shard : this.shards.values()) { if (shard.routingEntry().active() && shard.routingEntry().primary()) { switch (shard.state()) { @@ -950,7 +950,7 @@ final class AsyncGlobalCheckpointTask extends BaseAsyncTask { @Override protected void runInternal() { - indexService.syncGlobalCheckpoints(); + indexService.maybeSyncGlobalCheckpoints(); } @Override diff --git a/core/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/core/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index 38c9bcb72459f..0445c8ce3439b 100644 --- a/core/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/core/src/test/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -92,10 +92,10 @@ public void testAckedIndexing() throws Exception { final List nodes = startCluster(rarely() ? 5 : 3); assertAcked(prepareCreate("test") - .setSettings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2)) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2)) - )); + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1 + randomInt(2)) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(2)) + )); ensureGreen(); ServiceDisruptionScheme disruptionScheme = addRandomDisruptionScheme(); @@ -142,8 +142,8 @@ public void testAckedIndexing() throws Exception { exceptedExceptions.add(e); final String docId = id; logger.trace( - (Supplier) - () -> new ParameterizedMessage("[{}] failed id [{}] through node [{}]", name, docId, node), e); + (Supplier) + () -> new ParameterizedMessage("[{}] failed id [{}] through node [{}]", name, docId, node), e); } finally { countDownLatchRef.get().countDown(); logger.trace("[{}] decreased counter : {}", name, countDownLatchRef.get().getCount()); @@ -190,12 +190,12 @@ public void testAckedIndexing() throws Exception { disruptionScheme.stopDisrupting(); for (String node : internalCluster().getNodeNames()) { ensureStableCluster(nodes.size(), TimeValue.timeValueMillis(disruptionScheme.expectedTimeToHeal().millis() + - DISRUPTION_HEALING_OVERHEAD.millis()), true, node); + DISRUPTION_HEALING_OVERHEAD.millis()), true, node); } // in case of a bridge partition, shard allocation can fail "index.allocation.max_retries" times if the master // is the super-connected node and recovery source and target are on opposite sides of the bridge if (disruptionScheme instanceof NetworkDisruption && - ((NetworkDisruption) disruptionScheme).getDisruptedLinks() instanceof Bridge) { + ((NetworkDisruption) disruptionScheme).getDisruptedLinks() instanceof Bridge) { assertAcked(client().admin().cluster().prepareReroute().setRetryFailed(true)); } ensureGreen("test"); @@ -207,7 +207,7 @@ public void testAckedIndexing() throws Exception { logger.debug("validating through node [{}] ([{}] acked docs)", node, ackedDocs.size()); for (String id : ackedDocs.keySet()) { assertTrue("doc [" + id + "] indexed via node [" + ackedDocs.get(id) + "] not found", - client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists()); + client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists()); } } catch (AssertionError | NoShardAvailableActionException e) { throw new AssertionError(e.getMessage() + " (checked via node [" + node + "]", e); @@ -215,6 +215,8 @@ public void testAckedIndexing() throws Exception { } }, 30, TimeUnit.SECONDS); + assertSeqNos(); + logger.info("done validating (iteration [{}])", iter); } } finally { diff --git a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 15617ab876db8..1c7032fa02e87 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -20,22 +20,16 @@ package org.elasticsearch.recovery; import com.carrotsearch.hppc.IntHashSet; -import com.carrotsearch.hppc.ObjectLongMap; import com.carrotsearch.hppc.procedures.IntProcedure; import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.util.English; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.indices.stats.IndexShardStats; -import org.elasticsearch.action.admin.indices.stats.IndexStats; -import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; -import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; @@ -47,13 +41,10 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.seqno.SeqNoStats; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest; import org.elasticsearch.plugins.Plugin; @@ -82,7 +73,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; @@ -111,42 +101,7 @@ protected Collection> nodePlugins() { @Override protected void beforeIndexDeletion() throws Exception { super.beforeIndexDeletion(); - assertBusy(() -> { - IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get(); - for (IndexStats indexStats : stats.getIndices().values()) { - for (IndexShardStats indexShardStats : indexStats.getIndexShards().values()) { - Optional maybePrimary = Stream.of(indexShardStats.getShards()) - .filter(s -> s.getShardRouting().active() && s.getShardRouting().primary()) - .findFirst(); - if (maybePrimary.isPresent() == false) { - continue; - } - ShardStats primary = maybePrimary.get(); - final SeqNoStats primarySeqNoStats = primary.getSeqNoStats(); - final ShardRouting primaryShardRouting = primary.getShardRouting(); - assertThat(primaryShardRouting + " should have set the global checkpoint", - primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO))); - final DiscoveryNode node = clusterService().state().nodes().get(primaryShardRouting.currentNodeId()); - final IndicesService indicesService = - internalCluster().getInstance(IndicesService.class, node.getName()); - final IndexShard indexShard = indicesService.getShardOrNull(primaryShardRouting.shardId()); - final ObjectLongMap globalCheckpoints = indexShard.getInSyncGlobalCheckpoints(); - for (ShardStats shardStats : indexShardStats) { - final SeqNoStats seqNoStats = shardStats.getSeqNoStats(); - assertThat(shardStats.getShardRouting() + " local checkpoint mismatch", - seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint())); - assertThat(shardStats.getShardRouting() + " global checkpoint mismatch", - seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint())); - assertThat(shardStats.getShardRouting() + " max seq no mismatch", - seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo())); - // the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard - assertThat( - seqNoStats.getGlobalCheckpoint(), - equalTo(globalCheckpoints.get(shardStats.getShardRouting().allocationId().getId()))); - } - } - } - }); + assertSeqNos(); } public void testSimpleRelocationNoIndexing() { diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 2753e4013c181..ffa3cd6bed082 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -19,6 +19,7 @@ package org.elasticsearch.test; +import com.carrotsearch.hppc.ObjectLongMap; import com.carrotsearch.randomizedtesting.RandomizedContext; import com.carrotsearch.randomizedtesting.annotations.TestGroup; import com.carrotsearch.randomizedtesting.generators.RandomNumbers; @@ -49,6 +50,10 @@ import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; import org.elasticsearch.action.admin.indices.segments.ShardSegments; +import org.elasticsearch.action.admin.indices.stats.IndexShardStats; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -69,6 +74,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -114,6 +120,9 @@ import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.IndicesRequestCache; @@ -161,6 +170,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; @@ -191,6 +201,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.startsWith; @@ -2194,4 +2205,44 @@ public static Index resolveIndex(String index) { String uuid = getIndexResponse.getSettings().get(index).get(IndexMetaData.SETTING_INDEX_UUID); return new Index(index, uuid); } + + protected void assertSeqNos() throws Exception { + assertBusy(() -> { + IndicesStatsResponse stats = client().admin().indices().prepareStats().clear().get(); + for (IndexStats indexStats : stats.getIndices().values()) { + for (IndexShardStats indexShardStats : indexStats.getIndexShards().values()) { + Optional maybePrimary = Stream.of(indexShardStats.getShards()) + .filter(s -> s.getShardRouting().active() && s.getShardRouting().primary()) + .findFirst(); + if (maybePrimary.isPresent() == false) { + continue; + } + ShardStats primary = maybePrimary.get(); + final SeqNoStats primarySeqNoStats = primary.getSeqNoStats(); + final ShardRouting primaryShardRouting = primary.getShardRouting(); + assertThat(primaryShardRouting + " should have set the global checkpoint", + primarySeqNoStats.getGlobalCheckpoint(), not(equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO))); + final DiscoveryNode node = clusterService().state().nodes().get(primaryShardRouting.currentNodeId()); + final IndicesService indicesService = + internalCluster().getInstance(IndicesService.class, node.getName()); + final IndexShard indexShard = indicesService.getShardOrNull(primaryShardRouting.shardId()); + final ObjectLongMap globalCheckpoints = indexShard.getInSyncGlobalCheckpoints(); + for (ShardStats shardStats : indexShardStats) { + final SeqNoStats seqNoStats = shardStats.getSeqNoStats(); + assertThat(shardStats.getShardRouting() + " local checkpoint mismatch", + seqNoStats.getLocalCheckpoint(), equalTo(primarySeqNoStats.getLocalCheckpoint())); + assertThat(shardStats.getShardRouting() + " global checkpoint mismatch", + seqNoStats.getGlobalCheckpoint(), equalTo(primarySeqNoStats.getGlobalCheckpoint())); + assertThat(shardStats.getShardRouting() + " max seq no mismatch", + seqNoStats.getMaxSeqNo(), equalTo(primarySeqNoStats.getMaxSeqNo())); + // the local knowledge on the primary of the global checkpoint equals the global checkpoint on the shard + assertThat( + seqNoStats.getGlobalCheckpoint(), + equalTo(globalCheckpoints.get(shardStats.getShardRouting().allocationId().getId()))); + } + } + } + }); + } + }