diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index e8a2bf0aa58d3..ece6b2582eae4 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -36,6 +36,7 @@ import java.util.Comparator; import java.util.List; import java.util.Objects; +import java.util.function.Consumer; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -395,4 +396,8 @@ public Comparator getLeafSorter() { public LongSupplier getRelativeTimeInNanosSupplier() { return relativeTimeInNanosSupplier; } + + public MayHaveBeenIndexedBefore buildMayHaveBeenIndexedBefore(Consumer assertPrimaryCanOptimizeAddDocument) { + return indexSettings.getMode().idFieldMapperWithoutFieldData().buildMayHaveBeenIndexedBefore(assertPrimaryCanOptimizeAddDocument); + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a3cd492918ffc..8d6ce5c1f1366 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -41,7 +41,6 @@ import org.elasticsearch.Assertions; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.lucene.LoggerInfoStream; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; @@ -145,8 +144,7 @@ public class InternalEngine extends Engine { // incoming indexing ops to a single thread: private final AtomicInteger throttleRequestCount = new AtomicInteger(); private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); - private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); - private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1); + private final MayHaveBeenIndexedBefore mayHaveBeenIndexedBefore; // max_seq_no_of_updates_or_deletes tracks the max seq_no of update or delete operations that have been processed in this engine. // An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index. // The value of this marker never goes backwards, and is tracked/updated differently on primary and replica. @@ -198,6 +196,7 @@ public InternalEngine(EngineConfig engineConfig) { super(engineConfig); this.maxDocs = maxDocs; this.relativeTimeInNanosSupplier = config().getRelativeTimeInNanosSupplier(); + this.mayHaveBeenIndexedBefore = engineConfig.buildMayHaveBeenIndexedBefore(this::assertPrimaryCanOptimizeAddDocument); final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(); store.incRef(); IndexWriter writer = null; @@ -230,7 +229,7 @@ public InternalEngine(EngineConfig engineConfig) { ); this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); writer = createWriter(); - bootstrapAppendOnlyInfoFromWriter(writer); + mayHaveBeenIndexedBefore.bootstrap(writer.getLiveCommitData()); final Map commitData = commitDataAsMap(writer); historyUUID = loadHistoryUUID(commitData); forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY); @@ -453,16 +452,6 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException { } } - private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) { - for (Map.Entry entry : writer.getLiveCommitData()) { - if (MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID.equals(entry.getKey())) { - assert maxUnsafeAutoIdTimestamp.get() == -1 - : "max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]"; - updateAutoIdTimestamp(Long.parseLong(entry.getValue()), true); - } - } - } - @Override public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException { try (ReleasableLock lock = readLock.acquire()) { @@ -840,33 +829,9 @@ private VersionValue getVersionFromMap(BytesRef id) { return versionMap.getUnderLock(id); } - private boolean canOptimizeAddDocument(Index index) { - if (index.getAutoGeneratedIdTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) { - assert index.getAutoGeneratedIdTimestamp() >= 0 - : "autoGeneratedIdTimestamp must be positive but was: " + index.getAutoGeneratedIdTimestamp(); - return switch (index.origin()) { - case PRIMARY -> { - assert assertPrimaryCanOptimizeAddDocument(index); - yield true; - } - case PEER_RECOVERY, REPLICA -> { - assert index.version() == 1 && index.versionType() == null - : "version: " + index.version() + " type: " + index.versionType(); - yield true; - } - case LOCAL_TRANSLOG_RECOVERY, LOCAL_RESET -> { - assert index.isRetry(); - yield true; // allow to optimize in order to update the max safe time stamp - } - }; - } - return false; - } - - protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) { + protected void assertPrimaryCanOptimizeAddDocument(Index index) { assert (index.version() == Versions.MATCH_DELETED || index.version() == Versions.MATCH_ANY) && index.versionType() == VersionType.INTERNAL : "version: " + index.version() + " type: " + index.versionType(); - return true; } private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) { @@ -1052,10 +1017,7 @@ public IndexResult index(Index index) throws IOException { protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException { assert assertNonPrimaryOrigin(index); - // needs to maintain the auto_id timestamp in case this replica becomes primary - if (canOptimizeAddDocument(index)) { - mayHaveBeenIndexedBefore(index); - } + mayHaveBeenIndexedBefore.handleNonPrimary(index); final IndexingStrategy plan; // unlike the primary, replicas don't really care to about creation status of documents // this allows to ignore the case where a document was found in the live version maps in @@ -1100,8 +1062,8 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { final int reservingDocs = index.parsedDoc().docs().size(); final IndexingStrategy plan; // resolve an external operation into an internal one which is safe to replay - final boolean canOptimizeAddDocument = canOptimizeAddDocument(index); - if (canOptimizeAddDocument && mayHaveBeenIndexedBefore(index) == false) { + final boolean canOptimizeAddDocument = mayHaveBeenIndexedBefore.canOptimizeAddDocument(index); + if (canOptimizeAddDocument && mayHaveBeenIndexedBefore.mayHaveBeenIndexedBefore(index) == false) { final Exception reserveError = tryAcquireInFlightDocs(index, reservingDocs); if (reserveError != null) { plan = IndexingStrategy.failAsTooManyDocs(reserveError, index.id()); @@ -1183,7 +1145,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws I updateDocs(index.uid(), index.docs(), indexWriter); } else { // document does not exists, we can optimize for create, but double check if assertions are running - assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false); + assert assertDocDoesNotExist(index, mayHaveBeenIndexedBefore.canOptimizeAddDocument(index) == false); addDocs(index.docs(), indexWriter); } return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted, index.id()); @@ -1223,27 +1185,6 @@ private static boolean treatDocumentFailureAsTragicError(Index index) { || index.origin() == Operation.Origin.LOCAL_RESET; } - /** - * returns true if the indexing operation may have already be processed by this engine. - * Note that it is OK to rarely return true even if this is not the case. However a `false` - * return value must always be correct. - * - */ - private boolean mayHaveBeenIndexedBefore(Index index) { - assert canOptimizeAddDocument(index); - final boolean mayHaveBeenIndexBefore; - if (index.isRetry()) { - mayHaveBeenIndexBefore = true; - updateAutoIdTimestamp(index.getAutoGeneratedIdTimestamp(), true); - assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp(); - } else { - // in this case we force - mayHaveBeenIndexBefore = maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp(); - updateAutoIdTimestamp(index.getAutoGeneratedIdTimestamp(), false); - } - return mayHaveBeenIndexBefore; - } - private void addDocs(final List docs, final IndexWriter indexWriter) throws IOException { if (docs.size() > 1) { indexWriter.addDocuments(docs); @@ -2279,7 +2220,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() { protected final void writerSegmentStats(SegmentsStats stats) { stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed()); stats.addIndexWriterMemoryInBytes(indexWriter.ramBytesUsed()); - stats.updateMaxUnsafeAutoIdTimestamp(maxUnsafeAutoIdTimestamp.get()); + mayHaveBeenIndexedBefore.writerSegmentStats(stats); } @Override @@ -2600,7 +2541,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID()); commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo())); - commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); + mayHaveBeenIndexedBefore.writeCommitData(commitData); commitData.put(HISTORY_UUID_KEY, historyUUID); final String currentForceMergeUUID = forceMergeUUID; if (currentForceMergeUUID != null) { @@ -2921,21 +2862,12 @@ void updateRefreshedCheckpoint(long checkpoint) { @Override public final long getMaxSeenAutoIdTimestamp() { - return maxSeenAutoIdTimestamp.get(); + return mayHaveBeenIndexedBefore.getMaxSeenAutoIdTimestamp(); } @Override public final void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) { - updateAutoIdTimestamp(newTimestamp, true); - } - - private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) { - assert newTimestamp >= -1 : "invalid timestamp [" + newTimestamp + "]"; - maxSeenAutoIdTimestamp.accumulateAndGet(newTimestamp, Math::max); - if (unsafe) { - maxUnsafeAutoIdTimestamp.accumulateAndGet(newTimestamp, Math::max); - } - assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get(); + mayHaveBeenIndexedBefore.updateAutoIdTimestamp(newTimestamp); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/MayHaveBeenIndexedBefore.java b/server/src/main/java/org/elasticsearch/index/engine/MayHaveBeenIndexedBefore.java new file mode 100644 index 0000000000000..57f76e84a53f8 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/MayHaveBeenIndexedBefore.java @@ -0,0 +1,279 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.index.engine; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.util.ByteUtils; +import org.elasticsearch.index.engine.Engine.Index; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; + +import java.util.Base64; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +/** + * Quickly detects cases where {@link Engine.Index} operations must be unique + * so we can add them to the index without performing slow uniqueness queries. + */ +public interface MayHaveBeenIndexedBefore { + /** + * Bootstrap state from commit data. + */ + void bootstrap(Iterable> liveCommitData); + + long getMaxSeenAutoIdTimestamp(); + + /** + * {@code true} if it's valid to call {@link #mayHaveBeenIndexedBefore} + * on the provided {@link Index}, false otherwise. This should be fast + * an only rely on state from the {@link Index} and not rely on any + * internal state. + */ + boolean canOptimizeAddDocument(Index index); + + /** + * Returns {@code true} if the indexing operation may have already be + * processed by the engine. Note that it is OK to rarely return true even + * if this is not the case. However a {@code false} return value must + * always be correct. + *

+ * This relies on state internal to the implementation and may modify + * that state. + */ + boolean mayHaveBeenIndexedBefore(Index index); + + void updateAutoIdTimestamp(long newTimestamp); + + void handleNonPrimary(Index index); + + void writerSegmentStats(SegmentsStats stats); + + void writeCommitData(Map commitData); + + class Standard implements MayHaveBeenIndexedBefore { + /** + * Updated on bootstrap, recovery, and retry. + */ + private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); + private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1); + private final Consumer assertPrimaryCanOptimizeAddDocument; + + public Standard(Consumer assertPrimaryCanOptimizeAddDocument) { + this.assertPrimaryCanOptimizeAddDocument = assertPrimaryCanOptimizeAddDocument; + } + + @Override + public boolean canOptimizeAddDocument(Index index) { + if (index.getAutoGeneratedIdTimestamp() == IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) { + return false; + } + assert index.getAutoGeneratedIdTimestamp() >= 0 + : "autoGeneratedIdTimestamp must be positive but was: " + index.getAutoGeneratedIdTimestamp(); + switch (index.origin()) { + case PRIMARY: + assertPrimaryCanOptimizeAddDocument.accept(index); + break; + case PEER_RECOVERY, REPLICA: + assert index.version() == 1 && index.versionType() == null + : "version: " + index.version() + " type: " + index.versionType(); + break; + case LOCAL_TRANSLOG_RECOVERY, LOCAL_RESET: + assert index.isRetry(); + break; + } + + return true; + } + + @Override + public void bootstrap(Iterable> liveCommitData) { + for (Map.Entry entry : liveCommitData) { + if (Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID.equals(entry.getKey())) { + assert maxUnsafeAutoIdTimestamp.get() == -1 + : "max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]"; + updateAutoIdTimestamp(Long.parseLong(entry.getValue()), true); + } + } + } + + @Override + public boolean mayHaveBeenIndexedBefore(Index index) { + assert canOptimizeAddDocument(index); + final boolean mayHaveBeenIndexBefore; + if (index.isRetry()) { + mayHaveBeenIndexBefore = true; + updateAutoIdTimestamp(index.getAutoGeneratedIdTimestamp(), true); + assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp(); + } else { + // in this case we force + mayHaveBeenIndexBefore = maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp(); + updateAutoIdTimestamp(index.getAutoGeneratedIdTimestamp(), false); + } + return mayHaveBeenIndexBefore; + } + + @Override + public long getMaxSeenAutoIdTimestamp() { + return maxSeenAutoIdTimestamp.get(); + } + + @Override + public void handleNonPrimary(Index index) { + // needs to maintain the auto_id timestamp in case this replica becomes primary + if (canOptimizeAddDocument(index)) { + mayHaveBeenIndexedBefore(index); + } + } + + @Override + public void updateAutoIdTimestamp(long newTimestamp) { + updateAutoIdTimestamp(newTimestamp, true); + } + + private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) { + assert newTimestamp >= -1 : "invalid timestamp [" + newTimestamp + "]"; + maxSeenAutoIdTimestamp.accumulateAndGet(newTimestamp, Math::max); + if (unsafe) { + maxUnsafeAutoIdTimestamp.accumulateAndGet(newTimestamp, Math::max); + } + assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get(); + } + + @Override + public void writerSegmentStats(SegmentsStats stats) { + stats.updateMaxUnsafeAutoIdTimestamp(maxUnsafeAutoIdTimestamp.get()); + } + + @Override + public void writeCommitData(Map commitData) { + commitData.put(Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); + } + } + + class TimeSeries implements MayHaveBeenIndexedBefore { + private final int TIMESTAMPS_SIZE = 1 << 12; + private final int LOW_BIT_MASK = TIMESTAMPS_SIZE - 1; + private final AtomicLong[] maxTimestamps = new AtomicLong[TIMESTAMPS_SIZE]; + private Consumer assertPrimaryCanOptimizeAddDocument; + + public TimeSeries(Consumer assertPrimaryCanOptimizeAddDocument) { + for (int i = 0; i < maxTimestamps.length; i++) { + maxTimestamps[i] = new AtomicLong(-1); + } + this.assertPrimaryCanOptimizeAddDocument = assertPrimaryCanOptimizeAddDocument; + } + + @Override + public boolean canOptimizeAddDocument(Index index) { + assert index.getAutoGeneratedIdTimestamp() == IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; + // NOCOMMIT this switch comes from append only logic - probably half of it should assert and the other half should return + switch (index.origin()) { + case PRIMARY: + assertPrimaryCanOptimizeAddDocument.accept(index); + break; + case PEER_RECOVERY, REPLICA: + assert index.version() == 1 && index.versionType() == null + : "version: " + index.version() + " type: " + index.versionType(); + break; + case LOCAL_TRANSLOG_RECOVERY, LOCAL_RESET: + assert index.isRetry(); + break; + } + + return true; + } + + @Override + public void bootstrap(Iterable> liveCommitData) { + for (Map.Entry entry : liveCommitData) { + if (Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID.equals(entry.getKey())) { + assert maxTimestamp() == -1; + updateAutoIdTimestamp(Long.parseLong(entry.getValue())); + logger.error("SADFADSFAF bootstrap {}", entry.getValue()); + } + } + } + + private long maxTimestamp() { + long timestamp = -1; + for (int i = 0; i < maxTimestamps.length; i++) { + timestamp = Math.max(timestamp, maxTimestamps[i].get()); + } + return timestamp; + } + + private static final Logger logger = LogManager.getLogger(MayHaveBeenIndexedBefore.TimeSeries.class); + + @Override + public boolean mayHaveBeenIndexedBefore(Index index) { + assert canOptimizeAddDocument(index); + byte[] decoded = Base64.getUrlDecoder().decode(index.id()); + long hashPart = ByteUtils.readLongBE(decoded, decoded.length - 16); + long timestamp = ByteUtils.readLongBE(decoded, decoded.length - 8); + int lowBits = (int) (hashPart & LOW_BIT_MASK); + long prev = maxTimestamps[lowBits].getAndUpdate(curr -> Math.max(curr, timestamp)); +// logger.error( +// "SADFADSFAF {} {} {}/{} {} {} {}", +// Long.toHexString(hashPart), +// Integer.toHexString(lowBits), +// timestampsUsed(), +// maxTimestamps.length, +// prev, +// timestamp, +// prev - timestamp +// ); + return prev >= timestamp; + } + + private int timestampsUsed() { + int used = 0; + for (AtomicLong timestamp : maxTimestamps) { + if (timestamp.get() != -1) { + used++; + } + } + return used; + } + + @Override + public long getMaxSeenAutoIdTimestamp() { + return maxTimestamp(); + } + + @Override + public void handleNonPrimary(Index index) { + // needs to maintain the auto_id timestamp in case this replica becomes primary + if (canOptimizeAddDocument(index)) { + mayHaveBeenIndexedBefore(index); + } + } + + @Override + public void updateAutoIdTimestamp(long newTimestamp) { + logger.error("SADFADSFAF update auto {}", newTimestamp); + assert newTimestamp >= -1 : "invalid timestamp [" + newTimestamp + "]"; + for (int i = 0; i < maxTimestamps.length; i++) { + maxTimestamps[i].updateAndGet(curr -> Math.max(curr, newTimestamp)); + } + } + + @Override + public void writerSegmentStats(SegmentsStats stats) { + stats.updateMaxUnsafeAutoIdTimestamp(maxTimestamp()); + } + + @Override + public void writeCommitData(Map commitData) { + commitData.put(Engine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxTimestamp())); + } + + } +} diff --git a/server/src/main/java/org/elasticsearch/index/mapper/IdFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/IdFieldMapper.java index 29866ca4b56b2..fe0a8e4ad0303 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/IdFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/IdFieldMapper.java @@ -11,8 +11,11 @@ import org.apache.lucene.document.Field; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.index.analysis.NamedAnalyzer; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.MayHaveBeenIndexedBefore; import java.util.Map; +import java.util.function.Consumer; /** * A mapper for the _id field. @@ -71,4 +74,6 @@ public final SourceLoader.SyntheticFieldLoader syntheticFieldLoader() { public static Field standardIdField(String id) { return new Field(NAME, Uid.encodeId(id), ProvidedIdFieldMapper.Defaults.FIELD_TYPE); } + + public abstract MayHaveBeenIndexedBefore buildMayHaveBeenIndexedBefore(Consumer assertPrimaryCanOptimizeAddDocument); } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/ProvidedIdFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/ProvidedIdFieldMapper.java index fe5421fb56756..5a630885eaf26 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/ProvidedIdFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/ProvidedIdFieldMapper.java @@ -19,6 +19,8 @@ import org.elasticsearch.common.logging.DeprecationCategory; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.MayHaveBeenIndexedBefore; import org.elasticsearch.index.fielddata.FieldData; import org.elasticsearch.index.fielddata.FieldDataContext; import org.elasticsearch.index.fielddata.IndexFieldData; @@ -46,6 +48,7 @@ import java.util.Collection; import java.util.Collections; import java.util.function.BooleanSupplier; +import java.util.function.Consumer; /** * A mapper for the {@code _id} field that reads the from the @@ -287,4 +290,9 @@ public String documentDescription(ParsedDocument parsedDocument) { public String reindexId(String id) { return id; } + + @Override + public MayHaveBeenIndexedBefore buildMayHaveBeenIndexedBefore(Consumer assertPrimaryCanOptimizeAddDocument) { + return new MayHaveBeenIndexedBefore.Standard(assertPrimaryCanOptimizeAddDocument); + } } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/TsidExtractingIdFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/TsidExtractingIdFieldMapper.java index d5e74a432cc17..f008099f3986d 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/TsidExtractingIdFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/TsidExtractingIdFieldMapper.java @@ -20,6 +20,8 @@ import org.elasticsearch.common.hash.MurmurHash3; import org.elasticsearch.common.hash.MurmurHash3.Hash128; import org.elasticsearch.common.util.ByteUtils; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.MayHaveBeenIndexedBefore; import org.elasticsearch.index.fielddata.FieldDataContext; import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.query.SearchExecutionContext; @@ -28,6 +30,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Locale; +import java.util.function.Consumer; /** * A mapper for the {@code _id} field that builds the {@code _id} from the @@ -211,4 +214,9 @@ public String reindexId(String id) { // null the _id so we recalculate it on write return null; } + + @Override + public MayHaveBeenIndexedBefore buildMayHaveBeenIndexedBefore(Consumer assertPrimaryCanOptimizeAddDocument) { + return new MayHaveBeenIndexedBefore.TimeSeries(assertPrimaryCanOptimizeAddDocument); + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index fbf5b90370f7a..d48ab4b268780 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -46,6 +46,12 @@ public class FollowingEngine extends InternalEngine { super(validateEngineConfig(engineConfig)); } + @Override + protected void assertPrimaryCanOptimizeAddDocument(Index index) { + assert index.version() == 1 && index.versionType() == VersionType.EXTERNAL + : "version [" + index.version() + "], type [" + index.versionType() + "]"; + } + private static EngineConfig validateEngineConfig(final EngineConfig engineConfig) { if (CcrSettings.CCR_FOLLOWING_INDEX_SETTING.get(engineConfig.getIndexSettings().getSettings()) == false) { throw new IllegalArgumentException("a following engine can not be constructed for a non-following index"); @@ -170,13 +176,6 @@ protected boolean assertNonPrimaryOrigin(final Operation operation) { return true; } - @Override - protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) { - assert index.version() == 1 && index.versionType() == VersionType.EXTERNAL - : "version [" + index.version() + "], type [" + index.versionType() + "]"; - return true; - } - private OptionalLong lookupPrimaryTerm(final long seqNo) throws IOException { // Don't need to look up term for operations before the global checkpoint for they were processed on every copies already. if (seqNo <= engineConfig.getGlobalCheckpointSupplier().getAsLong()) {