Skip to content
4 changes: 3 additions & 1 deletion docs/reference/indices/flush.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ which returns something similar to:
"max_seq_no" : "-1",
"sync_id" : "AVvFY-071siAOuFGEO9P", <1>
"max_unsafe_auto_id_timestamp" : "-1",
"min_retained_seq_no": "0"
"min_retained_seq_no" : "0",
"retention_leases" : "id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"
},
"num_docs" : 0
}
Expand All @@ -117,6 +118,7 @@ which returns something similar to:
// TESTRESPONSE[s/"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA"/"translog_uuid": $body.indices.twitter.shards.0.0.commit.user_data.translog_uuid/]
// TESTRESPONSE[s/"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ"/"history_uuid": $body.indices.twitter.shards.0.0.commit.user_data.history_uuid/]
// TESTRESPONSE[s/"sync_id" : "AVvFY-071siAOuFGEO9P"/"sync_id": $body.indices.twitter.shards.0.0.commit.user_data.sync_id/]
// TESTRESPONSE[s/"retention_leases" : "id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"/"retention_leases": $body.indices.twitter.shards.0.0.commit.user_data.retention_leases/]
<1> the `sync id` marker

[float]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public abstract class Engine implements Closeable {
public static final String SYNC_COMMIT_ID = "sync_id";
public static final String HISTORY_UUID_KEY = "history_uuid";
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
public static final String RETENTION_LEASES = "retention_leases";
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";

protected final ShardId shardId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
Expand All @@ -74,6 +75,7 @@
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
Expand Down Expand Up @@ -2336,7 +2338,13 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
commitData.put(HISTORY_UUID_KEY, historyUUID);
if (softDeleteEnabled) {
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
/*
* We sample these from the policy (which occurs under a lock) to ensure that we have a consistent view of the minimum
* retained sequence number, and the retention leases.
*/
final Tuple<Long, Collection<RetentionLease>> retentionPolicy = softDeletesPolicy.getRetentionPolicy();
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1()));
commitData.put(Engine.RETENTION_LEASES, RetentionLease.encodeRetentionLeases(retentionPolicy.v2()));
}
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.search.Query;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.RetentionLease;
Expand All @@ -45,6 +46,7 @@ final class SoftDeletesPolicy {
private long retentionOperations;
// The min seq_no value that is retained - ops after this seq# should exist in the Lucene index.
private long minRetainedSeqNo;
private Collection<RetentionLease> retentionLeases;
// provides the retention leases used to calculate the minimum sequence number to retain
private final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;

Expand All @@ -57,6 +59,7 @@ final class SoftDeletesPolicy {
this.retentionOperations = retentionOperations;
this.minRetainedSeqNo = minRetainedSeqNo;
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
retentionLeases = retentionLeasesSupplier.get();
this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;
this.retentionLockCount = 0;
}
Expand Down Expand Up @@ -106,7 +109,11 @@ private synchronized void releaseRetentionLock() {
* Operations whose seq# is least this value should exist in the Lucene index.
*/
synchronized long getMinRetainedSeqNo() {
// Do not advance if the retention lock is held
return getRetentionPolicy().v1();
}

public synchronized Tuple<Long, Collection<RetentionLease>> getRetentionPolicy() {
// do not advance if the retention lock is held
if (retentionLockCount == 0) {
/*
* This policy retains operations for two purposes: peer-recovery and querying changes history.
Expand All @@ -119,8 +126,8 @@ synchronized long getMinRetainedSeqNo() {
*/

// calculate the minimum sequence number to retain based on retention leases
final long minimumRetainingSequenceNumber = retentionLeasesSupplier
.get()
retentionLeases = retentionLeasesSupplier.get();
final long minimumRetainingSequenceNumber = retentionLeases
.stream()
.mapToLong(RetentionLease::retainingSequenceNumber)
.min()
Expand All @@ -139,7 +146,7 @@ synchronized long getMinRetainedSeqNo() {
*/
minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain);
}
return minRetainedSeqNo;
return Tuple.tuple(minRetainedSeqNo, retentionLeases);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,17 @@ public synchronized void addOrUpdateRetentionLease(final String id, final long r
retentionLeases.put(id, new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source));
}

/**
* Updates retention leases on a replica.
*
* @param retentionLeases the retention leases
*/
public synchronized void updateRetentionLeasesOnReplica(final Collection<RetentionLease> retentionLeases) {
assert primaryMode == false;
this.retentionLeases.clear();
this.retentionLeases.putAll(retentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity())));
}

public static class CheckpointState implements Writeable {

/**
Expand Down
107 changes: 107 additions & 0 deletions server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@

package org.elasticsearch.index.seqno;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Locale;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* A "shard history retention lease" (or "retention lease" for short) is conceptually a marker containing a retaining sequence number such
* that all operations with sequence number at least that retaining sequence number will be retained during merge operations (which could
Expand Down Expand Up @@ -81,18 +88,118 @@ public String source() {
* @param source the source of the retention lease
*/
public RetentionLease(final String id, final long retainingSequenceNumber, final long timestamp, final String source) {
Objects.requireNonNull(id);
if (id.isEmpty()) {
throw new IllegalArgumentException("retention lease ID can not be empty");
}
if (id.contains(":") || id.contains(";") || id.contains(",")) {
// retention lease IDs can not contain these characters because they are used in encoding retention leases
throw new IllegalArgumentException("retention lease ID can not contain any of [:;,] but was [" + id + "]");
}
if (retainingSequenceNumber < SequenceNumbers.UNASSIGNED_SEQ_NO) {
throw new IllegalArgumentException("retention lease retaining sequence number [" + retainingSequenceNumber + "] out of range");
}
if (timestamp < 0) {
throw new IllegalArgumentException("retention lease timestamp [" + timestamp + "] out of range");
}
Objects.requireNonNull(source);
if (source.isEmpty()) {
throw new IllegalArgumentException("retention lease source can not be empty");
}
if (source.contains(":") || source.contains(";") || source.contains(",")) {
// retention lease sources can not contain these characters because they are used in encoding retention leases
throw new IllegalArgumentException("retention lease source can not contain any of [:;,] but was [" + source + "]");
}
this.id = id;
this.retainingSequenceNumber = retainingSequenceNumber;
this.timestamp = timestamp;
this.source = source;
}

/**
* Encodes a retention lease as a string. This encoding can be decoded by {@link #decodeRetentionLease(String)}. The retention lease is
* encoded in the format <code>id:{id};retaining_seq_no:{retainingSequenecNumber};timestamp:{timestamp};source:{source}</code>.
*
* @param retentionLease the retention lease
* @return the encoding of the retention lease
*/
static String encodeRetentionLease(final RetentionLease retentionLease) {
Objects.requireNonNull(retentionLease);
return String.format(
Locale.ROOT,
"id:%s;retaining_seq_no:%d;timestamp:%d;source:%s",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should use JSON, with the upside of being more extensible and will allow us to use our parsing infra (and the validation features it has). WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not particularly concerned about this needing to be extensible. I can be convinced that JSON is worth it but I am doubting.

retentionLease.id(),
retentionLease.retainingSequenceNumber(),
retentionLease.timestamp(),
retentionLease.source());
}

/**
* Encodes a collection of retention leases as a string. This encoding can be decoed by {@link #decodeRetentionLeases(String)}. The
* encoding is a comma-separated encoding of each retention lease as encoded by {@link #encodeRetentionLease(RetentionLease)}.
*
* @param retentionLeases the retention leases
* @return the encoding of the retention leases
*/
public static String encodeRetentionLeases(final Collection<RetentionLease> retentionLeases) {
Objects.requireNonNull(retentionLeases);
return retentionLeases.stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(","));
}

/**
* Decodes a retention lease encoded by {@link #encodeRetentionLease(RetentionLease)}.
*
* @param encodedRetentionLease an encoded retention lease
* @return the decoded retention lease
*/
static RetentionLease decodeRetentionLease(final String encodedRetentionLease) {
Objects.requireNonNull(encodedRetentionLease);
final String[] fields = encodedRetentionLease.split(";");
assert fields.length == 4 : Arrays.toString(fields);
assert fields[0].matches("id:[^:;,]+") : fields[0];
final String id = fields[0].substring("id:".length());
assert fields[1].matches("retaining_seq_no:\\d+") : fields[1];
final long retainingSequenceNumber = Long.parseLong(fields[1].substring("retaining_seq_no:".length()));
assert fields[2].matches("timestamp:\\d+") : fields[2];
final long timestamp = Long.parseLong(fields[2].substring("timestamp:".length()));
assert fields[3].matches("source:[^:;,]+") : fields[3];
final String source = fields[3].substring("source:".length());
return new RetentionLease(id, retainingSequenceNumber, timestamp, source);
}

/**
* Decodes retention leases encoded by {@link #encodeRetentionLeases(Collection)}.
*
* @param encodedRetentionLeases an encoded collection of retention leases
* @return the decoded retention leases
*/
public static Collection<RetentionLease> decodeRetentionLeases(final String encodedRetentionLeases) {
Objects.requireNonNull(encodedRetentionLeases);
if (encodedRetentionLeases.isEmpty()) {
return Collections.emptyList();
}
assert Arrays.stream(encodedRetentionLeases.split(","))
.allMatch(s -> s.matches("id:[^:;,]+;retaining_seq_no:\\d+;timestamp:\\d+;source:[^:;,]+"))
: encodedRetentionLeases;
return Arrays.stream(encodedRetentionLeases.split(",")).map(RetentionLease::decodeRetentionLease).collect(Collectors.toList());
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final RetentionLease that = (RetentionLease) o;
return Objects.equals(id, that.id) &&
retainingSequenceNumber == that.retainingSequenceNumber &&
timestamp == that.timestamp &&
Objects.equals(source, that.source);
}

@Override
public int hashCode() {
return Objects.hash(id, retainingSequenceNumber, timestamp, source);
}

@Override
public String toString() {
return "RetentionLease{" +
Expand Down
11 changes: 11 additions & 0 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
Expand Down Expand Up @@ -140,6 +141,7 @@
import java.nio.channels.ClosedByInterruptException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
Expand Down Expand Up @@ -1416,6 +1418,7 @@ private void innerOpenEngineAndTranslog() throws IOException {
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint");
replicationTracker.updateRetentionLeasesOnReplica(getRetentionLeases(store.readLastCommittedSegmentsInfo()));
trimUnsafeCommits();
synchronized (mutex) {
verifyNotClosed();
Expand All @@ -1435,6 +1438,14 @@ private void innerOpenEngineAndTranslog() throws IOException {
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
}

static Collection<RetentionLease> getRetentionLeases(final SegmentInfos segmentInfos) {
final String committedRetentionLeases = segmentInfos.getUserData().get(Engine.RETENTION_LEASES);
if (committedRetentionLeases == null) {
return Collections.emptyList();
}
return RetentionLease.decodeRetentionLeases(committedRetentionLeases);
}

private void trimUnsafeCommits() throws IOException {
assert currentEngineReference.get() == null || currentEngineReference.get() instanceof ReadOnlyEngine : "a write engine is running";
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
Expand All @@ -140,6 +141,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -5241,13 +5243,14 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException {
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final AtomicReference<Collection<RetentionLease>> leasesHolder = new AtomicReference<>(Collections.emptyList());
final List<Engine.Operation> operations = generateSingleDocHistory(true,
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 10, 300, "2");
Randomness.shuffle(operations);
Set<Long> existingSeqNos = new HashSet<>();
store = createStore();
engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null,
globalCheckpoint::get));
engine = createEngine(
config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, leasesHolder::get));
assertThat(engine.getMinRetainedSeqNo(), equalTo(0L));
long lastMinRetainedSeqNo = engine.getMinRetainedSeqNo();
for (Engine.Operation op : operations) {
Expand All @@ -5261,6 +5264,18 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException {
if (randomBoolean()) {
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getCheckpoint()));
}
if (randomBoolean()) {
final int length = randomIntBetween(0, 8);
final List<RetentionLease> leases = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomLongBetween(0L, Math.max(0L, globalCheckpoint.get()));
final long timestamp = randomLongBetween(0L, Long.MAX_VALUE);
final String source = randomAlphaOfLength(8);
leases.add(new RetentionLease(id, retainingSequenceNumber, timestamp, source));
}
leasesHolder.set(leases);
}
if (rarely()) {
settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10));
indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
Expand All @@ -5273,6 +5288,14 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException {
engine.flush(true, true);
assertThat(Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(Engine.MIN_RETAINED_SEQNO)),
equalTo(engine.getMinRetainedSeqNo()));
final Collection<RetentionLease> leases = leasesHolder.get();
if (leases.isEmpty()) {
assertThat(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES), equalTo(""));
} else {
assertThat(
engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES),
equalTo(RetentionLease.encodeRetentionLeases(leases)));
}
}
if (rarely()) {
engine.forceMerge(randomBoolean());
Expand Down
Loading