Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce retention lease state file #39004

Merged
merged 27 commits into from
Feb 18, 2019
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
da6ca36
Introduce retention lease state file
jasontedor Feb 16, 2019
2aa63fd
Remove dead field
jasontedor Feb 16, 2019
0fe1cbc
Add Javadocs
jasontedor Feb 16, 2019
509c575
Merge branch 'master' into retention-lease-state-file
jasontedor Feb 16, 2019
0465ded
Remove awaits fix
jasontedor Feb 16, 2019
2ff5b27
Remove imports
jasontedor Feb 16, 2019
64a8cd4
Fix tests
jasontedor Feb 16, 2019
0fb1a72
Tweak message
jasontedor Feb 16, 2019
f906152
Add clarifying comment
jasontedor Feb 16, 2019
542bad6
Add test for ordering
jasontedor Feb 16, 2019
aca06cd
Fix docs tests
jasontedor Feb 16, 2019
b73fcf0
Fix checkstyle
jasontedor Feb 16, 2019
bf454a3
Add one more test
jasontedor Feb 16, 2019
9168a60
Fix test
jasontedor Feb 16, 2019
a66c097
Add additional test for duplicates
jasontedor Feb 16, 2019
539a982
Remove awaits fix
jasontedor Feb 16, 2019
4d9efa3
Fix imports
jasontedor Feb 16, 2019
f1c8ebc
Refactor loading
jasontedor Feb 17, 2019
9739244
Correct Javadoc
jasontedor Feb 17, 2019
162cf77
Add test for persistence after recovery
jasontedor Feb 18, 2019
8433480
Merge remote-tracking branch 'elastic/master' into retention-lease-st…
jasontedor Feb 18, 2019
ee4d548
Remove unnecessary line wrap
jasontedor Feb 18, 2019
1675507
Remove unneeded restriction
jasontedor Feb 18, 2019
0762eda
Dedicated lock, and add test
jasontedor Feb 18, 2019
5d727ab
Remove indirection in soft deletes policy
jasontedor Feb 18, 2019
cf247be
Fix imports
jasontedor Feb 18, 2019
e4dbf01
Ensure we always persist latest
jasontedor Feb 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions docs/reference/indices/flush.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ which returns something similar to:
"max_seq_no" : "-1",
"sync_id" : "AVvFY-071siAOuFGEO9P", <1>
"max_unsafe_auto_id_timestamp" : "-1",
"min_retained_seq_no" : "0",
"retention_leases" : "primary_term:1;version:1;id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"
"min_retained_seq_no" : "0"
},
"num_docs" : 0
}
Expand All @@ -119,7 +118,6 @@ which returns something similar to:
// TESTRESPONSE[s/"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA"/"translog_uuid": $body.indices.twitter.shards.0.0.commit.user_data.translog_uuid/]
// TESTRESPONSE[s/"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ"/"history_uuid": $body.indices.twitter.shards.0.0.commit.user_data.history_uuid/]
// TESTRESPONSE[s/"sync_id" : "AVvFY-071siAOuFGEO9P"/"sync_id": $body.indices.twitter.shards.0.0.commit.user_data.sync_id/]
// TESTRESPONSE[s/"retention_leases" : "primary_term:1;version:1;id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"/"retention_leases": $body.indices.twitter.shards.0.0.commit.user_data.retention_leases/]
<1> the `sync id` marker

[float]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ public abstract class Engine implements Closeable {
public static final String SYNC_COMMIT_ID = "sync_id";
public static final String HISTORY_UUID_KEY = "history_uuid";
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
public static final String RETENTION_LEASES = "retention_leases";
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";

protected final ShardId shardId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
Expand All @@ -75,7 +74,6 @@
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
Expand Down Expand Up @@ -2344,13 +2342,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
commitData.put(HISTORY_UUID_KEY, historyUUID);
if (softDeleteEnabled) {
/*
* We sample these from the policy (which occurs under a lock) to ensure that we have a consistent view of the minimum
* retained sequence number, and the retention leases.
*/
final Tuple<Long, RetentionLeases> retentionPolicy = softDeletesPolicy.getRetentionPolicy();
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1()));
commitData.put(Engine.RETENTION_LEASES, RetentionLeases.encodeRetentionLeases(retentionPolicy.v2()));
commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo()));
}
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.search.Query;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.RetentionLease;
Expand Down Expand Up @@ -107,10 +106,6 @@ private synchronized void releaseRetentionLock() {
* Operations whose seq# is least this value should exist in the Lucene index.
*/
synchronized long getMinRetainedSeqNo() {
return getRetentionPolicy().v1();
}

public synchronized Tuple<Long, RetentionLeases> getRetentionPolicy() {
/*
* When an engine is flushed, we need to provide it the latest collection of retention leases even when the soft deletes policy is
* locked for peer recovery.
Expand Down Expand Up @@ -151,7 +146,7 @@ public synchronized Tuple<Long, RetentionLeases> getRetentionPolicy() {
*/
minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain);
}
return Tuple.tuple(minRetainedSeqNo, retentionLeases);
return minRetainedSeqNo;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -318,6 +321,37 @@ public synchronized void updateRetentionLeasesOnReplica(final RetentionLeases re
}
}

/**
* Loads the latest retention leases from their dedicated state file.
*
* @param path the path to the directory containing the state file
* @return the retention leases
* @throws IOException if an I/O exception occurs reading the retention leases
*/
public RetentionLeases loadRetentionLeases(final Path path) throws IOException {
final RetentionLeases retentionLeases = RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path);
if (retentionLeases == null) {
return RetentionLeases.EMPTY;
}
return retentionLeases;
}

private final Object retentionLeasePersistenceLock = new Object();

/**
* Persists the current retention leases to their dedicated state file.
*
* @param path the path to the directory containing the state file
* @throws WriteStateException if an exception occurs writing the state file
*/
public void persistRetentionLeases(final Path path) throws WriteStateException {
final RetentionLeases currentRetentionLeases = retentionLeases;
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
synchronized (retentionLeasePersistenceLock) {
logger.trace("persisting retention leases [{}]", currentRetentionLeases);
RetentionLeases.FORMAT.writeAndCleanup(currentRetentionLeases, path);
}
}

public static class CheckpointState implements Writeable {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@

package org.elasticsearch.index.seqno;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;

/**
Expand All @@ -34,7 +37,7 @@
* otherwise merge away operations that have been soft deleted). Each retention lease contains a unique identifier, the retaining sequence
* number, the timestamp of when the lease was created or renewed, and the source of the retention lease (e.g., "ccr").
*/
public final class RetentionLease implements Writeable {
public final class RetentionLease implements ToXContent, Writeable {

private final String id;

Expand Down Expand Up @@ -94,10 +97,6 @@ public RetentionLease(final String id, final long retainingSequenceNumber, final
if (id.isEmpty()) {
throw new IllegalArgumentException("retention lease ID can not be empty");
}
if (id.contains(":") || id.contains(";") || id.contains(",")) {
// retention lease IDs can not contain these characters because they are used in encoding retention leases
throw new IllegalArgumentException("retention lease ID can not contain any of [:;,] but was [" + id + "]");
}
if (retainingSequenceNumber < 0) {
throw new IllegalArgumentException("retention lease retaining sequence number [" + retainingSequenceNumber + "] out of range");
}
Expand All @@ -108,10 +107,6 @@ public RetentionLease(final String id, final long retainingSequenceNumber, final
if (source.isEmpty()) {
throw new IllegalArgumentException("retention lease source can not be empty");
}
if (source.contains(":") || source.contains(";") || source.contains(",")) {
// retention lease sources can not contain these characters because they are used in encoding retention leases
throw new IllegalArgumentException("retention lease source can not contain any of [:;,] but was [" + source + "]");
}
this.id = id;
this.retainingSequenceNumber = retainingSequenceNumber;
this.timestamp = timestamp;
Expand Down Expand Up @@ -145,43 +140,49 @@ public void writeTo(final StreamOutput out) throws IOException {
out.writeString(source);
}

/**
* Encodes a retention lease as a string. This encoding can be decoded by {@link #decodeRetentionLease(String)}. The retention lease is
* encoded in the format <code>id:{id};retaining_seq_no:{retainingSequenecNumber};timestamp:{timestamp};source:{source}</code>.
*
* @param retentionLease the retention lease
* @return the encoding of the retention lease
*/
static String encodeRetentionLease(final RetentionLease retentionLease) {
Objects.requireNonNull(retentionLease);
return String.format(
Locale.ROOT,
"id:%s;retaining_seq_no:%d;timestamp:%d;source:%s",
retentionLease.id,
retentionLease.retainingSequenceNumber,
retentionLease.timestamp,
retentionLease.source);
private static final ParseField ID_FIELD = new ParseField("id");
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
private static final ParseField RETAINING_SEQUENCE_NUMBER_FIELD = new ParseField("retaining_sequence_number");
private static final ParseField TIMESTAMP_FIELD = new ParseField("timestamp");
private static final ParseField SOURCE_FIELD = new ParseField("source");

private static ConstructingObjectParser<RetentionLease, Void> PARSER = new ConstructingObjectParser<>(
"retention_leases",
(a) -> new RetentionLease((String) a[0], (Long) a[1], (Long) a[2], (String) a[3]));

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), RETAINING_SEQUENCE_NUMBER_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIMESTAMP_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), SOURCE_FIELD);
}

@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
{
builder.field(ID_FIELD.getPreferredName(), id);
builder.field(RETAINING_SEQUENCE_NUMBER_FIELD.getPreferredName(), retainingSequenceNumber);
builder.field(TIMESTAMP_FIELD.getPreferredName(), timestamp);
builder.field(SOURCE_FIELD.getPreferredName(), source);
}
builder.endObject();
return builder;
}

@Override
public boolean isFragment() {
return false;
}

/**
* Decodes a retention lease encoded by {@link #encodeRetentionLease(RetentionLease)}.
* Parses a retention lease from {@link org.elasticsearch.common.xcontent.XContent}. This method assumes that the retention lease was
* converted to {@link org.elasticsearch.common.xcontent.XContent} via {@link #toXContent(XContentBuilder, Params)}.
*
* @param encodedRetentionLease an encoded retention lease
* @return the decoded retention lease
* @param parser the parser
* @return a retention lease
*/
static RetentionLease decodeRetentionLease(final String encodedRetentionLease) {
Objects.requireNonNull(encodedRetentionLease);
final String[] fields = encodedRetentionLease.split(";");
assert fields.length == 4 : Arrays.toString(fields);
assert fields[0].matches("id:[^:;,]+") : fields[0];
final String id = fields[0].substring("id:".length());
assert fields[1].matches("retaining_seq_no:\\d+") : fields[1];
final long retainingSequenceNumber = Long.parseLong(fields[1].substring("retaining_seq_no:".length()));
assert fields[2].matches("timestamp:\\d+") : fields[2];
final long timestamp = Long.parseLong(fields[2].substring("timestamp:".length()));
assert fields[3].matches("source:[^:;,]+") : fields[3];
final String source = fields[3].substring("source:".length());
return new RetentionLease(id, retainingSequenceNumber, timestamp, source);
public static RetentionLease fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -119,19 +120,21 @@ public void backgroundSync(
}

@Override
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(final Request request, final IndexShard primary) {
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
final Request request,
final IndexShard primary) throws WriteStateException {
Objects.requireNonNull(request);
Objects.requireNonNull(primary);
primary.afterWriteOperation();
primary.persistRetentionLeases();
return new PrimaryResult<>(request, new ReplicationResponse());
}

@Override
protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica){
protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica) throws WriteStateException {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
Objects.requireNonNull(request);
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
replica.afterWriteOperation();
replica.persistRetentionLeases();
return new ReplicaResult();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
Expand All @@ -39,6 +38,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -121,31 +121,26 @@ public void sync(
}

@Override
protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(final Request request, final IndexShard primary) {
protected WritePrimaryResult<Request, Response> shardOperationOnPrimary(
final Request request,
final IndexShard primary) throws WriteStateException {
Objects.requireNonNull(request);
Objects.requireNonNull(primary);
// we flush to ensure that retention leases are committed
flush(primary);
primary.persistRetentionLeases();
return new WritePrimaryResult<>(request, new Response(), null, null, primary, logger);
}

@Override
protected WriteReplicaResult<Request> shardOperationOnReplica(final Request request, final IndexShard replica) {
protected WriteReplicaResult<Request> shardOperationOnReplica(
final Request request,
final IndexShard replica) throws WriteStateException {
Objects.requireNonNull(request);
Objects.requireNonNull(replica);
replica.updateRetentionLeasesOnReplica(request.getRetentionLeases());
// we flush to ensure that retention leases are committed
flush(replica);
replica.persistRetentionLeases();
return new WriteReplicaResult<>(request, null, null, replica, logger);
}

private void flush(final IndexShard indexShard) {
final FlushRequest flushRequest = new FlushRequest();
flushRequest.force(true);
flushRequest.waitIfOngoing(true);
indexShard.flush(flushRequest);
}

public static final class Request extends ReplicatedWriteRequest<Request> {

private RetentionLeases retentionLeases;
Expand Down
Loading