Skip to content

Commit

Permalink
Adjust translog operation assertions for synthetic source (#119330) (#…
Browse files Browse the repository at this point in the history
…119559)

When synthetic sources are used in peer recoveries, the translog
operations via peer recoveries may differ from those created through
replication. This change relaxes the translog operation assertion to
account for synthetic source, allowing these operations to be considered
equivalent.

Closes #119191
  • Loading branch information
dnhatn authored Jan 4, 2025
1 parent ba4b1b2 commit 4f7ea81
Show file tree
Hide file tree
Showing 14 changed files with 428 additions and 114 deletions.
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,6 @@ tests:
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
method: test {p0=synonyms/90_synonyms_reloading_for_synset/Reload analyzers for specific synonym set}
issue: https://github.com/elastic/elasticsearch/issues/116777
- class: org.elasticsearch.smoketest.SmokeTestMultiNodeClientYamlTestSuiteIT
issue: https://github.com/elastic/elasticsearch/issues/119191
method: test {yaml=indices.create/20_synthetic_source/create index with use_synthetic_source}
- class: org.elasticsearch.xpack.ml.integration.InferenceIngestInputConfigIT
method: testIngestWithInputFields
issue: https://github.com/elastic/elasticsearch/issues/118092
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,8 @@ private Translog openTranslog(
translogDeletionPolicy,
globalCheckpointSupplier,
engineConfig.getPrimaryTermSupplier(),
persistedSequenceNumberConsumer
persistedSequenceNumberConsumer,
TranslogOperationAsserter.withEngineConfig(engineConfig)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ public void trimUnreferencedTranslogFiles() {
translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(),
engineConfig.getPrimaryTermSupplier(),
seqNo -> {}
seqNo -> {},
TranslogOperationAsserter.DEFAULT
)
) {
translog.trimUnreferencedReaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
translogDeletionPolicy,
config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(),
seqNo -> {}
seqNo -> {},
TranslogOperationAsserter.DEFAULT
)
) {
return translog.stats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,6 @@ private static UnsupportedOperationException unsupported() {
return new UnsupportedOperationException();
}

public TranslogLeafReader getLeafReader() {
return leafReader;
}

@Override
protected DirectoryReader doOpenIfChanged() {
throw unsupported();
Expand Down Expand Up @@ -142,6 +138,45 @@ public CacheHelper getReaderCacheHelper() {
return leafReader.getReaderCacheHelper();
}

static DirectoryReader createInMemoryReader(
ShardId shardId,
EngineConfig engineConfig,
Directory directory,
DocumentParser documentParser,
MappingLookup mappingLookup,
Translog.Index operation
) {
final ParsedDocument parsedDocs = documentParser.parseDocument(
new SourceToParse(operation.id(), operation.source(), XContentHelper.xContentType(operation.source()), operation.routing()),
mappingLookup
);

parsedDocs.updateSeqID(operation.seqNo(), operation.primaryTerm());
parsedDocs.version().setLongValue(operation.version());
// To guarantee indexability, we configure the analyzer and codec using the main engine configuration
final IndexWriterConfig writeConfig = new IndexWriterConfig(engineConfig.getAnalyzer()).setOpenMode(
IndexWriterConfig.OpenMode.CREATE
).setCodec(engineConfig.getCodec());
try (IndexWriter writer = new IndexWriter(directory, writeConfig)) {
writer.addDocument(parsedDocs.rootDoc());
final DirectoryReader reader = open(writer);
if (reader.leaves().size() != 1 || reader.leaves().get(0).reader().numDocs() != 1) {
reader.close();
throw new IllegalStateException(
"Expected a single document segment; "
+ "but ["
+ reader.leaves().size()
+ " segments with "
+ reader.leaves().get(0).reader().numDocs()
+ " documents"
);
}
return reader;
} catch (IOException e) {
throw new EngineException(shardId, "failed to create an in-memory segment for get [" + operation.id() + "]", e);
}
}

private static class TranslogLeafReader extends LeafReader {

private static final FieldInfo FAKE_SOURCE_FIELD = new FieldInfo(
Expand Down Expand Up @@ -240,7 +275,8 @@ private LeafReader getDelegate() {
ensureOpen();
reader = delegate.get();
if (reader == null) {
reader = createInMemoryLeafReader();
var indexReader = createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, operation);
reader = indexReader.leaves().get(0).reader();
final LeafReader existing = delegate.getAndSet(reader);
assert existing == null;
onSegmentCreated.run();
Expand All @@ -250,39 +286,6 @@ private LeafReader getDelegate() {
return reader;
}

private LeafReader createInMemoryLeafReader() {
assert Thread.holdsLock(this);
final ParsedDocument parsedDocs = documentParser.parseDocument(
new SourceToParse(operation.id(), operation.source(), XContentHelper.xContentType(operation.source()), operation.routing()),
mappingLookup
);

parsedDocs.updateSeqID(operation.seqNo(), operation.primaryTerm());
parsedDocs.version().setLongValue(operation.version());
// To guarantee indexability, we configure the analyzer and codec using the main engine configuration
final IndexWriterConfig writeConfig = new IndexWriterConfig(engineConfig.getAnalyzer()).setOpenMode(
IndexWriterConfig.OpenMode.CREATE
).setCodec(engineConfig.getCodec());
try (IndexWriter writer = new IndexWriter(directory, writeConfig)) {
writer.addDocument(parsedDocs.rootDoc());
final DirectoryReader reader = open(writer);
if (reader.leaves().size() != 1 || reader.leaves().get(0).reader().numDocs() != 1) {
reader.close();
throw new IllegalStateException(
"Expected a single document segment; "
+ "but ["
+ reader.leaves().size()
+ " segments with "
+ reader.leaves().get(0).reader().numDocs()
+ " documents"
);
}
return reader.leaves().get(0).reader();
} catch (IOException e) {
throw new EngineException(shardId, "failed to create an in-memory segment for get [" + operation.id() + "]", e);
}
}

@Override
public CacheHelper getCoreCacheHelper() {
return getDelegate().getCoreCacheHelper();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.search.similarities.BM25Similarity;
import org.apache.lucene.store.ByteBuffersDirectory;
import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy;
import org.elasticsearch.index.mapper.DocumentParser;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;

import java.io.IOException;

/**
*
* A utility class to assert that translog operations with the same sequence number
* in the same generation are either identical or equivalent when synthetic sources are used.
*/
public abstract class TranslogOperationAsserter {
public static final TranslogOperationAsserter DEFAULT = new TranslogOperationAsserter() {
};

private TranslogOperationAsserter() {

}

public static TranslogOperationAsserter withEngineConfig(EngineConfig engineConfig) {
return new TranslogOperationAsserter() {
@Override
public boolean assertSameIndexOperation(Translog.Index o1, Translog.Index o2) throws IOException {
if (super.assertSameIndexOperation(o1, o2)) {
return true;
}
if (engineConfig.getIndexSettings().isRecoverySourceSyntheticEnabled()) {
return super.assertSameIndexOperation(synthesizeSource(engineConfig, o1), o2)
|| super.assertSameIndexOperation(o1, synthesizeSource(engineConfig, o2));
}
return false;
}
};
}

static Translog.Index synthesizeSource(EngineConfig engineConfig, Translog.Index op) throws IOException {
final ShardId shardId = engineConfig.getShardId();
final MappingLookup mappingLookup = engineConfig.getMapperService().mappingLookup();
final DocumentParser documentParser = engineConfig.getMapperService().documentParser();
try (
var directory = new ByteBuffersDirectory();
var reader = TranslogDirectoryReader.createInMemoryReader(shardId, engineConfig, directory, documentParser, mappingLookup, op)
) {
final Engine.Searcher searcher = new Engine.Searcher(
"assert_translog",
reader,
new BM25Similarity(),
null,
TrivialQueryCachingPolicy.NEVER,
() -> {}
);
try (
LuceneSyntheticSourceChangesSnapshot snapshot = new LuceneSyntheticSourceChangesSnapshot(
mappingLookup,
searcher,
LuceneSyntheticSourceChangesSnapshot.DEFAULT_BATCH_SIZE,
Integer.MAX_VALUE,
op.seqNo(),
op.seqNo(),
true,
false,
engineConfig.getIndexSettings().getIndexVersionCreated()
)
) {
final Translog.Operation normalized = snapshot.next();
assert normalized != null : "expected one operation; got zero";
return (Translog.Index) normalized;
}
}
}

public boolean assertSameIndexOperation(Translog.Index o1, Translog.Index o2) throws IOException {
return Translog.Index.equalsWithoutAutoGeneratedTimestamp(o1, o2);
}
}
30 changes: 18 additions & 12 deletions server/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.core.Releasable;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.TranslogOperationAsserter;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Uid;
Expand Down Expand Up @@ -123,6 +124,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
private final TranslogDeletionPolicy deletionPolicy;
private final LongConsumer persistedSequenceNumberConsumer;
private final OperationListener operationListener;
private final TranslogOperationAsserter operationAsserter;

/**
* Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is
Expand Down Expand Up @@ -150,14 +152,16 @@ public Translog(
TranslogDeletionPolicy deletionPolicy,
final LongSupplier globalCheckpointSupplier,
final LongSupplier primaryTermSupplier,
final LongConsumer persistedSequenceNumberConsumer
final LongConsumer persistedSequenceNumberConsumer,
final TranslogOperationAsserter operationAsserter
) throws IOException {
super(config.getShardId(), config.getIndexSettings());
this.config = config;
this.globalCheckpointSupplier = globalCheckpointSupplier;
this.primaryTermSupplier = primaryTermSupplier;
this.persistedSequenceNumberConsumer = persistedSequenceNumberConsumer;
this.operationListener = config.getOperationListener();
this.operationAsserter = operationAsserter;
this.deletionPolicy = deletionPolicy;
this.translogUUID = translogUUID;
this.bigArrays = config.getBigArrays();
Expand Down Expand Up @@ -582,6 +586,7 @@ TranslogWriter createWriter(
bigArrays,
diskIoBufferPool,
operationListener,
operationAsserter,
config.fsync()
);
} catch (final IOException e) {
Expand Down Expand Up @@ -1265,17 +1270,8 @@ public boolean equals(Object o) {
return false;
}

Index index = (Index) o;

if (version != index.version
|| seqNo != index.seqNo
|| primaryTerm != index.primaryTerm
|| id.equals(index.id) == false
|| autoGeneratedIdTimestamp != index.autoGeneratedIdTimestamp
|| source.equals(index.source) == false) {
return false;
}
return Objects.equals(routing, index.routing);
Index other = (Index) o;
return autoGeneratedIdTimestamp == other.autoGeneratedIdTimestamp && equalsWithoutAutoGeneratedTimestamp(this, other);
}

@Override
Expand Down Expand Up @@ -1311,6 +1307,15 @@ public long getAutoGeneratedIdTimestamp() {
return autoGeneratedIdTimestamp;
}

public static boolean equalsWithoutAutoGeneratedTimestamp(Translog.Index o1, Translog.Index o2) {
return o1.version == o2.version
&& o1.seqNo == o2.seqNo
&& o1.primaryTerm == o2.primaryTerm
&& o1.id.equals(o2.id)
&& o1.source.equals(o2.source)
&& Objects.equals(o1.routing, o2.routing);
}

}

public static final class Delete extends Operation {
Expand Down Expand Up @@ -1958,6 +1963,7 @@ public static String createEmptyTranslog(
BigArrays.NON_RECYCLING_INSTANCE,
DiskIoBufferPool.INSTANCE,
TranslogConfig.NOOP_OPERATION_LISTENER,
TranslogOperationAsserter.DEFAULT,
true
);
writer.close();
Expand Down
Loading

0 comments on commit 4f7ea81

Please sign in to comment.