From 626601ea95ed3043de212d1d43c883ddd5c293bd Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 10 Jan 2019 00:30:41 -0800 Subject: [PATCH 01/14] Introduce retention lease persistence This commit introduces the persistence of retention leases by persisting them in index commits and recovering them when recovering a shard from store. --- .../elasticsearch/index/engine/Engine.java | 1 + .../index/engine/InternalEngine.java | 19 ++- .../index/engine/SoftDeletesPolicy.java | 14 ++- .../index/seqno/ReplicationTracker.java | 11 ++ .../index/seqno/RetentionLease.java | 28 +++++ .../elasticsearch/index/shard/IndexShard.java | 36 ++++++ .../index/engine/InternalEngineTests.java | 36 +++++- .../index/seqno/RetentionLeaseTests.java | 20 ++++ .../shard/IndexShardRetentionLeaseTests.java | 53 +++++++++ .../index/engine/EngineTestCase.java | 108 ++++++++++++++---- 10 files changed, 299 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 008b85331030d..e1df104d338df 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -113,6 +113,7 @@ public abstract class Engine implements Closeable { public static final String SYNC_COMMIT_ID = "sync_id"; public static final String HISTORY_UUID_KEY = "history_uuid"; public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no"; + public static final String RETENTION_LEASES = "retention_leases"; public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; protected final ShardId shardId; diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index d0e55fc13eeda..1d0d2e9aefb27 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -51,6 +51,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lucene.LoggerInfoStream; import org.elasticsearch.common.lucene.Lucene; @@ -74,6 +75,7 @@ import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.seqno.LocalCheckpointTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; @@ -91,6 +93,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -104,6 +107,7 @@ import java.util.function.BiFunction; import java.util.function.LongSupplier; import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.Stream; public class InternalEngine extends Engine { @@ -2336,7 +2340,20 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); commitData.put(HISTORY_UUID_KEY, historyUUID); if (softDeleteEnabled) { - commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(softDeletesPolicy.getMinRetainedSeqNo())); + final Tuple> retentionPolicy = softDeletesPolicy.getRetentionPolicy(); + commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1())); + final Collection retentionLeases = retentionPolicy.v2(); + final String encodedRetentionLeases = retentionLeases + .stream() + .map(retentionLease -> String.format( + Locale.ROOT, + "id:%s;retaining_seq_no:%d;timestamp:%d;source:%s", + retentionLease.id(), + retentionLease.retainingSequenceNumber(), + retentionLease.timestamp(), + retentionLease.source())) + .collect(Collectors.joining(",")); + commitData.put(Engine.RETENTION_LEASES, encodedRetentionLeases); } logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); diff --git a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java index c957902d8df77..bdcf4529e576b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java @@ -21,6 +21,7 @@ import org.apache.lucene.document.LongPoint; import org.apache.lucene.search.Query; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.seqno.RetentionLease; @@ -45,6 +46,7 @@ final class SoftDeletesPolicy { private long retentionOperations; // The min seq_no value that is retained - ops after this seq# should exist in the Lucene index. private long minRetainedSeqNo; + private Collection retentionLeases; // provides the retention leases used to calculate the minimum sequence number to retain private final Supplier> retentionLeasesSupplier; @@ -106,7 +108,11 @@ private synchronized void releaseRetentionLock() { * Operations whose seq# is least this value should exist in the Lucene index. */ synchronized long getMinRetainedSeqNo() { - // Do not advance if the retention lock is held + return getRetentionPolicy().v1(); + } + + public synchronized Tuple> getRetentionPolicy() { + // do not advance if the retention lock is held if (retentionLockCount == 0) { /* * This policy retains operations for two purposes: peer-recovery and querying changes history. @@ -119,8 +125,8 @@ synchronized long getMinRetainedSeqNo() { */ // calculate the minimum sequence number to retain based on retention leases - final long minimumRetainingSequenceNumber = retentionLeasesSupplier - .get() + retentionLeases = retentionLeasesSupplier.get(); + final long minimumRetainingSequenceNumber = retentionLeases .stream() .mapToLong(RetentionLease::retainingSequenceNumber) .min() @@ -139,7 +145,7 @@ synchronized long getMinRetainedSeqNo() { */ minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain); } - return minRetainedSeqNo; + return Tuple.tuple(minRetainedSeqNo, retentionLeases); } /** diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 4298e5712bfc6..f309512ec98b6 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -185,6 +185,17 @@ public synchronized void addOrUpdateRetentionLease(final String id, final long r retentionLeases.put(id, new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source)); } + /** + * Updates retention leases on a replica. + * + * @param retentionLeases the retention leases + */ + public synchronized void updateRetentionLeasesOnReplica(final Collection retentionLeases) { + assert primaryMode == false; + this.retentionLeases.clear(); + this.retentionLeases.putAll(retentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity()))); + } + public static class CheckpointState implements Writeable { /** diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java index 076b707a5df42..890d8bc750893 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java @@ -19,6 +19,8 @@ package org.elasticsearch.index.seqno; +import java.util.Objects; + /** * A "shard history retention lease" (or "retention lease" for short) is conceptually a marker containing a retaining sequence number such * that all operations with sequence number at least that retaining sequence number will be retained during merge operations (which could @@ -81,18 +83,44 @@ public String source() { * @param source the source of the retention lease */ public RetentionLease(final String id, final long retainingSequenceNumber, final long timestamp, final String source) { + Objects.requireNonNull(id); + if (id.contains(":") || id.contains(";") || id.contains(",")) { + // retention lease IDs can not contain these characters because they are used as separators in index commits + throw new IllegalArgumentException("retention lease ID can not contain any of [:;,] but was [" + id + "]"); + } if (retainingSequenceNumber < SequenceNumbers.UNASSIGNED_SEQ_NO) { throw new IllegalArgumentException("retention lease retaining sequence number [" + retainingSequenceNumber + "] out of range"); } if (timestamp < 0) { throw new IllegalArgumentException("retention lease timestamp [" + timestamp + "] out of range"); } + Objects.requireNonNull(source); + if (source.contains(":") || source.contains(";") || source.contains(",")) { + // retention lease sources can not contain these characters because they are used as separators in index commits + throw new IllegalArgumentException("retention lease source can not contain any of [:;,] but was [" + source + "]"); + } this.id = id; this.retainingSequenceNumber = retainingSequenceNumber; this.timestamp = timestamp; this.source = source; } + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final RetentionLease that = (RetentionLease) o; + return Objects.equals(id, that.id) && + retainingSequenceNumber == that.retainingSequenceNumber && + timestamp == that.timestamp && + Objects.equals(source, that.source); + } + + @Override + public int hashCode() { + return Objects.hash(id, retainingSequenceNumber, timestamp, source); + } + @Override public String toString() { return "RetentionLease{" + diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 66b8e607b5c76..eb30e92f74bdf 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -106,6 +106,7 @@ import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.search.stats.ShardSearchStats; import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask; @@ -140,6 +141,8 @@ import java.nio.channels.ClosedByInterruptException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.List; @@ -1416,6 +1419,7 @@ private void innerOpenEngineAndTranslog() throws IOException { final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); final long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID); replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "read from translog checkpoint"); + replicationTracker.updateRetentionLeasesOnReplica(getRetentionLeases(store.readLastCommittedSegmentsInfo())); trimUnsafeCommits(); synchronized (mutex) { verifyNotClosed(); @@ -1435,6 +1439,32 @@ private void innerOpenEngineAndTranslog() throws IOException { assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); } + static Collection getRetentionLeases(final SegmentInfos segmentInfos) { + final String committedRetentionLeases = segmentInfos.getUserData().get(Engine.RETENTION_LEASES); + if (committedRetentionLeases == null) { + return Collections.emptyList(); + } + final String[] encodedRetentionLeases = committedRetentionLeases.split(","); + assert Arrays.stream(encodedRetentionLeases) + .allMatch(s -> s.matches("id:[^:;,]+;retaining_seq_no:\\d+;timestamp:\\d+;source:[^:;,]+")) + : Arrays.toString(encodedRetentionLeases); + final List retentionLeases = new ArrayList<>(); + for (final String encodedRetentionLease : encodedRetentionLeases) { + final String[] fields = encodedRetentionLease.split(";"); + assert fields.length == 4 : Arrays.toString(fields); + assert fields[0].matches("id:[^:;,]+") : fields[0]; + final String id = fields[0].substring("id:".length()); + assert fields[1].matches("retaining_seq_no:\\d+") : fields[1]; + final long retainingSequenceNumber = Long.parseLong(fields[1].substring("retaining_seq_no:".length())); + assert fields[2].matches("timestamp:\\d+") : fields[2]; + final long timestamp = Long.parseLong(fields[2].substring("timestamp:".length())); + assert fields[3].matches("source:[^:;,]+") : fields[3]; + final String source = fields[3].substring("source:".length()); + retentionLeases.add(new RetentionLease(id, retainingSequenceNumber, timestamp, source)); + } + return retentionLeases; + } + private void trimUnsafeCommits() throws IOException { assert currentEngineReference.get() == null || currentEngineReference.get() instanceof ReadOnlyEngine : "a write engine is running"; final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); @@ -1881,6 +1911,12 @@ public void addGlobalCheckpointListener( void addOrUpdateRetentionLease(final String id, final long retainingSequenceNumber, final String source) { assert assertPrimaryMode(); verifyNotClosed(); + if (id.contains(",") || id.contains(":") || id.contains(";")) { + throw new IllegalArgumentException("retention lease ID can not contain any of [,:;] but was [" + id + "]"); + } + if (source.contains(",") || source.contains(":") || source.contains(";")) { + throw new IllegalArgumentException("retention lease source can not contain any of [,:;] but was [" + source + "]"); + } replicationTracker.addOrUpdateRetentionLease(id, retainingSequenceNumber, source); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f88aaedd6991f..667cd14b0b679 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -116,6 +116,7 @@ import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexSearcherWrapper; @@ -140,6 +141,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -147,6 +149,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Queue; import java.util.Set; @@ -5241,13 +5244,14 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final AtomicReference> leasesHolder = new AtomicReference<>(Collections.emptyList()); final List operations = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 10, 300, "2"); Randomness.shuffle(operations); Set existingSeqNos = new HashSet<>(); store = createStore(); - engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, - globalCheckpoint::get)); + engine = createEngine( + config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get, leasesHolder::get)); assertThat(engine.getMinRetainedSeqNo(), equalTo(0L)); long lastMinRetainedSeqNo = engine.getMinRetainedSeqNo(); for (Engine.Operation op : operations) { @@ -5261,6 +5265,18 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { if (randomBoolean()) { globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getCheckpoint())); } + if (randomBoolean()) { + final int length = randomIntBetween(0, 8); + final List leases = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomLongBetween(0L, Math.max(0L, globalCheckpoint.get())); + final long timestamp = randomLongBetween(0L, Long.MAX_VALUE); + final String source = randomAlphaOfLength(8); + leases.add(new RetentionLease(id, retainingSequenceNumber, timestamp, source)); + } + leasesHolder.set(leases); + } if (rarely()) { settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), randomLongBetween(0, 10)); indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build()); @@ -5273,6 +5289,22 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { engine.flush(true, true); assertThat(Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(Engine.MIN_RETAINED_SEQNO)), equalTo(engine.getMinRetainedSeqNo())); + final Collection leases = leasesHolder.get(); + if (leases.isEmpty()) { + assertThat(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES), equalTo("")); + } else { + final String expected = leases + .stream() + .map(lease -> String.format( + Locale.ROOT, + "id:%s;retaining_seq_no:%d;timestamp:%d;source:%s", + lease.id(), + lease.retainingSequenceNumber(), + lease.timestamp(), + lease.source())) + .collect(Collectors.joining(",")); + assertThat(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES), equalTo(expected)); + } } if (rarely()) { engine.forceMerge(randomBoolean()); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java index a5e4af5d0e6a3..1cbbc49808cbc 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java @@ -27,6 +27,16 @@ public class RetentionLeaseTests extends ESTestCase { + public void testInvalidId() { + final String id = "id" + randomFrom(":", ";", ","); + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> new RetentionLease(id, randomNonNegativeLong(), randomNonNegativeLong(), "source")); + assertThat( + e, + hasToString(containsString("retention lease ID can not contain any of [:;,] but was [" + id + "]"))); + } + public void testRetainingSequenceNumberOutOfRange() { final long retainingSequenceNumber = randomLongBetween(Long.MIN_VALUE, UNASSIGNED_SEQ_NO - 1); final IllegalArgumentException e = expectThrows( @@ -47,4 +57,14 @@ public void testTimestampOutOfRange() { hasToString(containsString("retention lease timestamp [" + timestamp + "] out of range"))); } + public void testInvalidSource() { + final String source = "source" + randomFrom(":", ";", ","); + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> new RetentionLease("id", randomNonNegativeLong(), randomNonNegativeLong(), source)); + assertThat( + e, + hasToString(containsString("retention lease source can not contain any of [:;,] but was [" + source + "]"))); + } + } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index bd2a33617eecf..e95b52280ca2e 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -19,9 +19,14 @@ package org.elasticsearch.index.shard; +import org.apache.lucene.index.SegmentInfos; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRoutingHelper; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -33,9 +38,11 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; @@ -127,6 +134,52 @@ public void testExpiration() throws IOException { } } + public void testCommit() throws IOException { + final Settings settings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), Long.MAX_VALUE, TimeUnit.NANOSECONDS) + .build(); + final IndexShard indexShard = newStartedShard( + true, + settings, + new InternalEngineFactory()); + try { + final int length = randomIntBetween(0, 8); + final long[] minimumRetainingSequenceNumbers = new long[length]; + for (int i = 0; i < length; i++) { + minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(randomNonNegativeLong())); + indexShard.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); + } + + currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(Long.MAX_VALUE)); + + // force a commit + indexShard.flush(new FlushRequest().force(true)); + + // the committed retention leases should equal our current retention leases + final SegmentInfos segmentCommitInfos = indexShard.store().readLastCommittedSegmentsInfo(); + assertTrue(segmentCommitInfos.getUserData().containsKey(Engine.RETENTION_LEASES)); + final Collection retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); + assertThat(IndexShard.getRetentionLeases(segmentCommitInfos), contains(retentionLeases.toArray(new RetentionLease[0]))); + + // when we recover, we should recover the retention leases + final IndexShard recoveredShard = reinitShard( + indexShard, + ShardRoutingHelper.initWithSameId(indexShard.routingEntry(), RecoverySource.ExistingStoreRecoverySource.INSTANCE)); + try { + recoverShardFromStore(recoveredShard); + assertThat( + recoveredShard.getEngine().config().retentionLeasesSupplier().get(), + contains(retentionLeases.toArray(new RetentionLease[0]))); + } finally { + closeShards(recoveredShard); + } + } finally { + closeShards(indexShard); + } + } + private void assertRetentionLeases( final IndexShard indexShard, final int size, diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 58059cd30e382..8b463f33b9081 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -578,44 +578,112 @@ public EngineConfig config(IndexSettings indexSettings, Store store, Path transl public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, ReferenceManager.RefreshListener refreshListener, Sort indexSort, LongSupplier globalCheckpointSupplier) { - return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null, indexSort, globalCheckpointSupplier, - new NoneCircuitBreakerService()); + return config( + indexSettings, + store, + translogPath, + mergePolicy, + refreshListener, + indexSort, + globalCheckpointSupplier, + globalCheckpointSupplier == null ? null : Collections::emptyList); + } + + public EngineConfig config( + final IndexSettings indexSettings, + final Store store, + final Path translogPath, + final MergePolicy mergePolicy, + final ReferenceManager.RefreshListener refreshListener, + final Sort indexSort, + final LongSupplier globalCheckpointSupplier, + final Supplier> retentionLeasesSupplier) { + return config( + indexSettings, + store, + translogPath, + mergePolicy, + refreshListener, + null, + indexSort, + globalCheckpointSupplier, + retentionLeasesSupplier, + new NoneCircuitBreakerService()); } public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, ReferenceManager.RefreshListener externalRefreshListener, ReferenceManager.RefreshListener internalRefreshListener, - Sort indexSort, @Nullable final LongSupplier maybeGlobalCheckpointSupplier, + Sort indexSort, @Nullable LongSupplier maybeGlobalCheckpointSupplier, CircuitBreakerService breakerService) { - IndexWriterConfig iwc = newIndexWriterConfig(); - TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); - Engine.EventListener listener = new Engine.EventListener() { - @Override - public void onFailedEngine(String reason, @Nullable Exception e) { - // we don't need to notify anybody in this test - } - }; + return config( + indexSettings, + store, + translogPath, + mergePolicy, + externalRefreshListener, + internalRefreshListener, + indexSort, + maybeGlobalCheckpointSupplier, + maybeGlobalCheckpointSupplier == null ? null : Collections::emptyList, + breakerService); + } + + public EngineConfig config( + final IndexSettings indexSettings, + final Store store, + final Path translogPath, + final MergePolicy mergePolicy, + final ReferenceManager.RefreshListener externalRefreshListener, + final ReferenceManager.RefreshListener internalRefreshListener, + final Sort indexSort, + final @Nullable LongSupplier maybeGlobalCheckpointSupplier, + final @Nullable Supplier> maybeRetentionLeasesSupplier, + final CircuitBreakerService breakerService) { + final IndexWriterConfig iwc = newIndexWriterConfig(); + final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); + final Engine.EventListener listener = new Engine.EventListener() {}; // we don't need to notify anybody in this test final List extRefreshListenerList = - externalRefreshListener == null ? emptyList() : Collections.singletonList(externalRefreshListener); + externalRefreshListener == null ? emptyList() : Collections.singletonList(externalRefreshListener); final List intRefreshListenerList = - internalRefreshListener == null ? emptyList() : Collections.singletonList(internalRefreshListener); + internalRefreshListener == null ? emptyList() : Collections.singletonList(internalRefreshListener); final LongSupplier globalCheckpointSupplier; final Supplier> retentionLeasesSupplier; if (maybeGlobalCheckpointSupplier == null) { + assert maybeRetentionLeasesSupplier == null; final ReplicationTracker replicationTracker = new ReplicationTracker( shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}, () -> 0L); globalCheckpointSupplier = replicationTracker; retentionLeasesSupplier = replicationTracker::getRetentionLeases; } else { + assert maybeRetentionLeasesSupplier != null; globalCheckpointSupplier = maybeGlobalCheckpointSupplier; - retentionLeasesSupplier = Collections::emptyList; + retentionLeasesSupplier = maybeRetentionLeasesSupplier; } - EngineConfig config = new EngineConfig(shardId, allocationId.getId(), threadPool, indexSettings, null, store, - mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, - IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), extRefreshListenerList, intRefreshListenerList, indexSort, - breakerService, globalCheckpointSupplier, retentionLeasesSupplier, primaryTerm::get, tombstoneDocSupplier()); - return config; + return new EngineConfig( + shardId, + allocationId.getId(), + threadPool, + indexSettings, + null, + store, + mergePolicy, + iwc.getAnalyzer(), + iwc.getSimilarity(), + new CodecService(null, logger), + listener, + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + translogConfig, + TimeValue.timeValueMinutes(5), + extRefreshListenerList, + intRefreshListenerList, + indexSort, + breakerService, + globalCheckpointSupplier, + retentionLeasesSupplier, + primaryTerm::get, + tombstoneDocSupplier()); } protected static final BytesReference B_1 = new BytesArray(new byte[]{1}); From 40c9a68879a317ce90c20c28f5339edbe336ee0f Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 11 Jan 2019 10:49:51 -0800 Subject: [PATCH 02/14] Remove duplicate check --- .../main/java/org/elasticsearch/index/shard/IndexShard.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index eb30e92f74bdf..76c8a4439216c 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1911,12 +1911,6 @@ public void addGlobalCheckpointListener( void addOrUpdateRetentionLease(final String id, final long retainingSequenceNumber, final String source) { assert assertPrimaryMode(); verifyNotClosed(); - if (id.contains(",") || id.contains(":") || id.contains(";")) { - throw new IllegalArgumentException("retention lease ID can not contain any of [,:;] but was [" + id + "]"); - } - if (source.contains(",") || source.contains(":") || source.contains(";")) { - throw new IllegalArgumentException("retention lease source can not contain any of [,:;] but was [" + source + "]"); - } replicationTracker.addOrUpdateRetentionLease(id, retainingSequenceNumber, source); } From 33abce27f1896f6a0bc9d45de2ea4651e445e53e Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 11 Jan 2019 10:57:33 -0800 Subject: [PATCH 03/14] Relocate logic --- .../index/engine/InternalEngine.java | 12 ++------ .../index/seqno/RetentionLease.java | 30 +++++++++++++++++-- .../elasticsearch/index/shard/IndexShard.java | 16 +--------- 3 files changed, 31 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1d0d2e9aefb27..aafbb2fc05ff0 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2343,16 +2343,8 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl final Tuple> retentionPolicy = softDeletesPolicy.getRetentionPolicy(); commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1())); final Collection retentionLeases = retentionPolicy.v2(); - final String encodedRetentionLeases = retentionLeases - .stream() - .map(retentionLease -> String.format( - Locale.ROOT, - "id:%s;retaining_seq_no:%d;timestamp:%d;source:%s", - retentionLease.id(), - retentionLease.retainingSequenceNumber(), - retentionLease.timestamp(), - retentionLease.source())) - .collect(Collectors.joining(",")); + final String encodedRetentionLeases = + retentionLeases.stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(",")); commitData.put(Engine.RETENTION_LEASES, encodedRetentionLeases); } logger.trace("committing writer with commit data [{}]", commitData); diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java index 890d8bc750893..db009bf30e152 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java @@ -19,6 +19,8 @@ package org.elasticsearch.index.seqno; +import java.util.Arrays; +import java.util.Locale; import java.util.Objects; /** @@ -85,7 +87,7 @@ public String source() { public RetentionLease(final String id, final long retainingSequenceNumber, final long timestamp, final String source) { Objects.requireNonNull(id); if (id.contains(":") || id.contains(";") || id.contains(",")) { - // retention lease IDs can not contain these characters because they are used as separators in index commits + // retention lease IDs can not contain these characters because they are used in encoding retention leases throw new IllegalArgumentException("retention lease ID can not contain any of [:;,] but was [" + id + "]"); } if (retainingSequenceNumber < SequenceNumbers.UNASSIGNED_SEQ_NO) { @@ -96,7 +98,7 @@ public RetentionLease(final String id, final long retainingSequenceNumber, final } Objects.requireNonNull(source); if (source.contains(":") || source.contains(";") || source.contains(",")) { - // retention lease sources can not contain these characters because they are used as separators in index commits + // retention lease sources can not contain these characters because they are used in encoding retention leases throw new IllegalArgumentException("retention lease source can not contain any of [:;,] but was [" + source + "]"); } this.id = id; @@ -105,6 +107,30 @@ public RetentionLease(final String id, final long retainingSequenceNumber, final this.source = source; } + public static String encodeRetentionLease(final RetentionLease retentionLease) { + return String.format( + Locale.ROOT, + "id:%s;retaining_seq_no:%d;timestamp:%d;source:%s", + retentionLease.id(), + retentionLease.retainingSequenceNumber(), + retentionLease.timestamp(), + retentionLease.source()); + } + + public static RetentionLease decodeRetentionLease(final String encodedRetentionLease) { + final String[] fields = encodedRetentionLease.split(";"); + assert fields.length == 4 : Arrays.toString(fields); + assert fields[0].matches("id:[^:;,]+") : fields[0]; + final String id = fields[0].substring("id:".length()); + assert fields[1].matches("retaining_seq_no:\\d+") : fields[1]; + final long retainingSequenceNumber = Long.parseLong(fields[1].substring("retaining_seq_no:".length())); + assert fields[2].matches("timestamp:\\d+") : fields[2]; + final long timestamp = Long.parseLong(fields[2].substring("timestamp:".length())); + assert fields[3].matches("source:[^:;,]+") : fields[3]; + final String source = fields[3].substring("source:".length()); + return new RetentionLease(id, retainingSequenceNumber, timestamp, source); + } + @Override public boolean equals(final Object o) { if (this == o) return true; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 76c8a4439216c..5ae74f69ff3a1 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1448,21 +1448,7 @@ static Collection getRetentionLeases(final SegmentInfos segmentI assert Arrays.stream(encodedRetentionLeases) .allMatch(s -> s.matches("id:[^:;,]+;retaining_seq_no:\\d+;timestamp:\\d+;source:[^:;,]+")) : Arrays.toString(encodedRetentionLeases); - final List retentionLeases = new ArrayList<>(); - for (final String encodedRetentionLease : encodedRetentionLeases) { - final String[] fields = encodedRetentionLease.split(";"); - assert fields.length == 4 : Arrays.toString(fields); - assert fields[0].matches("id:[^:;,]+") : fields[0]; - final String id = fields[0].substring("id:".length()); - assert fields[1].matches("retaining_seq_no:\\d+") : fields[1]; - final long retainingSequenceNumber = Long.parseLong(fields[1].substring("retaining_seq_no:".length())); - assert fields[2].matches("timestamp:\\d+") : fields[2]; - final long timestamp = Long.parseLong(fields[2].substring("timestamp:".length())); - assert fields[3].matches("source:[^:;,]+") : fields[3]; - final String source = fields[3].substring("source:".length()); - retentionLeases.add(new RetentionLease(id, retainingSequenceNumber, timestamp, source)); - } - return retentionLeases; + return Arrays.stream(encodedRetentionLeases).map(RetentionLease::decodeRetentionLease).collect(Collectors.toList()); } private void trimUnsafeCommits() throws IOException { From 7aea54473774da9787c3a654e5daf2f11a43bc85 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 11 Jan 2019 11:01:16 -0800 Subject: [PATCH 04/14] Add test --- .../index/engine/InternalEngineTests.java | 11 +---------- .../index/seqno/RetentionLeaseTests.java | 10 ++++++++++ 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 667cd14b0b679..4b68e19340759 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5293,16 +5293,7 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { if (leases.isEmpty()) { assertThat(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES), equalTo("")); } else { - final String expected = leases - .stream() - .map(lease -> String.format( - Locale.ROOT, - "id:%s;retaining_seq_no:%d;timestamp:%d;source:%s", - lease.id(), - lease.retainingSequenceNumber(), - lease.timestamp(), - lease.source())) - .collect(Collectors.joining(",")); + final String expected = leases.stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(",")); assertThat(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES), equalTo(expected)); } } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java index 1cbbc49808cbc..c10c5988fd78b 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java @@ -23,6 +23,7 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasToString; public class RetentionLeaseTests extends ESTestCase { @@ -67,4 +68,13 @@ public void testInvalidSource() { hasToString(containsString("retention lease source can not contain any of [:;,] but was [" + source + "]"))); } + public void testRetentionLeaseEncoding() { + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomNonNegativeLong(); + final long timestamp = randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source); + assertThat(RetentionLease.decodeRetentionLease(RetentionLease.encodeRetentionLease(retentionLease)), equalTo(retentionLease)); + } + } From ed43526630884c68499deed405e5f27236b16e85 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 11 Jan 2019 11:01:36 -0800 Subject: [PATCH 05/14] Imports --- .../java/org/elasticsearch/index/engine/InternalEngineTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 4b68e19340759..53783a0c630de 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -149,7 +149,6 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Queue; import java.util.Set; From dde8e1b2511e467b95294cc5a0e4650cfae39a6e Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 11 Jan 2019 11:06:15 -0800 Subject: [PATCH 06/14] Javadocs --- .../elasticsearch/index/seqno/RetentionLease.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java index db009bf30e152..7854e2b9e2d91 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java @@ -107,6 +107,13 @@ public RetentionLease(final String id, final long retainingSequenceNumber, final this.source = source; } + /** + * Encodes a retention lease as a string. This encoding can be decoded by {@link #decodeRetentionLease(String)}. The retention lease is + * encoded in the format id:{id};retaining_seq_no:{retainingSequenecNumber};timestamp:{timestamp};source:{source}. + * + * @param retentionLease an encoding of the retention lease + * @return the encoded retention lease + */ public static String encodeRetentionLease(final RetentionLease retentionLease) { return String.format( Locale.ROOT, @@ -117,6 +124,12 @@ public static String encodeRetentionLease(final RetentionLease retentionLease) { retentionLease.source()); } + /** + * Decodes a retention lease encoded by {@link #encodeRetentionLease(RetentionLease)}. + * + * @param encodedRetentionLease an encoded retention lease + * @return the decoded retention lease + */ public static RetentionLease decodeRetentionLease(final String encodedRetentionLease) { final String[] fields = encodedRetentionLease.split(";"); assert fields.length == 4 : Arrays.toString(fields); From df9f35659a13d095e102bc84b23d54bf1db7d5ca Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 11 Jan 2019 11:27:47 -0800 Subject: [PATCH 07/14] More encapsulation --- .../index/engine/InternalEngine.java | 5 +-- .../index/seqno/RetentionLease.java | 38 +++++++++++++++++-- .../elasticsearch/index/shard/IndexShard.java | 6 +-- .../index/engine/InternalEngineTests.java | 5 ++- .../index/seqno/RetentionLeaseTests.java | 27 +++++++++++++ 5 files changed, 66 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index aafbb2fc05ff0..1892b3e5d1abb 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2342,10 +2342,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl if (softDeleteEnabled) { final Tuple> retentionPolicy = softDeletesPolicy.getRetentionPolicy(); commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1())); - final Collection retentionLeases = retentionPolicy.v2(); - final String encodedRetentionLeases = - retentionLeases.stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(",")); - commitData.put(Engine.RETENTION_LEASES, encodedRetentionLeases); + commitData.put(Engine.RETENTION_LEASES, RetentionLease.encodeRetentionLeases(retentionPolicy.v2())); } logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java index 7854e2b9e2d91..f9dc370eba712 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java @@ -20,8 +20,11 @@ package org.elasticsearch.index.seqno; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.Locale; import java.util.Objects; +import java.util.stream.Collectors; /** * A "shard history retention lease" (or "retention lease" for short) is conceptually a marker containing a retaining sequence number such @@ -111,10 +114,10 @@ public RetentionLease(final String id, final long retainingSequenceNumber, final * Encodes a retention lease as a string. This encoding can be decoded by {@link #decodeRetentionLease(String)}. The retention lease is * encoded in the format id:{id};retaining_seq_no:{retainingSequenecNumber};timestamp:{timestamp};source:{source}. * - * @param retentionLease an encoding of the retention lease - * @return the encoded retention lease + * @param retentionLease the retention lease + * @return the encoding of the retention lease */ - public static String encodeRetentionLease(final RetentionLease retentionLease) { + static String encodeRetentionLease(final RetentionLease retentionLease) { return String.format( Locale.ROOT, "id:%s;retaining_seq_no:%d;timestamp:%d;source:%s", @@ -124,13 +127,24 @@ public static String encodeRetentionLease(final RetentionLease retentionLease) { retentionLease.source()); } + /** + * Encodes a collection of retention leases as a string. This encoding can be decoed by {@link #decodeRetentionLeases(String)}. The + * encoding is a comma-separated encoding of each retention lease as encoded by {@link #encodeRetentionLease(RetentionLease)}. + * + * @param retentionLeases the retention leases + * @return the encoding of the retention leases + */ + public static String encodeRetentionLeases(final Collection retentionLeases) { + return retentionLeases.stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(",")); + } + /** * Decodes a retention lease encoded by {@link #encodeRetentionLease(RetentionLease)}. * * @param encodedRetentionLease an encoded retention lease * @return the decoded retention lease */ - public static RetentionLease decodeRetentionLease(final String encodedRetentionLease) { + static RetentionLease decodeRetentionLease(final String encodedRetentionLease) { final String[] fields = encodedRetentionLease.split(";"); assert fields.length == 4 : Arrays.toString(fields); assert fields[0].matches("id:[^:;,]+") : fields[0]; @@ -144,6 +158,22 @@ public static RetentionLease decodeRetentionLease(final String encodedRetentionL return new RetentionLease(id, retainingSequenceNumber, timestamp, source); } + /** + * Decodes retention leases encoded by {@link #encodeRetentionLeases(Collection)}. + * + * @param encodedRetentionLeases an encoded collection of retention leases + * @return the decoded retention leases + */ + public static Collection decodeRetentionLeases(final String encodedRetentionLeases) { + if (encodedRetentionLeases.isEmpty()) { + return Collections.emptyList(); + } + assert Arrays.stream(encodedRetentionLeases.split(",")) + .allMatch(s -> s.matches("id:[^:;,]+;retaining_seq_no:\\d+;timestamp:\\d+;source:[^:;,]+")) + : encodedRetentionLeases; + return Arrays.stream(encodedRetentionLeases.split(",")).map(RetentionLease::decodeRetentionLease).collect(Collectors.toList()); + } + @Override public boolean equals(final Object o) { if (this == o) return true; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 5ae74f69ff3a1..5c0cfa4918706 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1444,11 +1444,7 @@ static Collection getRetentionLeases(final SegmentInfos segmentI if (committedRetentionLeases == null) { return Collections.emptyList(); } - final String[] encodedRetentionLeases = committedRetentionLeases.split(","); - assert Arrays.stream(encodedRetentionLeases) - .allMatch(s -> s.matches("id:[^:;,]+;retaining_seq_no:\\d+;timestamp:\\d+;source:[^:;,]+")) - : Arrays.toString(encodedRetentionLeases); - return Arrays.stream(encodedRetentionLeases).map(RetentionLease::decodeRetentionLease).collect(Collectors.toList()); + return RetentionLease.decodeRetentionLeases(committedRetentionLeases); } private void trimUnsafeCommits() throws IOException { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 53783a0c630de..796d7eb0c60ec 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5292,8 +5292,9 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException { if (leases.isEmpty()) { assertThat(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES), equalTo("")); } else { - final String expected = leases.stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(",")); - assertThat(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES), equalTo(expected)); + assertThat( + engine.getLastCommittedSegmentInfos().getUserData().get(Engine.RETENTION_LEASES), + equalTo(RetentionLease.encodeRetentionLeases(leases))); } } if (rarely()) { diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java index c10c5988fd78b..8f8c4e0bb85ae 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java @@ -21,9 +21,16 @@ import org.elasticsearch.test.ESTestCase; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; public class RetentionLeaseTests extends ESTestCase { @@ -77,4 +84,24 @@ public void testRetentionLeaseEncoding() { assertThat(RetentionLease.decodeRetentionLease(RetentionLease.encodeRetentionLease(retentionLease)), equalTo(retentionLease)); } + public void testRetentionLeasesEncoding() { + final int length = randomIntBetween(0, 8); + final List retentionLeases = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomNonNegativeLong(); + final long timestamp = randomNonNegativeLong(); + final String source = randomAlphaOfLength(8); + final RetentionLease retentionLease = new RetentionLease(id, retainingSequenceNumber, timestamp, source); + retentionLeases.add(retentionLease); + } + final Collection decodedRetentionLeases = + RetentionLease.decodeRetentionLeases(RetentionLease.encodeRetentionLeases(retentionLeases)); + if (length == 0) { + assertThat(decodedRetentionLeases, empty()); + } else { + assertThat(decodedRetentionLeases, contains(retentionLeases.toArray(new RetentionLease[0]))); + } + } + } From c1331040f461227983e7aa8c87dfa2dc10d05901 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 11 Jan 2019 11:28:36 -0800 Subject: [PATCH 08/14] Fix imports --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 2 -- .../src/main/java/org/elasticsearch/index/shard/IndexShard.java | 1 - .../java/org/elasticsearch/index/seqno/RetentionLeaseTests.java | 1 - 3 files changed, 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1892b3e5d1abb..4a5262375e571 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -93,7 +93,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -107,7 +106,6 @@ import java.util.function.BiFunction; import java.util.function.LongSupplier; import java.util.function.Supplier; -import java.util.stream.Collectors; import java.util.stream.Stream; public class InternalEngine extends Engine { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 5c0cfa4918706..270ce20819102 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -141,7 +141,6 @@ import java.nio.channels.ClosedByInterruptException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java index 8f8c4e0bb85ae..346e4a899e352 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java @@ -30,7 +30,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasToString; public class RetentionLeaseTests extends ESTestCase { From b73c7e13ab942cc46243b58fd348eae6e457e2aa Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 11 Jan 2019 11:42:02 -0800 Subject: [PATCH 09/14] Fix docs --- docs/reference/indices/flush.asciidoc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/reference/indices/flush.asciidoc b/docs/reference/indices/flush.asciidoc index c6bf60182fd76..491f799856d3b 100644 --- a/docs/reference/indices/flush.asciidoc +++ b/docs/reference/indices/flush.asciidoc @@ -102,7 +102,8 @@ which returns something similar to: "max_seq_no" : "-1", "sync_id" : "AVvFY-071siAOuFGEO9P", <1> "max_unsafe_auto_id_timestamp" : "-1", - "min_retained_seq_no": "0" + "min_retained_seq_no" : "0", + "retention_leases" : "id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica" }, "num_docs" : 0 } @@ -117,6 +118,7 @@ which returns something similar to: // TESTRESPONSE[s/"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA"/"translog_uuid": $body.indices.twitter.shards.0.0.commit.user_data.translog_uuid/] // TESTRESPONSE[s/"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ"/"history_uuid": $body.indices.twitter.shards.0.0.commit.user_data.history_uuid/] // TESTRESPONSE[s/"sync_id" : "AVvFY-071siAOuFGEO9P"/"sync_id": $body.indices.twitter.shards.0.0.commit.user_data.sync_id/] +// TESTRESPONSE[s/"retention_leases" : "id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica-0"/"retention_leases": $body.indices.twitter.shards.0.0.commit.user_data.retention_leases/] <1> the `sync id` marker [float] From 4f31b23c59632bf0f4d1cfc9a037256450b07257 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 11 Jan 2019 11:42:41 -0800 Subject: [PATCH 10/14] Fix copy/paste error --- docs/reference/indices/flush.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/indices/flush.asciidoc b/docs/reference/indices/flush.asciidoc index 491f799856d3b..a0027756ab57a 100644 --- a/docs/reference/indices/flush.asciidoc +++ b/docs/reference/indices/flush.asciidoc @@ -118,7 +118,7 @@ which returns something similar to: // TESTRESPONSE[s/"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA"/"translog_uuid": $body.indices.twitter.shards.0.0.commit.user_data.translog_uuid/] // TESTRESPONSE[s/"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ"/"history_uuid": $body.indices.twitter.shards.0.0.commit.user_data.history_uuid/] // TESTRESPONSE[s/"sync_id" : "AVvFY-071siAOuFGEO9P"/"sync_id": $body.indices.twitter.shards.0.0.commit.user_data.sync_id/] -// TESTRESPONSE[s/"retention_leases" : "id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica-0"/"retention_leases": $body.indices.twitter.shards.0.0.commit.user_data.retention_leases/] +// TESTRESPONSE[s/"retention_leases" : "id:replica-0;retaining_seq_no:0;timestamp:1547235588;source:replica"/"retention_leases": $body.indices.twitter.shards.0.0.commit.user_data.retention_leases/] <1> the `sync id` marker [float] From 444f483f0b2caa9f4303a13a6054257255035539 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 12 Jan 2019 08:54:45 -0800 Subject: [PATCH 11/14] Add more validation --- .../index/seqno/RetentionLease.java | 10 +++++++ .../index/seqno/RetentionLeaseTests.java | 26 ++++++++++++------- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java index f9dc370eba712..f763759261385 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java @@ -89,6 +89,9 @@ public String source() { */ public RetentionLease(final String id, final long retainingSequenceNumber, final long timestamp, final String source) { Objects.requireNonNull(id); + if (id.isEmpty()) { + throw new IllegalArgumentException("retention lease ID can not be empty"); + } if (id.contains(":") || id.contains(";") || id.contains(",")) { // retention lease IDs can not contain these characters because they are used in encoding retention leases throw new IllegalArgumentException("retention lease ID can not contain any of [:;,] but was [" + id + "]"); @@ -100,6 +103,9 @@ public RetentionLease(final String id, final long retainingSequenceNumber, final throw new IllegalArgumentException("retention lease timestamp [" + timestamp + "] out of range"); } Objects.requireNonNull(source); + if (source.isEmpty()) { + throw new IllegalArgumentException("retention lease source can not be empty"); + } if (source.contains(":") || source.contains(";") || source.contains(",")) { // retention lease sources can not contain these characters because they are used in encoding retention leases throw new IllegalArgumentException("retention lease source can not contain any of [:;,] but was [" + source + "]"); @@ -118,6 +124,7 @@ public RetentionLease(final String id, final long retainingSequenceNumber, final * @return the encoding of the retention lease */ static String encodeRetentionLease(final RetentionLease retentionLease) { + Objects.requireNonNull(retentionLease); return String.format( Locale.ROOT, "id:%s;retaining_seq_no:%d;timestamp:%d;source:%s", @@ -135,6 +142,7 @@ static String encodeRetentionLease(final RetentionLease retentionLease) { * @return the encoding of the retention leases */ public static String encodeRetentionLeases(final Collection retentionLeases) { + Objects.requireNonNull(retentionLeases); return retentionLeases.stream().map(RetentionLease::encodeRetentionLease).collect(Collectors.joining(",")); } @@ -145,6 +153,7 @@ public static String encodeRetentionLeases(final Collection rete * @return the decoded retention lease */ static RetentionLease decodeRetentionLease(final String encodedRetentionLease) { + Objects.requireNonNull(encodedRetentionLease); final String[] fields = encodedRetentionLease.split(";"); assert fields.length == 4 : Arrays.toString(fields); assert fields[0].matches("id:[^:;,]+") : fields[0]; @@ -165,6 +174,7 @@ static RetentionLease decodeRetentionLease(final String encodedRetentionLease) { * @return the decoded retention leases */ public static Collection decodeRetentionLeases(final String encodedRetentionLeases) { + Objects.requireNonNull(encodedRetentionLeases); if (encodedRetentionLeases.isEmpty()) { return Collections.emptyList(); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java index 346e4a899e352..c4340a381ce25 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseTests.java @@ -39,9 +39,14 @@ public void testInvalidId() { final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, () -> new RetentionLease(id, randomNonNegativeLong(), randomNonNegativeLong(), "source")); - assertThat( - e, - hasToString(containsString("retention lease ID can not contain any of [:;,] but was [" + id + "]"))); + assertThat(e, hasToString(containsString("retention lease ID can not contain any of [:;,] but was [" + id + "]"))); + } + + public void testEmptyId() { + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> new RetentionLease("", randomNonNegativeLong(), randomNonNegativeLong(), "source")); + assertThat(e, hasToString(containsString("retention lease ID can not be empty"))); } public void testRetainingSequenceNumberOutOfRange() { @@ -59,9 +64,7 @@ public void testTimestampOutOfRange() { final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, () -> new RetentionLease("id", randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE), timestamp, "source")); - assertThat( - e, - hasToString(containsString("retention lease timestamp [" + timestamp + "] out of range"))); + assertThat(e, hasToString(containsString("retention lease timestamp [" + timestamp + "] out of range"))); } public void testInvalidSource() { @@ -69,9 +72,14 @@ public void testInvalidSource() { final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, () -> new RetentionLease("id", randomNonNegativeLong(), randomNonNegativeLong(), source)); - assertThat( - e, - hasToString(containsString("retention lease source can not contain any of [:;,] but was [" + source + "]"))); + assertThat(e, hasToString(containsString("retention lease source can not contain any of [:;,] but was [" + source + "]"))); + } + + public void testEmptySource() { + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> new RetentionLease("id", randomNonNegativeLong(), randomNonNegativeLong(), "")); + assertThat(e, hasToString(containsString("retention lease source can not be empty"))); } public void testRetentionLeaseEncoding() { From 5c226d9b9ae852da9e4724783c106afbaa7a68e2 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 12 Jan 2019 08:56:29 -0800 Subject: [PATCH 12/14] Add comment --- .../java/org/elasticsearch/index/engine/InternalEngine.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 4a5262375e571..df52c6bc0213f 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2338,6 +2338,10 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); commitData.put(HISTORY_UUID_KEY, historyUUID); if (softDeleteEnabled) { + /* + * We sample these from the policy (which occurs under a lock) to ensure that we have a consistent view of the minimum + * retained sequence number, and the retention leases. + */ final Tuple> retentionPolicy = softDeletesPolicy.getRetentionPolicy(); commitData.put(Engine.MIN_RETAINED_SEQNO, Long.toString(retentionPolicy.v1())); commitData.put(Engine.RETENTION_LEASES, RetentionLease.encodeRetentionLeases(retentionPolicy.v2())); From cd6a091131f0374ed4994da17bbf9bec741a9418 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 12 Jan 2019 12:55:04 -0800 Subject: [PATCH 13/14] Fix possible NPE --- .../java/org/elasticsearch/index/engine/SoftDeletesPolicy.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java index bdcf4529e576b..adabd64278df3 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java @@ -29,6 +29,7 @@ import org.elasticsearch.index.translog.Translog; import java.util.Collection; +import java.util.Collections; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongSupplier; @@ -59,6 +60,7 @@ final class SoftDeletesPolicy { this.retentionOperations = retentionOperations; this.minRetainedSeqNo = minRetainedSeqNo; this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier); + retentionLeases = retentionLeasesSupplier.get(); this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED; this.retentionLockCount = 0; } From 628adf8af0788c11bdfab7299e61f5853873ecf2 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 12 Jan 2019 12:55:30 -0800 Subject: [PATCH 14/14] Fix import --- .../java/org/elasticsearch/index/engine/SoftDeletesPolicy.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java index adabd64278df3..a2452d4b53eb9 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java @@ -29,7 +29,6 @@ import org.elasticsearch.index.translog.Translog; import java.util.Collection; -import java.util.Collections; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongSupplier;