Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract append-only optimization from Engine #84771

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand Down Expand Up @@ -395,4 +396,8 @@ public Comparator<LeafReader> getLeafSorter() {
public LongSupplier getRelativeTimeInNanosSupplier() {
return relativeTimeInNanosSupplier;
}

public MayHaveBeenIndexedBefore buildMayHaveBeenIndexedBefore(Consumer<Engine.Index> assertPrimaryCanOptimizeAddDocument) {
return indexSettings.getMode().idFieldMapperWithoutFieldData().buildMayHaveBeenIndexedBefore(assertPrimaryCanOptimizeAddDocument);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.elasticsearch.Assertions;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
Expand Down Expand Up @@ -145,8 +144,7 @@ public class InternalEngine extends Engine {
// incoming indexing ops to a single thread:
private final AtomicInteger throttleRequestCount = new AtomicInteger();
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1);
private final MayHaveBeenIndexedBefore mayHaveBeenIndexedBefore;
// max_seq_no_of_updates_or_deletes tracks the max seq_no of update or delete operations that have been processed in this engine.
// An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index.
// The value of this marker never goes backwards, and is tracked/updated differently on primary and replica.
Expand Down Expand Up @@ -198,6 +196,7 @@ public InternalEngine(EngineConfig engineConfig) {
super(engineConfig);
this.maxDocs = maxDocs;
this.relativeTimeInNanosSupplier = config().getRelativeTimeInNanosSupplier();
this.mayHaveBeenIndexedBefore = engineConfig.buildMayHaveBeenIndexedBefore(this::assertPrimaryCanOptimizeAddDocument);
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
store.incRef();
IndexWriter writer = null;
Expand Down Expand Up @@ -230,7 +229,7 @@ public InternalEngine(EngineConfig engineConfig) {
);
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
writer = createWriter();
bootstrapAppendOnlyInfoFromWriter(writer);
mayHaveBeenIndexedBefore.bootstrap(writer.getLiveCommitData());
final Map<String, String> commitData = commitDataAsMap(writer);
historyUUID = loadHistoryUUID(commitData);
forceMergeUUID = commitData.get(FORCE_MERGE_UUID_KEY);
Expand Down Expand Up @@ -453,16 +452,6 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException {
}
}

private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
if (MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID.equals(entry.getKey())) {
assert maxUnsafeAutoIdTimestamp.get() == -1
: "max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]";
updateAutoIdTimestamp(Long.parseLong(entry.getValue()), true);
}
}
}

@Override
public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
try (ReleasableLock lock = readLock.acquire()) {
Expand Down Expand Up @@ -840,33 +829,9 @@ private VersionValue getVersionFromMap(BytesRef id) {
return versionMap.getUnderLock(id);
}

private boolean canOptimizeAddDocument(Index index) {
if (index.getAutoGeneratedIdTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) {
assert index.getAutoGeneratedIdTimestamp() >= 0
: "autoGeneratedIdTimestamp must be positive but was: " + index.getAutoGeneratedIdTimestamp();
return switch (index.origin()) {
case PRIMARY -> {
assert assertPrimaryCanOptimizeAddDocument(index);
yield true;
}
case PEER_RECOVERY, REPLICA -> {
assert index.version() == 1 && index.versionType() == null
: "version: " + index.version() + " type: " + index.versionType();
yield true;
}
case LOCAL_TRANSLOG_RECOVERY, LOCAL_RESET -> {
assert index.isRetry();
yield true; // allow to optimize in order to update the max safe time stamp
}
};
}
return false;
}

protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) {
protected void assertPrimaryCanOptimizeAddDocument(Index index) {
assert (index.version() == Versions.MATCH_DELETED || index.version() == Versions.MATCH_ANY)
&& index.versionType() == VersionType.INTERNAL : "version: " + index.version() + " type: " + index.versionType();
return true;
}

private boolean assertIncomingSequenceNumber(final Engine.Operation.Origin origin, final long seqNo) {
Expand Down Expand Up @@ -1052,10 +1017,7 @@ public IndexResult index(Index index) throws IOException {

protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException {
assert assertNonPrimaryOrigin(index);
// needs to maintain the auto_id timestamp in case this replica becomes primary
if (canOptimizeAddDocument(index)) {
mayHaveBeenIndexedBefore(index);
}
mayHaveBeenIndexedBefore.handleNonPrimary(index);
final IndexingStrategy plan;
// unlike the primary, replicas don't really care to about creation status of documents
// this allows to ignore the case where a document was found in the live version maps in
Expand Down Expand Up @@ -1100,8 +1062,8 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
final int reservingDocs = index.parsedDoc().docs().size();
final IndexingStrategy plan;
// resolve an external operation into an internal one which is safe to replay
final boolean canOptimizeAddDocument = canOptimizeAddDocument(index);
if (canOptimizeAddDocument && mayHaveBeenIndexedBefore(index) == false) {
final boolean canOptimizeAddDocument = mayHaveBeenIndexedBefore.canOptimizeAddDocument(index);
if (canOptimizeAddDocument && mayHaveBeenIndexedBefore.mayHaveBeenIndexedBefore(index) == false) {
final Exception reserveError = tryAcquireInFlightDocs(index, reservingDocs);
if (reserveError != null) {
plan = IndexingStrategy.failAsTooManyDocs(reserveError, index.id());
Expand Down Expand Up @@ -1183,7 +1145,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws I
updateDocs(index.uid(), index.docs(), indexWriter);
} else {
// document does not exists, we can optimize for create, but double check if assertions are running
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
assert assertDocDoesNotExist(index, mayHaveBeenIndexedBefore.canOptimizeAddDocument(index) == false);
addDocs(index.docs(), indexWriter);
}
return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted, index.id());
Expand Down Expand Up @@ -1223,27 +1185,6 @@ private static boolean treatDocumentFailureAsTragicError(Index index) {
|| index.origin() == Operation.Origin.LOCAL_RESET;
}

/**
* returns true if the indexing operation may have already be processed by this engine.
* Note that it is OK to rarely return true even if this is not the case. However a `false`
* return value must always be correct.
*
*/
private boolean mayHaveBeenIndexedBefore(Index index) {
assert canOptimizeAddDocument(index);
final boolean mayHaveBeenIndexBefore;
if (index.isRetry()) {
mayHaveBeenIndexBefore = true;
updateAutoIdTimestamp(index.getAutoGeneratedIdTimestamp(), true);
assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
} else {
// in this case we force
mayHaveBeenIndexBefore = maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
updateAutoIdTimestamp(index.getAutoGeneratedIdTimestamp(), false);
}
return mayHaveBeenIndexBefore;
}

private void addDocs(final List<LuceneDocument> docs, final IndexWriter indexWriter) throws IOException {
if (docs.size() > 1) {
indexWriter.addDocuments(docs);
Expand Down Expand Up @@ -2279,7 +2220,7 @@ protected SegmentInfos getLastCommittedSegmentInfos() {
protected final void writerSegmentStats(SegmentsStats stats) {
stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed());
stats.addIndexWriterMemoryInBytes(indexWriter.ramBytesUsed());
stats.updateMaxUnsafeAutoIdTimestamp(maxUnsafeAutoIdTimestamp.get());
mayHaveBeenIndexedBefore.writerSegmentStats(stats);
}

@Override
Expand Down Expand Up @@ -2600,7 +2541,7 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID());
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
mayHaveBeenIndexedBefore.writeCommitData(commitData);
commitData.put(HISTORY_UUID_KEY, historyUUID);
final String currentForceMergeUUID = forceMergeUUID;
if (currentForceMergeUUID != null) {
Expand Down Expand Up @@ -2921,21 +2862,12 @@ void updateRefreshedCheckpoint(long checkpoint) {

@Override
public final long getMaxSeenAutoIdTimestamp() {
return maxSeenAutoIdTimestamp.get();
return mayHaveBeenIndexedBefore.getMaxSeenAutoIdTimestamp();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used for safety after recovery. I wonder if you would not need something similar with a tsdb specific optimized append?

}

@Override
public final void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {
updateAutoIdTimestamp(newTimestamp, true);
}

private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) {
assert newTimestamp >= -1 : "invalid timestamp [" + newTimestamp + "]";
maxSeenAutoIdTimestamp.accumulateAndGet(newTimestamp, Math::max);
if (unsafe) {
maxUnsafeAutoIdTimestamp.accumulateAndGet(newTimestamp, Math::max);
}
assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get();
mayHaveBeenIndexedBefore.updateAutoIdTimestamp(newTimestamp);
}

@Override
Expand Down
Loading