From 8142760ee4477892df4557debb860039332eb24b Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 18 Feb 2019 16:52:51 -0500 Subject: [PATCH] Introduce retention lease state file (#39004) This commit moves retention leases from being persisted in the Lucene commit point to being persisted in a dedicated state file. --- .../elasticsearch/index/engine/Engine.java | 1 - .../index/engine/InternalEngine.java | 10 +- .../index/engine/SoftDeletesPolicy.java | 7 +- .../index/seqno/ReplicationTracker.java | 36 ++++++ .../index/seqno/RetentionLease.java | 87 +++++++------- .../RetentionLeaseBackgroundSyncAction.java | 10 +- .../index/seqno/RetentionLeaseSyncAction.java | 22 ++-- .../index/seqno/RetentionLeases.java | 107 +++++++++++------- .../elasticsearch/index/shard/IndexShard.java | 31 +++-- .../org/elasticsearch/index/store/Store.java | 9 +- .../indices/recovery/RecoveryTarget.java | 1 + .../index/engine/InternalEngineTests.java | 10 -- .../index/engine/SoftDeletesPolicyTests.java | 26 ----- ...ReplicationTrackerRetentionLeaseTests.java | 107 ++++++++++++++++++ ...tentionLeaseBackgroundSyncActionTests.java | 13 ++- .../index/seqno/RetentionLeaseIT.java | 29 ++--- .../seqno/RetentionLeaseSyncActionTests.java | 20 ++-- .../index/seqno/RetentionLeaseTests.java | 25 ---- .../seqno/RetentionLeaseXContentTests.java | 48 ++++++++ .../index/seqno/RetentionLeasesTests.java | 75 ++++++++---- .../seqno/RetentionLeasesXContentTests.java | 58 ++++++++++ .../shard/IndexShardRetentionLeaseTests.java | 101 ++--------------- .../indexlifecycle/CCRIndexLifecycleIT.java | 1 - 23 files changed, 479 insertions(+), 355 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseXContentTests.java create mode 100644 server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesXContentTests.java diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 62bd1f1b2f189..36e76012027a5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -114,7 +114,6 @@ public abstract class Engine implements Closeable { public static final String SYNC_COMMIT_ID = "sync_id"; public static final String HISTORY_UUID_KEY = "history_uuid"; public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no"; - public static final String RETENTION_LEASES = "retention_leases"; public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; protected final ShardId shardId; diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 60b37a45d76b8..09df4a6a23e19 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -51,7 +51,6 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lucene.LoggerInfoStream; import org.elasticsearch.common.lucene.Lucene; @@ -76,7 +75,6 @@ import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.seqno.LocalCheckpointTracker; -import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; @@ -2433,13 +2431,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); commitData.put(HISTORY_UUID_KEY, historyUUID); if (softDeleteEnabled) { - /* - * We sample these from the policy (which occurs under a lock) to ensure that we have a consistent view of the minimum - * retained sequence number, and the retention leases. - */ - final Tuple retentionPolicy = softDeletesPolicy.getRetentionPolicy(); - commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1())); - commitData.put(Engine.RETENTION_LEASES, RetentionLeases.encodeRetentionLeases(retentionPolicy.v2())); + commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo())); } logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java index 9a9c7bd0ee869..4c9ee0be92f46 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java @@ -21,7 +21,6 @@ import org.apache.lucene.document.LongPoint; import org.apache.lucene.search.Query; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.seqno.RetentionLease; @@ -107,10 +106,6 @@ private synchronized void releaseRetentionLock() { * Operations whose seq# is least this value should exist in the Lucene index. */ synchronized long getMinRetainedSeqNo() { - return getRetentionPolicy().v1(); - } - - public synchronized Tuple getRetentionPolicy() { /* * When an engine is flushed, we need to provide it the latest collection of retention leases even when the soft deletes policy is * locked for peer recovery. @@ -151,7 +146,7 @@ public synchronized Tuple getRetentionPolicy() { */ minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain); } - return Tuple.tuple(minRetainedSeqNo, retentionLeases); + return minRetainedSeqNo; } /** diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 3e4e83d365ec1..b0aa8c4dc5524 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.IndexShard; @@ -39,6 +40,7 @@ import org.elasticsearch.index.shard.ShardId; import java.io.IOException; +import java.nio.file.Path; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -318,6 +320,40 @@ public synchronized void updateRetentionLeasesOnReplica(final RetentionLeases re } } + /** + * Loads the latest retention leases from their dedicated state file. + * + * @param path the path to the directory containing the state file + * @return the retention leases + * @throws IOException if an I/O exception occurs reading the retention leases + */ + public RetentionLeases loadRetentionLeases(final Path path) throws IOException { + final RetentionLeases retentionLeases = RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path); + if (retentionLeases == null) { + return RetentionLeases.EMPTY; + } + return retentionLeases; + } + + private final Object retentionLeasePersistenceLock = new Object(); + + /** + * Persists the current retention leases to their dedicated state file. + * + * @param path the path to the directory containing the state file + * @throws IOException if an exception occurs writing the state file + */ + public void persistRetentionLeases(final Path path) throws IOException { + synchronized (retentionLeasePersistenceLock) { + final RetentionLeases currentRetentionLeases; + synchronized (this) { + currentRetentionLeases = retentionLeases; + } + logger.trace("persisting retention leases [{}]", currentRetentionLeases); + RetentionLeases.FORMAT.write(currentRetentionLeases, path); + } + } + public static class CheckpointState implements Writeable { /** diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java index e1d362d98764a..e6d6ed3fe825f 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java @@ -19,13 +19,16 @@ package org.elasticsearch.index.seqno; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; -import java.util.Arrays; -import java.util.Locale; import java.util.Objects; /** @@ -34,7 +37,7 @@ * otherwise merge away operations that have been soft deleted). Each retention lease contains a unique identifier, the retaining sequence * number, the timestamp of when the lease was created or renewed, and the source of the retention lease (e.g., "ccr"). */ -public final class RetentionLease implements Writeable { +public final class RetentionLease implements ToXContent, Writeable { private final String id; @@ -94,10 +97,6 @@ public RetentionLease(final String id, final long retainingSequenceNumber, final if (id.isEmpty()) { throw new IllegalArgumentException("retention lease ID can not be empty"); } - if (id.contains(":") || id.contains(";") || id.contains(",")) { - // retention lease IDs can not contain these characters because they are used in encoding retention leases - throw new IllegalArgumentException("retention lease ID can not contain any of [:;,] but was [" + id + "]"); - } if (retainingSequenceNumber < 0) { throw new IllegalArgumentException("retention lease retaining sequence number [" + retainingSequenceNumber + "] out of range"); } @@ -108,10 +107,6 @@ public RetentionLease(final String id, final long retainingSequenceNumber, final if (source.isEmpty()) { throw new IllegalArgumentException("retention lease source can not be empty"); } - if (source.contains(":") || source.contains(";") || source.contains(",")) { - // retention lease sources can not contain these characters because they are used in encoding retention leases - throw new IllegalArgumentException("retention lease source can not contain any of [:;,] but was [" + source + "]"); - } this.id = id; this.retainingSequenceNumber = retainingSequenceNumber; this.timestamp = timestamp; @@ -145,43 +140,49 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeString(source); } - /** - * Encodes a retention lease as a string. This encoding can be decoded by {@link #decodeRetentionLease(String)}. The retention lease is - * encoded in the format id:{id};retaining_seq_no:{retainingSequenecNumber};timestamp:{timestamp};source:{source}. - * - * @param retentionLease the retention lease - * @return the encoding of the retention lease - */ - static String encodeRetentionLease(final RetentionLease retentionLease) { - Objects.requireNonNull(retentionLease); - return String.format( - Locale.ROOT, - "id:%s;retaining_seq_no:%d;timestamp:%d;source:%s", - retentionLease.id, - retentionLease.retainingSequenceNumber, - retentionLease.timestamp, - retentionLease.source); + private static final ParseField ID_FIELD = new ParseField("id"); + private static final ParseField RETAINING_SEQUENCE_NUMBER_FIELD = new ParseField("retaining_sequence_number"); + private static final ParseField TIMESTAMP_FIELD = new ParseField("timestamp"); + private static final ParseField SOURCE_FIELD = new ParseField("source"); + + private static ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "retention_leases", + (a) -> new RetentionLease((String) a[0], (Long) a[1], (Long) a[2], (String) a[3])); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), RETAINING_SEQUENCE_NUMBER_FIELD); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), TIMESTAMP_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), SOURCE_FIELD); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + { + builder.field(ID_FIELD.getPreferredName(), id); + builder.field(RETAINING_SEQUENCE_NUMBER_FIELD.getPreferredName(), retainingSequenceNumber); + builder.field(TIMESTAMP_FIELD.getPreferredName(), timestamp); + builder.field(SOURCE_FIELD.getPreferredName(), source); + } + builder.endObject(); + return builder; + } + + @Override + public boolean isFragment() { + return false; } /** - * Decodes a retention lease encoded by {@link #encodeRetentionLease(RetentionLease)}. + * Parses a retention lease from {@link org.elasticsearch.common.xcontent.XContent}. This method assumes that the retention lease was + * converted to {@link org.elasticsearch.common.xcontent.XContent} via {@link #toXContent(XContentBuilder, Params)}. * - * @param encodedRetentionLease an encoded retention lease - * @return the decoded retention lease + * @param parser the parser + * @return a retention lease */ - static RetentionLease decodeRetentionLease(final String encodedRetentionLease) { - Objects.requireNonNull(encodedRetentionLease); - final String[] fields = encodedRetentionLease.split(";"); - assert fields.length == 4 : Arrays.toString(fields); - assert fields[0].matches("id:[^:;,]+") : fields[0]; - final String id = fields[0].substring("id:".length()); - assert fields[1].matches("retaining_seq_no:\\d+") : fields[1]; - final long retainingSequenceNumber = Long.parseLong(fields[1].substring("retaining_seq_no:".length())); - assert fields[2].matches("timestamp:\\d+") : fields[2]; - final long timestamp = Long.parseLong(fields[2].substring("timestamp:".length())); - assert fields[3].matches("source:[^:;,]+") : fields[3]; - final String source = fields[3].substring("source:".length()); - return new RetentionLease(id, retainingSequenceNumber, timestamp, source); + public static RetentionLease fromXContent(final XContentParser parser) { + return PARSER.apply(parser, null); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index 906b505dad7e3..a8465790132b1 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -119,19 +119,21 @@ public void backgroundSync( } @Override - protected PrimaryResult shardOperationOnPrimary(final Request request, final IndexShard primary) { + protected PrimaryResult shardOperationOnPrimary( + final Request request, + final IndexShard primary) throws IOException { Objects.requireNonNull(request); Objects.requireNonNull(primary); - primary.afterWriteOperation(); + primary.persistRetentionLeases(); return new PrimaryResult<>(request, new ReplicationResponse()); } @Override - protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica){ + protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica) throws IOException { Objects.requireNonNull(request); Objects.requireNonNull(replica); replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); - replica.afterWriteOperation(); + replica.persistRetentionLeases(); return new ReplicaResult(); } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index 9be7ab046eb8b..ac0bf6be987af 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -25,7 +25,6 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.WriteResponse; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; @@ -121,31 +120,26 @@ public void sync( } @Override - protected WritePrimaryResult shardOperationOnPrimary(final Request request, final IndexShard primary) { + protected WritePrimaryResult shardOperationOnPrimary( + final Request request, + final IndexShard primary) throws IOException { Objects.requireNonNull(request); Objects.requireNonNull(primary); - // we flush to ensure that retention leases are committed - flush(primary); + primary.persistRetentionLeases(); return new WritePrimaryResult<>(request, new Response(), null, null, primary, logger); } @Override - protected WriteReplicaResult shardOperationOnReplica(final Request request, final IndexShard replica) { + protected WriteReplicaResult shardOperationOnReplica( + final Request request, + final IndexShard replica) throws IOException { Objects.requireNonNull(request); Objects.requireNonNull(replica); replica.updateRetentionLeasesOnReplica(request.getRetentionLeases()); - // we flush to ensure that retention leases are committed - flush(replica); + replica.persistRetentionLeases(); return new WriteReplicaResult<>(request, null, null, replica, logger); } - private void flush(final IndexShard indexShard) { - final FlushRequest flushRequest = new FlushRequest(); - flushRequest.force(true); - flushRequest.waitIfOngoing(true); - indexShard.flush(flushRequest); - } - public static final class Request extends ReplicatedWriteRequest { private RetentionLeases retentionLeases; diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java index 5a9d9e333b27b..3bad887282502 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java @@ -19,15 +19,20 @@ package org.elasticsearch.index.seqno; +import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.gateway.MetaDataStateFormat; import java.io.IOException; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Locale; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; import java.util.function.Function; @@ -37,7 +42,7 @@ * Represents a versioned collection of retention leases. We version the collection of retention leases to ensure that sync requests that * arrive out of order on the replica, using the version to ensure that older sync requests are rejected. */ -public class RetentionLeases implements Writeable { +public class RetentionLeases implements ToXContent, Writeable { private final long primaryTerm; @@ -157,54 +162,59 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeCollection(leases.values()); } - /** - * Encodes a retention lease collection as a string. This encoding can be decoded by - * {@link RetentionLeases#decodeRetentionLeases(String)}. The encoding is a comma-separated encoding of each retention lease as encoded - * by {@link RetentionLease#encodeRetentionLease(RetentionLease)}, prefixed by the version of the retention lease collection. - * - * @param retentionLeases the retention lease collection - * @return the encoding of the retention lease collection - */ - public static String encodeRetentionLeases(final RetentionLeases retentionLeases) { - Objects.requireNonNull(retentionLeases); - return String.format( - Locale.ROOT, - "primary_term:%d;version:%d;%s", - retentionLeases.primaryTerm, - retentionLeases.version, - retentionLeases.leases.values().stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(","))); + private static final ParseField PRIMARY_TERM_FIELD = new ParseField("primary_term"); + private static final ParseField VERSION_FIELD = new ParseField("version"); + private static final ParseField LEASES_FIELD = new ParseField("leases"); + + @SuppressWarnings("unchecked") + private static ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "retention_leases", + (a) -> new RetentionLeases((Long) a[0], (Long) a[1], (Collection) a[2])); + + static { + PARSER.declareLong(ConstructingObjectParser.constructorArg(), PRIMARY_TERM_FIELD); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), VERSION_FIELD); + PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> RetentionLease.fromXContent(p), LEASES_FIELD); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.field(PRIMARY_TERM_FIELD.getPreferredName(), primaryTerm); + builder.field(VERSION_FIELD.getPreferredName(), version); + builder.startArray(LEASES_FIELD.getPreferredName()); + { + for (final RetentionLease retentionLease : leases.values()) { + retentionLease.toXContent(builder, params); + } + } + builder.endArray(); + return builder; } /** - * Decodes retention leases encoded by {@link #encodeRetentionLeases(RetentionLeases)}. + * Parses a retention leases collection from {@link org.elasticsearch.common.xcontent.XContent}. This method assumes that the retention + * leases were converted to {@link org.elasticsearch.common.xcontent.XContent} via {@link #toXContent(XContentBuilder, Params)}. * - * @param encodedRetentionLeases an encoded retention lease collection - * @return the decoded retention lease collection + * @param parser the parser + * @return a retention leases collection */ - public static RetentionLeases decodeRetentionLeases(final String encodedRetentionLeases) { - Objects.requireNonNull(encodedRetentionLeases); - if (encodedRetentionLeases.isEmpty()) { - return EMPTY; + public static RetentionLeases fromXContent(final XContentParser parser) { + return PARSER.apply(parser, null); + } + + static final MetaDataStateFormat FORMAT = new MetaDataStateFormat("retention-leases-") { + + @Override + public void toXContent(final XContentBuilder builder, final RetentionLeases retentionLeases) throws IOException { + retentionLeases.toXContent(builder, ToXContent.EMPTY_PARAMS); } - assert encodedRetentionLeases.matches("primary_term:\\d+;version:\\d+;.*") : encodedRetentionLeases; - final int firstSemicolon = encodedRetentionLeases.indexOf(";"); - final long primaryTerm = Long.parseLong(encodedRetentionLeases.substring("primary_term:".length(), firstSemicolon)); - final int secondSemicolon = encodedRetentionLeases.indexOf(";", firstSemicolon + 1); - final long version = Long.parseLong(encodedRetentionLeases.substring(firstSemicolon + 1 + "version:".length(), secondSemicolon)); - final Collection leases; - if (secondSemicolon + 1 == encodedRetentionLeases.length()) { - leases = Collections.emptyList(); - } else { - assert Arrays.stream(encodedRetentionLeases.substring(secondSemicolon + 1).split(",")) - .allMatch(s -> s.matches("id:[^:;,]+;retaining_seq_no:\\d+;timestamp:\\d+;source:[^:;,]+")) - : encodedRetentionLeases; - leases = Arrays.stream(encodedRetentionLeases.substring(secondSemicolon + 1).split(",")) - .map(RetentionLease::decodeRetentionLease) - .collect(Collectors.toList()); + + @Override + public RetentionLeases fromXContent(final XContentParser parser) { + return RetentionLeases.fromXContent(parser); } - return new RetentionLeases(primaryTerm, version, leases); - } + }; @Override public boolean equals(Object o) { @@ -237,7 +247,16 @@ public String toString() { * @return the map from retention lease ID to retention lease */ private static Map toMap(final Collection leases) { - return leases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity())); + // use a linked hash map to preserve order + return leases.stream() + .collect(Collectors.toMap( + RetentionLease::id, + Function.identity(), + (left, right) -> { + assert left.id().equals(right.id()) : "expected [" + left.id() + "] to equal [" + right.id() + "]"; + throw new IllegalStateException("duplicate retention lease ID [" + left.id() + "]"); + }, + LinkedHashMap::new)); } /** diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index a8109034f4b80..2cb4eac5b7658 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1464,7 +1464,7 @@ private void innerOpenEngineAndTranslog() throws IOException { final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint"); - replicationTracker.updateRetentionLeasesOnReplica(getRetentionLeases(store.readLastCommittedSegmentsInfo())); + updateRetentionLeasesOnReplica(loadRetentionLeases()); trimUnsafeCommits(); synchronized (mutex) { verifyNotClosed(); @@ -1484,14 +1484,6 @@ private void innerOpenEngineAndTranslog() throws IOException { assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); } - static RetentionLeases getRetentionLeases(final SegmentInfos segmentInfos) { - final String committedRetentionLeases = segmentInfos.getUserData().get(Engine.RETENTION_LEASES); - if (committedRetentionLeases == null) { - return RetentionLeases.EMPTY; - } - return RetentionLeases.decodeRetentionLeases(committedRetentionLeases); - } - private void trimUnsafeCommits() throws IOException { assert currentEngineReference.get() == null || currentEngineReference.get() instanceof ReadOnlyEngine : "a write engine is running"; final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); @@ -2040,6 +2032,27 @@ public void updateRetentionLeasesOnReplica(final RetentionLeases retentionLeases replicationTracker.updateRetentionLeasesOnReplica(retentionLeases); } + /** + * Loads the latest retention leases from their dedicated state file. + * + * @return the retention leases + * @throws IOException if an I/O exception occurs reading the retention leases + */ + public RetentionLeases loadRetentionLeases() throws IOException { + verifyNotClosed(); + return replicationTracker.loadRetentionLeases(path.getShardStatePath()); + } + + /** + * Persists the current retention leases to their dedicated state file. + * + * @throws IOException if an exception occurs writing the state file + */ + public void persistRetentionLeases() throws IOException { + verifyNotClosed(); + replicationTracker.persistRetentionLeases(path.getShardStatePath()); + } + /** * Syncs the current retention leases to all replicas. */ diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index c53f719ddb177..5fa097373f5b3 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -1581,13 +1581,6 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long } logger.debug("starting index commit [{}]", startingIndexCommit.getUserData()); if (startingIndexCommit.equals(lastIndexCommitCommit) == false) { - /* - * Unlike other commit tags, the retention-leases tag is not restored when an engine is - * recovered from translog. We need to manually copy it from the last commit to the safe commit; - * otherwise we might lose the latest committed retention leases when re-opening an engine. - */ - final Map userData = new HashMap<>(startingIndexCommit.getUserData()); - userData.put(Engine.RETENTION_LEASES, lastIndexCommitCommit.getUserData().getOrDefault(Engine.RETENTION_LEASES, "")); try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, startingIndexCommit)) { // this achieves two things: // - by committing a new commit based on the starting commit, it make sure the starting commit will be opened @@ -1598,7 +1591,7 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long // The new commit will use segment files from the starting commit but userData from the last commit by default. // Thus, we need to manually set the userData from the starting commit to the new commit. - writer.setLiveCommitData(userData.entrySet()); + writer.setLiveCommitData(startingIndexCommit.getUserData().entrySet()); writer.commit(); } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 1b2a4caebc3a9..55dd4b242b495 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -310,6 +310,7 @@ public void finalizeRecovery(final long globalCheckpoint, ActionListener l indexShard.updateGlobalCheckpointOnReplica(globalCheckpoint, "finalizing recovery"); // Persist the global checkpoint. indexShard.sync(); + indexShard.persistRetentionLeases(); indexShard.finalizeRecovery(); return null; }); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 9cf199debeda0..ecede189ce67c 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5385,16 +5385,6 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { engine.flush(true, true); assertThat(Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(Engine.MIN_RETAINED_SEQNO)), equalTo(engine.getMinRetainedSeqNo())); - final RetentionLeases leases = retentionLeasesHolder.get(); - if (leases.leases().isEmpty()) { - assertThat( - engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES), - equalTo("primary_term:" + primaryTerm + ";version:" + retentionLeasesVersion.get() + ";")); - } else { - assertThat( - engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES), - equalTo(RetentionLeases.encodeRetentionLeases(leases))); - } } if (rarely()) { engine.forceMerge(randomBoolean()); diff --git a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java index e4da636deaf6d..3c71e4fede3d5 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java @@ -37,8 +37,6 @@ import java.util.function.Supplier; import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -121,30 +119,6 @@ public void testSoftDeletesRetentionLock() { assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo)); } - public void testAlwaysFetchLatestRetentionLeases() { - final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED); - final Collection leases = new ArrayList<>(); - final int numLeases = randomIntBetween(0, 10); - for (int i = 0; i < numLeases; i++) { - leases.add(new RetentionLease(Integer.toString(i), randomLongBetween(0, 1000), randomNonNegativeLong(), "test")); - } - final Supplier leasesSupplier = - () -> new RetentionLeases( - randomNonNegativeLong(), - randomNonNegativeLong(), - Collections.unmodifiableCollection(new ArrayList<>(leases))); - final SoftDeletesPolicy policy = - new SoftDeletesPolicy(globalCheckpoint::get, randomIntBetween(1, 1000), randomIntBetween(0, 1000), leasesSupplier); - if (randomBoolean()) { - policy.acquireRetentionLock(); - } - if (numLeases == 0) { - assertThat(policy.getRetentionPolicy().v2().leases(), empty()); - } else { - assertThat(policy.getRetentionPolicy().v2().leases(), contains(leases.toArray(new RetentionLease[0]))); - } - } - public void testWhenGlobalCheckpointDictatesThePolicy() { final int retentionOperations = randomIntBetween(0, 1024); final AtomicLong globalCheckpoint = new AtomicLong(randomLongBetween(0, Long.MAX_VALUE - 2)); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index a9aae80db6ca4..f8781d0e78d40 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -28,12 +28,16 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.IndexSettingsModule; +import java.io.IOException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -460,6 +464,109 @@ public void testReplicaIgnoresOlderRetentionLeasesVersion() { } } + public void testLoadAndPersistRetentionLeases() throws IOException { + final AllocationId allocationId = AllocationId.newInitializing(); + long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); + final ReplicationTracker replicationTracker = new ReplicationTracker( + new ShardId("test", "_na", 0), + allocationId.getId(), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + primaryTerm, + UNASSIGNED_SEQ_NO, + value -> {}, + () -> 0L, + (leases, listener) -> {}); + replicationTracker.updateFromMaster( + randomNonNegativeLong(), + Collections.singleton(allocationId.getId()), + routingTable(Collections.emptySet(), allocationId), + Collections.emptySet()); + replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + final int length = randomIntBetween(0, 8); + for (int i = 0; i < length; i++) { + if (rarely() && primaryTerm < Long.MAX_VALUE) { + primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE); + replicationTracker.setOperationPrimaryTerm(primaryTerm); + } + final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + replicationTracker.addRetentionLease( + Integer.toString(i), retainingSequenceNumber, "test-" + i, ActionListener.wrap(() -> {})); + } + + final Path path = createTempDir(); + replicationTracker.persistRetentionLeases(path); + assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases())); + } + + /** + * Test that we correctly synchronize writing the retention lease state file in {@link ReplicationTracker#persistRetentionLeases(Path)}. + * This test can fail without the synchronization block in that method. + * + * @throws IOException if an I/O exception occurs loading the retention lease state file + */ + public void testPersistRetentionLeasesUnderConcurrency() throws IOException { + final AllocationId allocationId = AllocationId.newInitializing(); + long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); + final ReplicationTracker replicationTracker = new ReplicationTracker( + new ShardId("test", "_na", 0), + allocationId.getId(), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + primaryTerm, + UNASSIGNED_SEQ_NO, + value -> {}, + () -> 0L, + (leases, listener) -> {}); + replicationTracker.updateFromMaster( + randomNonNegativeLong(), + Collections.singleton(allocationId.getId()), + routingTable(Collections.emptySet(), allocationId), + Collections.emptySet()); + replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + final int length = randomIntBetween(0, 8); + for (int i = 0; i < length; i++) { + if (rarely() && primaryTerm < Long.MAX_VALUE) { + primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE); + replicationTracker.setOperationPrimaryTerm(primaryTerm); + } + final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + replicationTracker.addRetentionLease( + Integer.toString(i), retainingSequenceNumber, "test-" + i, ActionListener.wrap(() -> {})); + } + + final Path path = createTempDir(); + final int numberOfThreads = randomIntBetween(1, 2 * Runtime.getRuntime().availableProcessors()); + final CyclicBarrier barrier = new CyclicBarrier(1 + numberOfThreads); + final Thread[] threads = new Thread[numberOfThreads]; + for (int i = 0; i < numberOfThreads; i++) { + final String id = Integer.toString(length + i); + threads[i] = new Thread(() -> { + try { + barrier.await(); + final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test-" + id, ActionListener.wrap(() -> {})); + replicationTracker.persistRetentionLeases(path); + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException | IOException e) { + throw new AssertionError(e); + } + }); + threads[i].start(); + } + + try { + // synchronize the threads invoking ReplicationTracker#persistRetentionLeases(Path path) + barrier.await(); + // wait for all the threads to finish + barrier.await(); + for (int i = 0; i < numberOfThreads; i++) { + threads[i].join(); + } + } catch (final BrokenBarrierException | InterruptedException e) { + throw new AssertionError(e); + } + assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases())); + } + private void assertRetentionLeases( final ReplicationTracker replicationTracker, final int size, diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java index ff4beea808915..58c4d4b7b4eeb 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.transport.TransportService; import org.mockito.ArgumentCaptor; +import java.io.IOException; import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; @@ -91,7 +92,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testRetentionLeaseBackgroundSyncActionOnPrimary() { + public void testRetentionLeaseBackgroundSyncActionOnPrimary() throws IOException { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); @@ -120,13 +121,13 @@ public void testRetentionLeaseBackgroundSyncActionOnPrimary() { final ReplicationOperation.PrimaryResult result = action.shardOperationOnPrimary(request, indexShard); - // the retention leases on the shard should be periodically flushed - verify(indexShard).afterWriteOperation(); + // the retention leases on the shard should be persisted + verify(indexShard).persistRetentionLeases(); // we should forward the request containing the current retention leases to the replica assertThat(result.replicaRequest(), sameInstance(request)); } - public void testRetentionLeaseBackgroundSyncActionOnReplica() { + public void testRetentionLeaseBackgroundSyncActionOnReplica() throws IOException { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); @@ -156,8 +157,8 @@ public void testRetentionLeaseBackgroundSyncActionOnReplica() { final TransportReplicationAction.ReplicaResult result = action.shardOperationOnReplica(request, indexShard); // the retention leases on the shard should be updated verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases); - // the retention leases on the shard should be periodically flushed - verify(indexShard).afterWriteOperation(); + // the retention leases on the shard should be persisted + verify(indexShard).persistRetentionLeases(); // the result should indicate success final AtomicBoolean success = new AtomicBoolean(); result.respond(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString()))); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java index 65854040e7893..863013bdd64ee 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseIT.java @@ -28,7 +28,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -104,10 +103,8 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { latch.await(); retentionLock.close(); - // check retention leases have been committed on the primary - final RetentionLeases primaryCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases( - primary.commitStats().getUserData().get(Engine.RETENTION_LEASES)); - assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primaryCommittedRetentionLeases))); + // check retention leases have been written on the primary + assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primary.loadRetentionLeases()))); // check current retention leases have been synced to all replicas for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { @@ -119,10 +116,8 @@ public void testRetentionLeasesSyncedOnAdd() throws Exception { final Map retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases()); assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); - // check retention leases have been committed on the replica - final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases( - replica.commitStats().getUserData().get(Engine.RETENTION_LEASES)); - assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replicaCommittedRetentionLeases))); + // check retention leases have been written on the replica + assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replica.loadRetentionLeases()))); } } } @@ -167,10 +162,8 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception { latch.await(); retentionLock.close(); - // check retention leases have been committed on the primary - final RetentionLeases primaryCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases( - primary.commitStats().getUserData().get(Engine.RETENTION_LEASES)); - assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primaryCommittedRetentionLeases))); + // check retention leases have been written on the primary + assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primary.loadRetentionLeases()))); // check current retention leases have been synced to all replicas for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { @@ -182,10 +175,8 @@ public void testRetentionLeaseSyncedOnRemove() throws Exception { final Map retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases()); assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); - // check retention leases have been committed on the replica - final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases( - replica.commitStats().getUserData().get(Engine.RETENTION_LEASES)); - assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replicaCommittedRetentionLeases))); + // check retention leases have been written on the replica + assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replica.loadRetentionLeases()))); } } } @@ -326,7 +317,6 @@ public void testBackgroundRetentionLeaseSync() throws Exception { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38588") public void testRetentionLeasesSyncOnRecovery() throws Exception { final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); @@ -383,6 +373,9 @@ public void testRetentionLeasesSyncOnRecovery() throws Exception { .getShardOrNull(new ShardId(resolveIndex("index"), 0)); final Map retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases()); assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases)); + + // check retention leases have been written on the replica; see RecoveryTarget#finalizeRecovery + assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replica.loadRetentionLeases()))); } } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java index c0b13d32dcb9d..cce1ab0e955c3 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -21,7 +21,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -43,6 +42,7 @@ import org.elasticsearch.transport.TransportService; import org.mockito.ArgumentCaptor; +import java.io.IOException; import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; @@ -90,7 +90,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testRetentionLeaseSyncActionOnPrimary() { + public void testRetentionLeaseSyncActionOnPrimary() throws IOException { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); @@ -118,18 +118,15 @@ public void testRetentionLeaseSyncActionOnPrimary() { final TransportWriteAction.WritePrimaryResult result = action.shardOperationOnPrimary(request, indexShard); - // the retention leases on the shard should be flushed - final ArgumentCaptor flushRequest = ArgumentCaptor.forClass(FlushRequest.class); - verify(indexShard).flush(flushRequest.capture()); - assertTrue(flushRequest.getValue().force()); - assertTrue(flushRequest.getValue().waitIfOngoing()); + // the retention leases on the shard should be persisted + verify(indexShard).persistRetentionLeases(); // we should forward the request containing the current retention leases to the replica assertThat(result.replicaRequest(), sameInstance(request)); // we should start with an empty replication response assertNull(result.finalResponseIfSuccessful.getShardInfo()); } - public void testRetentionLeaseSyncActionOnReplica() { + public void testRetentionLeaseSyncActionOnReplica() throws IOException { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); @@ -159,11 +156,8 @@ public void testRetentionLeaseSyncActionOnReplica() { action.shardOperationOnReplica(request, indexShard); // the retention leases on the shard should be updated verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases); - // the retention leases on the shard should be flushed - final ArgumentCaptor flushRequest = ArgumentCaptor.forClass(FlushRequest.class); - verify(indexShard).flush(flushRequest.capture()); - assertTrue(flushRequest.getValue().force()); - assertTrue(flushRequest.getValue().waitIfOngoing()); + // the retention leases on the shard should be persisteed + verify(indexShard).persistRetentionLeases(); // the result should indicate success final AtomicBoolean success = new AtomicBoolean(); result.respond(ActionListener.wrap(r -> success.set(true), e -> fail(e.toString()))); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java index bd2dee78b05ed..f38a806bd7b95 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java @@ -31,14 +31,6 @@ public class RetentionLeaseTests extends ESTestCase { - public void testInvalidId() { - final String id = "id" + randomFrom(":", ";", ","); - final IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> new RetentionLease(id, randomNonNegativeLong(), randomNonNegativeLong(), "source")); - assertThat(e, hasToString(containsString("retention lease ID can not contain any of [:;,] but was [" + id + "]"))); - } - public void testEmptyId() { final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, @@ -64,14 +56,6 @@ public void testTimestampOutOfRange() { assertThat(e, hasToString(containsString("retention lease timestamp [" + timestamp + "] out of range"))); } - public void testInvalidSource() { - final String source = "source" + randomFrom(":", ";", ","); - final IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> new RetentionLease("id", randomNonNegativeLong(), randomNonNegativeLong(), source)); - assertThat(e, hasToString(containsString("retention lease source can not contain any of [:;,] but was [" + source + "]"))); - } - public void testEmptySource() { final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, @@ -93,13 +77,4 @@ public void testRetentionLeaseSerialization() throws IOException { } } - public void testRetentionLeaseEncoding() { - final String id = randomAlphaOfLength(8); - final long retainingSequenceNumber = randomNonNegativeLong(); - final long timestamp = randomNonNegativeLong(); - final String source = randomAlphaOfLength(8); - final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source); - assertThat(RetentionLease.decodeRetentionLease(RetentionLease.encodeRetentionLease(retentionLease)), equalTo(retentionLease)); - } - } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseXContentTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseXContentTests.java new file mode 100644 index 0000000000000..159e85b572b98 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseXContentTests.java @@ -0,0 +1,48 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; + +public class RetentionLeaseXContentTests extends AbstractXContentTestCase { + + @Override + protected RetentionLease createTestInstance() { + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomNonNegativeLong(); + final long timestamp = randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + return new RetentionLease(id, retainingSequenceNumber, timestamp, source); + } + + @Override + protected RetentionLease doParseInstance(final XContentParser parser) throws IOException { + return RetentionLease.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java index 33cc83f602860..bf11f610f107b 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java @@ -19,13 +19,18 @@ package org.elasticsearch.index.seqno; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.test.ESTestCase; +import java.io.IOException; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; -import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -49,30 +54,6 @@ public void testVersionOutOfRange() { assertThat(e, hasToString(containsString("version must be non-negative but was [" + version + "]"))); } - public void testRetentionLeasesEncoding() { - final long primaryTerm = randomNonNegativeLong(); - final long version = randomNonNegativeLong(); - final int length = randomIntBetween(0, 8); - final List retentionLeases = new ArrayList<>(length); - for (int i = 0; i < length; i++) { - final String id = randomAlphaOfLength(8); - final long retainingSequenceNumber = randomNonNegativeLong(); - final long timestamp = randomNonNegativeLong(); - final String source = randomAlphaOfLength(8); - final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source); - retentionLeases.add(retentionLease); - } - final RetentionLeases decodedRetentionLeases = - RetentionLeases.decodeRetentionLeases( - RetentionLeases.encodeRetentionLeases(new RetentionLeases(primaryTerm, version, retentionLeases))); - assertThat(decodedRetentionLeases.version(), equalTo(version)); - if (length == 0) { - assertThat(decodedRetentionLeases.leases(), empty()); - } else { - assertThat(decodedRetentionLeases.leases(), containsInAnyOrder(retentionLeases.toArray(new RetentionLease[0]))); - } - } - public void testSupersedesByPrimaryTerm() { final long lowerPrimaryTerm = randomLongBetween(1, Long.MAX_VALUE); final RetentionLeases left = new RetentionLeases(lowerPrimaryTerm, randomLongBetween(1, Long.MAX_VALUE), Collections.emptyList()); @@ -92,4 +73,48 @@ public void testSupersedesByVersion() { assertFalse(left.supersedes(right)); } + public void testRetentionLeasesRejectsDuplicates() { + final RetentionLeases retentionLeases = randomRetentionLeases(false); + final RetentionLease retentionLease = randomFrom(retentionLeases.leases()); + final IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> new RetentionLeases( + retentionLeases.primaryTerm(), + retentionLeases.version(), + Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList()))); + assertThat(e, hasToString(containsString("duplicate retention lease ID [" + retentionLease.id() + "]"))); + } + + public void testLeasesPreservesIterationOrder() { + final RetentionLeases retentionLeases = randomRetentionLeases(true); + if (retentionLeases.leases().isEmpty()) { + assertThat(retentionLeases.leases(), empty()); + } else { + assertThat(retentionLeases.leases(), contains(retentionLeases.leases().toArray(new RetentionLease[0]))); + } + } + + public void testRetentionLeasesMetaDataStateFormat() throws IOException { + final Path path = createTempDir(); + final RetentionLeases retentionLeases = randomRetentionLeases(true); + RetentionLeases.FORMAT.write(retentionLeases, path); + assertThat(RetentionLeases.FORMAT.loadLatestState(logger, NamedXContentRegistry.EMPTY, path), equalTo(retentionLeases)); + } + + private RetentionLeases randomRetentionLeases(boolean allowEmpty) { + final long primaryTerm = randomNonNegativeLong(); + final long version = randomNonNegativeLong(); + final int length = randomIntBetween(allowEmpty ? 0 : 1, 8); + final List leases = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomNonNegativeLong(); + final long timestamp = randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source); + leases.add(retentionLease); + } + return new RetentionLeases(primaryTerm, version, leases); + } + } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesXContentTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesXContentTests.java new file mode 100644 index 0000000000000..5fc2ace16ee94 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesXContentTests.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class RetentionLeasesXContentTests extends AbstractXContentTestCase { + + @Override + protected RetentionLeases createTestInstance() { + final long primaryTerm = randomNonNegativeLong(); + final long version = randomNonNegativeLong(); + final int length = randomIntBetween(0, 8); + final List leases = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomNonNegativeLong(); + final long timestamp = randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source); + leases.add(retentionLease); + } + return new RetentionLeases(primaryTerm, version, leases); + } + + @Override + protected RetentionLeases doParseInstance(final XContentParser parser) throws IOException { + return RetentionLeases.fromXContent(parser); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index 5f103d484f8c1..566d1feaf007d 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -19,43 +19,30 @@ package org.elasticsearch.index.shard; -import org.apache.lucene.index.SegmentInfos; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.flush.FlushRequest; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRoutingHelper; -import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseStats; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; -import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; @@ -221,7 +208,7 @@ private void runExpirationTest(final boolean primary) throws IOException { } } - public void testCommit() throws IOException { + public void testPersistence() throws IOException { final Settings settings = Settings.builder() .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), Long.MAX_VALUE, TimeUnit.NANOSECONDS) @@ -242,19 +229,17 @@ public void testCommit() throws IOException { currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(Long.MAX_VALUE)); - // force a commit - indexShard.flush(new FlushRequest().force(true)); + // force the retention leases to persist + indexShard.persistRetentionLeases(); - // the committed retention leases should equal our current retention leases - final SegmentInfos segmentCommitInfos = indexShard.store().readLastCommittedSegmentsInfo(); - assertTrue(segmentCommitInfos.getUserData().containsKey(Engine.RETENTION_LEASES)); + // the written retention leases should equal our current retention leases final RetentionLeases retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); - final RetentionLeases committedRetentionLeases = IndexShard.getRetentionLeases(segmentCommitInfos); + final RetentionLeases writtenRetentionLeases = indexShard.loadRetentionLeases(); if (retentionLeases.leases().isEmpty()) { - assertThat(committedRetentionLeases.version(), equalTo(0L)); - assertThat(committedRetentionLeases.leases(), empty()); + assertThat(writtenRetentionLeases.version(), equalTo(0L)); + assertThat(writtenRetentionLeases.leases(), empty()); } else { - assertThat(committedRetentionLeases.version(), equalTo((long) length)); + assertThat(writtenRetentionLeases.version(), equalTo((long) length)); assertThat(retentionLeases.leases(), contains(retentionLeases.leases().toArray(new RetentionLease[0]))); } @@ -304,76 +289,6 @@ public void testRetentionLeaseStats() throws IOException { } } - public void testRecoverFromStoreReserveRetentionLeases() throws Exception { - final AtomicBoolean throwDuringRecoverFromTranslog = new AtomicBoolean(); - final IndexShard shard = newStartedShard(false, Settings.builder().put("index.soft_deletes.enabled", true).build(), - config -> new InternalEngine(config) { - @Override - public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, - long recoverUpToSeqNo) throws IOException { - if (throwDuringRecoverFromTranslog.get()) { - throw new RuntimeException("crashed before recover from translog is completed"); - } - return super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo); - } - }); - final List leases = new ArrayList<>(); - long version = randomLongBetween(0, 100); - long primaryTerm = randomLongBetween(1, 100); - final int iterations = randomIntBetween(1, 10); - for (int i = 0; i < iterations; i++) { - if (randomBoolean()) { - indexDoc(shard, "_doc", Integer.toString(i)); - } else { - leases.add(new RetentionLease(Integer.toString(i), randomNonNegativeLong(), - randomLongBetween(Integer.MAX_VALUE, Long.MAX_VALUE), "test")); - } - if (randomBoolean()) { - if (randomBoolean()) { - version += randomLongBetween(1, 100); - primaryTerm += randomLongBetween(0, 100); - shard.updateRetentionLeasesOnReplica(new RetentionLeases(primaryTerm, version, leases)); - shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); - } - } - if (randomBoolean()) { - shard.updateGlobalCheckpointOnReplica(randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint()), "test"); - flushShard(shard); - } - } - version += randomLongBetween(1, 100); - primaryTerm += randomLongBetween(0, 100); - shard.updateRetentionLeasesOnReplica(new RetentionLeases(primaryTerm, version, leases)); - shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); - closeShard(shard, false); - - final IndexShard failedShard = reinitShard(shard, newShardRouting(shard.routingEntry().shardId(), - shard.routingEntry().currentNodeId(), true, ShardRoutingState.INITIALIZING, - RecoverySource.ExistingStoreRecoverySource.INSTANCE)); - final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), - Collections.emptyMap(), Collections.emptySet(), Version.CURRENT); - failedShard.markAsRecovering("store", new RecoveryState(failedShard.routingEntry(), localNode, null)); - throwDuringRecoverFromTranslog.set(true); - expectThrows(IndexShardRecoveryException.class, failedShard::recoverFromStore); - closeShards(failedShard); - - final IndexShard newShard = reinitShard(shard, newShardRouting(shard.routingEntry().shardId(), - shard.routingEntry().currentNodeId(), true, ShardRoutingState.INITIALIZING, - RecoverySource.ExistingStoreRecoverySource.INSTANCE)); - newShard.markAsRecovering("store", new RecoveryState(failedShard.routingEntry(), localNode, null)); - throwDuringRecoverFromTranslog.set(false); - assertTrue(newShard.recoverFromStore()); - final RetentionLeases retentionLeases = newShard.getRetentionLeases(); - assertThat(retentionLeases.version(), equalTo(version)); - assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm)); - if (leases.isEmpty()) { - assertThat(retentionLeases.leases(), empty()); - } else { - assertThat(retentionLeases.leases(), containsInAnyOrder(leases.toArray(new RetentionLease[0]))); - } - closeShards(newShard); - } - private void assertRetentionLeases( final IndexShard indexShard, final int size, diff --git a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java index f9119f1dfeb39..01f0eb4c7d0d4 100644 --- a/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java +++ b/x-pack/plugin/ilm/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/indexlifecycle/CCRIndexLifecycleIT.java @@ -278,7 +278,6 @@ public void testCcrAndIlmWithRollover() throws Exception { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37165") public void testUnfollowInjectedBeforeShrink() throws Exception { final String indexName = "shrink-test"; final String shrunkenIndexName = "shrink-" + indexName;