Skip to content

Commit

Permalink
Introduce retention lease state file (#39004)
Browse files Browse the repository at this point in the history
This commit moves retention leases from being persisted in the Lucene
commit point to being persisted in a dedicated state file.
  • Loading branch information
jasontedor committed Feb 18, 2019
1 parent 9dc8975 commit 8142760
Show file tree
Hide file tree
Showing 23 changed files with 479 additions and 355 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ public abstract class Engine implements Closeable {
public static final String SYNC_COMMIT_ID = "sync_id";
public static final String HISTORY_UUID_KEY = "history_uuid";
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
public static final String RETENTION_LEASES = "retention_leases";
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";

protected final ShardId shardId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
Expand All @@ -76,7 +75,6 @@
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
Expand Down Expand Up @@ -2433,13 +2431,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
commitData.put(HISTORY_UUID_KEY, historyUUID);
if (softDeleteEnabled) {
/*
* We sample these from the policy (which occurs under a lock) to ensure that we have a consistent view of the minimum
* retained sequence number, and the retention leases.
*/
final Tuple<Long, RetentionLeases> retentionPolicy = softDeletesPolicy.getRetentionPolicy();
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1()));
commitData.put(Engine.RETENTION_LEASES, RetentionLeases.encodeRetentionLeases(retentionPolicy.v2()));
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
}
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.search.Query;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.RetentionLease;
Expand Down Expand Up @@ -107,10 +106,6 @@ private synchronized void releaseRetentionLock() {
* Operations whose seq# is least this value should exist in the Lucene index.
*/
synchronized long getMinRetainedSeqNo() {
return getRetentionPolicy().v1();
}

public synchronized Tuple<Long, RetentionLeases> getRetentionPolicy() {
/*
* When an engine is flushed, we need to provide it the latest collection of retention leases even when the soft deletes policy is
* locked for peer recovery.
Expand Down Expand Up @@ -151,7 +146,7 @@ public synchronized Tuple<Long, RetentionLeases> getRetentionPolicy() {
*/
minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain);
}
return Tuple.tuple(minRetainedSeqNo, retentionLeases);
return minRetainedSeqNo;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -318,6 +320,40 @@ public synchronized void updateRetentionLeasesOnReplica(final RetentionLeases re
}
}

/**
* Loads the latest retention leases from their dedicated state file.
*
* @param path the path to the directory containing the state file
* @return the retention leases
* @throws IOException if an I/O exception occurs reading the retention leases
*/
public RetentionLeases loadRetentionLeases(final Path path) throws IOException {
final RetentionLeases retentionLeases = RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path);
if (retentionLeases == null) {
return RetentionLeases.EMPTY;
}
return retentionLeases;
}

private final Object retentionLeasePersistenceLock = new Object();

/**
* Persists the current retention leases to their dedicated state file.
*
* @param path the path to the directory containing the state file
* @throws IOException if an exception occurs writing the state file
*/
public void persistRetentionLeases(final Path path) throws IOException {
synchronized (retentionLeasePersistenceLock) {
final RetentionLeases currentRetentionLeases;
synchronized (this) {
currentRetentionLeases = retentionLeases;
}
logger.trace("persisting retention leases [{}]", currentRetentionLeases);
RetentionLeases.FORMAT.write(currentRetentionLeases, path);
}
}

public static class CheckpointState implements Writeable {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@

package org.elasticsearch.index.seqno;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;

/**
Expand All @@ -34,7 +37,7 @@
* otherwise merge away operations that have been soft deleted). Each retention lease contains a unique identifier, the retaining sequence
* number, the timestamp of when the lease was created or renewed, and the source of the retention lease (e.g., "ccr").
*/
public final class RetentionLease implements Writeable {
public final class RetentionLease implements ToXContent, Writeable {

private final String id;

Expand Down Expand Up @@ -94,10 +97,6 @@ public RetentionLease(final String id, final long retainingSequenceNumber, final
if (id.isEmpty()) {
throw new IllegalArgumentException("retention lease ID can not be empty");
}
if (id.contains(":") || id.contains(";") || id.contains(",")) {
// retention lease IDs can not contain these characters because they are used in encoding retention leases
throw new IllegalArgumentException("retention lease ID can not contain any of [:;,] but was [" + id + "]");
}
if (retainingSequenceNumber < 0) {
throw new IllegalArgumentException("retention lease retaining sequence number [" + retainingSequenceNumber + "] out of range");
}
Expand All @@ -108,10 +107,6 @@ public RetentionLease(final String id, final long retainingSequenceNumber, final
if (source.isEmpty()) {
throw new IllegalArgumentException("retention lease source can not be empty");
}
if (source.contains(":") || source.contains(";") || source.contains(",")) {
// retention lease sources can not contain these characters because they are used in encoding retention leases
throw new IllegalArgumentException("retention lease source can not contain any of [:;,] but was [" + source + "]");
}
this.id = id;
this.retainingSequenceNumber = retainingSequenceNumber;
this.timestamp = timestamp;
Expand Down Expand Up @@ -145,43 +140,49 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeString(source);
}

/**
* Encodes a retention lease as a string. This encoding can be decoded by {@link #decodeRetentionLease(String)}. The retention lease is
* encoded in the format <code>id:{id};retaining_seq_no:{retainingSequenecNumber};timestamp:{timestamp};source:{source}</code>.
*
* @param retentionLease the retention lease
* @return the encoding of the retention lease
*/
static String encodeRetentionLease(final RetentionLease retentionLease) {
Objects.requireNonNull(retentionLease);
return String.format(
Locale.ROOT,
"id:%s;retaining_seq_no:%d;timestamp:%d;source:%s",
retentionLease.id,
retentionLease.retainingSequenceNumber,
retentionLease.timestamp,
retentionLease.source);
private static final ParseField ID_FIELD = new ParseField("id");
private static final ParseField RETAINING_SEQUENCE_NUMBER_FIELD = new ParseField("retaining_sequence_number");
private static final ParseField TIMESTAMP_FIELD = new ParseField("timestamp");
private static final ParseField SOURCE_FIELD = new ParseField("source");

private static ConstructingObjectParser<RetentionLease, Void> PARSER = new ConstructingObjectParser<>(
"retention_leases",
(a) -> new RetentionLease((String) a[0], (Long) a[1], (Long) a[2], (String) a[3]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), RETAINING_SEQUENCE_NUMBER_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIMESTAMP_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), SOURCE_FIELD);
}

@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
{
builder.field(ID_FIELD.getPreferredName(), id);
builder.field(RETAINING_SEQUENCE_NUMBER_FIELD.getPreferredName(), retainingSequenceNumber);
builder.field(TIMESTAMP_FIELD.getPreferredName(), timestamp);
builder.field(SOURCE_FIELD.getPreferredName(), source);
}
builder.endObject();
return builder;
}

@Override
public boolean isFragment() {
return false;
}

/**
* Decodes a retention lease encoded by {@link #encodeRetentionLease(RetentionLease)}.
* Parses a retention lease from {@link org.elasticsearch.common.xcontent.XContent}. This method assumes that the retention lease was
* converted to {@link org.elasticsearch.common.xcontent.XContent} via {@link #toXContent(XContentBuilder, Params)}.
*
* @param encodedRetentionLease an encoded retention lease
* @return the decoded retention lease
* @param parser the parser
* @return a retention lease
*/
static RetentionLease decodeRetentionLease(final String encodedRetentionLease) {
Objects.requireNonNull(encodedRetentionLease);
final String[] fields = encodedRetentionLease.split(";");
assert fields.length == 4 : Arrays.toString(fields);
assert fields[0].matches("id:[^:;,]+") : fields[0];
final String id = fields[0].substring("id:".length());
assert fields[1].matches("retaining_seq_no:\\d+") : fields[1];
final long retainingSequenceNumber = Long.parseLong(fields[1].substring("retaining_seq_no:".length()));
assert fields[2].matches("timestamp:\\d+") : fields[2];
final long timestamp = Long.parseLong(fields[2].substring("timestamp:".length()));
assert fields[3].matches("source:[^:;,]+") : fields[3];
final String source = fields[3].substring("source:".length());
return new RetentionLease(id, retainingSequenceNumber, timestamp, source);
public static RetentionLease fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,21 @@ public void backgroundSync(
}

@Override
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(final Request request, final IndexShard primary) {
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
final Request request,
final IndexShard primary) throws IOException {
Objects.requireNonNull(request);
Objects.requireNonNull(primary);
primary.afterWriteOperation();
primary.persistRetentionLeases();
return new PrimaryResult<>(request, new ReplicationResponse());
}

@Override
protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica){
protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica) throws IOException {
Objects.requireNonNull(request);
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
replica.afterWriteOperation();
replica.persistRetentionLeases();
return new ReplicaResult();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
Expand Down Expand Up @@ -121,31 +120,26 @@ public void sync(
}

@Override
protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(final Request request, final IndexShard primary) {
protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(
final Request request,
final IndexShard primary) throws IOException {
Objects.requireNonNull(request);
Objects.requireNonNull(primary);
// we flush to ensure that retention leases are committed
flush(primary);
primary.persistRetentionLeases();
return new WritePrimaryResult<>(request, new Response(), null, null, primary, logger);
}

@Override
protected WriteReplicaResult<Request> shardOperationOnReplica(final Request request, final IndexShard replica) {
protected WriteReplicaResult<Request> shardOperationOnReplica(
final Request request,
final IndexShard replica) throws IOException {
Objects.requireNonNull(request);
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
// we flush to ensure that retention leases are committed
flush(replica);
replica.persistRetentionLeases();
return new WriteReplicaResult<>(request, null, null, replica, logger);
}

private void flush(final IndexShard indexShard) {
final FlushRequest flushRequest = new FlushRequest();
flushRequest.force(true);
flushRequest.waitIfOngoing(true);
indexShard.flush(flushRequest);
}

public static final class Request extends ReplicatedWriteRequest<Request> {

private RetentionLeases retentionLeases;
Expand Down
Loading

0 comments on commit 8142760

Please sign in to comment.