Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collapse retention lease integration tests (#38483) #38510

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -47,13 +48,14 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
public class RetentionLeaseSyncIT extends ESIntegTestCase {
public class RetentionLeaseIT extends ESIntegTestCase {

public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin {

Expand All @@ -68,7 +70,7 @@ public List<Setting<?>> getSettings() {
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(
super.nodePlugins().stream(),
Stream.of(RetentionLeaseBackgroundSyncIT.RetentionLeaseSyncIntervalSettingPlugin.class))
Stream.of(RetentionLeaseSyncIntervalSettingPlugin.class))
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -207,9 +209,63 @@ public void testRetentionLeasesSyncOnExpiration() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38487")
public void testBackgroundRetentionLeaseSync() throws Exception {
final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2);
internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", numberOfReplicas)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "1s")
.build();
createIndex("index", settings);
ensureGreen("index");
final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId();
final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName();
final IndexShard primary = internalCluster()
.getInstance(IndicesService.class, primaryShardNodeName)
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
// we will add multiple retention leases and expect to see them synced to all replicas
final int length = randomIntBetween(1, 8);
final Map<String, RetentionLease> currentRetentionLeases = new HashMap<>(length);
final List<String> ids = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
final String id = randomValueOtherThanMany(currentRetentionLeases.keySet()::contains, () -> randomAlphaOfLength(8));
ids.add(id);
final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
final String source = randomAlphaOfLength(8);
final CountDownLatch latch = new CountDownLatch(1);
// put a new lease
currentRetentionLeases.put(
id,
primary.addRetentionLease(id, retainingSequenceNumber, source, ActionListener.wrap(latch::countDown)));
latch.await();
// now renew all existing leases; we expect to see these synced to the replicas
for (int j = 0; j <= i; j++) {
currentRetentionLeases.put(
ids.get(j),
primary.renewRetentionLease(
ids.get(j),
randomLongBetween(currentRetentionLeases.get(ids.get(j)).retainingSequenceNumber(), Long.MAX_VALUE),
source));
}
assertBusy(() -> {
// check all retention leases have been synced to all replicas
for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) {
final String replicaShardNodeId = replicaShard.currentNodeId();
final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName();
final IndexShard replica = internalCluster()
.getInstance(IndicesService.class, replicaShardNodeName)
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
assertThat(replica.getRetentionLeases(), equalTo(primary.getRetentionLeases()));
}
});
}
}

public void testRetentionLeasesSyncOnRecovery() throws Exception {
final int numberOfReplicas = 1;
final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2);
internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);
/*
* We effectively disable the background sync to ensure that the retention leases are not synced in the background so that the only
* source of retention leases on the replicas would be from the commit point and recovery.
Expand All @@ -220,10 +276,9 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception {
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueHours(24))
.build();
createIndex("index", settings);
// when we increase the number of replicas below we want to exclude the replicas from being allocated so that they do not recover
assertAcked(prepareCreate("index", 1).setSettings(settings));
ensureYellow("index");
// exclude the replicas from being allocated
allowNodes("index", 1);
final AcknowledgedResponse response = client().admin()
.indices()
.prepareUpdateSettings("index").setSettings(Settings.builder().put("index.number_of_replicas", numberOfReplicas).build())
Expand Down Expand Up @@ -264,11 +319,6 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception {
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
final Map<String, RetentionLease> retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases());
assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases));

// check retention leases have been committed on the replica
final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
replica.acquireLastIndexCommit(false).getIndexCommit().getUserData().get(Engine.RETENTION_LEASES));
assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replicaCommittedRetentionLeases)));
}
}

Expand Down