diff --git a/docs/reference/indices/flush.asciidoc b/docs/reference/indices/flush.asciidoc index c6bf60182fd76..a0027756ab57a 100644 --- a/docs/reference/indices/flush.asciidoc +++ b/docs/reference/indices/flush.asciidoc @@ -102,7 +102,8 @@ which returns something similar to: "max_seq_no" : "-1", "sync_id" : "AVvFY-071siAOuFGEO9P", <1> "max_unsafe_auto_id_timestamp" : "-1", - "min_retained_seq_no": "0" + "min_retained_seq_no" : "0", + "retention_leases" : "id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica" }, "num_docs" : 0 } @@ -117,6 +118,7 @@ which returns something similar to: // TESTRESPONSE[s/"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA"/"translog_uuid": $body.indices.twitter.shards.0.0.commit.user_data.translog_uuid/] // TESTRESPONSE[s/"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ"/"history_uuid": $body.indices.twitter.shards.0.0.commit.user_data.history_uuid/] // TESTRESPONSE[s/"sync_id" : "AVvFY-071siAOuFGEO9P"/"sync_id": $body.indices.twitter.shards.0.0.commit.user_data.sync_id/] +// TESTRESPONSE[s/"retention_leases" : "id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"/"retention_leases": $body.indices.twitter.shards.0.0.commit.user_data.retention_leases/] <1> the `sync id` marker [float] diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 008b85331030d..e1df104d338df 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -113,6 +113,7 @@ public abstract class Engine implements Closeable { public static final String SYNC_COMMIT_ID = "sync_id"; public static final String HISTORY_UUID_KEY = "history_uuid"; public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no"; + public static final String RETENTION_LEASES = "retention_leases"; public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; protected final ShardId shardId; diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index d0e55fc13eeda..df52c6bc0213f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -51,6 +51,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lucene.LoggerInfoStream; import org.elasticsearch.common.lucene.Lucene; @@ -74,6 +75,7 @@ import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.seqno.LocalCheckpointTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; @@ -2336,7 +2338,13 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); commitData.put(HISTORY_UUID_KEY, historyUUID); if (softDeleteEnabled) { - commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo())); + /* + * We sample these from the policy (which occurs under a lock) to ensure that we have a consistent view of the minimum + * retained sequence number, and the retention leases. + */ + final Tuple> retentionPolicy = softDeletesPolicy.getRetentionPolicy(); + commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1())); + commitData.put(Engine.RETENTION_LEASES, RetentionLease.encodeRetentionLeases(retentionPolicy.v2())); } logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java index c957902d8df77..a2452d4b53eb9 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java @@ -21,6 +21,7 @@ import org.apache.lucene.document.LongPoint; import org.apache.lucene.search.Query; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.seqno.RetentionLease; @@ -45,6 +46,7 @@ final class SoftDeletesPolicy { private long retentionOperations; // The min seq_no value that is retained - ops after this seq# should exist in the Lucene index. private long minRetainedSeqNo; + private Collection retentionLeases; // provides the retention leases used to calculate the minimum sequence number to retain private final Supplier> retentionLeasesSupplier; @@ -57,6 +59,7 @@ final class SoftDeletesPolicy { this.retentionOperations = retentionOperations; this.minRetainedSeqNo = minRetainedSeqNo; this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier); + retentionLeases = retentionLeasesSupplier.get(); this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED; this.retentionLockCount = 0; } @@ -106,7 +109,11 @@ private synchronized void releaseRetentionLock() { * Operations whose seq# is least this value should exist in the Lucene index. */ synchronized long getMinRetainedSeqNo() { - // Do not advance if the retention lock is held + return getRetentionPolicy().v1(); + } + + public synchronized Tuple> getRetentionPolicy() { + // do not advance if the retention lock is held if (retentionLockCount == 0) { /* * This policy retains operations for two purposes: peer-recovery and querying changes history. @@ -119,8 +126,8 @@ synchronized long getMinRetainedSeqNo() { */ // calculate the minimum sequence number to retain based on retention leases - final long minimumRetainingSequenceNumber = retentionLeasesSupplier - .get() + retentionLeases = retentionLeasesSupplier.get(); + final long minimumRetainingSequenceNumber = retentionLeases .stream() .mapToLong(RetentionLease::retainingSequenceNumber) .min() @@ -139,7 +146,7 @@ synchronized long getMinRetainedSeqNo() { */ minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain); } - return minRetainedSeqNo; + return Tuple.tuple(minRetainedSeqNo, retentionLeases); } /** diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 4298e5712bfc6..f309512ec98b6 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -185,6 +185,17 @@ public synchronized void addOrUpdateRetentionLease(final String id, final long r retentionLeases.put(id, new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source)); } + /** + * Updates retention leases on a replica. + * + * @param retentionLeases the retention leases + */ + public synchronized void updateRetentionLeasesOnReplica(final Collection retentionLeases) { + assert primaryMode == false; + this.retentionLeases.clear(); + this.retentionLeases.putAll(retentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity()))); + } + public static class CheckpointState implements Writeable { /** diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java index 076b707a5df42..f763759261385 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java @@ -19,6 +19,13 @@ package org.elasticsearch.index.seqno; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Locale; +import java.util.Objects; +import java.util.stream.Collectors; + /** * A "shard history retention lease" (or "retention lease" for short) is conceptually a marker containing a retaining sequence number such * that all operations with sequence number at least that retaining sequence number will be retained during merge operations (which could @@ -81,18 +88,118 @@ public String source() { * @param source the source of the retention lease */ public RetentionLease(final String id, final long retainingSequenceNumber, final long timestamp, final String source) { + Objects.requireNonNull(id); + if (id.isEmpty()) { + throw new IllegalArgumentException("retention lease ID can not be empty"); + } + if (id.contains(":") || id.contains(";") || id.contains(",")) { + // retention lease IDs can not contain these characters because they are used in encoding retention leases + throw new IllegalArgumentException("retention lease ID can not contain any of [:;,] but was [" + id + "]"); + } if (retainingSequenceNumber < SequenceNumbers.UNASSIGNED_SEQ_NO) { throw new IllegalArgumentException("retention lease retaining sequence number [" + retainingSequenceNumber + "] out of range"); } if (timestamp < 0) { throw new IllegalArgumentException("retention lease timestamp [" + timestamp + "] out of range"); } + Objects.requireNonNull(source); + if (source.isEmpty()) { + throw new IllegalArgumentException("retention lease source can not be empty"); + } + if (source.contains(":") || source.contains(";") || source.contains(",")) { + // retention lease sources can not contain these characters because they are used in encoding retention leases + throw new IllegalArgumentException("retention lease source can not contain any of [:;,] but was [" + source + "]"); + } this.id = id; this.retainingSequenceNumber = retainingSequenceNumber; this.timestamp = timestamp; this.source = source; } + /** + * Encodes a retention lease as a string. This encoding can be decoded by {@link #decodeRetentionLease(String)}. The retention lease is + * encoded in the format id:{id};retaining_seq_no:{retainingSequenecNumber};timestamp:{timestamp};source:{source}. + * + * @param retentionLease the retention lease + * @return the encoding of the retention lease + */ + static String encodeRetentionLease(final RetentionLease retentionLease) { + Objects.requireNonNull(retentionLease); + return String.format( + Locale.ROOT, + "id:%s;retaining_seq_no:%d;timestamp:%d;source:%s", + retentionLease.id(), + retentionLease.retainingSequenceNumber(), + retentionLease.timestamp(), + retentionLease.source()); + } + + /** + * Encodes a collection of retention leases as a string. This encoding can be decoed by {@link #decodeRetentionLeases(String)}. The + * encoding is a comma-separated encoding of each retention lease as encoded by {@link #encodeRetentionLease(RetentionLease)}. + * + * @param retentionLeases the retention leases + * @return the encoding of the retention leases + */ + public static String encodeRetentionLeases(final Collection retentionLeases) { + Objects.requireNonNull(retentionLeases); + return retentionLeases.stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(",")); + } + + /** + * Decodes a retention lease encoded by {@link #encodeRetentionLease(RetentionLease)}. + * + * @param encodedRetentionLease an encoded retention lease + * @return the decoded retention lease + */ + static RetentionLease decodeRetentionLease(final String encodedRetentionLease) { + Objects.requireNonNull(encodedRetentionLease); + final String[] fields = encodedRetentionLease.split(";"); + assert fields.length == 4 : Arrays.toString(fields); + assert fields[0].matches("id:[^:;,]+") : fields[0]; + final String id = fields[0].substring("id:".length()); + assert fields[1].matches("retaining_seq_no:\\d+") : fields[1]; + final long retainingSequenceNumber = Long.parseLong(fields[1].substring("retaining_seq_no:".length())); + assert fields[2].matches("timestamp:\\d+") : fields[2]; + final long timestamp = Long.parseLong(fields[2].substring("timestamp:".length())); + assert fields[3].matches("source:[^:;,]+") : fields[3]; + final String source = fields[3].substring("source:".length()); + return new RetentionLease(id, retainingSequenceNumber, timestamp, source); + } + + /** + * Decodes retention leases encoded by {@link #encodeRetentionLeases(Collection)}. + * + * @param encodedRetentionLeases an encoded collection of retention leases + * @return the decoded retention leases + */ + public static Collection decodeRetentionLeases(final String encodedRetentionLeases) { + Objects.requireNonNull(encodedRetentionLeases); + if (encodedRetentionLeases.isEmpty()) { + return Collections.emptyList(); + } + assert Arrays.stream(encodedRetentionLeases.split(",")) + .allMatch(s -> s.matches("id:[^:;,]+;retaining_seq_no:\\d+;timestamp:\\d+;source:[^:;,]+")) + : encodedRetentionLeases; + return Arrays.stream(encodedRetentionLeases.split(",")).map(RetentionLease::decodeRetentionLease).collect(Collectors.toList()); + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final RetentionLease that = (RetentionLease) o; + return Objects.equals(id, that.id) && + retainingSequenceNumber == that.retainingSequenceNumber && + timestamp == that.timestamp && + Objects.equals(source, that.source); + } + + @Override + public int hashCode() { + return Objects.hash(id, retainingSequenceNumber, timestamp, source); + } + @Override public String toString() { return "RetentionLease{" + diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 66b8e607b5c76..270ce20819102 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -106,6 +106,7 @@ import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.search.stats.ShardSearchStats; import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask; @@ -140,6 +141,7 @@ import java.nio.channels.ClosedByInterruptException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.List; @@ -1416,6 +1418,7 @@ private void innerOpenEngineAndTranslog() throws IOException { final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint"); + replicationTracker.updateRetentionLeasesOnReplica(getRetentionLeases(store.readLastCommittedSegmentsInfo())); trimUnsafeCommits(); synchronized (mutex) { verifyNotClosed(); @@ -1435,6 +1438,14 @@ private void innerOpenEngineAndTranslog() throws IOException { assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); } + static Collection getRetentionLeases(final SegmentInfos segmentInfos) { + final String committedRetentionLeases = segmentInfos.getUserData().get(Engine.RETENTION_LEASES); + if (committedRetentionLeases == null) { + return Collections.emptyList(); + } + return RetentionLease.decodeRetentionLeases(committedRetentionLeases); + } + private void trimUnsafeCommits() throws IOException { assert currentEngineReference.get() == null || currentEngineReference.get() instanceof ReadOnlyEngine : "a write engine is running"; final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f88aaedd6991f..796d7eb0c60ec 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -116,6 +116,7 @@ import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexSearcherWrapper; @@ -140,6 +141,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -5241,13 +5243,14 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final AtomicReference> leasesHolder = new AtomicReference<>(Collections.emptyList()); final List operations = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 10, 300, "2"); Randomness.shuffle(operations); Set existingSeqNos = new HashSet<>(); store = createStore(); - engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, - globalCheckpoint::get)); + engine = createEngine( + config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, leasesHolder::get)); assertThat(engine.getMinRetainedSeqNo(), equalTo(0L)); long lastMinRetainedSeqNo = engine.getMinRetainedSeqNo(); for (Engine.Operation op : operations) { @@ -5261,6 +5264,18 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { if (randomBoolean()) { globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getCheckpoint())); } + if (randomBoolean()) { + final int length = randomIntBetween(0, 8); + final List leases = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomLongBetween(0L, Math.max(0L, globalCheckpoint.get())); + final long timestamp = randomLongBetween(0L, Long.MAX_VALUE); + final String source = randomAlphaOfLength(8); + leases.add(new RetentionLease(id, retainingSequenceNumber, timestamp, source)); + } + leasesHolder.set(leases); + } if (rarely()) { settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10)); indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build()); @@ -5273,6 +5288,14 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { engine.flush(true, true); assertThat(Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(Engine.MIN_RETAINED_SEQNO)), equalTo(engine.getMinRetainedSeqNo())); + final Collection leases = leasesHolder.get(); + if (leases.isEmpty()) { + assertThat(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES), equalTo("")); + } else { + assertThat( + engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES), + equalTo(RetentionLease.encodeRetentionLeases(leases))); + } } if (rarely()) { engine.forceMerge(randomBoolean()); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java index a5e4af5d0e6a3..c4340a381ce25 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java @@ -21,12 +21,34 @@ import org.elasticsearch.test.ESTestCase; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; public class RetentionLeaseTests extends ESTestCase { + public void testInvalidId() { + final String id = "id" + randomFrom(":", ";", ","); + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> new RetentionLease(id, randomNonNegativeLong(), randomNonNegativeLong(), "source")); + assertThat(e, hasToString(containsString("retention lease ID can not contain any of [:;,] but was [" + id + "]"))); + } + + public void testEmptyId() { + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> new RetentionLease("", randomNonNegativeLong(), randomNonNegativeLong(), "source")); + assertThat(e, hasToString(containsString("retention lease ID can not be empty"))); + } + public void testRetainingSequenceNumberOutOfRange() { final long retainingSequenceNumber = randomLongBetween(Long.MIN_VALUE, UNASSIGNED_SEQ_NO - 1); final IllegalArgumentException e = expectThrows( @@ -42,9 +64,51 @@ public void testTimestampOutOfRange() { final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, () -> new RetentionLease("id", randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE), timestamp, "source")); - assertThat( - e, - hasToString(containsString("retention lease timestamp [" + timestamp + "] out of range"))); + assertThat(e, hasToString(containsString("retention lease timestamp [" + timestamp + "] out of range"))); + } + + public void testInvalidSource() { + final String source = "source" + randomFrom(":", ";", ","); + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> new RetentionLease("id", randomNonNegativeLong(), randomNonNegativeLong(), source)); + assertThat(e, hasToString(containsString("retention lease source can not contain any of [:;,] but was [" + source + "]"))); + } + + public void testEmptySource() { + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> new RetentionLease("id", randomNonNegativeLong(), randomNonNegativeLong(), "")); + assertThat(e, hasToString(containsString("retention lease source can not be empty"))); + } + + public void testRetentionLeaseEncoding() { + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomNonNegativeLong(); + final long timestamp = randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source); + assertThat(RetentionLease.decodeRetentionLease(RetentionLease.encodeRetentionLease(retentionLease)), equalTo(retentionLease)); + } + + public void testRetentionLeasesEncoding() { + final int length = randomIntBetween(0, 8); + final List retentionLeases = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomNonNegativeLong(); + final long timestamp = randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source); + retentionLeases.add(retentionLease); + } + final Collection decodedRetentionLeases = + RetentionLease.decodeRetentionLeases(RetentionLease.encodeRetentionLeases(retentionLeases)); + if (length == 0) { + assertThat(decodedRetentionLeases, empty()); + } else { + assertThat(decodedRetentionLeases, contains(retentionLeases.toArray(new RetentionLease[0]))); + } } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index bd2a33617eecf..e95b52280ca2e 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -19,9 +19,14 @@ package org.elasticsearch.index.shard; +import org.apache.lucene.index.SegmentInfos; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRoutingHelper; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -33,9 +38,11 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; @@ -127,6 +134,52 @@ public void testExpiration() throws IOException { } } + public void testCommit() throws IOException { + final Settings settings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), Long.MAX_VALUE, TimeUnit.NANOSECONDS) + .build(); + final IndexShard indexShard = newStartedShard( + true, + settings, + new InternalEngineFactory()); + try { + final int length = randomIntBetween(0, 8); + final long[] minimumRetainingSequenceNumbers = new long[length]; + for (int i = 0; i < length; i++) { + minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(randomNonNegativeLong())); + indexShard.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); + } + + currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(Long.MAX_VALUE)); + + // force a commit + indexShard.flush(new FlushRequest().force(true)); + + // the committed retention leases should equal our current retention leases + final SegmentInfos segmentCommitInfos = indexShard.store().readLastCommittedSegmentsInfo(); + assertTrue(segmentCommitInfos.getUserData().containsKey(Engine.RETENTION_LEASES)); + final Collection retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); + assertThat(IndexShard.getRetentionLeases(segmentCommitInfos), contains(retentionLeases.toArray(new RetentionLease[0]))); + + // when we recover, we should recover the retention leases + final IndexShard recoveredShard = reinitShard( + indexShard, + ShardRoutingHelper.initWithSameId(indexShard.routingEntry(), RecoverySource.ExistingStoreRecoverySource.INSTANCE)); + try { + recoverShardFromStore(recoveredShard); + assertThat( + recoveredShard.getEngine().config().retentionLeasesSupplier().get(), + contains(retentionLeases.toArray(new RetentionLease[0]))); + } finally { + closeShards(recoveredShard); + } + } finally { + closeShards(indexShard); + } + } + private void assertRetentionLeases( final IndexShard indexShard, final int size, diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 58059cd30e382..8b463f33b9081 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -578,44 +578,112 @@ public EngineConfig config(IndexSettings indexSettings, Store store, Path transl public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, ReferenceManager.RefreshListener refreshListener, Sort indexSort, LongSupplier globalCheckpointSupplier) { - return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null, indexSort, globalCheckpointSupplier, - new NoneCircuitBreakerService()); + return config( + indexSettings, + store, + translogPath, + mergePolicy, + refreshListener, + indexSort, + globalCheckpointSupplier, + globalCheckpointSupplier == null ? null : Collections::emptyList); + } + + public EngineConfig config( + final IndexSettings indexSettings, + final Store store, + final Path translogPath, + final MergePolicy mergePolicy, + final ReferenceManager.RefreshListener refreshListener, + final Sort indexSort, + final LongSupplier globalCheckpointSupplier, + final Supplier> retentionLeasesSupplier) { + return config( + indexSettings, + store, + translogPath, + mergePolicy, + refreshListener, + null, + indexSort, + globalCheckpointSupplier, + retentionLeasesSupplier, + new NoneCircuitBreakerService()); } public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, ReferenceManager.RefreshListener externalRefreshListener, ReferenceManager.RefreshListener internalRefreshListener, - Sort indexSort, @Nullable final LongSupplier maybeGlobalCheckpointSupplier, + Sort indexSort, @Nullable LongSupplier maybeGlobalCheckpointSupplier, CircuitBreakerService breakerService) { - IndexWriterConfig iwc = newIndexWriterConfig(); - TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); - Engine.EventListener listener = new Engine.EventListener() { - @Override - public void onFailedEngine(String reason, @Nullable Exception e) { - // we don't need to notify anybody in this test - } - }; + return config( + indexSettings, + store, + translogPath, + mergePolicy, + externalRefreshListener, + internalRefreshListener, + indexSort, + maybeGlobalCheckpointSupplier, + maybeGlobalCheckpointSupplier == null ? null : Collections::emptyList, + breakerService); + } + + public EngineConfig config( + final IndexSettings indexSettings, + final Store store, + final Path translogPath, + final MergePolicy mergePolicy, + final ReferenceManager.RefreshListener externalRefreshListener, + final ReferenceManager.RefreshListener internalRefreshListener, + final Sort indexSort, + final @Nullable LongSupplier maybeGlobalCheckpointSupplier, + final @Nullable Supplier> maybeRetentionLeasesSupplier, + final CircuitBreakerService breakerService) { + final IndexWriterConfig iwc = newIndexWriterConfig(); + final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); + final Engine.EventListener listener = new Engine.EventListener() {}; // we don't need to notify anybody in this test final List extRefreshListenerList = - externalRefreshListener == null ? emptyList() : Collections.singletonList(externalRefreshListener); + externalRefreshListener == null ? emptyList() : Collections.singletonList(externalRefreshListener); final List intRefreshListenerList = - internalRefreshListener == null ? emptyList() : Collections.singletonList(internalRefreshListener); + internalRefreshListener == null ? emptyList() : Collections.singletonList(internalRefreshListener); final LongSupplier globalCheckpointSupplier; final Supplier> retentionLeasesSupplier; if (maybeGlobalCheckpointSupplier == null) { + assert maybeRetentionLeasesSupplier == null; final ReplicationTracker replicationTracker = new ReplicationTracker( shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}, () -> 0L); globalCheckpointSupplier = replicationTracker; retentionLeasesSupplier = replicationTracker::getRetentionLeases; } else { + assert maybeRetentionLeasesSupplier != null; globalCheckpointSupplier = maybeGlobalCheckpointSupplier; - retentionLeasesSupplier = Collections::emptyList; + retentionLeasesSupplier = maybeRetentionLeasesSupplier; } - EngineConfig config = new EngineConfig(shardId, allocationId.getId(), threadPool, indexSettings, null, store, - mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, - IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), extRefreshListenerList, intRefreshListenerList, indexSort, - breakerService, globalCheckpointSupplier, retentionLeasesSupplier, primaryTerm::get, tombstoneDocSupplier()); - return config; + return new EngineConfig( + shardId, + allocationId.getId(), + threadPool, + indexSettings, + null, + store, + mergePolicy, + iwc.getAnalyzer(), + iwc.getSimilarity(), + new CodecService(null, logger), + listener, + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + translogConfig, + TimeValue.timeValueMinutes(5), + extRefreshListenerList, + intRefreshListenerList, + indexSort, + breakerService, + globalCheckpointSupplier, + retentionLeasesSupplier, + primaryTerm::get, + tombstoneDocSupplier()); } protected static final BytesReference B_1 = new BytesArray(new byte[]{1});