From d0cbfaa2239c3c86d5823518b54772319a5fbb75 Mon Sep 17 00:00:00 2001 From: satyajitg28 <95424630+satyajitg28@users.noreply.github.com> Date: Wed, 3 Aug 2022 19:53:48 +0530 Subject: [PATCH] [Backport 2.x] Changes to encapsulate Translog into TranslogManager (#4095) * Changes to encapsulate Translog into TranslogManager. Signed-off-by: Satyajit Ganguly Co-authored-by: Bukhtawar Khan (cherry-picked from commit: 98dfdf5eab081194487bccacbfb45cecf319e974 of 2.x branch) --- .../index/engine/InternalEngine.java | 86 ++++------- .../index/engine/NRTReplicationEngine.java | 95 ++----------- .../translog/InternalTranslogManager.java | 133 ++++++++++++++++-- .../index/translog/NoOpTranslogManager.java | 19 ++- .../index/translog/TranslogManager.java | 16 ++- .../index/engine/InternalEngineTests.java | 126 +++++++++++------ .../engine/NRTReplicationEngineTests.java | 30 ++-- .../index/engine/NoOpEngineTests.java | 2 +- .../InternalTranslogManagerTests.java | 34 ++--- .../index/engine/EngineTestCase.java | 36 +++-- 10 files changed, 336 insertions(+), 241 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 256470f6841be..fcef1afab56c3 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -105,7 +105,6 @@ import org.opensearch.index.shard.OpenSearchMergePolicy; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; -import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogCorruptedException; import org.opensearch.index.translog.TranslogDeletionPolicy; import org.opensearch.index.translog.TranslogStats; @@ -134,8 +133,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.BiFunction; -import java.util.function.LongConsumer; -import java.util.function.LongSupplier; import java.util.function.UnaryOperator; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -300,7 +297,7 @@ public void onFailure(String reason, Exception ex) { logger, translogDeletionPolicy, softDeletesPolicy, - translogManager.getTranslog()::getLastSyncedGlobalCheckpoint + translogManager::getLastSyncedGlobalCheckpoint ); this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); writer = createWriter(); @@ -334,7 +331,7 @@ public void onFailure(String reason, Exception ex) { this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getProcessedCheckpoint()); this.internalReaderManager.addListener(lastRefreshedCheckpointListener); maxSeqNoOfUpdatesOrDeletes = new AtomicLong( - SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translogManager.getTranslog().getMaxSeqNo()) + SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translogManager.getMaxSeqNo()) ); if (localCheckpointTracker.getPersistedCheckpoint() < localCheckpointTracker.getMaxSeqNo()) { try (Searcher searcher = acquireSearcher("restore_version_map_and_checkpoint_tracker", SearcherScope.INTERNAL)) { @@ -352,11 +349,7 @@ public void onFailure(String reason, Exception ex) { success = true; } finally { if (success == false) { - Translog translog = null; - if (translogManagerRef != null) { - translog = translogManagerRef.getTranslog(); - } - IOUtils.closeWhileHandlingException(writer, translog, internalReaderManager, externalReaderManager, scheduler); + IOUtils.closeWhileHandlingException(writer, translogManagerRef, internalReaderManager, externalReaderManager, scheduler); if (isClosed.get() == false) { // failure we need to dec the store reference store.decRef(); @@ -389,7 +382,7 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException { lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)) + 1; } return new SoftDeletesPolicy( - translogManager.getTranslog()::getLastSyncedGlobalCheckpoint, + translogManager::getLastSyncedGlobalCheckpoint, lastMinRetainedSeqNo, engineConfig.getIndexSettings().getSoftDeleteRetentionOperations(), engineConfig.retentionLeasesSupplier() @@ -543,15 +536,15 @@ private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) { public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - if (translogManager().getPendingTranslogRecovery().get() == false) { + if (translogManager.getPendingTranslogRecovery().get() == false) { throw new IllegalStateException("Engine has already been recovered"); } try { recoverFromTranslogInternal(translogRecoveryRunner, recoverUpToSeqNo); } catch (Exception e) { try { - translogManager().getPendingTranslogRecovery().set(true); // just play safe and never allow commits on this see - // #ensureCanFlush + translogManager.getPendingTranslogRecovery().set(true); // just play safe and never allow commits on this see + // #ensureCanFlush failEngine("failed to recover from translog", e); } catch (Exception inner) { e.addSuppressed(inner); @@ -571,7 +564,7 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery final int opsRecovered; final long localCheckpoint = getProcessedLocalCheckpoint(); if (localCheckpoint < recoverUpToSeqNo) { - try (Translog.Snapshot snapshot = translogManager().getTranslog().newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) { + try (Translog.Snapshot snapshot = translogManager.getTranslog().newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) { opsRecovered = translogRecoveryRunner.run(this, snapshot); } catch (Exception e) { throw new EngineException(shardId, "failed to recover from translog", e); @@ -581,38 +574,17 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery } // flush if we recovered something or if we have references to older translogs // note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length. - assert translogManager().getPendingTranslogRecovery().get() : "translogRecovery is not pending but should be"; - translogManager().getPendingTranslogRecovery().set(false); // we are good - now we can commit + assert translogManager.getPendingTranslogRecovery().get() : "translogRecovery is not pending but should be"; + translogManager.getPendingTranslogRecovery().set(false); // we are good - now we can commit logger.trace( () -> new ParameterizedMessage( "flushing post recovery from translog: ops recovered [{}], current translog generation [{}]", opsRecovered, - translogManager().getTranslog().currentFileGeneration() + translogManager.getTranslog().currentFileGeneration() ) ); flush(false, true); - translogManager().getTranslog().trimUnreferencedReaders(); - } - - private Translog openTranslog( - EngineConfig engineConfig, - TranslogDeletionPolicy translogDeletionPolicy, - LongSupplier globalCheckpointSupplier, - LongConsumer persistedSequenceNumberConsumer - ) throws IOException { - - final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); - final Map userData = store.readLastCommittedSegmentsInfo().getUserData(); - final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY)); - // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong! - return new Translog( - translogConfig, - translogUUID, - translogDeletionPolicy, - globalCheckpointSupplier, - engineConfig.getPrimaryTermSupplier(), - persistedSequenceNumberConsumer - ); + translogManager.trimUnreferencedReaders(); } // Package private for testing purposes only @@ -622,32 +594,27 @@ boolean hasSnapshottedCommits() { @Override public boolean isTranslogSyncNeeded() { - return translogManager().getTranslog().syncNeeded(); + return translogManager.getTranslog().syncNeeded(); } @Override public boolean ensureTranslogSynced(Stream locations) throws IOException { - final boolean synced = translogManager().getTranslog().ensureSynced(locations); - if (synced) { - revisitIndexDeletionPolicyOnTranslogSynced(); - } - return synced; + return translogManager.ensureTranslogSynced(locations); } @Override public void syncTranslog() throws IOException { - translogManager().getTranslog().sync(); - revisitIndexDeletionPolicyOnTranslogSynced(); + translogManager.syncTranslog(); } @Override public TranslogStats getTranslogStats() { - return translogManager().getTranslog().stats(); + return translogManager.getTranslog().stats(); } @Override public Translog.Location getTranslogLastWriteLocation() { - return translogManager().getTranslog().getLastWriteLocation(); + return translogManager.getTranslogLastWriteLocation(); } private void revisitIndexDeletionPolicyOnTranslogSynced() { @@ -655,7 +622,7 @@ private void revisitIndexDeletionPolicyOnTranslogSynced() { if (combinedDeletionPolicy.hasUnreferencedCommits()) { indexWriter.deleteUnusedFiles(); } - translogManager.getTranslog().trimUnreferencedReaders(); + translogManager.trimUnreferencedReaders(); } catch (IOException ex) { throw new TranslogException(shardId, "Failed to execute index deletion policy on translog synced", ex); } @@ -747,7 +714,7 @@ public GetResult get(Get get, BiFunction // the update call doesn't need the consistency since it's source only + _parent but parent can go away in 7.0 if (versionValue.getLocation() != null) { try { - Translog.Operation operation = translogManager.getTranslog().readOperation(versionValue.getLocation()); + Translog.Operation operation = translogManager.readOperation(versionValue.getLocation()); if (operation != null) { // in the case of a already pruned translog generation we might get null here - yet very unlikely final Translog.Index index = (Translog.Index) operation; @@ -1052,7 +1019,7 @@ public IndexResult index(Index index) throws IOException { if (index.origin().isFromTranslog() == false) { final Translog.Location location; if (indexResult.getResultType() == Result.Type.SUCCESS) { - location = translogManager.getTranslog().add(new Translog.Index(index, indexResult)); + location = translogManager.add(new Translog.Index(index, indexResult)); } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no final NoOp noOp = new NoOp( @@ -1492,7 +1459,7 @@ public DeleteResult delete(Delete delete) throws IOException { } } if (delete.origin().isFromTranslog() == false && deleteResult.getResultType() == Result.Type.SUCCESS) { - final Translog.Location location = translogManager.getTranslog().add(new Translog.Delete(delete, deleteResult)); + final Translog.Location location = translogManager.add(new Translog.Delete(delete, deleteResult)); deleteResult.setTranslogLocation(location); } localCheckpointTracker.markSeqNoAsProcessed(deleteResult.getSeqNo()); @@ -1819,8 +1786,9 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { } noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo()); if (noOp.origin().isFromTranslog() == false && noOpResult.getResultType() == Result.Type.SUCCESS) { - final Translog.Location location = translogManager.getTranslog() - .add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); + final Translog.Location location = translogManager.add( + new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()) + ); noOpResult.setTranslogLocation(location); } } @@ -1999,7 +1967,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { // we need to refresh in order to clear older version values refresh("version_table_flush", SearcherScope.INTERNAL, true); - translogManager.getTranslog().trimUnreferencedReaders(); + translogManager.trimUnreferencedReaders(); } catch (AlreadyClosedException e) { failOnTragicEvent(e); throw e; @@ -2723,7 +2691,7 @@ public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue tran // the setting will be re-interpreted if it's set to true updateAutoIdTimestamp(Long.MAX_VALUE, true); } - final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getTranslog().getDeletionPolicy(); + final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getDeletionPolicy(); translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis()); translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes()); softDeletesPolicy.setRetentionOperations(softDeletesRetentionOps); @@ -2739,7 +2707,7 @@ LocalCheckpointTracker getLocalCheckpointTracker() { @Override public long getLastSyncedGlobalCheckpoint() { - return translogManager.getTranslog().getLastSyncedGlobalCheckpoint(); + return translogManager.getLastSyncedGlobalCheckpoint(); } public long getProcessedLocalCheckpoint() { diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 3b9da1bff5c79..414426921201f 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -18,13 +18,11 @@ import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.core.internal.io.IOUtils; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.translog.Translog; -import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogDeletionPolicy; import org.opensearch.index.translog.TranslogException; import org.opensearch.index.translog.TranslogManager; @@ -41,8 +39,6 @@ import java.util.Objects; import java.util.concurrent.CountDownLatch; import java.util.function.BiFunction; -import java.util.function.LongConsumer; -import java.util.function.LongSupplier; import java.util.stream.Stream; /** @@ -58,13 +54,13 @@ public class NRTReplicationEngine extends Engine implements LifecycleAware { private final NRTReplicationReaderManager readerManager; private final CompletionStatsCache completionStatsCache; private final LocalCheckpointTracker localCheckpointTracker; - private final TranslogManager translogManager; + private final WriteOnlyTranslogManager translogManager; public NRTReplicationEngine(EngineConfig engineConfig) { super(engineConfig); store.incRef(); NRTReplicationReaderManager readerManager = null; - TranslogManager translogManagerRef = null; + WriteOnlyTranslogManager translogManagerRef = null; try { lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId)); @@ -95,7 +91,7 @@ public void onFailure(String reason, Exception ex) { @Override public void onAfterTranslogSync() { try { - translogManager.getTranslog().trimUnreferencedReaders(); + translogManager.trimUnreferencedReaders(); } catch (IOException ex) { throw new TranslogException(shardId, "failed to trim unreferenced translog readers", ex); } @@ -105,11 +101,7 @@ public void onAfterTranslogSync() { ); this.translogManager = translogManagerRef; } catch (IOException e) { - Translog translog = null; - if (translogManagerRef != null) { - translog = translogManagerRef.getTranslog(); - } - IOUtils.closeWhileHandlingException(store::decRef, readerManager, translog); + IOUtils.closeWhileHandlingException(store::decRef, readerManager, translogManagerRef); throw new EngineCreationFailureException(shardId, "failed to create engine", e); } } @@ -158,24 +150,14 @@ public boolean isThrottled() { @Override public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException { - try (ReleasableLock lock = readLock.acquire()) { - ensureOpen(); - translogManager.getTranslog().trimOperations(belowTerm, aboveSeqNo); - } catch (Exception e) { - try { - failEngine("translog operations trimming failed", e); - } catch (Exception inner) { - e.addSuppressed(inner); - } - throw new EngineException(shardId, "failed to trim translog operations", e); - } + translogManager.trimOperationsFromTranslog(belowTerm, aboveSeqNo); } @Override public IndexResult index(Index index) throws IOException { ensureOpen(); IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false); - final Translog.Location location = translogManager.getTranslog().add(new Translog.Index(index, indexResult)); + final Translog.Location location = translogManager.add(new Translog.Index(index, indexResult)); indexResult.setTranslogLocation(location); indexResult.setTook(System.nanoTime() - index.startTime()); indexResult.freeze(); @@ -187,7 +169,7 @@ public IndexResult index(Index index) throws IOException { public DeleteResult delete(Delete delete) throws IOException { ensureOpen(); DeleteResult deleteResult = new DeleteResult(delete.version(), delete.primaryTerm(), delete.seqNo(), true); - final Translog.Location location = translogManager.getTranslog().add(new Translog.Delete(delete, deleteResult)); + final Translog.Location location = translogManager.add(new Translog.Delete(delete, deleteResult)); deleteResult.setTranslogLocation(location); deleteResult.setTook(System.nanoTime() - delete.startTime()); deleteResult.freeze(); @@ -199,8 +181,7 @@ public DeleteResult delete(Delete delete) throws IOException { public NoOpResult noOp(NoOp noOp) throws IOException { ensureOpen(); NoOpResult noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo()); - final Translog.Location location = translogManager.getTranslog() - .add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); + final Translog.Location location = translogManager.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); noOpResult.setTranslogLocation(location); noOpResult.setTook(System.nanoTime() - noOp.startTime()); noOpResult.freeze(); @@ -225,17 +206,12 @@ public boolean isTranslogSyncNeeded() { @Override public boolean ensureTranslogSynced(Stream locations) throws IOException { - boolean synced = translogManager.getTranslog().ensureSynced(locations); - if (synced) { - translogManager.getTranslog().trimUnreferencedReaders(); - } - return synced; + return translogManager.ensureTranslogSynced(locations); } @Override public void syncTranslog() throws IOException { - translogManager.getTranslog().sync(); - translogManager.getTranslog().trimUnreferencedReaders(); + translogManager.syncTranslog(); } @Override @@ -302,7 +278,7 @@ public SeqNoStats getSeqNoStats(long globalCheckpoint) { @Override public long getLastSyncedGlobalCheckpoint() { - return translogManager.getTranslog().getLastSyncedGlobalCheckpoint(); + return translogManager.getLastSyncedGlobalCheckpoint(); } @Override @@ -336,17 +312,7 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { @Override public void trimUnreferencedTranslogFiles() throws EngineException { - try (ReleasableLock lock = readLock.acquire()) { - ensureOpen(); - translogManager.getTranslog().trimUnreferencedReaders(); - } catch (Exception e) { - try { - failEngine("translog trimming failed", e); - } catch (Exception inner) { - e.addSuppressed(inner); - } - throw new EngineException(shardId, "failed to trim translog", e); - } + translogManager.trimUnreferencedTranslogFiles(); } @Override @@ -356,18 +322,7 @@ public boolean shouldRollTranslogGeneration() { @Override public void rollTranslogGeneration() throws EngineException { - try (ReleasableLock ignored = readLock.acquire()) { - ensureOpen(); - translogManager.getTranslog().rollGeneration(); - translogManager.getTranslog().trimUnreferencedReaders(); - } catch (Exception e) { - try { - failEngine("translog trimming failed", e); - } catch (Exception inner) { - e.addSuppressed(inner); - } - throw new EngineException(shardId, "failed to roll translog", e); - } + translogManager.rollTranslogGeneration(); } @Override @@ -406,7 +361,7 @@ protected final void closeNoLock(String reason, CountDownLatch closedLatch) { assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself"; try { - IOUtils.close(readerManager, translogManager().getTranslog(), store::decRef); + IOUtils.close(readerManager, translogManager.getTranslog(), store::decRef); } catch (Exception e) { logger.warn("failed to close engine", e); } finally { @@ -462,7 +417,7 @@ public Translog getTranslog() { @Override public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) { - final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getTranslog().getDeletionPolicy(); + final TranslogDeletionPolicy translogDeletionPolicy = translogManager.getDeletionPolicy(); translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis()); translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes()); } @@ -485,24 +440,4 @@ private DirectoryReader getDirectoryReader() throws IOException { // for segment replication: replicas should create the reader from store, we don't want an open IW on replicas. return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD); } - - private Translog openTranslog( - EngineConfig engineConfig, - TranslogDeletionPolicy translogDeletionPolicy, - LongSupplier globalCheckpointSupplier, - LongConsumer persistedSequenceNumberConsumer - ) throws IOException { - final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); - final Map userData = lastCommittedSegmentInfos.getUserData(); - final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY)); - // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong! - return new Translog( - translogConfig, - translogUUID, - translogDeletionPolicy, - globalCheckpointSupplier, - engineConfig.getPrimaryTermSupplier(), - persistedSequenceNumberConsumer - ); - } } diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 6f5d57ad388bf..cdd4d08579312 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -13,11 +13,13 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.common.util.concurrent.ReleasableLock; +import org.opensearch.core.internal.io.IOUtils; import org.opensearch.index.engine.LifecycleAware; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.listener.TranslogEventListener; +import java.io.Closeable; import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongConsumer; @@ -31,7 +33,7 @@ * * @opensearch.internal */ -public class InternalTranslogManager implements TranslogManager { +public class InternalTranslogManager implements TranslogManager, Closeable { private final ReleasableLock readLock; private final LifecycleAware engineLifeCycleAware; @@ -39,6 +41,7 @@ public class InternalTranslogManager implements TranslogManager { private final Translog translog; private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); private final TranslogEventListener translogEventListener; + private final Supplier localCheckpointTrackerSupplier; private static final Logger logger = LogManager.getLogger(InternalTranslogManager.class); public AtomicBoolean getPendingTranslogRecovery() { @@ -61,6 +64,7 @@ public InternalTranslogManager( this.readLock = readLock; this.engineLifeCycleAware = engineLifeCycleAware; this.translogEventListener = translogEventListener; + this.localCheckpointTrackerSupplier = localCheckpointTrackerSupplier; Translog translog = openTranslog(translogConfig, primaryTermSupplier, translogDeletionPolicy, globalCheckpointSupplier, seqNo -> { final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.get(); assert tracker != null || getTranslog(true).isOpen() == false; @@ -283,6 +287,28 @@ public void ensureCanFlush() { } } + /** + * Reads operations from the translog + * @param location + * @return the translog operation + * @throws IOException + */ + @Override + public Translog.Operation readOperation(Translog.Location location) throws IOException { + return translog.readOperation(location); + } + + /** + * Adds an operation to the translog + * @param operation + * @return the location in the translog + * @throws IOException + */ + @Override + public Translog.Location add(Translog.Operation operation) throws IOException { + return translog.add(operation); + } + /** * Do not replay translog operations, but make the engine be ready. */ @@ -292,7 +318,20 @@ public void skipTranslogRecovery() { pendingTranslogRecovery.set(false); // we are good - now we can commit } - private Translog openTranslog( + // visible for testing + // TODO refactor tests to remove public access to translog + public Translog getTranslog() { + return translog; + } + + private Translog getTranslog(boolean ensureOpen) { + if (ensureOpen) { + this.engineLifeCycleAware.ensureOpen(); + } + return translog; + } + + protected Translog openTranslog( TranslogConfig translogConfig, LongSupplier primaryTermSupplier, TranslogDeletionPolicy translogDeletionPolicy, @@ -312,18 +351,90 @@ private Translog openTranslog( } /** - * Returns the the translog instance - * @return the {@link Translog} instance + * Retrieves last synced global checkpoint + * @return last synced global checkpoint */ - @Override - public Translog getTranslog() { - return translog; + public long getLastSyncedGlobalCheckpoint() { + return translog.getLastSyncedGlobalCheckpoint(); } - private Translog getTranslog(boolean ensureOpen) { - if (ensureOpen) { - this.engineLifeCycleAware.ensureOpen(); + /** + * Retrieves the max seq no + * @return max seq no + */ + public long getMaxSeqNo() { + return translog.getMaxSeqNo(); + } + + /** + * Trims unreferenced translog generations by asking {@link TranslogDeletionPolicy} for the minimum + * required generation + */ + public void trimUnreferencedReaders() throws IOException { + translog.trimUnreferencedReaders(); + } + + /** + * Retrieves the translog deletion policy + * @return TranslogDeletionPolicy + */ + public TranslogDeletionPolicy getDeletionPolicy() { + return translog.getDeletionPolicy(); + } + + /** + * Retrieves the underlying translog tragic exception + * @return the tragic exception + */ + public Exception getTragicExceptionIfClosed() { + return translog.isOpen() == false ? translog.getTragicException() : null; + } + + /** + * Retrieves the translog unique identifier + * @return the uuid of the translog + */ + public String getTranslogUUID() { + return translog.getTranslogUUID(); + } + + /** + * + * @param localCheckpointOfLastCommit + * @param flushThreshold + * @return if the translog should be flushed + */ + public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold) { + final long translogGenerationOfLastCommit = translog.getMinGenerationForSeqNo( + localCheckpointOfLastCommit + 1 + ).translogFileGeneration; + if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) { + return false; } - return translog; + /* + * We flush to reduce the size of uncommitted translog but strictly speaking the uncommitted size won't always be + * below the flush-threshold after a flush. To avoid getting into an endless loop of flushing, we only enable the + * periodically flush condition if this condition is disabled after a flush. The condition will change if the new + * commit points to the later generation the last commit's(eg. gen-of-last-commit < gen-of-new-commit)[1]. + * + * When the local checkpoint equals to max_seqno, and translog-gen of the last commit equals to translog-gen of + * the new commit, we know that the last generation must contain operations because its size is above the flush + * threshold and the flush-threshold is guaranteed to be higher than an empty translog by the setting validation. + * This guarantees that the new commit will point to the newly rolled generation. In fact, this scenario only + * happens when the generation-threshold is close to or above the flush-threshold; otherwise we have rolled + * generations as the generation-threshold was reached, then the first condition (eg. [1]) is already satisfied. + * + * This method is to maintain translog only, thus IndexWriter#hasUncommittedChanges condition is not considered. + */ + final long translogGenerationOfNewCommit = translog.getMinGenerationForSeqNo( + localCheckpointTrackerSupplier.get().getProcessedCheckpoint() + 1 + ).translogFileGeneration; + return translogGenerationOfLastCommit < translogGenerationOfNewCommit + || localCheckpointTrackerSupplier.get().getProcessedCheckpoint() == localCheckpointTrackerSupplier.get().getMaxSeqNo(); + } + + @Override + public void close() throws IOException { + IOUtils.closeWhileHandlingException(translog); } } diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index 88e6ce97b2784..328edad51b5b7 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -47,7 +47,7 @@ public void rollTranslogGeneration() throws TranslogException {} @Override public int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long localCheckpoint, long recoverUpToSeqNo) throws IOException { - try (ReleasableLock lock = readLock.acquire()) { + try (ReleasableLock ignored = readLock.acquire()) { ensureOpen.run(); try (Translog.Snapshot snapshot = emptyTranslogSnapshot) { translogRecoveryRunner.run(snapshot); @@ -64,7 +64,7 @@ public boolean isTranslogSyncNeeded() { } @Override - public boolean ensureTranslogSynced(Stream locations) throws IOException { + public boolean ensureTranslogSynced(Stream locations) { return false; } @@ -92,11 +92,6 @@ public boolean shouldRollTranslogGeneration() { @Override public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException {} - @Override - public Translog getTranslog() { - return null; - } - @Override public void ensureCanFlush() {} @@ -107,4 +102,14 @@ public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRec @Override public void skipTranslogRecovery() {} + + @Override + public Translog.Operation readOperation(Translog.Location location) throws IOException { + return null; + } + + @Override + public Translog.Location add(Translog.Operation operation) throws IOException { + return new Translog.Location(0, 0, 0); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index dc2c2e20015b0..f82434f40b06c 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -95,10 +95,20 @@ public interface TranslogManager { void skipTranslogRecovery(); /** - * Returns the instance of the translog with a precondition - * @return the translog instance + * Reads operations for the translog + * @param location the location in the translog + * @return the translog operation + * @throws IOException */ - Translog getTranslog(); + Translog.Operation readOperation(Translog.Location location) throws IOException; + + /** + * Adds an operation to the translog + * @param operation + * @return the location in the translog + * @throws IOException + */ + Translog.Location add(Translog.Operation operation) throws IOException; /** * Checks if the translog has a pending recovery diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index 345adfe7a8891..eb05dd967344d 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -1299,24 +1299,33 @@ public void testCommitAdvancesMinTranslogForRecovery() throws IOException { engine.flush(); engine.ensureOpen(); - assertThat(engine.translogManager().getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.translogManager().getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); + assertThat(assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().currentFileGeneration(), equalTo(3L)); + assertThat( + assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().getMinFileGeneration(), + equalTo(inSync ? 3L : 2L) + ); engine.flush(); engine.ensureOpen(); - assertThat(engine.translogManager().getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.translogManager().getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); + assertThat(assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().currentFileGeneration(), equalTo(3L)); + assertThat( + assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().getMinFileGeneration(), + equalTo(inSync ? 3L : 2L) + ); engine.flush(true, true); engine.ensureOpen(); - assertThat(engine.translogManager().getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.translogManager().getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); + assertThat(assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().currentFileGeneration(), equalTo(3L)); + assertThat( + assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().getMinFileGeneration(), + equalTo(inSync ? 3L : 2L) + ); globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); engine.flush(true, true); engine.ensureOpen(); - assertThat(engine.translogManager().getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.translogManager().getTranslog().getMinFileGeneration(), equalTo(3L)); + assertThat(assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().currentFileGeneration(), equalTo(3L)); + assertThat(assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().getMinFileGeneration(), equalTo(3L)); } public void testSyncTranslogConcurrently() throws Exception { @@ -2821,10 +2830,13 @@ public void testSeqNoAndCheckpoints() throws IOException, InterruptedException { equalTo(localCheckpoint) ); initialEngine.ensureOpen(); - initialEngine.translogManager().getTranslog().sync(); // to guarantee the global checkpoint is written to the translog - // checkpoint + getTranslog(initialEngine).sync(); // to guarantee the global checkpoint is written to the translog + // checkpoint initialEngine.ensureOpen(); - assertThat(initialEngine.translogManager().getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); + assertThat( + assertAndGetInternalTranslogManager(initialEngine.translogManager()).getLastSyncedGlobalCheckpoint(), + equalTo(globalCheckpoint) + ); assertThat(Long.parseLong(initialEngine.commitStats().getUserData().get(SequenceNumbers.MAX_SEQ_NO)), equalTo(maxSeqNo)); } finally { @@ -2842,7 +2854,10 @@ public void testSeqNoAndCheckpoints() throws IOException, InterruptedException { equalTo(primarySeqNo) ); recoveringEngine.ensureOpen(); - assertThat(recoveringEngine.translogManager().getTranslog().getLastSyncedGlobalCheckpoint(), equalTo(globalCheckpoint)); + assertThat( + assertAndGetInternalTranslogManager(recoveringEngine.translogManager()).getLastSyncedGlobalCheckpoint(), + equalTo(globalCheckpoint) + ); assertThat( Long.parseLong(recoveringEngine.commitStats().getUserData().get(SequenceNumbers.MAX_SEQ_NO)), // after recovering from translog, all docs have been flushed to Lucene segments, so here we will assert @@ -3256,7 +3271,10 @@ public void testCurrentTranslogUUIIDIsCommitted() throws IOException { ); Map userData = engine.getLastCommittedSegmentInfos().getUserData(); engine.ensureOpen(); - assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + assertEquals( + assertAndGetInternalTranslogManager(engine.translogManager()).getTranslogUUID(), + userData.get(Translog.TRANSLOG_UUID_KEY) + ); } } // open and recover tlog @@ -3266,12 +3284,18 @@ public void testCurrentTranslogUUIIDIsCommitted() throws IOException { expectThrows(IllegalStateException.class, engine.translogManager()::ensureCanFlush); Map userData = engine.getLastCommittedSegmentInfos().getUserData(); engine.ensureOpen(); - assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + assertEquals( + assertAndGetInternalTranslogManager(engine.translogManager()).getTranslogUUID(), + userData.get(Translog.TRANSLOG_UUID_KEY) + ); TranslogHandler translogHandler = createTranslogHandler(config.getIndexSettings(), engine); engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); engine.ensureOpen(); - assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + assertEquals( + assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().getTranslogUUID(), + userData.get(Translog.TRANSLOG_UUID_KEY) + ); } } } @@ -3287,11 +3311,14 @@ public void testCurrentTranslogUUIIDIsCommitted() throws IOException { try (InternalEngine engine = new InternalEngine(config)) { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); engine.ensureOpen(); - assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + assertEquals( + assertAndGetInternalTranslogManager(engine.translogManager()).getTranslogUUID(), + userData.get(Translog.TRANSLOG_UUID_KEY) + ); engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); engine.ensureOpen(); - assertEquals(2, engine.translogManager().getTranslog().currentFileGeneration()); - assertEquals(0L, engine.translogManager().getTranslog().stats().getUncommittedOperations()); + assertEquals(2, assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().currentFileGeneration()); + assertEquals(0L, engine.translogManager().getTranslogStats().getUncommittedOperations()); } } @@ -3301,11 +3328,17 @@ public void testCurrentTranslogUUIIDIsCommitted() throws IOException { try (InternalEngine engine = new InternalEngine(config)) { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); engine.ensureOpen(); - assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + assertEquals( + assertAndGetInternalTranslogManager(engine.translogManager()).getTranslogUUID(), + userData.get(Translog.TRANSLOG_UUID_KEY) + ); engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); engine.ensureOpen(); - assertEquals(engine.translogManager().getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + assertEquals( + assertAndGetInternalTranslogManager(engine.translogManager()).getTranslogUUID(), + userData.get(Translog.TRANSLOG_UUID_KEY) + ); } } } @@ -3447,8 +3480,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog) throws I engine.getLastCommittedSegmentInfos().userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY) ); engine.ensureOpen(); - final long committedGen = engine.translogManager() - .getTranslog() + final long committedGen = assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog() .getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; for (int gen = 1; gen < committedGen; gen++) { final Path genFile = translogPath.resolve(Translog.getFilename(gen)); @@ -3635,7 +3667,8 @@ public void testRecoverFromForeignTranslog() throws IOException { } assertVisibleCount(engine, numDocs); engine.ensureOpen(); - Translog.TranslogGeneration generation = engine.translogManager().getTranslog().getGeneration(); + Translog.TranslogGeneration generation = assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog() + .getGeneration(); engine.close(); final Path badTranslogLog = createTempDir(); @@ -3732,7 +3765,9 @@ public CustomTranslogDeletionPolicy(IndexSettings indexSettings, Supplier { long localCheckpoint = Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); return translog.totalOperationsByMinGen(translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration); }; - final long extraTranslogSizeInNewEngine = engine.translogManager().getTranslog().stats().getUncommittedSizeInBytes() + final long extraTranslogSizeInNewEngine = engine.translogManager().getTranslogStats().getUncommittedSizeInBytes() - Translog.DEFAULT_HEADER_SIZE_IN_BYTES; int numDocs = between(10, 100); for (int id = 0; id < numDocs; id++) { @@ -6009,7 +6048,7 @@ public void testShouldPeriodicallyFlush() throws Exception { long flushThreshold = RandomNumbers.randomLongBetween( random(), 120, - engine.translogManager().getTranslog().stats().getUncommittedSizeInBytes() - extraTranslogSizeInNewEngine + engine.translogManager().getTranslogStats().getUncommittedSizeInBytes() - extraTranslogSizeInNewEngine ); final IndexSettings indexSettings = engine.config().getIndexSettings(); final IndexMetadata indexMetadata = IndexMetadata.builder(indexSettings.getIndexMetadata()) @@ -6026,7 +6065,7 @@ public void testShouldPeriodicallyFlush() throws Exception { indexSettings.getSoftDeleteRetentionOperations() ); engine.ensureOpen(); - assertThat(engine.translogManager().getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); + assertThat(engine.translogManager().getTranslogStats().getUncommittedOperations(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(); assertThat(uncommittedTranslogOperationsSinceLastCommit.getAsInt(), equalTo(0)); @@ -6083,12 +6122,12 @@ public void testShouldPeriodicallyFlushAfterMerge() throws Exception { indexSettings.getSoftDeleteRetentionOperations() ); engine.ensureOpen(); - assertThat(engine.translogManager().getTranslog().stats().getUncommittedOperations(), equalTo(1)); + assertThat(engine.translogManager().getTranslogStats().getUncommittedOperations(), equalTo(1)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); doc = testParsedDocument(Integer.toString(1), null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc)); engine.ensureOpen(); - assertThat(engine.translogManager().getTranslog().stats().getUncommittedOperations(), equalTo(2)); + assertThat(engine.translogManager().getTranslogStats().getUncommittedOperations(), equalTo(2)); engine.refresh("test"); engine.forceMerge(false, 1, false, false, false, UUIDs.randomBase64UUID()); assertBusy(() -> { @@ -6124,7 +6163,7 @@ public void testStressShouldPeriodicallyFlush() throws Exception { final ParsedDocument doc = testParsedDocument(Long.toString(seqno), null, testDocumentWithTextField(), SOURCE, null); engine.index(replicaIndexForDoc(doc, 1L, seqno, false)); engine.ensureOpen(); - if (rarely() && engine.translogManager().getTranslog().shouldRollGeneration()) { + if (rarely() && engine.translogManager().shouldRollTranslogGeneration()) { engine.translogManager().rollTranslogGeneration(); } if (rarely() || engine.shouldPeriodicallyFlush()) { @@ -6348,7 +6387,7 @@ public void testTrimUnsafeCommits() throws Exception { globalCheckpoint.set(randomInt(maxSeqNo)); engine.translogManager().syncTranslog(); engine.ensureOpen(); - minTranslogGen = engine.translogManager().getTranslog().getMinFileGeneration(); + minTranslogGen = assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().getMinFileGeneration(); } store.trimUnsafeCommits(config.getTranslogConfig().getTranslogPath()); @@ -6901,7 +6940,8 @@ public void testMaxSeqNoInCommitUserData() throws Exception { AtomicBoolean running = new AtomicBoolean(true); Thread rollTranslog = new Thread(() -> { engine.ensureOpen(); - while (running.get() && engine.translogManager().getTranslog().currentFileGeneration() < 500) { + while (running.get() + && assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().currentFileGeneration() < 500) { engine.translogManager().rollTranslogGeneration(); // make adding operations to translog slower } }); @@ -7071,8 +7111,8 @@ public void testRecoverFromLocalTranslog() throws Exception { engine.ensureOpen(); assertThat( "engine should trim all unreferenced translog after recovery", - engine.translogManager().getTranslog().getMinFileGeneration(), - equalTo(engine.translogManager().getTranslog().currentFileGeneration()) + assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().getMinFileGeneration(), + equalTo(assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().currentFileGeneration()) ); } } diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 6d3e54e0648f1..29fff69a433a4 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -38,7 +38,7 @@ public void testCreateEngine() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( final Store nrtEngineStore = createStore(); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos(); final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos(); @@ -62,7 +62,7 @@ public void testEngineWritesOpsToTranslog() throws Exception { try ( final Store nrtEngineStore = createStore(); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { List operations = generateHistoryOnReplica( between(1, 500), @@ -88,7 +88,7 @@ public void testEngineWritesOpsToTranslog() throws Exception { engine.translogManager().recoverFromTranslog(translogHandler, engine.getProcessedLocalCheckpoint(), Long.MAX_VALUE); assertEquals(getDocIds(engine, true), docs); } - assertEngineCleanedUp(nrtEngine, nrtEngine.getTranslog()); + assertEngineCleanedUp(nrtEngine, assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getDeletionPolicy()); } } @@ -97,7 +97,7 @@ public void testUpdateSegments() throws Exception { try ( final Store nrtEngineStore = createStore(); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { // add docs to the primary engine. List operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean()) @@ -132,7 +132,10 @@ public void testUpdateSegments() throws Exception { Set seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet()); - try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) { + nrtEngine.ensureOpen(); + try ( + Translog.Snapshot snapshot = assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getTranslog().newSnapshot() + ) { assertThat(snapshot.totalOperations(), equalTo(operations.size())); assertThat( TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()), @@ -144,11 +147,13 @@ public void testUpdateSegments() throws Exception { assertMatchingSegmentsAndCheckpoints(nrtEngine); assertEquals( - nrtEngine.getTranslog().getGeneration().translogFileGeneration, - engine.translogManager().getTranslog().getGeneration().translogFileGeneration + assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getTranslog().getGeneration().translogFileGeneration, + assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().getGeneration().translogFileGeneration ); - try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) { + try ( + Translog.Snapshot snapshot = assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getTranslog().newSnapshot() + ) { assertThat(snapshot.totalOperations(), equalTo(operations.size())); assertThat( TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()), @@ -162,7 +167,7 @@ public void testUpdateSegments() throws Exception { expectedDocCount = test.count(Queries.newMatchAllQuery()); assertSearcherHits(nrtEngine, expectedDocCount); } - assertEngineCleanedUp(nrtEngine, nrtEngine.getTranslog()); + assertEngineCleanedUp(nrtEngine, assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getDeletionPolicy()); } } @@ -171,7 +176,7 @@ public void testTrimTranslogOps() throws Exception { try ( final Store nrtEngineStore = createStore(); - final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { List operations = generateHistoryOnReplica( between(1, 100), @@ -181,7 +186,10 @@ public void testTrimTranslogOps() throws Exception { ); applyOperations(nrtEngine, operations); Set seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet()); - try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) { + nrtEngine.ensureOpen(); + try ( + Translog.Snapshot snapshot = assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getTranslog().newSnapshot() + ) { assertThat(snapshot.totalOperations(), equalTo(operations.size())); assertThat( TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()), diff --git a/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java index 1ab7e3d7bc093..e53a3b09a5eb6 100644 --- a/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NoOpEngineTests.java @@ -222,7 +222,7 @@ public void testTrimUnreferencedTranslogFiles() throws Exception { } // prevent translog from trimming so we can test trimUnreferencedFiles in NoOpEngine. engine.ensureOpen(); - final Translog.Snapshot snapshot = engine.translogManager().getTranslog().newSnapshot(); + final Translog.Snapshot snapshot = assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().newSnapshot(); engine.flush(true, true); engine.close(); diff --git a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java index 9cc58e250d74e..5171f0dfa1d18 100644 --- a/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/InternalTranslogManagerTests.java @@ -33,7 +33,7 @@ public void testRecoveryFromTranslog() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final AtomicBoolean beginTranslogRecoveryInvoked = new AtomicBoolean(false); final AtomicBoolean onTranslogRecoveryInvoked = new AtomicBoolean(false); - TranslogManager translogManager = null; + InternalTranslogManager translogManager = null; LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); try { @@ -56,7 +56,7 @@ public void testRecoveryFromTranslog() throws IOException { Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), i, true); tracker.markSeqNoAsProcessed(i); - translogManager.getTranslog().add(new Translog.Index(index, indexResult)); + translogManager.add(new Translog.Index(index, indexResult)); translogManager.rollTranslogGeneration(); } long maxSeqNo = tracker.getMaxSeqNo(); @@ -64,7 +64,7 @@ public void testRecoveryFromTranslog() throws IOException { assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().estimatedNumberOfOperations()); translogManager.syncTranslog(); - translogManager.getTranslog().close(); + translogManager.close(); translogManager = new InternalTranslogManager( new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), primaryTerm, @@ -103,13 +103,13 @@ public void onBeginTranslogRecovery() { assertTrue(onTranslogRecoveryInvoked.get()); } finally { - translogManager.getTranslog().close(); + translogManager.close(); } } public void testTranslogRollsGeneration() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - TranslogManager translogManager = null; + InternalTranslogManager translogManager = null; LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); try { translogManager = new InternalTranslogManager( @@ -131,7 +131,7 @@ public void testTranslogRollsGeneration() throws IOException { Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), i, true); tracker.markSeqNoAsProcessed(i); - translogManager.getTranslog().add(new Translog.Index(index, indexResult)); + translogManager.add(new Translog.Index(index, indexResult)); translogManager.rollTranslogGeneration(); } long maxSeqNo = tracker.getMaxSeqNo(); @@ -139,7 +139,7 @@ public void testTranslogRollsGeneration() throws IOException { assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().estimatedNumberOfOperations()); translogManager.syncTranslog(); - translogManager.getTranslog().close(); + translogManager.close(); translogManager = new InternalTranslogManager( new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), primaryTerm, @@ -164,13 +164,13 @@ public void testTranslogRollsGeneration() throws IOException { assertEquals(maxSeqNo + 1, opsRecovered.get()); assertEquals(maxSeqNo + 1, opsRecoveredFromTranslog); } finally { - translogManager.getTranslog().close(); + translogManager.close(); } } public void testTrimOperationsFromTranslog() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - TranslogManager translogManager = null; + InternalTranslogManager translogManager = null; LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED); try { translogManager = new InternalTranslogManager( @@ -192,7 +192,7 @@ public void testTrimOperationsFromTranslog() throws IOException { Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), i, true); tracker.markSeqNoAsProcessed(i); - translogManager.getTranslog().add(new Translog.Index(index, indexResult)); + translogManager.add(new Translog.Index(index, indexResult)); } long maxSeqNo = tracker.getMaxSeqNo(); assertEquals(maxSeqNo + 1, translogManager.getTranslogStats().getUncommittedOperations()); @@ -202,7 +202,7 @@ public void testTrimOperationsFromTranslog() throws IOException { translogManager.rollTranslogGeneration(); translogManager.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); // trim everything in translog - translogManager.getTranslog().close(); + translogManager.close(); translogManager = new InternalTranslogManager( new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), primaryTerm, @@ -227,19 +227,19 @@ public void testTrimOperationsFromTranslog() throws IOException { assertEquals(0, opsRecovered.get()); assertEquals(0, opsRecoveredFromTranslog); } finally { - translogManager.getTranslog().close(); + translogManager.close(); } } public void testTranslogSync() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); AtomicBoolean syncListenerInvoked = new AtomicBoolean(); - TranslogManager translogManager = null; + InternalTranslogManager translogManager = null; final AtomicInteger maxSeqNo = new AtomicInteger(randomIntBetween(0, 128)); final AtomicInteger localCheckpoint = new AtomicInteger(randomIntBetween(0, maxSeqNo.get())); try { ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); - AtomicReference translogManagerAtomicReference = new AtomicReference<>(); + AtomicReference translogManagerAtomicReference = new AtomicReference<>(); translogManager = new InternalTranslogManager( new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE), primaryTerm, @@ -253,7 +253,7 @@ public void testTranslogSync() throws IOException { @Override public void onAfterTranslogSync() { try { - translogManagerAtomicReference.get().getTranslog().trimUnreferencedReaders(); + translogManagerAtomicReference.get().trimUnreferencedReaders(); syncListenerInvoked.set(true); } catch (IOException ex) { fail("Failed due to " + ex); @@ -265,7 +265,7 @@ public void onAfterTranslogSync() { translogManagerAtomicReference.set(translogManager); Engine.Index index = indexForDoc(doc); Engine.IndexResult indexResult = new Engine.IndexResult(index.version(), index.primaryTerm(), 1, false); - translogManager.getTranslog().add(new Translog.Index(index, indexResult)); + translogManager.add(new Translog.Index(index, indexResult)); translogManager.syncTranslog(); @@ -273,7 +273,7 @@ public void onAfterTranslogSync() { assertThat(translogManager.getTranslog().getMinFileGeneration(), equalTo(2L)); assertTrue(syncListenerInvoked.get()); } finally { - translogManager.getTranslog().close(); + translogManager.close(); } } } diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index a8b5b03e03877..174747d306ff5 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -111,8 +111,11 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; +import org.opensearch.index.translog.InternalTranslogManager; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogConfig; +import org.opensearch.index.translog.TranslogDeletionPolicy; +import org.opensearch.index.translog.TranslogManager; import org.opensearch.index.translog.listener.TranslogEventListener; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.NoneCircuitBreakerService; @@ -149,6 +152,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.instanceOf; import static org.opensearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY; import static org.opensearch.index.engine.Engine.Operation.Origin.PRIMARY; import static org.opensearch.index.engine.Engine.Operation.Origin.REPLICA; @@ -329,23 +333,33 @@ public void tearDown() throws Exception { try { if (engine != null && engine.isClosed.get() == false) { engine.ensureOpen(); - assertEngineCleanedUp(engine, engine.translogManager().getTranslog()); + assertEngineCleanedUp(engine, assertAndGetInternalTranslogManager(engine.translogManager()).getDeletionPolicy()); } if (replicaEngine != null && replicaEngine.isClosed.get() == false) { replicaEngine.ensureOpen(); - assertEngineCleanedUp(replicaEngine, replicaEngine.translogManager().getTranslog()); + assertEngineCleanedUp( + replicaEngine, + assertAndGetInternalTranslogManager(replicaEngine.translogManager()).getDeletionPolicy() + ); } } finally { IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool)); } } - protected void assertEngineCleanedUp(Engine engine, Translog translog) throws Exception { - translog.getDeletionPolicy().assertNoOpenTranslogRefs(); - assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine); - assertNoInFlightDocuments(engine); - assertMaxSeqNoInCommitUserData(engine); - assertAtMostOneLuceneDocumentPerSequenceNumber(engine); + protected InternalTranslogManager assertAndGetInternalTranslogManager(final TranslogManager translogManager) { + assertThat(translogManager, instanceOf(InternalTranslogManager.class)); + return (InternalTranslogManager) translogManager; + } + + protected void assertEngineCleanedUp(Engine engine, TranslogDeletionPolicy translogDeletionPolicy) throws Exception { + if (engine.isClosed.get() == false) { + translogDeletionPolicy.assertNoOpenTranslogRefs(); + assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine); + assertNoInFlightDocuments(engine); + assertMaxSeqNoInCommitUserData(engine); + assertAtMostOneLuceneDocumentPerSequenceNumber(engine); + } } protected static ParseContext.Document testDocumentWithTextField() { @@ -1481,7 +1495,11 @@ public static Translog getTranslog(Engine engine) { assert engine instanceof InternalEngine : "only InternalEngines have translogs, got: " + engine.getClass(); InternalEngine internalEngine = (InternalEngine) engine; internalEngine.ensureOpen(); - return internalEngine.translogManager().getTranslog(); + TranslogManager translogManager = internalEngine.translogManager(); + assert translogManager instanceof InternalTranslogManager : "only InternalTranslogManager have translogs, got: " + + engine.getClass(); + InternalTranslogManager internalTranslogManager = (InternalTranslogManager) translogManager; + return internalTranslogManager.getTranslog(); } /**