Skip to content

Commit bd51436

Browse files
committed
Copy retention leases when trim unsafe commits (#37995)
When a primary shard is recovered from its store, we trim the last commit (when it's unsafe). If that primary crashes before the recovery completes, we will lose the committed retention leases because they are baked in the last commit. With this change, we copy the retention leases from the last commit to the safe commit when trimming unsafe commits. Relates #37165
1 parent bc9ed93 commit bd51436

File tree

2 files changed

+91
-3
lines changed

2 files changed

+91
-3
lines changed

server/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1521,7 +1521,8 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long
15211521
if (existingCommits.isEmpty()) {
15221522
throw new IllegalArgumentException("No index found to trim");
15231523
}
1524-
final String translogUUID = existingCommits.get(existingCommits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);
1524+
final IndexCommit lastIndexCommitCommit = existingCommits.get(existingCommits.size() - 1);
1525+
final String translogUUID = lastIndexCommitCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY);
15251526
final IndexCommit startingIndexCommit;
15261527
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog
15271528
// are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
@@ -1546,7 +1547,14 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long
15461547
+ startingIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY) + "] is not equal to last commit's translog uuid ["
15471548
+ translogUUID + "]");
15481549
}
1549-
if (startingIndexCommit.equals(existingCommits.get(existingCommits.size() - 1)) == false) {
1550+
if (startingIndexCommit.equals(lastIndexCommitCommit) == false) {
1551+
/*
1552+
* Unlike other commit tags, the retention-leases tag is not restored when an engine is
1553+
* recovered from translog. We need to manually copy it from the last commit to the safe commit;
1554+
* otherwise we might lose the latest committed retention leases when re-opening an engine.
1555+
*/
1556+
final Map<String, String> userData = new HashMap<>(startingIndexCommit.getUserData());
1557+
userData.put(Engine.RETENTION_LEASES, lastIndexCommitCommit.getUserData().getOrDefault(Engine.RETENTION_LEASES, ""));
15501558
try (IndexWriter writer = newAppendingIndexWriter(directory, startingIndexCommit)) {
15511559
// this achieves two things:
15521560
// - by committing a new commit based on the starting commit, it make sure the starting commit will be opened
@@ -1557,7 +1565,7 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long
15571565

15581566
// The new commit will use segment files from the starting commit but userData from the last commit by default.
15591567
// Thus, we need to manually set the userData from the starting commit to the new commit.
1560-
writer.setLiveCommitData(startingIndexCommit.getUserData().entrySet());
1568+
writer.setLiveCommitData(userData.entrySet());
15611569
writer.commit();
15621570
}
15631571
}

server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,42 @@
2020
package org.elasticsearch.index.shard;
2121

2222
import org.apache.lucene.index.SegmentInfos;
23+
import org.elasticsearch.Version;
2324
import org.elasticsearch.action.ActionListener;
2425
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
26+
import org.elasticsearch.cluster.node.DiscoveryNode;
2527
import org.elasticsearch.cluster.routing.RecoverySource;
2628
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
29+
import org.elasticsearch.cluster.routing.ShardRoutingState;
2730
import org.elasticsearch.common.collect.Tuple;
2831
import org.elasticsearch.common.settings.Settings;
2932
import org.elasticsearch.common.unit.TimeValue;
3033
import org.elasticsearch.index.IndexSettings;
3134
import org.elasticsearch.index.engine.Engine;
35+
import org.elasticsearch.index.engine.InternalEngine;
3236
import org.elasticsearch.index.engine.InternalEngineFactory;
3337
import org.elasticsearch.index.seqno.RetentionLease;
3438
import org.elasticsearch.index.seqno.RetentionLeaseStats;
3539
import org.elasticsearch.index.seqno.RetentionLeases;
3640
import org.elasticsearch.index.seqno.SequenceNumbers;
41+
import org.elasticsearch.indices.recovery.RecoveryState;
3742
import org.elasticsearch.threadpool.ThreadPool;
3843

3944
import java.io.IOException;
45+
import java.util.ArrayList;
4046
import java.util.Collections;
4147
import java.util.HashMap;
48+
import java.util.List;
4249
import java.util.Map;
4350
import java.util.concurrent.ExecutorService;
4451
import java.util.concurrent.ScheduledExecutorService;
4552
import java.util.concurrent.TimeUnit;
53+
import java.util.concurrent.atomic.AtomicBoolean;
4654
import java.util.concurrent.atomic.AtomicLong;
4755

56+
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
4857
import static org.hamcrest.Matchers.contains;
58+
import static org.hamcrest.Matchers.containsInAnyOrder;
4959
import static org.hamcrest.Matchers.empty;
5060
import static org.hamcrest.Matchers.equalTo;
5161
import static org.hamcrest.Matchers.hasItem;
@@ -294,6 +304,76 @@ public void testRetentionLeaseStats() throws IOException {
294304
}
295305
}
296306

307+
public void testRecoverFromStoreReserveRetentionLeases() throws Exception {
308+
final AtomicBoolean throwDuringRecoverFromTranslog = new AtomicBoolean();
309+
final IndexShard shard = newStartedShard(false, Settings.builder().put("index.soft_deletes.enabled", true).build(),
310+
config -> new InternalEngine(config) {
311+
@Override
312+
public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner,
313+
long recoverUpToSeqNo) throws IOException {
314+
if (throwDuringRecoverFromTranslog.get()) {
315+
throw new RuntimeException("crashed before recover from translog is completed");
316+
}
317+
return super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
318+
}
319+
});
320+
final List<RetentionLease> leases = new ArrayList<>();
321+
long version = randomLongBetween(0, 100);
322+
long primaryTerm = randomLongBetween(1, 100);
323+
final int iterations = randomIntBetween(1, 10);
324+
for (int i = 0; i < iterations; i++) {
325+
if (randomBoolean()) {
326+
indexDoc(shard, "_doc", Integer.toString(i));
327+
} else {
328+
leases.add(new RetentionLease(Integer.toString(i), randomNonNegativeLong(),
329+
randomLongBetween(Integer.MAX_VALUE, Long.MAX_VALUE), "test"));
330+
}
331+
if (randomBoolean()) {
332+
if (randomBoolean()) {
333+
version += randomLongBetween(1, 100);
334+
primaryTerm += randomLongBetween(0, 100);
335+
shard.updateRetentionLeasesOnReplica(new RetentionLeases(primaryTerm, version, leases));
336+
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
337+
}
338+
}
339+
if (randomBoolean()) {
340+
shard.updateGlobalCheckpointOnReplica(randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint()), "test");
341+
flushShard(shard);
342+
}
343+
}
344+
version += randomLongBetween(1, 100);
345+
primaryTerm += randomLongBetween(0, 100);
346+
shard.updateRetentionLeasesOnReplica(new RetentionLeases(primaryTerm, version, leases));
347+
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
348+
closeShard(shard, false);
349+
350+
final IndexShard failedShard = reinitShard(shard, newShardRouting(shard.routingEntry().shardId(),
351+
shard.routingEntry().currentNodeId(), true, ShardRoutingState.INITIALIZING,
352+
RecoverySource.ExistingStoreRecoverySource.INSTANCE));
353+
final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
354+
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
355+
failedShard.markAsRecovering("store", new RecoveryState(failedShard.routingEntry(), localNode, null));
356+
throwDuringRecoverFromTranslog.set(true);
357+
expectThrows(IndexShardRecoveryException.class, failedShard::recoverFromStore);
358+
closeShards(failedShard);
359+
360+
final IndexShard newShard = reinitShard(shard, newShardRouting(shard.routingEntry().shardId(),
361+
shard.routingEntry().currentNodeId(), true, ShardRoutingState.INITIALIZING,
362+
RecoverySource.ExistingStoreRecoverySource.INSTANCE));
363+
newShard.markAsRecovering("store", new RecoveryState(failedShard.routingEntry(), localNode, null));
364+
throwDuringRecoverFromTranslog.set(false);
365+
assertTrue(newShard.recoverFromStore());
366+
final RetentionLeases retentionLeases = newShard.getRetentionLeases();
367+
assertThat(retentionLeases.version(), equalTo(version));
368+
assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm));
369+
if (leases.isEmpty()) {
370+
assertThat(retentionLeases.leases(), empty());
371+
} else {
372+
assertThat(retentionLeases.leases(), containsInAnyOrder(leases.toArray(new RetentionLease[0])));
373+
}
374+
closeShards(newShard);
375+
}
376+
297377
private void assertRetentionLeases(
298378
final IndexShard indexShard,
299379
final int size,

0 commit comments

Comments
 (0)