diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncIT.java deleted file mode 100644 index 7b89ef4703128..0000000000000 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncIT.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.seqno; - -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.IndexService; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ESIntegTestCase; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.hamcrest.Matchers.equalTo; - -public class RetentionLeaseBackgroundSyncIT extends ESIntegTestCase { - - public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin { - - @Override - public List> getSettings() { - return Collections.singletonList(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING); - } - - } - - @Override - protected Collection> nodePlugins() { - return Stream.concat( - super.nodePlugins().stream(), - Stream.of(RetentionLeaseSyncIntervalSettingPlugin.class)) - .collect(Collectors.toList()); - } - - public void testBackgroundRetentionLeaseSync() throws Exception { - final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); - internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); - final Settings settings = Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", numberOfReplicas) - .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s") - .build(); - createIndex("index", settings); - ensureGreen("index"); - final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId(); - final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName(); - final IndexShard primary = internalCluster() - .getInstance(IndicesService.class, primaryShardNodeName) - .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - // we will add multiple retention leases and expect to see them synced to all replicas - final int length = randomIntBetween(1, 8); - final Map currentRetentionLeases = new HashMap<>(length); - final List ids = new ArrayList<>(length); - for (int i = 0; i < length; i++) { - final String id = randomValueOtherThanMany(currentRetentionLeases.keySet()::contains, () -> randomAlphaOfLength(8)); - ids.add(id); - final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE); - final String source = randomAlphaOfLength(8); - final CountDownLatch latch = new CountDownLatch(1); - // put a new lease - currentRetentionLeases.put( - id, - primary.addRetentionLease(id, retainingSequenceNumber, source, ActionListener.wrap(latch::countDown))); - latch.await(); - // now renew all existing leases; we expect to see these synced to the replicas - for (int j = 0; j <= i; j++) { - currentRetentionLeases.put( - ids.get(j), - primary.renewRetentionLease( - ids.get(j), - randomLongBetween(currentRetentionLeases.get(ids.get(j)).retainingSequenceNumber(), Long.MAX_VALUE), - source)); - } - assertBusy(() -> { - // check all retention leases have been synced to all replicas - for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { - final String replicaShardNodeId = replicaShard.currentNodeId(); - final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName(); - final IndexShard replica = internalCluster() - .getInstance(IndicesService.class, replicaShardNodeName) - .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - assertThat(replica.getRetentionLeases(), equalTo(primary.getRetentionLeases())); - } - }); - } - } - -} diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java similarity index 79% rename from server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java rename to server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index 0e2ac6273ea22..daf17b91961d5 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -37,6 +37,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -47,13 +48,14 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) -public class RetentionLeaseSyncIT extends ESIntegTestCase { +public class RetentionLeaseIT extends ESIntegTestCase { public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin { @@ -68,7 +70,7 @@ public List> getSettings() { protected Collection> nodePlugins() { return Stream.concat( super.nodePlugins().stream(), - Stream.of(RetentionLeaseBackgroundSyncIT.RetentionLeaseSyncIntervalSettingPlugin.class)) + Stream.of(RetentionLeaseSyncIntervalSettingPlugin.class)) .collect(Collectors.toList()); } @@ -207,9 +209,63 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38487") + public void testBackgroundRetentionLeaseSync() throws Exception { + final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); + internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", numberOfReplicas) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s") + .build(); + createIndex("index", settings); + ensureGreen("index"); + final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId(); + final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName(); + final IndexShard primary = internalCluster() + .getInstance(IndicesService.class, primaryShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + // we will add multiple retention leases and expect to see them synced to all replicas + final int length = randomIntBetween(1, 8); + final Map currentRetentionLeases = new HashMap<>(length); + final List ids = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + final String id = randomValueOtherThanMany(currentRetentionLeases.keySet()::contains, () -> randomAlphaOfLength(8)); + ids.add(id); + final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE); + final String source = randomAlphaOfLength(8); + final CountDownLatch latch = new CountDownLatch(1); + // put a new lease + currentRetentionLeases.put( + id, + primary.addRetentionLease(id, retainingSequenceNumber, source, ActionListener.wrap(latch::countDown))); + latch.await(); + // now renew all existing leases; we expect to see these synced to the replicas + for (int j = 0; j <= i; j++) { + currentRetentionLeases.put( + ids.get(j), + primary.renewRetentionLease( + ids.get(j), + randomLongBetween(currentRetentionLeases.get(ids.get(j)).retainingSequenceNumber(), Long.MAX_VALUE), + source)); + } + assertBusy(() -> { + // check all retention leases have been synced to all replicas + for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { + final String replicaShardNodeId = replicaShard.currentNodeId(); + final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName(); + final IndexShard replica = internalCluster() + .getInstance(IndicesService.class, replicaShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + assertThat(replica.getRetentionLeases(), equalTo(primary.getRetentionLeases())); + } + }); + } + } + public void testRetentionLeasesSyncOnRecovery() throws Exception { - final int numberOfReplicas = 1; + final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); + internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); /* * We effectively disable the background sync to ensure that the retention leases are not synced in the background so that the only * source of retention leases on the replicas would be from the commit point and recovery. @@ -220,10 +276,9 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception { .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueHours(24)) .build(); - createIndex("index", settings); + // when we increase the number of replicas below we want to exclude the replicas from being allocated so that they do not recover + assertAcked(prepareCreate("index", 1).setSettings(settings)); ensureYellow("index"); - // exclude the replicas from being allocated - allowNodes("index", 1); final AcknowledgedResponse response = client().admin() .indices() .prepareUpdateSettings("index").setSettings(Settings.builder().put("index.number_of_replicas", numberOfReplicas).build()) @@ -264,11 +319,6 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception { .getShardOrNull(new ShardId(resolveIndex("index"), 0)); final Map retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases()); assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); - - // check retention leases have been committed on the replica - final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases( - replica.acquireLastIndexCommit(false).getIndexCommit().getUserData().get(Engine.RETENTION_LEASES)); - assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replicaCommittedRetentionLeases))); } }