Skip to content

Commit 26e11d3

Browse files
committed
Sync retention leases on expiration (#37902)
This commit introduces a sync of retention leases when a retention lease expires. As expiration of retention leases is lazy, their expiration is managed only when getting the current retention leases from the replication tracker. At this point, we callback to our full retention lease sync to sync and flush these on all shard copies. With this change, replicas do not locally manage expiration of retention leases; instead, that is done only on the primary.
1 parent 5f491ff commit 26e11d3

File tree

4 files changed

+284
-50
lines changed

4 files changed

+284
-50
lines changed

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -147,10 +147,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
147147
private final LongSupplier currentTimeMillisSupplier;
148148

149149
/**
150-
* A callback when a new retention lease is created. In practice, this callback invokes the retention lease sync action, to sync
151-
* retention leases to replicas.
150+
* A callback when a new retention lease is created or an existing retention lease expires. In practice, this callback invokes the
151+
* retention lease sync action, to sync retention leases to replicas.
152152
*/
153-
private final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onNewRetentionLease;
153+
private final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onSyncRetentionLeases;
154154

155155
/**
156156
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
@@ -171,21 +171,45 @@ private Collection<RetentionLease> copyRetentionLeases() {
171171
}
172172

173173
/**
174-
* Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned.
174+
* Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. Note that only
175+
* the primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas.
175176
*
176177
* @return the retention leases
177178
*/
178-
public synchronized Collection<RetentionLease> getRetentionLeases() {
179-
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
180-
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
181-
final Collection<RetentionLease> nonExpiredRetentionLeases = retentionLeases
182-
.values()
183-
.stream()
184-
.filter(retentionLease -> currentTimeMillis - retentionLease.timestamp() <= retentionLeaseMillis)
185-
.collect(Collectors.toList());
186-
retentionLeases.clear();
187-
retentionLeases.putAll(nonExpiredRetentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, lease -> lease)));
188-
return Collections.unmodifiableCollection(nonExpiredRetentionLeases);
179+
public Collection<RetentionLease> getRetentionLeases() {
180+
final boolean wasPrimaryMode;
181+
final Collection<RetentionLease> nonExpiredRetentionLeases;
182+
synchronized (this) {
183+
if (primaryMode) {
184+
// the primary calculates the non-expired retention leases and syncs them to replicas
185+
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
186+
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
187+
final Collection<RetentionLease> expiredRetentionLeases = retentionLeases
188+
.values()
189+
.stream()
190+
.filter(retentionLease -> currentTimeMillis - retentionLease.timestamp() > retentionLeaseMillis)
191+
.collect(Collectors.toList());
192+
if (expiredRetentionLeases.isEmpty()) {
193+
// early out as no retention leases have expired
194+
return copyRetentionLeases();
195+
}
196+
// clean up the expired retention leases
197+
for (final RetentionLease expiredRetentionLease : expiredRetentionLeases) {
198+
retentionLeases.remove(expiredRetentionLease.id());
199+
}
200+
}
201+
/*
202+
* At this point, we were either in primary mode and have updated the non-expired retention leases into the tracking map, or
203+
* we were in replica mode and merely need to copy the existing retention leases since a replica does not calculate the
204+
* non-expired retention leases, instead receiving them on syncs from the primary.
205+
*/
206+
wasPrimaryMode = primaryMode;
207+
nonExpiredRetentionLeases = copyRetentionLeases();
208+
}
209+
if (wasPrimaryMode) {
210+
onSyncRetentionLeases.accept(nonExpiredRetentionLeases, ActionListener.wrap(() -> {}));
211+
}
212+
return nonExpiredRetentionLeases;
189213
}
190214

191215
/**
@@ -215,7 +239,7 @@ public RetentionLease addRetentionLease(
215239
retentionLeases.put(id, retentionLease);
216240
currentRetentionLeases = copyRetentionLeases();
217241
}
218-
onNewRetentionLease.accept(currentRetentionLeases, listener);
242+
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
219243
return retentionLease;
220244
}
221245

@@ -500,11 +524,11 @@ private static long inSyncCheckpointStates(
500524
* Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or
501525
* {@link SequenceNumbers#UNASSIGNED_SEQ_NO}.
502526
*
503-
* @param shardId the shard ID
504-
* @param allocationId the allocation ID
505-
* @param indexSettings the index settings
506-
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
507-
* @param onNewRetentionLease a callback when a new retention lease is created
527+
* @param shardId the shard ID
528+
* @param allocationId the allocation ID
529+
* @param indexSettings the index settings
530+
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
531+
* @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires
508532
*/
509533
public ReplicationTracker(
510534
final ShardId shardId,
@@ -513,7 +537,7 @@ public ReplicationTracker(
513537
final long globalCheckpoint,
514538
final LongConsumer onGlobalCheckpointUpdated,
515539
final LongSupplier currentTimeMillisSupplier,
516-
final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onNewRetentionLease) {
540+
final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onSyncRetentionLeases) {
517541
super(shardId, indexSettings);
518542
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
519543
this.shardAllocationId = allocationId;
@@ -524,7 +548,7 @@ public ReplicationTracker(
524548
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
525549
this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated);
526550
this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier);
527-
this.onNewRetentionLease = Objects.requireNonNull(onNewRetentionLease);
551+
this.onSyncRetentionLeases = Objects.requireNonNull(onSyncRetentionLeases);
528552
this.pendingInSync = new HashSet<>();
529553
this.routingTable = null;
530554
this.replicationGroup = null;

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java

Lines changed: 122 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.action.ActionListener;
2323
import org.elasticsearch.cluster.routing.AllocationId;
24+
import org.elasticsearch.common.collect.Tuple;
2425
import org.elasticsearch.common.settings.Settings;
2526
import org.elasticsearch.common.unit.TimeValue;
2627
import org.elasticsearch.index.IndexSettings;
@@ -30,6 +31,7 @@
3031
import java.util.Collection;
3132
import java.util.Collections;
3233
import java.util.HashMap;
34+
import java.util.List;
3335
import java.util.Map;
3436
import java.util.concurrent.atomic.AtomicBoolean;
3537
import java.util.concurrent.atomic.AtomicLong;
@@ -67,17 +69,17 @@ public void testAddOrRenewRetentionLease() {
6769
minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
6870
replicationTracker.addRetentionLease(
6971
Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {}));
70-
assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L);
72+
assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L, true);
7173
}
7274

7375
for (int i = 0; i < length; i++) {
7476
minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE);
7577
replicationTracker.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
76-
assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L);
78+
assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L, true);
7779
}
7880
}
7981

80-
public void testOnNewRetentionLease() {
82+
public void testAddRetentionLeaseCausesRetentionLeaseSync() {
8183
final AllocationId allocationId = AllocationId.newInitializing();
8284
final Map<String, Long> retentionLeases = new HashMap<>();
8385
final AtomicBoolean invoked = new AtomicBoolean();
@@ -113,14 +115,23 @@ public void testOnNewRetentionLease() {
113115
replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {}));
114116
// assert that the new retention lease callback was invoked
115117
assertTrue(invoked.get());
118+
116119
// reset the invocation marker so that we can assert the callback was not invoked when renewing the lease
117120
invoked.set(false);
118121
replicationTracker.renewRetentionLease(id, retainingSequenceNumber, "test");
119122
assertFalse(invoked.get());
120123
}
121124
}
122125

123-
public void testExpiration() {
126+
public void testExpirationOnPrimary() {
127+
runExpirationTest(true);
128+
}
129+
130+
public void testExpirationOnReplica() {
131+
runExpirationTest(false);
132+
}
133+
134+
private void runExpirationTest(final boolean primaryMode) {
124135
final AllocationId allocationId = AllocationId.newInitializing();
125136
final AtomicLong currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024));
126137
final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis());
@@ -141,42 +152,136 @@ public void testExpiration() {
141152
Collections.singleton(allocationId.getId()),
142153
routingTable(Collections.emptySet(), allocationId),
143154
Collections.emptySet());
144-
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
155+
if (primaryMode) {
156+
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
157+
}
145158
final long[] retainingSequenceNumbers = new long[1];
146159
retainingSequenceNumbers[0] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
147-
replicationTracker.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {}));
160+
if (primaryMode) {
161+
replicationTracker.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {}));
162+
} else {
163+
replicationTracker.updateRetentionLeasesOnReplica(
164+
Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0")));
165+
}
148166

149167
{
150168
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
151169
assertThat(retentionLeases, hasSize(1));
152170
final RetentionLease retentionLease = retentionLeases.iterator().next();
153171
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
154-
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get);
172+
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryMode);
155173
}
156174

157175
// renew the lease
158176
currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, 1024));
159177
retainingSequenceNumbers[0] = randomLongBetween(retainingSequenceNumbers[0], Long.MAX_VALUE);
160-
replicationTracker.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0");
178+
if (primaryMode) {
179+
replicationTracker.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0");
180+
} else {
181+
replicationTracker.updateRetentionLeasesOnReplica(
182+
Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0")));
183+
}
161184

162185
{
163186
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
164187
assertThat(retentionLeases, hasSize(1));
165188
final RetentionLease retentionLease = retentionLeases.iterator().next();
166189
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
167-
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get);
190+
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryMode);
168191
}
169192

170193
// now force the lease to expire
171194
currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get()));
172-
assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get);
195+
if (primaryMode) {
196+
assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get, true);
197+
} else {
198+
// leases do not expire on replicas until synced from the primary
199+
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, false);
200+
}
201+
}
202+
203+
public void testRetentionLeaseExpirationCausesRetentionLeaseSync() {
204+
final AllocationId allocationId = AllocationId.newInitializing();
205+
final AtomicLong currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024));
206+
final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis());
207+
final Settings settings = Settings
208+
.builder()
209+
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis))
210+
.build();
211+
final Map<String, Tuple<Long, Long>> retentionLeases = new HashMap<>();
212+
final AtomicBoolean invoked = new AtomicBoolean();
213+
final AtomicReference<ReplicationTracker> reference = new AtomicReference<>();
214+
final ReplicationTracker replicationTracker = new ReplicationTracker(
215+
new ShardId("test", "_na", 0),
216+
allocationId.getId(),
217+
IndexSettingsModule.newIndexSettings("test", settings),
218+
UNASSIGNED_SEQ_NO,
219+
value -> {},
220+
currentTimeMillis::get,
221+
(leases, listener) -> {
222+
// we do not want to hold a lock on the replication tracker in the callback!
223+
assertFalse(Thread.holdsLock(reference.get()));
224+
invoked.set(true);
225+
assertThat(
226+
leases.stream().collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)),
227+
equalTo(retentionLeases));
228+
});
229+
reference.set(replicationTracker);
230+
replicationTracker.updateFromMaster(
231+
randomNonNegativeLong(),
232+
Collections.singleton(allocationId.getId()),
233+
routingTable(Collections.emptySet(), allocationId),
234+
Collections.emptySet());
235+
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
236+
237+
final int length = randomIntBetween(0, 8);
238+
for (int i = 0; i < length; i++) {
239+
final String id = randomAlphaOfLength(8);
240+
final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
241+
retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get()));
242+
replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {}));
243+
// assert that the new retention lease callback was invoked
244+
assertTrue(invoked.get());
245+
246+
// reset the invocation marker so that we can assert the callback was not invoked when renewing the lease
247+
invoked.set(false);
248+
currentTimeMillis.set(1 + currentTimeMillis.get());
249+
retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get()));
250+
replicationTracker.renewRetentionLease(id, retainingSequenceNumber, "test");
251+
252+
// reset the invocation marker so that we can assert the callback was invoked if any leases are expired
253+
assertFalse(invoked.get());
254+
// randomly expire some leases
255+
final long currentTimeMillisIncrement = randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get());
256+
// calculate the expired leases and update our tracking map
257+
final List<String> expiredIds = retentionLeases.entrySet()
258+
.stream()
259+
.filter(r -> currentTimeMillis.get() + currentTimeMillisIncrement > r.getValue().v2() + retentionLeaseMillis)
260+
.map(Map.Entry::getKey)
261+
.collect(Collectors.toList());
262+
expiredIds.forEach(retentionLeases::remove);
263+
currentTimeMillis.set(currentTimeMillis.get() + currentTimeMillisIncrement);
264+
// getting the leases has the side effect of calculating which leases are expired and invoking the sync callback
265+
final Collection<RetentionLease> current = replicationTracker.getRetentionLeases();
266+
// the current leases should equal our tracking map
267+
assertThat(
268+
current.stream().collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)),
269+
equalTo(retentionLeases));
270+
// the callback should only be invoked if there were expired leases
271+
assertThat(invoked.get(), equalTo(expiredIds.isEmpty() == false));
272+
}
273+
}
274+
275+
private static Tuple<Long, Long> toTuple(final RetentionLease retentionLease) {
276+
return Tuple.tuple(retentionLease.retainingSequenceNumber(), retentionLease.timestamp());
173277
}
174278

175279
private void assertRetentionLeases(
176280
final ReplicationTracker replicationTracker,
177281
final int size,
178282
final long[] minimumRetainingSequenceNumbers,
179-
final LongSupplier currentTimeMillisSupplier) {
283+
final LongSupplier currentTimeMillisSupplier,
284+
final boolean primaryMode) {
180285
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
181286
final Map<String, RetentionLease> idToRetentionLease = new HashMap<>();
182287
for (final RetentionLease retentionLease : retentionLeases) {
@@ -188,9 +293,12 @@ private void assertRetentionLeases(
188293
assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i)));
189294
final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i));
190295
assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i]));
191-
assertThat(
192-
currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(),
193-
lessThanOrEqualTo(replicationTracker.indexSettings().getRetentionLeaseMillis()));
296+
if (primaryMode) {
297+
// retention leases can be expired on replicas, so we can only assert on primaries here
298+
assertThat(
299+
currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(),
300+
lessThanOrEqualTo(replicationTracker.indexSettings().getRetentionLeaseMillis()));
301+
}
194302
assertThat(retentionLease.source(), equalTo("test-" + i));
195303
}
196304
}

0 commit comments

Comments
 (0)