Skip to content

Commit

Permalink
Introduce retention lease state file (#39004)
Browse files Browse the repository at this point in the history
This commit moves retention leases from being persisted in the Lucene
commit point to being persisted in a dedicated state file.
  • Loading branch information
jasontedor committed Feb 18, 2019
1 parent d43ac8f commit 2d8f6b6
Show file tree
Hide file tree
Showing 24 changed files with 485 additions and 358 deletions.
4 changes: 1 addition & 3 deletions docs/reference/indices/flush.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ which returns something similar to:
"max_seq_no" : "-1",
"sync_id" : "AVvFY-071siAOuFGEO9P", <1>
"max_unsafe_auto_id_timestamp" : "-1",
"min_retained_seq_no" : "0",
"retention_leases" : "primary_term:1;version:1;id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"
"min_retained_seq_no" : "0"
},
"num_docs" : 0
}
Expand All @@ -119,7 +118,6 @@ which returns something similar to:
// TESTRESPONSE[s/"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA"/"translog_uuid": $body.indices.twitter.shards.0.0.commit.user_data.translog_uuid/]
// TESTRESPONSE[s/"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ"/"history_uuid": $body.indices.twitter.shards.0.0.commit.user_data.history_uuid/]
// TESTRESPONSE[s/"sync_id" : "AVvFY-071siAOuFGEO9P"/"sync_id": $body.indices.twitter.shards.0.0.commit.user_data.sync_id/]
// TESTRESPONSE[s/"retention_leases" : "primary_term:1;version:1;id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"/"retention_leases": $body.indices.twitter.shards.0.0.commit.user_data.retention_leases/]
<1> the `sync id` marker

[float]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ public abstract class Engine implements Closeable {
public static final String SYNC_COMMIT_ID = "sync_id";
public static final String HISTORY_UUID_KEY = "history_uuid";
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
public static final String RETENTION_LEASES = "retention_leases";
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";

protected final ShardId shardId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
Expand All @@ -75,7 +74,6 @@
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
Expand Down Expand Up @@ -2347,13 +2345,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
commitData.put(HISTORY_UUID_KEY, historyUUID);
if (softDeleteEnabled) {
/*
* We sample these from the policy (which occurs under a lock) to ensure that we have a consistent view of the minimum
* retained sequence number, and the retention leases.
*/
final Tuple<Long, RetentionLeases> retentionPolicy = softDeletesPolicy.getRetentionPolicy();
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1()));
commitData.put(Engine.RETENTION_LEASES, RetentionLeases.encodeRetentionLeases(retentionPolicy.v2()));
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
}
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.search.Query;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.RetentionLease;
Expand Down Expand Up @@ -107,10 +106,6 @@ private synchronized void releaseRetentionLock() {
* Operations whose seq# is least this value should exist in the Lucene index.
*/
synchronized long getMinRetainedSeqNo() {
return getRetentionPolicy().v1();
}

public synchronized Tuple<Long, RetentionLeases> getRetentionPolicy() {
/*
* When an engine is flushed, we need to provide it the latest collection of retention leases even when the soft deletes policy is
* locked for peer recovery.
Expand Down Expand Up @@ -151,7 +146,7 @@ public synchronized Tuple<Long, RetentionLeases> getRetentionPolicy() {
*/
minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain);
}
return Tuple.tuple(minRetainedSeqNo, retentionLeases);
return minRetainedSeqNo;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -318,6 +321,40 @@ public synchronized void updateRetentionLeasesOnReplica(final RetentionLeases re
}
}

/**
* Loads the latest retention leases from their dedicated state file.
*
* @param path the path to the directory containing the state file
* @return the retention leases
* @throws IOException if an I/O exception occurs reading the retention leases
*/
public RetentionLeases loadRetentionLeases(final Path path) throws IOException {
final RetentionLeases retentionLeases = RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path);
if (retentionLeases == null) {
return RetentionLeases.EMPTY;
}
return retentionLeases;
}

private final Object retentionLeasePersistenceLock = new Object();

/**
* Persists the current retention leases to their dedicated state file.
*
* @param path the path to the directory containing the state file
* @throws WriteStateException if an exception occurs writing the state file
*/
public void persistRetentionLeases(final Path path) throws WriteStateException {
synchronized (retentionLeasePersistenceLock) {
final RetentionLeases currentRetentionLeases;
synchronized (this) {
currentRetentionLeases = retentionLeases;
}
logger.trace("persisting retention leases [{}]", currentRetentionLeases);
RetentionLeases.FORMAT.writeAndCleanup(currentRetentionLeases, path);
}
}

public static class CheckpointState implements Writeable {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@

package org.elasticsearch.index.seqno;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;

/**
Expand All @@ -34,7 +37,7 @@
* otherwise merge away operations that have been soft deleted). Each retention lease contains a unique identifier, the retaining sequence
* number, the timestamp of when the lease was created or renewed, and the source of the retention lease (e.g., "ccr").
*/
public final class RetentionLease implements Writeable {
public final class RetentionLease implements ToXContent, Writeable {

private final String id;

Expand Down Expand Up @@ -94,10 +97,6 @@ public RetentionLease(final String id, final long retainingSequenceNumber, final
if (id.isEmpty()) {
throw new IllegalArgumentException("retention lease ID can not be empty");
}
if (id.contains(":") || id.contains(";") || id.contains(",")) {
// retention lease IDs can not contain these characters because they are used in encoding retention leases
throw new IllegalArgumentException("retention lease ID can not contain any of [:;,] but was [" + id + "]");
}
if (retainingSequenceNumber < 0) {
throw new IllegalArgumentException("retention lease retaining sequence number [" + retainingSequenceNumber + "] out of range");
}
Expand All @@ -108,10 +107,6 @@ public RetentionLease(final String id, final long retainingSequenceNumber, final
if (source.isEmpty()) {
throw new IllegalArgumentException("retention lease source can not be empty");
}
if (source.contains(":") || source.contains(";") || source.contains(",")) {
// retention lease sources can not contain these characters because they are used in encoding retention leases
throw new IllegalArgumentException("retention lease source can not contain any of [:;,] but was [" + source + "]");
}
this.id = id;
this.retainingSequenceNumber = retainingSequenceNumber;
this.timestamp = timestamp;
Expand Down Expand Up @@ -145,43 +140,49 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeString(source);
}

/**
* Encodes a retention lease as a string. This encoding can be decoded by {@link #decodeRetentionLease(String)}. The retention lease is
* encoded in the format <code>id:{id};retaining_seq_no:{retainingSequenecNumber};timestamp:{timestamp};source:{source}</code>.
*
* @param retentionLease the retention lease
* @return the encoding of the retention lease
*/
static String encodeRetentionLease(final RetentionLease retentionLease) {
Objects.requireNonNull(retentionLease);
return String.format(
Locale.ROOT,
"id:%s;retaining_seq_no:%d;timestamp:%d;source:%s",
retentionLease.id,
retentionLease.retainingSequenceNumber,
retentionLease.timestamp,
retentionLease.source);
private static final ParseField ID_FIELD = new ParseField("id");
private static final ParseField RETAINING_SEQUENCE_NUMBER_FIELD = new ParseField("retaining_sequence_number");
private static final ParseField TIMESTAMP_FIELD = new ParseField("timestamp");
private static final ParseField SOURCE_FIELD = new ParseField("source");

private static ConstructingObjectParser<RetentionLease, Void> PARSER = new ConstructingObjectParser<>(
"retention_leases",
(a) -> new RetentionLease((String) a[0], (Long) a[1], (Long) a[2], (String) a[3]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), RETAINING_SEQUENCE_NUMBER_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIMESTAMP_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), SOURCE_FIELD);
}

@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
{
builder.field(ID_FIELD.getPreferredName(), id);
builder.field(RETAINING_SEQUENCE_NUMBER_FIELD.getPreferredName(), retainingSequenceNumber);
builder.field(TIMESTAMP_FIELD.getPreferredName(), timestamp);
builder.field(SOURCE_FIELD.getPreferredName(), source);
}
builder.endObject();
return builder;
}

@Override
public boolean isFragment() {
return false;
}

/**
* Decodes a retention lease encoded by {@link #encodeRetentionLease(RetentionLease)}.
* Parses a retention lease from {@link org.elasticsearch.common.xcontent.XContent}. This method assumes that the retention lease was
* converted to {@link org.elasticsearch.common.xcontent.XContent} via {@link #toXContent(XContentBuilder, Params)}.
*
* @param encodedRetentionLease an encoded retention lease
* @return the decoded retention lease
* @param parser the parser
* @return a retention lease
*/
static RetentionLease decodeRetentionLease(final String encodedRetentionLease) {
Objects.requireNonNull(encodedRetentionLease);
final String[] fields = encodedRetentionLease.split(";");
assert fields.length == 4 : Arrays.toString(fields);
assert fields[0].matches("id:[^:;,]+") : fields[0];
final String id = fields[0].substring("id:".length());
assert fields[1].matches("retaining_seq_no:\\d+") : fields[1];
final long retainingSequenceNumber = Long.parseLong(fields[1].substring("retaining_seq_no:".length()));
assert fields[2].matches("timestamp:\\d+") : fields[2];
final long timestamp = Long.parseLong(fields[2].substring("timestamp:".length()));
assert fields[3].matches("source:[^:;,]+") : fields[3];
final String source = fields[3].substring("source:".length());
return new RetentionLease(id, retainingSequenceNumber, timestamp, source);
public static RetentionLease fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -119,19 +120,21 @@ public void backgroundSync(
}

@Override
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(final Request request, final IndexShard primary) {
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
final Request request,
final IndexShard primary) throws WriteStateException {
Objects.requireNonNull(request);
Objects.requireNonNull(primary);
primary.afterWriteOperation();
primary.persistRetentionLeases();
return new PrimaryResult<>(request, new ReplicationResponse());
}

@Override
protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica){
protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica) throws WriteStateException {
Objects.requireNonNull(request);
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
replica.afterWriteOperation();
replica.persistRetentionLeases();
return new ReplicaResult();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
Expand All @@ -39,6 +38,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -121,31 +121,26 @@ public void sync(
}

@Override
protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(final Request request, final IndexShard primary) {
protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(
final Request request,
final IndexShard primary) throws WriteStateException {
Objects.requireNonNull(request);
Objects.requireNonNull(primary);
// we flush to ensure that retention leases are committed
flush(primary);
primary.persistRetentionLeases();
return new WritePrimaryResult<>(request, new Response(), null, null, primary, logger);
}

@Override
protected WriteReplicaResult<Request> shardOperationOnReplica(final Request request, final IndexShard replica) {
protected WriteReplicaResult<Request> shardOperationOnReplica(
final Request request,
final IndexShard replica) throws WriteStateException {
Objects.requireNonNull(request);
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
// we flush to ensure that retention leases are committed
flush(replica);
replica.persistRetentionLeases();
return new WriteReplicaResult<>(request, null, null, replica, logger);
}

private void flush(final IndexShard indexShard) {
final FlushRequest flushRequest = new FlushRequest();
flushRequest.force(true);
flushRequest.waitIfOngoing(true);
indexShard.flush(flushRequest);
}

public static final class Request extends ReplicatedWriteRequest<Request> {

private RetentionLeases retentionLeases;
Expand Down
Loading

0 comments on commit 2d8f6b6

Please sign in to comment.