Skip to content

Commit

Permalink
Introduce retention leases version (#38193)
Browse files Browse the repository at this point in the history
Because concurrent sync requests from a primary to its replicas could be
in flight, it can be the case that an older retention leases collection
arrives and is processed on the replica after a newer retention leases
collection has arrived and been processed. Without a defense, in this
case the replica would overwrite the newer retention leases with the
older retention leases. This commit addresses this issue by introducing
a versioning scheme to retention leases. This versioning scheme is used
to resolve out-of-order processing on the replica. We persist this
version into Lucene and restore it on recovery. The encoding of
retention leases is starting to get a little ugly. We can consider
addressing this in a follow-up.
  • Loading branch information
jasontedor authored Feb 1, 2019
1 parent 16f715f commit 13cc7f1
Show file tree
Hide file tree
Showing 26 changed files with 730 additions and 280 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -81,15 +80,15 @@ public final class EngineConfig {
@Nullable
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;

/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
* soft deleted should be retained.
*
* @return a supplier of outstanding retention leases
*/
public Supplier<Collection<RetentionLease>> retentionLeasesSupplier() {
public Supplier<RetentionLeases> retentionLeasesSupplier() {
return retentionLeasesSupplier;
}

Expand Down Expand Up @@ -141,7 +140,7 @@ public EngineConfig(ShardId shardId, String allocationId, ThreadPool threadPool,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier,
Supplier<Collection<RetentionLease>> retentionLeasesSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier) {
this.shardId = shardId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
Expand Down Expand Up @@ -2429,9 +2429,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
* We sample these from the policy (which occurs under a lock) to ensure that we have a consistent view of the minimum
* retained sequence number, and the retention leases.
*/
final Tuple<Long, Collection<RetentionLease>> retentionPolicy = softDeletesPolicy.getRetentionPolicy();
final Tuple<Long, RetentionLeases> retentionPolicy = softDeletesPolicy.getRetentionPolicy();
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1()));
commitData.put(Engine.RETENTION_LEASES, RetentionLease.encodeRetentionLeases(retentionPolicy.v2()));
commitData.put(Engine.RETENTION_LEASES, RetentionLeases.encodeRetentionLeases(retentionPolicy.v2()));
}
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;

import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier;
Expand All @@ -47,13 +47,13 @@ final class SoftDeletesPolicy {
// The min seq_no value that is retained - ops after this seq# should exist in the Lucene index.
private long minRetainedSeqNo;
// provides the retention leases used to calculate the minimum sequence number to retain
private final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;

SoftDeletesPolicy(
final LongSupplier globalCheckpointSupplier,
final long minRetainedSeqNo,
final long retentionOperations,
final Supplier<Collection<RetentionLease>> retentionLeasesSupplier) {
final Supplier<RetentionLeases> retentionLeasesSupplier) {
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.retentionOperations = retentionOperations;
this.minRetainedSeqNo = minRetainedSeqNo;
Expand Down Expand Up @@ -110,12 +110,12 @@ synchronized long getMinRetainedSeqNo() {
return getRetentionPolicy().v1();
}

public synchronized Tuple<Long, Collection<RetentionLease>> getRetentionPolicy() {
public synchronized Tuple<Long, RetentionLeases> getRetentionPolicy() {
/*
* When an engine is flushed, we need to provide it the latest collection of retention leases even when the soft deletes policy is
* locked for peer recovery.
*/
final Collection<RetentionLease> retentionLeases = retentionLeasesSupplier.get();
final RetentionLeases retentionLeases = retentionLeasesSupplier.get();
// do not advance if the retention lock is held
if (retentionLockCount == 0) {
/*
Expand All @@ -130,6 +130,7 @@ public synchronized Tuple<Long, Collection<RetentionLease>> getRetentionPolicy()

// calculate the minimum sequence number to retain based on retention leases
final long minimumRetainingSequenceNumber = retentionLeases
.leases()
.stream()
.mapToLong(RetentionLease::retainingSequenceNumber)
.min()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
Expand All @@ -54,6 +54,7 @@
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;

/**
* This class is responsible for tracking the replication group with its progress and safety markers (local and global checkpoints).
Expand Down Expand Up @@ -157,7 +158,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
* A callback when a new retention lease is created or an existing retention lease expires. In practice, this callback invokes the
* retention lease sync action, to sync retention leases to replicas.
*/
private final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onSyncRetentionLeases;
private final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases;

/**
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
Expand All @@ -170,48 +171,44 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
volatile ReplicationGroup replicationGroup;

private final Map<String, RetentionLease> retentionLeases = new HashMap<>();

private Collection<RetentionLease> copyRetentionLeases() {
assert Thread.holdsLock(this);
return Collections.unmodifiableCollection(new ArrayList<>(retentionLeases.values()));
}
/**
* The current retention leases.
*/
private RetentionLeases retentionLeases = RetentionLeases.EMPTY;

/**
* Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. Note that only
* the primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas.
*
* @return the retention leases
*/
public Collection<RetentionLease> getRetentionLeases() {
public RetentionLeases getRetentionLeases() {
final boolean wasPrimaryMode;
final Collection<RetentionLease> nonExpiredRetentionLeases;
final RetentionLeases nonExpiredRetentionLeases;
synchronized (this) {
if (primaryMode) {
// the primary calculates the non-expired retention leases and syncs them to replicas
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
final Collection<RetentionLease> expiredRetentionLeases = retentionLeases
.values()
final Map<Boolean, List<RetentionLease>> partitionByExpiration = retentionLeases
.leases()
.stream()
.filter(retentionLease -> currentTimeMillis - retentionLease.timestamp() > retentionLeaseMillis)
.collect(Collectors.toList());
if (expiredRetentionLeases.isEmpty()) {
.collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis));
if (partitionByExpiration.get(true) == null) {
// early out as no retention leases have expired
return copyRetentionLeases();
}
// clean up the expired retention leases
for (final RetentionLease expiredRetentionLease : expiredRetentionLeases) {
retentionLeases.remove(expiredRetentionLease.id());
return retentionLeases;
}
final Collection<RetentionLease> nonExpiredLeases =
partitionByExpiration.get(false) != null ? partitionByExpiration.get(false) : Collections.emptyList();
retentionLeases = new RetentionLeases(operationPrimaryTerm, retentionLeases.version() + 1, nonExpiredLeases);
}
/*
* At this point, we were either in primary mode and have updated the non-expired retention leases into the tracking map, or
* we were in replica mode and merely need to copy the existing retention leases since a replica does not calculate the
* non-expired retention leases, instead receiving them on syncs from the primary.
*/
wasPrimaryMode = primaryMode;
nonExpiredRetentionLeases = copyRetentionLeases();
nonExpiredRetentionLeases = retentionLeases;
}
if (wasPrimaryMode) {
onSyncRetentionLeases.accept(nonExpiredRetentionLeases, ActionListener.wrap(() -> {}));
Expand All @@ -236,15 +233,18 @@ public RetentionLease addRetentionLease(
final ActionListener<ReplicationResponse> listener) {
Objects.requireNonNull(listener);
final RetentionLease retentionLease;
final Collection<RetentionLease> currentRetentionLeases;
final RetentionLeases currentRetentionLeases;
synchronized (this) {
assert primaryMode;
if (retentionLeases.containsKey(id)) {
if (retentionLeases.contains(id)) {
throw new IllegalArgumentException("retention lease with ID [" + id + "] already exists");
}
retentionLease = new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source);
retentionLeases.put(id, retentionLease);
currentRetentionLeases = copyRetentionLeases();
retentionLeases = new RetentionLeases(
operationPrimaryTerm,
retentionLeases.version() + 1,
Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList()));
currentRetentionLeases = retentionLeases;
}
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
return retentionLease;
Expand All @@ -261,18 +261,25 @@ public RetentionLease addRetentionLease(
*/
public synchronized RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
assert primaryMode;
if (retentionLeases.containsKey(id) == false) {
if (retentionLeases.contains(id) == false) {
throw new IllegalArgumentException("retention lease with ID [" + id + "] does not exist");
}
final RetentionLease retentionLease =
new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source);
final RetentionLease existingRetentionLease = retentionLeases.put(id, retentionLease);
final RetentionLease existingRetentionLease = retentionLeases.get(id);
assert existingRetentionLease != null;
assert existingRetentionLease.retainingSequenceNumber() <= retentionLease.retainingSequenceNumber() :
"retention lease renewal for [" + id + "]"
+ " from [" + source + "]"
+ " renewed a lower retaining sequence number [" + retentionLease.retainingSequenceNumber() + "]"
+ " than the current lease retaining sequence number [" + existingRetentionLease.retainingSequenceNumber() + "]";
retentionLeases = new RetentionLeases(
operationPrimaryTerm,
retentionLeases.version() + 1,
Stream.concat(
retentionLeases.leases().stream().filter(lease -> lease.id().equals(id) == false),
Stream.of(retentionLease))
.collect(Collectors.toList()));
return retentionLease;
}

Expand All @@ -281,10 +288,11 @@ public synchronized RetentionLease renewRetentionLease(final String id, final lo
*
* @param retentionLeases the retention leases
*/
public synchronized void updateRetentionLeasesOnReplica(final Collection<RetentionLease> retentionLeases) {
public synchronized void updateRetentionLeasesOnReplica(final RetentionLeases retentionLeases) {
assert primaryMode == false;
this.retentionLeases.clear();
this.retentionLeases.putAll(retentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity())));
if (retentionLeases.supersedes(this.retentionLeases)) {
this.retentionLeases = retentionLeases;
}
}

public static class CheckpointState implements Writeable {
Expand Down Expand Up @@ -565,7 +573,7 @@ public ReplicationTracker(
final long globalCheckpoint,
final LongConsumer onGlobalCheckpointUpdated,
final LongSupplier currentTimeMillisSupplier,
final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onSyncRetentionLeases) {
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases) {
super(shardId, indexSettings);
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.shardAllocationId = allocationId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,8 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* A "shard history retention lease" (or "retention lease" for short) is conceptually a marker containing a retaining sequence number such
Expand Down Expand Up @@ -162,22 +157,10 @@ static String encodeRetentionLease(final RetentionLease retentionLease) {
return String.format(
Locale.ROOT,
"id:%s;retaining_seq_no:%d;timestamp:%d;source:%s",
retentionLease.id(),
retentionLease.retainingSequenceNumber(),
retentionLease.timestamp(),
retentionLease.source());
}

/**
* Encodes a collection of retention leases as a string. This encoding can be decoed by {@link #decodeRetentionLeases(String)}. The
* encoding is a comma-separated encoding of each retention lease as encoded by {@link #encodeRetentionLease(RetentionLease)}.
*
* @param retentionLeases the retention leases
* @return the encoding of the retention leases
*/
public static String encodeRetentionLeases(final Collection<RetentionLease> retentionLeases) {
Objects.requireNonNull(retentionLeases);
return retentionLeases.stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(","));
retentionLease.id,
retentionLease.retainingSequenceNumber,
retentionLease.timestamp,
retentionLease.source);
}

/**
Expand All @@ -201,23 +184,6 @@ static RetentionLease decodeRetentionLease(final String encodedRetentionLease) {
return new RetentionLease(id, retainingSequenceNumber, timestamp, source);
}

/**
* Decodes retention leases encoded by {@link #encodeRetentionLeases(Collection)}.
*
* @param encodedRetentionLeases an encoded collection of retention leases
* @return the decoded retention leases
*/
public static Collection<RetentionLease> decodeRetentionLeases(final String encodedRetentionLeases) {
Objects.requireNonNull(encodedRetentionLeases);
if (encodedRetentionLeases.isEmpty()) {
return Collections.emptyList();
}
assert Arrays.stream(encodedRetentionLeases.split(","))
.allMatch(s -> s.matches("id:[^:;,]+;retaining_seq_no:\\d+;timestamp:\\d+;source:[^:;,]+"))
: encodedRetentionLeases;
return Arrays.stream(encodedRetentionLeases.split(",")).map(RetentionLease::decodeRetentionLease).collect(Collectors.toList());
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
Expand All @@ -244,14 +210,4 @@ public String toString() {
'}';
}

/**
* A utility method to convert a collection of retention leases to a map from retention lease ID to retention lease.
*
* @param leases the leases
* @return the map from retention lease ID to retention lease
*/
static Map<String, RetentionLease> toMap(final Collection<RetentionLease> leases) {
return leases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity()));
}

}
Loading

0 comments on commit 13cc7f1

Please sign in to comment.