From a14d4bc353feea0b964129cf3feb35607b61f128 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 10 Feb 2020 08:26:01 -0500 Subject: [PATCH 1/8] Use local checkpoint to calculate min translog gen for recovery (#51905) Today we use the translog_generation of the safe commit as the minimum required translog generation for recovery. This approach has a limitation, where we won't be able to clean up translog unless we flush. Reopening an already recovered engine will create a new empty translog, and we leave it there until we force flush. This commit removes the translog_generation commit tag and uses the local checkpoint of the safe commit to calculate the minimum required translog generation for recovery instead. Closes #49970 --- .../test/indices.stats/20_translog.yml | 12 +- .../index/engine/CombinedDeletionPolicy.java | 12 +- .../index/engine/InternalEngine.java | 63 ++-- .../index/engine/NoOpEngine.java | 31 +- .../index/engine/ReadOnlyEngine.java | 5 +- .../org/elasticsearch/index/store/Store.java | 8 +- .../index/translog/Translog.java | 87 ++--- .../translog/TranslogDeletionPolicy.java | 47 +-- .../translog/TruncateTranslogAction.java | 17 +- .../indices/stats/IndicesStatsTests.java | 1 - .../index/IndexServiceTests.java | 16 +- .../engine/CombinedDeletionPolicyTests.java | 40 +-- .../index/engine/InternalEngineTests.java | 76 ++--- .../index/engine/NoOpEngineTests.java | 52 +-- .../RecoveryDuringReplicationTests.java | 2 +- .../index/shard/IndexShardIT.java | 14 +- .../index/shard/IndexShardTests.java | 15 +- .../shard/PrimaryReplicaSyncerTests.java | 9 +- .../elasticsearch/index/store/StoreTests.java | 4 - .../index/translog/TestTranslog.java | 19 +- .../translog/TranslogDeletionPolicyTests.java | 31 +- .../index/translog/TranslogTests.java | 317 ++++++++---------- .../PeerRecoveryTargetServiceTests.java | 9 +- .../indices/recovery/RecoveryTests.java | 4 - 24 files changed, 341 insertions(+), 550 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml index 64581b61f87e0..9d49829f615df 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.stats/20_translog.yml @@ -88,6 +88,13 @@ cluster.health: wait_for_no_initializing_shards: true wait_for_events: languid + # Before 8.0, an empty shard has two empty translog files as we used the translog_generation commit tag as the minimum required + # translog generation for recovery. Here we force-flush to have a consistent translog stats for both old and new indices. + - do: + indices.flush: + index: test + force: true + wait_if_ongoing: true - do: indices.stats: metric: [ translog ] @@ -115,10 +122,9 @@ - do: indices.stats: metric: [ translog ] - # after flushing we have one empty translog file while an empty index before flushing has two empty translog files. - - lt: { indices.test.primaries.translog.size_in_bytes: $creation_size } + - match: { indices.test.primaries.translog.size_in_bytes: $creation_size } - match: { indices.test.primaries.translog.operations: 0 } - - lt: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size } + - match: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size } - match: { indices.test.primaries.translog.uncommitted_operations: 0 } --- diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 70507fd18e7a3..5fbf9e69b2400 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -121,16 +121,10 @@ private void updateRetentionPolicy() throws IOException { assert Thread.holdsLock(this); logger.debug("Safe commit [{}], last commit [{}]", commitDescription(safeCommit), commitDescription(lastCommit)); assert safeCommit.isDeleted() == false : "The safe commit must not be deleted"; - final long minRequiredGen = Long.parseLong(safeCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); assert lastCommit.isDeleted() == false : "The last commit must not be deleted"; - final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); - - assert minRequiredGen <= lastGen : "minRequiredGen must not be greater than lastGen"; - translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen); - translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen); - - softDeletesPolicy.setLocalCheckpointOfSafeCommit( - Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY))); + final long localCheckpointOfSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + softDeletesPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); + translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); } protected int getDocCountOfCommit(IndexCommit indexCommit) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 365980f7e0885..b76a23c014154 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -400,7 +400,7 @@ public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecove try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); - try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) { + try (Translog.Snapshot snapshot = getTranslog().newSnapshot(localCheckpoint + 1, Long.MAX_VALUE)) { return translogRecoveryRunner.run(this, snapshot); } } @@ -473,23 +473,24 @@ public void skipTranslogRecovery() { } private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException { - Translog.TranslogGeneration translogGeneration = translog.getGeneration(); final int opsRecovered; - final long translogFileGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); - try (Translog.Snapshot snapshot = translog.newSnapshotFromGen( - new Translog.TranslogGeneration(translog.getTranslogUUID(), translogFileGen), recoverUpToSeqNo)) { - opsRecovered = translogRecoveryRunner.run(this, snapshot); - } catch (Exception e) { - throw new EngineException(shardId, "failed to recover from translog", e); + final long localCheckpoint = getProcessedLocalCheckpoint(); + if (localCheckpoint < recoverUpToSeqNo) { + try (Translog.Snapshot snapshot = translog.newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) { + opsRecovered = translogRecoveryRunner.run(this, snapshot); + } catch (Exception e) { + throw new EngineException(shardId, "failed to recover from translog", e); + } + } else { + opsRecovered = 0; } // flush if we recovered something or if we have references to older translogs // note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length. assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be"; pendingTranslogRecovery.set(false); // we are good - now we can commit if (opsRecovered > 0) { - logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]", - opsRecovered, translogGeneration == null ? null : - translogGeneration.translogFileGeneration, translog.currentFileGeneration()); + logger.trace("flushing post recovery from translog: ops recovered [{}], current translog generation [{}]", + opsRecovered, translog.currentFileGeneration()); commitIndexWriter(indexWriter, translog, null); refreshLastCommittedSegmentInfos(); refresh("translog_recovery"); @@ -501,7 +502,8 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy LongSupplier globalCheckpointSupplier, LongConsumer persistedSequenceNumberConsumer) throws IOException { final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); - final String translogUUID = loadTranslogUUIDFromLastCommit(); + final Map userData = store.readLastCommittedSegmentsInfo().getUserData(); + final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY)); // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong! return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, engineConfig.getPrimaryTermSupplier(), persistedSequenceNumberConsumer); @@ -549,7 +551,7 @@ public Translog.Snapshot readHistoryOperations(String reason, HistorySource hist ensureSoftDeletesEnabled(); return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false); } else { - return getTranslog().newSnapshotFromMinSeqNo(startingSeqNo); + return getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE); } } @@ -598,18 +600,6 @@ public long getWritingBytes() { return indexWriter.getFlushingBytes() + versionMap.getRefreshingBytes(); } - /** - * Reads the current stored translog ID from the last commit data. - */ - @Nullable - private String loadTranslogUUIDFromLastCommit() throws IOException { - final Map commitUserData = store.readLastCommittedSegmentsInfo().getUserData(); - if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) { - throw new IllegalStateException("commit doesn't contain translog generation id"); - } - return commitUserData.get(Translog.TRANSLOG_UUID_KEY); - } - /** * Reads the current stored history ID from the IW commit data. */ @@ -1688,8 +1678,9 @@ final boolean tryRenewSyncCommit() { ensureOpen(); ensureCanFlush(); String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID); - long translogGenOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); - if (syncId != null && indexWriter.hasUncommittedChanges() && translog.totalOperationsByMinGen(translogGenOfLastCommit) == 0) { + long localCheckpointOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + if (syncId != null && indexWriter.hasUncommittedChanges() && + translog.estimateTotalOperationsFromMinSeq(localCheckpointOfLastCommit + 1) == 0) { logger.trace("start renewing sync commit [{}]", syncId); commitIndexWriter(indexWriter, translog, syncId); logger.debug("successfully sync committed. sync id [{}].", syncId); @@ -1714,8 +1705,10 @@ public boolean shouldPeriodicallyFlush() { if (shouldPeriodicallyFlushAfterBigMerge.get()) { return true; } + final long localCheckpointOfLastCommit = + Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); final long translogGenerationOfLastCommit = - Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY)); + translog.getMinGenerationForSeqNo(localCheckpointOfLastCommit + 1).translogFileGeneration; final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes(); if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) { return false; @@ -2423,11 +2416,6 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl ensureCanFlush(); try { final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); - final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1); - final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration); - final String translogUUID = translogGeneration.translogUUID; - final String localCheckpointValue = Long.toString(localCheckpoint); - writer.setLiveCommitData(() -> { /* * The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes @@ -2438,10 +2426,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl * {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time * of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene). */ - final Map commitData = new HashMap<>(8); - commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration); - commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID); - commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue); + final Map commitData = new HashMap<>(7); + commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID()); + commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); if (syncId != null) { commitData.put(Engine.SYNC_COMMIT_ID, syncId); } @@ -2657,7 +2644,7 @@ public boolean hasCompleteOperationHistory(String reason, HistorySource historyS return true; } final LocalCheckpointTracker tracker = new LocalCheckpointTracker(startingSeqNo, startingSeqNo - 1); - try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(startingSeqNo)) { + try (Translog.Snapshot snapshot = getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE)) { Translog.Operation operation; while ((operation = snapshot.next()) != null) { if (operation.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { diff --git a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java index 06520d3036c31..78f5a885def41 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java @@ -28,6 +28,7 @@ import org.apache.lucene.store.Directory; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.util.concurrent.ReleasableLock; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; @@ -137,31 +138,23 @@ public void trimUnreferencedTranslogFiles() { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); final List commits = DirectoryReader.listCommits(store.directory()); - if (commits.size() == 1) { + if (commits.size() == 1 && translogStats.getTranslogSizeInBytes() > translogStats.getUncommittedSizeInBytes()) { final Map commitUserData = getLastCommittedSegmentInfos().getUserData(); final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY); if (translogUuid == null) { throw new IllegalStateException("commit doesn't contain translog unique id"); } - if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) { - throw new IllegalStateException("commit doesn't contain translog generation id"); - } - final long lastCommitGeneration = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY)); final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); - final long minTranslogGeneration = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUuid); - - if (minTranslogGeneration < lastCommitGeneration) { - // a translog deletion policy that retains nothing but the last translog generation from safe commit - final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1, 0); - translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration); - translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration); - - try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, - engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) { - translog.trimUnreferencedReaders(); - // refresh the translog stats - this.translogStats = translog.stats(); - } + final long localCheckpoint = Long.parseLong(commitUserData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1, 0); + translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); + try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, + engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) { + translog.trimUnreferencedReaders(); + // refresh the translog stats + this.translogStats = translog.stats(); + assert translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed " + + " current gen " + translog.currentFileGeneration() + " != min gen " + translog.getMinFileGeneration(); } } } catch (final Exception e) { diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 2ffdcb71425eb..06cb780431386 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -223,15 +223,14 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm if (translogUuid == null) { throw new IllegalStateException("commit doesn't contain translog unique id"); } - final long translogGenOfLastCommit = Long.parseLong(infos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); final TranslogConfig translogConfig = config.getTranslogConfig(); final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( config.getIndexSettings().getTranslogRetentionSize().getBytes(), config.getIndexSettings().getTranslogRetentionAge().getMillis(), config.getIndexSettings().getTranslogRetentionTotalFiles() ); - translogDeletionPolicy.setTranslogGenerationOfLastCommit(translogGenOfLastCommit); - + final long localCheckpoint = Long.parseLong(infos.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(), seqNo -> {}) ) { diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 16c5e636edb06..a43adaa3c49b5 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -1453,10 +1453,7 @@ public void associateIndexWithNewTranslog(final String translogUUID) throws IOEx if (translogUUID.equals(getUserData(writer).get(Translog.TRANSLOG_UUID_KEY))) { throw new IllegalArgumentException("a new translog uuid can't be equal to existing one. got [" + translogUUID + "]"); } - final Map map = new HashMap<>(); - map.put(Translog.TRANSLOG_GENERATION_KEY, "1"); - map.put(Translog.TRANSLOG_UUID_KEY, translogUUID); - updateCommitData(writer, map); + updateCommitData(writer, Collections.singletonMap(Translog.TRANSLOG_UUID_KEY, translogUUID)); } finally { metadataLock.writeLock().unlock(); } @@ -1517,7 +1514,8 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long if (indexVersionCreated.before(org.elasticsearch.Version.V_6_2_0)) { final List recoverableCommits = new ArrayList<>(); for (IndexCommit commit : existingCommits) { - if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) { + final String translogGeneration = commit.getUserData().get("translog_generation"); + if (translogGeneration == null || minRetainedTranslogGen <= Long.parseLong(translogGeneration)) { recoverableCommits.add(commit); } } diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index dabd6829f5401..87d228352164f 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -73,9 +73,7 @@ /** * A Translog is a per index shard component that records all non-committed index operations in a durable manner. - * In Elasticsearch there is one Translog instance per {@link org.elasticsearch.index.engine.InternalEngine}. The engine - * records the current translog generation {@link Translog#getGeneration()} in it's commit metadata using {@link #TRANSLOG_GENERATION_KEY} - * to reference the generation that contains all operations that have not yet successfully been committed to the engines lucene index. + * In Elasticsearch there is one Translog instance per {@link org.elasticsearch.index.engine.InternalEngine}. * Additionally, since Elasticsearch 2.0 the engine also records a {@link #TRANSLOG_UUID_KEY} with each commit to ensure a strong * association between the lucene index an the transaction log file. This UUID is used to prevent accidental recovery from a transaction * log that belongs to a @@ -106,7 +104,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * - we need to page align the last write before we sync, we can take advantage of ensureSynced for this since we might have already * fsynced far enough */ - public static final String TRANSLOG_GENERATION_KEY = "translog_generation"; public static final String TRANSLOG_UUID_KEY = "translog_uuid"; public static final String TRANSLOG_FILE_PREFIX = "translog-"; public static final String TRANSLOG_FILE_SUFFIX = ".tlog"; @@ -222,16 +219,7 @@ private ArrayList recoverFromFiles(Checkpoint checkpoint) throws ArrayList foundTranslogs = new ArrayList<>(); try (ReleasableLock ignored = writeLock.acquire()) { logger.debug("open uncommitted translog checkpoint {}", checkpoint); - - final long minGenerationToRecoverFrom; - if (checkpoint.minTranslogGeneration < 0) { - final Version indexVersionCreated = indexSettings().getIndexVersionCreated(); - assert indexVersionCreated.before(Version.V_6_0_0_beta1) : - "no minTranslogGeneration in checkpoint, but index was created with version [" + indexVersionCreated + "]"; - minGenerationToRecoverFrom = deletionPolicy.getMinTranslogGenerationForRecovery(); - } else { - minGenerationToRecoverFrom = checkpoint.minTranslogGeneration; - } + final long minGenerationToRecoverFrom = checkpoint.minTranslogGeneration; // we open files in reverse order in order to validate the translog uuid before we start traversing the translog based on // the generation id we found in the lucene commit. This gives for better error messages if the wrong @@ -608,33 +596,28 @@ final Checkpoint getLastSyncedCheckpoint() { } } - /** - * Snapshots the current transaction log allowing to safely iterate over the snapshot. - * Snapshots are fixed in time and will not be updated with future operations. - */ + // for testing public Snapshot newSnapshot() throws IOException { - try (ReleasableLock ignored = readLock.acquire()) { - return newSnapshotFromGen(new TranslogGeneration(translogUUID, getMinFileGeneration()), Long.MAX_VALUE); - } + return newSnapshot(0, Long.MAX_VALUE); } - public Snapshot newSnapshotFromGen(TranslogGeneration fromGeneration, long upToSeqNo) throws IOException { + /** + * Creates a new translog snapshot containing operations from the given range. + * + * @param fromSeqNo the lower bound of the range (inclusive) + * @param toSeqNo the upper bound of the range (inclusive) + * @return the new snapshot + */ + public Snapshot newSnapshot(long fromSeqNo, long toSeqNo) throws IOException { + assert fromSeqNo <= toSeqNo : fromSeqNo + " > " + toSeqNo; + assert fromSeqNo >= 0 : "from_seq_no must be non-negative " + fromSeqNo; try (ReleasableLock ignored = readLock.acquire()) { ensureOpen(); - final long fromFileGen = fromGeneration.translogFileGeneration; - if (fromFileGen < getMinFileGeneration()) { - throw new IllegalArgumentException("requested snapshot generation [" + fromFileGen + "] is not available. " + - "Min referenced generation is [" + getMinFileGeneration() + "]"); - } TranslogSnapshot[] snapshots = Stream.concat(readers.stream(), Stream.of(current)) - .filter(reader -> reader.getGeneration() >= fromFileGen && reader.getCheckpoint().minSeqNo <= upToSeqNo) + .filter(reader -> reader.getCheckpoint().minSeqNo <= toSeqNo && fromSeqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) .map(BaseTranslogReader::newSnapshot).toArray(TranslogSnapshot[]::new); final Snapshot snapshot = newMultiSnapshot(snapshots); - if (upToSeqNo == Long.MAX_VALUE) { - return snapshot; - } else { - return new SeqNoFilterSnapshot(snapshot, Long.MIN_VALUE, upToSeqNo); - } + return new SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo); } } @@ -668,15 +651,6 @@ public Operation readOperation(Location location) throws IOException { return null; } - public Snapshot newSnapshotFromMinSeqNo(long minSeqNo) throws IOException { - try (ReleasableLock ignored = readLock.acquire()) { - ensureOpen(); - TranslogSnapshot[] snapshots = readersAboveMinSeqNo(minSeqNo).map(BaseTranslogReader::newSnapshot) - .toArray(TranslogSnapshot[]::new); - return newMultiSnapshot(snapshots); - } - } - private Snapshot newMultiSnapshot(TranslogSnapshot[] snapshots) throws IOException { final Closeable onClose; if (snapshots.length == 0) { @@ -866,7 +840,7 @@ protected void closeOnTragicEvent(final Exception ex) { public TranslogStats stats() { // acquire lock to make the two numbers roughly consistent (no file change half way) try (ReleasableLock lock = readLock.acquire()) { - final long uncommittedGen = deletionPolicy.getTranslogGenerationOfLastCommit(); + long uncommittedGen = getMinGenerationForSeqNo(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1).translogFileGeneration; return new TranslogStats(totalOperations(), sizeInBytes(), totalOperationsByMinGen(uncommittedGen), sizeInBytesByMinGen(uncommittedGen), earliestLastModifiedAge()); } @@ -966,7 +940,7 @@ default int skippedOperations() { * shares the same underlying resources with the {@code delegate} snapshot, therefore we should not * use the {@code delegate} after passing it to this filtered snapshot. */ - static final class SeqNoFilterSnapshot implements Snapshot { + private static final class SeqNoFilterSnapshot implements Snapshot { private final Snapshot delegate; private int filteredOpsCount; private final long fromSeqNo; // inclusive @@ -1626,20 +1600,18 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl */ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) { try (ReleasableLock ignored = readLock.acquire()) { - /* - * When flushing, the engine will ask the translog for the minimum generation that could contain any sequence number after the - * local checkpoint. Immediately after flushing, there will be no such generation, so this minimum generation in this case will - * be the current translog generation as we do not need any prior generations to have a complete history up to the current local - * checkpoint. - */ - long minTranslogFileGeneration = this.currentFileGeneration(); - for (final TranslogReader reader : readers) { - if (seqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) { - minTranslogFileGeneration = Math.min(minTranslogFileGeneration, reader.getGeneration()); - } + return new TranslogGeneration(translogUUID, minGenerationForSeqNo(seqNo, current, readers)); + } + } + + private static long minGenerationForSeqNo(long seqNo, TranslogWriter writer, List readers) { + long minGen = writer.generation; + for (final TranslogReader reader : readers) { + if (seqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) { + minGen = Math.min(minGen, reader.getGeneration()); } - return new TranslogGeneration(translogUUID, minTranslogFileGeneration); } + return minGen; } /** @@ -1681,7 +1653,8 @@ public void trimUnreferencedReaders() throws IOException { // we're shutdown potentially on some tragic event, don't delete anything return; } - long minReferencedGen = deletionPolicy.minTranslogGenRequired(readers, current); + long minReferencedGen = Math.min(deletionPolicy.minTranslogGenRequired(readers, current), + minGenerationForSeqNo(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1, current, readers)); assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is [" + getMinFileGeneration() + "]"; diff --git a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java index 8a553aad326b7..a26b2dc15e9ad 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java @@ -22,6 +22,7 @@ import org.apache.lucene.util.Counter; import org.elasticsearch.Assertions; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.index.seqno.SequenceNumbers; import java.io.IOException; import java.util.HashMap; @@ -47,17 +48,7 @@ public void assertNoOpenTranslogRefs() { * translog generation */ private final Map translogRefCounts = new HashMap<>(); - - /** - * the translog generation that is requires to properly recover from the oldest non deleted - * {@link org.apache.lucene.index.IndexCommit}. - */ - private long minTranslogGenerationForRecovery = 1; - - /** - * This translog generation is used to calculate the number of uncommitted operations since the last index commit. - */ - private long translogGenerationOfLastCommit = 1; + private long localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED; private long retentionSizeInBytes; @@ -76,23 +67,12 @@ public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMill } } - public synchronized void setMinTranslogGenerationForRecovery(long newGen) { - if (newGen < minTranslogGenerationForRecovery || newGen > translogGenerationOfLastCommit) { - throw new IllegalArgumentException("Invalid minTranslogGenerationForRecovery can't go backwards; new [" + newGen + "]," + - "current [" + minTranslogGenerationForRecovery + "], lastGen [" + translogGenerationOfLastCommit + "]"); + public synchronized void setLocalCheckpointOfSafeCommit(long newCheckpoint) { + if (newCheckpoint < this.localCheckpointOfSafeCommit) { + throw new IllegalArgumentException("local checkpoint of the safe commit can't go backwards: " + + "current [" + this.localCheckpointOfSafeCommit + "] new [" + newCheckpoint + "]"); } - minTranslogGenerationForRecovery = newGen; - } - - /** - * Sets the translog generation of the last index commit. - */ - public synchronized void setTranslogGenerationOfLastCommit(long lastGen) { - if (lastGen < translogGenerationOfLastCommit || lastGen < minTranslogGenerationForRecovery) { - throw new IllegalArgumentException("Invalid translogGenerationOfLastCommit; new [" + lastGen + "]," + - "current [" + translogGenerationOfLastCommit + "], minRequiredGen [" + minTranslogGenerationForRecovery + "]"); - } - translogGenerationOfLastCommit = lastGen; + this.localCheckpointOfSafeCommit = newCheckpoint; } public synchronized void setRetentionSizeInBytes(long bytes) { @@ -172,7 +152,7 @@ synchronized long minTranslogGenRequired(List readers, TranslogW minByAgeAndSize = Math.max(minByAge, minBySize); } long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles); - return Math.min(Math.max(minByAgeAndSize, minByNumFiles), Math.min(minByLocks, minTranslogGenerationForRecovery)); + return Math.min(Math.max(minByAgeAndSize, minByNumFiles), minByLocks); } static long getMinTranslogGenBySize(List readers, TranslogWriter writer, long retentionSizeInBytes) { @@ -222,16 +202,11 @@ private long getMinTranslogGenRequiredByLocks() { return translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE); } - /** returns the translog generation that will be used as a basis of a future store/peer recovery */ - public synchronized long getMinTranslogGenerationForRecovery() { - return minTranslogGenerationForRecovery; - } - /** - * Returns a translog generation that will be used to calculate the number of uncommitted operations since the last index commit. + * Returns the local checkpoint of the safe commit. This value is used to calculate the min required generation for recovery. */ - public synchronized long getTranslogGenerationOfLastCommit() { - return translogGenerationOfLastCommit; + public synchronized long getLocalCheckpointOfSafeCommit() { + return localCheckpointOfSafeCommit; } synchronized long getTranslogRefCount(long gen) { diff --git a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java index 9480ee3c1e1f3..42082e0629ac6 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java +++ b/server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogAction.java @@ -129,30 +129,25 @@ public void execute(Terminal terminal, ShardPath shardPath, Directory indexDirec // Retrieve the generation and UUID from the existing data commitData = commits.get(commits.size() - 1).getUserData(); - final String translogGeneration = commitData.get(Translog.TRANSLOG_GENERATION_KEY); final String translogUUID = commitData.get(Translog.TRANSLOG_UUID_KEY); - if (translogGeneration == null || translogUUID == null) { - throw new ElasticsearchException("shard must have a valid translog generation and UUID but got: [{}] and: [{}]", - translogGeneration, translogUUID); + if (translogUUID == null) { + throw new ElasticsearchException("shard must have a valid translog UUID"); } final long globalCheckpoint = commitData.containsKey(SequenceNumbers.MAX_SEQ_NO) ? Long.parseLong(commitData.get(SequenceNumbers.MAX_SEQ_NO)) : SequenceNumbers.UNASSIGNED_SEQ_NO; - terminal.println("Translog Generation: " + translogGeneration); terminal.println("Translog UUID : " + translogUUID); terminal.println("History UUID : " + historyUUID); Path tempEmptyCheckpoint = translogPath.resolve("temp-" + Translog.CHECKPOINT_FILE_NAME); Path realEmptyCheckpoint = translogPath.resolve(Translog.CHECKPOINT_FILE_NAME); - Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX + - translogGeneration + Translog.TRANSLOG_FILE_SUFFIX); - Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX + - translogGeneration + Translog.TRANSLOG_FILE_SUFFIX); + final long gen = 1; + Path tempEmptyTranslog = translogPath.resolve("temp-" + Translog.TRANSLOG_FILE_PREFIX + gen + Translog.TRANSLOG_FILE_SUFFIX); + Path realEmptyTranslog = translogPath.resolve(Translog.TRANSLOG_FILE_PREFIX + gen + Translog.TRANSLOG_FILE_SUFFIX); // Write empty checkpoint and translog to empty files - long gen = Long.parseLong(translogGeneration); int translogLen = writeEmptyTranslog(tempEmptyTranslog, translogUUID); writeEmptyCheckpoint(tempEmptyCheckpoint, translogLen, gen, globalCheckpoint); @@ -192,7 +187,7 @@ long minTranslogGenRequired(List readers, TranslogWriter writer) }; try (Translog translog = new Translog(translogConfig, translogUUID, retainAllTranslogPolicy, () -> translogGlobalCheckpoint, () -> primaryTerm, seqNo -> {}); - Translog.Snapshot snapshot = translog.newSnapshot()) { + Translog.Snapshot snapshot = translog.newSnapshot(0, Long.MAX_VALUE)) { //noinspection StatementWithEmptyBody we are just checking that we can iterate through the whole snapshot while (snapshot.next() != null) { } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java index c7abf1172b0a5..10cdf95015aa2 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java @@ -123,7 +123,6 @@ public void testCommitStats() throws Exception { assertNotNull(commitStats); assertThat(commitStats.getGeneration(), greaterThan(0L)); assertThat(commitStats.getId(), notNullValue()); - assertThat(commitStats.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); assertThat(commitStats.getUserData(), hasKey(Translog.TRANSLOG_UUID_KEY)); } } diff --git a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java index 0bec748072251..a032e4b4d519c 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexServiceTests.java @@ -43,10 +43,8 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.nio.file.Path; import java.util.Collection; import java.util.Collections; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -393,8 +391,6 @@ public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception { .build()); Translog translog = IndexShardTestCase.getTranslog(indexService.getShard(0)); - final Path translogPath = translog.getConfig().getTranslogPath(); - final String translogUuid = translog.getTranslogUUID(); int translogOps = 0; final int numDocs = scaledRandomIntBetween(10, 100); @@ -415,15 +411,9 @@ public void testAsyncTranslogTrimTaskOnClosedIndex() throws Exception { indexService = getInstanceFromNode(IndicesService.class).indexServiceSafe(indexService.index()); assertTrue(indexService.getTrimTranslogTask().mustReschedule()); - final long lastCommitedTranslogGeneration; - try (Engine.IndexCommitRef indexCommitRef = getEngine(indexService.getShard(0)).acquireLastIndexCommit(false)) { - Map lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData(); - lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY)); - } - assertBusy(() -> { - long minTranslogGen = Translog.readMinTranslogGeneration(translogPath, translogUuid); - assertThat(minTranslogGen, equalTo(lastCommitedTranslogGeneration)); - }); + final Engine readOnlyEngine = getEngine(indexService.getShard(0)); + assertBusy(() -> + assertThat(readOnlyEngine.getTranslogStats().getTranslogSizeInBytes(), equalTo((long) Translog.DEFAULT_HEADER_SIZE_IN_BYTES))); assertAcked(client().admin().indices().prepareOpen("test").setWaitForActiveShards(ActiveShardCount.DEFAULT)); diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index deb224f5b1895..a2508ee3ec8f0 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -58,20 +58,16 @@ public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); final LongArrayList maxSeqNoList = new LongArrayList(); - final LongArrayList translogGenList = new LongArrayList(); final List commitList = new ArrayList<>(); int totalCommits = between(2, 20); long lastMaxSeqNo = 0; long lastCheckpoint = lastMaxSeqNo; - long lastTranslogGen = 0; final UUID translogUUID = UUID.randomUUID(); for (int i = 0; i < totalCommits; i++) { lastMaxSeqNo += between(1, 10000); lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo); - lastTranslogGen += between(1, 100); - commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen)); + commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID)); maxSeqNoList.add(lastMaxSeqNo); - translogGenList.add(lastTranslogGen); } int keptIndex = randomInt(commitList.size() - 1); @@ -88,8 +84,7 @@ public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { verify(commitList.get(i), never()).delete(); } } - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(translogGenList.get(keptIndex))); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen)); + assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(keptIndex)))); assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo(Math.max(NO_OPS_PERFORMED, Math.min(getLocalCheckpoint(commitList.get(keptIndex)) + 1, globalCheckpoint.get() + 1 - extraRetainedOps)))); @@ -105,7 +100,6 @@ public void testAcquireIndexCommit() throws Exception { CombinedDeletionPolicy indexPolicy = newCombinedDeletionPolicy(translogPolicy, softDeletesPolicy, globalCheckpoint); long lastMaxSeqNo = between(1, 1000); long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo); - long lastTranslogGen = between(1, 20); int safeIndex = 0; List commitList = new ArrayList<>(); List snapshottingCommits = new ArrayList<>(); @@ -115,8 +109,7 @@ public void testAcquireIndexCommit() throws Exception { for (int n = 0; n < newCommits; n++) { lastMaxSeqNo += between(1, 1000); lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo); - lastTranslogGen += between(1, 20); - commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen)); + commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID)); } // Advance the global checkpoint to between [safeIndex, safeIndex + 1) safeIndex = randomIntBetween(safeIndex, commitList.size() - 1); @@ -155,10 +148,7 @@ public void testAcquireIndexCommit() throws Exception { // Snapshotting commits must not be deleted. snapshottingCommits.forEach(snapshot -> assertThat(snapshot.isDeleted(), equalTo(false))); // We don't need to retain translog for snapshotting commits. - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), - equalTo(Long.parseLong(commitList.get(safeIndex).getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), - equalTo(Long.parseLong(commitList.get(commitList.size() - 1).getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); + assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(safeIndex)))); assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo( Math.max(NO_OPS_PERFORMED, Math.min(getLocalCheckpoint(commitList.get(safeIndex)) + 1, globalCheckpoint.get() + 1 - extraRetainedOps)))); @@ -171,8 +161,7 @@ public void testAcquireIndexCommit() throws Exception { assertThat(commitList.get(i).isDeleted(), equalTo(true)); } assertThat(commitList.get(commitList.size() - 1).isDeleted(), equalTo(false)); - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen)); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen)); + assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get()); assertThat(softDeletesPolicy.getMinRetainedSeqNo(), equalTo( Math.max(NO_OPS_PERFORMED, Math.min(getLocalCheckpoint(safeCommit) + 1, globalCheckpoint.get() + 1 - extraRetainedOps)))); @@ -188,19 +177,17 @@ public void testDeleteInvalidCommits() throws Exception { final List commitList = new ArrayList<>(); for (int i = 0; i < invalidCommits; i++) { long maxSeqNo = randomNonNegativeLong(); - commitList.add(mockIndexCommit(randomLongBetween(-1, maxSeqNo), maxSeqNo, UUID.randomUUID(), randomNonNegativeLong())); + commitList.add(mockIndexCommit(randomLongBetween(-1, maxSeqNo), maxSeqNo, UUID.randomUUID())); } final UUID expectedTranslogUUID = UUID.randomUUID(); - long lastTranslogGen = 0; final int validCommits = between(1, 10); long lastMaxSeqNo = between(1, 1000); long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo); for (int i = 0; i < validCommits; i++) { - lastTranslogGen += between(1, 1000); lastMaxSeqNo += between(1, 1000); lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo); - commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, expectedTranslogUUID, lastTranslogGen)); + commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, expectedTranslogUUID)); } // We should never keep invalid commits regardless of the value of the global checkpoint. @@ -222,12 +209,10 @@ public void testCheckUnreferencedCommits() throws Exception { int totalCommits = between(2, 20); long lastMaxSeqNo = between(1, 1000); long lastCheckpoint = randomLongBetween(-1, lastMaxSeqNo); - long lastTranslogGen = between(1, 50); for (int i = 0; i < totalCommits; i++) { lastMaxSeqNo += between(1, 10000); - lastTranslogGen += between(1, 100); lastCheckpoint = randomLongBetween(lastCheckpoint, lastMaxSeqNo); - commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID, lastTranslogGen)); + commitList.add(mockIndexCommit(lastCheckpoint, lastMaxSeqNo, translogUUID)); } int safeCommitIndex = randomIntBetween(0, commitList.size() - 1); globalCheckpoint.set(Long.parseLong(commitList.get(safeCommitIndex).getUserData().get(SequenceNumbers.MAX_SEQ_NO))); @@ -236,8 +221,7 @@ public void testCheckUnreferencedCommits() throws Exception { if (safeCommitIndex == commitList.size() - 1) { // Safe commit is the last commit - no need to clean up - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen)); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen)); + assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false)); } else { // Advanced but not enough for any commit after the safe commit becomes safe @@ -254,8 +238,7 @@ public void testCheckUnreferencedCommits() throws Exception { commitList.forEach(this::resetDeletion); indexPolicy.onCommit(commitList); // Safe commit is the last commit - no need to clean up - assertThat(translogPolicy.getMinTranslogGenerationForRecovery(), equalTo(lastTranslogGen)); - assertThat(translogPolicy.getTranslogGenerationOfLastCommit(), equalTo(lastTranslogGen)); + assertThat(translogPolicy.getLocalCheckpointOfSafeCommit(), equalTo(getLocalCheckpoint(commitList.get(commitList.size() - 1)))); assertThat(indexPolicy.hasUnreferencedCommits(), equalTo(false)); } } @@ -271,12 +254,11 @@ protected int getDocCountOfCommit(IndexCommit indexCommit) { }; } - IndexCommit mockIndexCommit(long localCheckpoint, long maxSeqNo, UUID translogUUID, long translogGen) throws IOException { + IndexCommit mockIndexCommit(long localCheckpoint, long maxSeqNo, UUID translogUUID) throws IOException { final Map userData = new HashMap<>(); userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); userData.put(Translog.TRANSLOG_UUID_KEY, translogUUID.toString()); - userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGen)); final IndexCommit commit = mock(IndexCommit.class); final Directory directory = mock(Directory.class); when(commit.getUserData()).thenReturn(userData); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 335629928396d..5e21db432cea0 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -98,6 +98,7 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver; import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -176,6 +177,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.IntSupplier; import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.function.ToLongBiFunction; @@ -697,7 +699,6 @@ public long getProcessedCheckpoint() { CommitStats stats1 = engine.commitStats(); assertThat(stats1.getGeneration(), greaterThan(0L)); assertThat(stats1.getId(), notNullValue()); - assertThat(stats1.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); assertThat(stats1.getUserData(), hasKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); assertThat( Long.parseLong(stats1.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), @@ -722,11 +723,7 @@ public long getProcessedCheckpoint() { assertThat(stats2.getGeneration(), greaterThan(stats1.getGeneration())); assertThat(stats2.getId(), notNullValue()); assertThat(stats2.getId(), not(equalTo(stats1.getId()))); - assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_GENERATION_KEY)); assertThat(stats2.getUserData(), hasKey(Translog.TRANSLOG_UUID_KEY)); - assertThat( - stats2.getUserData().get(Translog.TRANSLOG_GENERATION_KEY), - not(equalTo(stats1.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)))); assertThat(stats2.getUserData().get(Translog.TRANSLOG_UUID_KEY), equalTo(stats1.getUserData().get(Translog.TRANSLOG_UUID_KEY))); assertThat(Long.parseLong(stats2.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), equalTo(localCheckpoint.get())); @@ -1157,6 +1154,7 @@ public void testCommitAdvancesMinTranslogForRecovery() throws IOException { final LongSupplier globalCheckpointSupplier = () -> globalCheckpoint.get(); engine = createEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier)); + engine.onSettingsChanged(TimeValue.MINUS_ONE, ByteSizeValue.ZERO, randomNonNegativeLong()); ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); engine.index(indexForDoc(doc)); boolean inSync = randomBoolean(); @@ -1167,24 +1165,20 @@ public void testCommitAdvancesMinTranslogForRecovery() throws IOException { engine.flush(); assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 3L : 1L)); - assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(3L)); + assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); engine.flush(); assertThat(engine.getTranslog().currentFileGeneration(), equalTo(3L)); - assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 3L : 1L)); - assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(3L)); + assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 3L : 2L)); engine.flush(true, true); assertThat(engine.getTranslog().currentFileGeneration(), equalTo(4L)); - assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 4L : 1L)); - assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(4L)); + assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(inSync ? 4L : 2L)); globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); engine.flush(true, true); assertThat(engine.getTranslog().currentFileGeneration(), equalTo(5L)); - assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(5L)); - assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(5L)); + assertThat(engine.getTranslog().getMinFileGeneration(), equalTo(5L)); } public void testSyncedFlush() throws IOException { @@ -2817,7 +2811,7 @@ public void testSettings() { assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName()); } - public void testCurrentTranslogIDisCommitted() throws IOException { + public void testCurrentTranslogUUIIDIsCommitted() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try (Store store = createStore()) { EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, @@ -2842,7 +2836,6 @@ public void testCurrentTranslogIDisCommitted() throws IOException { globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE)); Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); } } @@ -2852,18 +2845,9 @@ public void testCurrentTranslogIDisCommitted() throws IOException { try (InternalEngine engine = new InternalEngine(config)) { expectThrows(IllegalStateException.class, engine::ensureCanFlush); Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - if (i == 0) { - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - } else { - // creating an empty index will create the first translog gen and commit it - // opening the empty index will make the second translog file but not commit it - // opening the engine again (i=0) will make the third translog file, which then be committed - assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - } assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); } } @@ -2876,7 +2860,6 @@ public void testCurrentTranslogIDisCommitted() throws IOException { store.associateIndexWithNewTranslog(translogUUID); try (InternalEngine engine = new InternalEngine(config)) { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(2, engine.getTranslog().currentFileGeneration()); @@ -2889,12 +2872,9 @@ public void testCurrentTranslogIDisCommitted() throws IOException { for (int i = 0; i < 2; i++) { try (InternalEngine engine = new InternalEngine(config)) { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("no changes - nothing to commit", "1", - userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); } } @@ -3010,8 +2990,9 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s globalCheckpointSupplier))) { engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertVisibleCount(engine, 1); - final long committedGen = Long.valueOf( - engine.getLastCommittedSegmentInfos().getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); + final long localCheckpoint = Long.parseLong( + engine.getLastCommittedSegmentInfos().userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + final long committedGen = engine.getTranslog().getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; for (int gen = 1; gen < committedGen; gen++) { final Path genFile = translogPath.resolve(Translog.getFilename(gen)); assertFalse(genFile + " wasn't cleaned up", Files.exists(genFile)); @@ -4518,7 +4499,6 @@ public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierExcepti for (final Map.Entry entry : threads.entrySet()) { final Map userData = finalActualEngine.commitStats().getUserData(); assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(Long.toString(3 * i))); - assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo(Long.toString(i + generation))); entry.getValue().countDown(); entry.getKey().join(); finalActualEngine.flush(); @@ -4579,6 +4559,7 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException { final EngineConfig engineConfig; final SeqNoStats prevSeqNoStats; final List prevDocs; + final List existingTranslog; try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) { engineConfig = engine.config(); for (final long seqNo : seqNos) { @@ -4597,6 +4578,9 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException { engine.syncTranslog(); prevSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get()); prevDocs = getDocIds(engine, true); + try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) { + existingTranslog = TestTranslog.drainSnapshot(snapshot, false); + } } try (InternalEngine engine = new InternalEngine(engineConfig)) { final Translog.TranslogGeneration currrentTranslogGeneration = new Translog.TranslogGeneration( @@ -4607,8 +4591,10 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException { SeqNoStats seqNoStats = engine.getSeqNoStats(globalCheckpoint.get()); assertThat(seqNoStats.getLocalCheckpoint(), equalTo(prevSeqNoStats.getLocalCheckpoint())); assertThat(seqNoStats.getMaxSeqNo(), equalTo(prevSeqNoStats.getMaxSeqNo())); - try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshotFromGen(currrentTranslogGeneration, Long.MAX_VALUE)) { - assertThat("restore from local translog must not add operations to translog", snapshot, SnapshotMatchers.size(0)); + try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) { + assertThat("restore from local translog must not add operations to translog", + snapshot.totalOperations(), equalTo(existingTranslog.size())); + assertThat(TestTranslog.drainSnapshot(snapshot, false), equalTo(existingTranslog)); } } assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test")); @@ -5077,6 +5063,10 @@ public void testShouldPeriodicallyFlush() throws Exception { assertThat("Empty engine does not need flushing", engine.shouldPeriodicallyFlush(), equalTo(false)); // A new engine may have more than one empty translog files - the test should account this extra. final Translog translog = engine.getTranslog(); + final IntSupplier uncommittedTranslogOperationsSinceLastCommit = () -> { + long localCheckpoint = Long.parseLong(engine.getLastCommittedSegmentInfos().userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); + return translog.totalOperationsByMinGen(translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration); + }; final long extraTranslogSizeInNewEngine = engine.getTranslog().stats().getUncommittedSizeInBytes() - Translog.DEFAULT_HEADER_SIZE_IN_BYTES; int numDocs = between(10, 100); @@ -5098,7 +5088,7 @@ public void testShouldPeriodicallyFlush() throws Exception { assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(); - assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); + assertThat(uncommittedTranslogOperationsSinceLastCommit.getAsInt(), equalTo(0)); // Stale operations skipped by Lucene but added to translog - still able to flush for (int id = 0; id < numDocs; id++) { final ParsedDocument doc = @@ -5107,11 +5097,11 @@ public void testShouldPeriodicallyFlush() throws Exception { assertThat(result.isCreated(), equalTo(false)); } SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos(); - assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(numDocs)); + assertThat(uncommittedTranslogOperationsSinceLastCommit.getAsInt(), equalTo(numDocs)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(true)); engine.flush(false, false); assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); - assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); + assertThat(uncommittedTranslogOperationsSinceLastCommit.getAsInt(), equalTo(0)); // If the new index commit still points to the same translog generation as the current index commit, // we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes. generateNewSeqNo(engine); // create a gap here @@ -6039,11 +6029,21 @@ public void testRecoverFromLocalTranslog() throws Exception { engine.forceMerge(randomBoolean(), 1, false, false, false); } } + if (randomBoolean()) { + // engine is flushed properly before shutting down. + engine.syncTranslog(); + globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); + engine.flush(); + } docs = getDocIds(engine, true); } try (InternalEngine engine = new InternalEngine(config)) { engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertThat(getDocIds(engine, randomBoolean()), equalTo(docs)); + if (engine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo() == globalCheckpoint.get()) { + assertThat("engine should trim all unreferenced translog after recovery", + engine.getTranslog().getMinFileGeneration(), equalTo(engine.getTranslog().currentFileGeneration())); + } } } } @@ -6098,12 +6098,12 @@ public void testAlwaysRecordReplicaOrPeerRecoveryOperationsToTranslog() throws E engine.rollTranslogGeneration(); engine.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED); // trim everything in translog try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) { - assertThat(snapshot.totalOperations(), equalTo(operations.size())); + assertThat(snapshot.totalOperations(), equalTo(0)); assertNull(snapshot.next()); } applyOperations(engine, operations); try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) { - assertThat(snapshot.totalOperations(), equalTo(operations.size() * 2)); + assertThat(snapshot.totalOperations(), equalTo(operations.size())); assertThat(TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()), equalTo(seqNos)); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java index 623bbe0ec50db..08783dc4a1762 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java @@ -28,6 +28,8 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.ParsedDocument; @@ -36,14 +38,12 @@ import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.index.translog.TranslogDeletionPolicy; import org.elasticsearch.test.IndexSettingsModule; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Path; import java.util.Collections; -import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import static org.hamcrest.Matchers.equalTo; @@ -55,7 +55,6 @@ public class NoOpEngineTests extends EngineTestCase { public void testNoopEngine() throws IOException { engine.close(); final NoOpEngine engine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir)); - expectThrows(UnsupportedOperationException.class, () -> engine.syncFlush(null, null)); assertThat(engine.refreshNeeded(), equalTo(false)); assertThat(engine.shouldPeriodicallyFlush(), equalTo(false)); engine.close(); @@ -123,7 +122,7 @@ public void testNoOpEngineStats() throws Exception { for (int i = 0; i < numDocs; i++) { if (randomBoolean()) { String delId = Integer.toString(i); - Engine.DeleteResult result = engine.delete(new Engine.Delete("test", delId, newUid(delId), primaryTerm.get())); + Engine.DeleteResult result = engine.delete(new Engine.Delete("_doc", delId, newUid(delId), primaryTerm.get())); assertTrue(result.isFound()); engine.syncTranslog(); // advance persisted local checkpoint globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); @@ -131,7 +130,7 @@ public void testNoOpEngineStats() throws Exception { } } engine.getLocalCheckpointTracker().waitForProcessedOpsToComplete(numDocs + deletions - 1); - flushAndTrimTranslog(engine); + engine.flush(true, true); } final DocsStats expectedDocStats; @@ -168,52 +167,33 @@ public void testTrimUnreferencedTranslogFiles() throws Exception { IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(shardId).addShard(routing).build(); tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table); tracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); - - boolean softDeleteEnabled = engine.config().getIndexSettings().isSoftDeleteEnabled(); + engine.onSettingsChanged(TimeValue.MINUS_ONE, ByteSizeValue.ZERO, randomNonNegativeLong()); final int numDocs = scaledRandomIntBetween(10, 3000); + int totalTranslogOps = 0; for (int i = 0; i < numDocs; i++) { + totalTranslogOps++; engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null))); tracker.updateLocalCheckpoint(allocationId.getId(), i); if (rarely()) { + totalTranslogOps = 0; engine.flush(); } + if (randomBoolean()) { + engine.rollTranslogGeneration(); + } } + // prevent translog from trimming so we can test trimUnreferencedFiles in NoOpEngine. + final Translog.Snapshot snapshot = engine.getTranslog().newSnapshot(); engine.flush(true, true); - - final String translogUuid = engine.getTranslog().getTranslogUUID(); - final long minFileGeneration = engine.getTranslog().getMinFileGeneration(); - final long currentFileGeneration = engine.getTranslog().currentFileGeneration(); engine.close(); final NoOpEngine noOpEngine = new NoOpEngine(noOpConfig(INDEX_SETTINGS, store, primaryTranslogDir, tracker)); - final Path translogPath = noOpEngine.config().getTranslogConfig().getTranslogPath(); - - final long lastCommitedTranslogGeneration; - try (Engine.IndexCommitRef indexCommitRef = noOpEngine.acquireLastIndexCommit(false)) { - Map lastCommittedUserData = indexCommitRef.getIndexCommit().getUserData(); - lastCommitedTranslogGeneration = Long.parseLong(lastCommittedUserData.get(Translog.TRANSLOG_GENERATION_KEY)); - assertThat(lastCommitedTranslogGeneration, equalTo(currentFileGeneration)); - } - - assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(minFileGeneration)); - assertThat(noOpEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(softDeleteEnabled ? 0 : numDocs)); - assertThat(noOpEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); - + assertThat(noOpEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(totalTranslogOps)); noOpEngine.trimUnreferencedTranslogFiles(); - - assertThat(Translog.readMinTranslogGeneration(translogPath, translogUuid), equalTo(lastCommitedTranslogGeneration)); assertThat(noOpEngine.getTranslogStats().estimatedNumberOfOperations(), equalTo(0)); assertThat(noOpEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); - + assertThat(noOpEngine.getTranslogStats().getTranslogSizeInBytes(), equalTo((long)Translog.DEFAULT_HEADER_SIZE_IN_BYTES)); + snapshot.close(); noOpEngine.close(); } - - private void flushAndTrimTranslog(final InternalEngine engine) { - engine.flush(true, true); - final TranslogDeletionPolicy deletionPolicy = engine.getTranslog().getDeletionPolicy(); - deletionPolicy.setRetentionSizeInBytes(-1); - deletionPolicy.setRetentionAgeInMillis(-1); - deletionPolicy.setMinTranslogGenerationForRecovery(engine.getTranslog().getGeneration().translogFileGeneration); - engine.flush(true, true); - } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 8df9046a4c685..43e76efee998e 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -803,7 +803,7 @@ public void testRollbackOnPromotion() throws Exception { shards.assertAllEqual(initDocs + inFlightOpsOnNewPrimary + moreDocsAfterRollback); done.set(true); thread.join(); - + shards.syncGlobalCheckpoint(); for (IndexShard shard : shards) { shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); assertThat(shard.translogStats().getUncommittedOperations(), equalTo(0)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 9c70accf7ef03..35b7af704dc47 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -352,17 +352,17 @@ public void testMaybeFlush() throws Exception { assertFalse(shard.shouldPeriodicallyFlush()); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), - new ByteSizeValue(190 /* size of the operation + two generations header&footer*/, ByteSizeUnit.BYTES)).build()).get(); - client().prepareIndex("test", "test", "0") + new ByteSizeValue(135 /* size of the operation + one generation header&footer*/, ByteSizeUnit.BYTES)).build()).get(); + client().prepareIndex("test", "_doc").setId("0") .setSource("{}", XContentType.JSON).setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); assertFalse(shard.shouldPeriodicallyFlush()); shard.applyIndexOperationOnPrimary(Versions.MATCH_ANY, VersionType.INTERNAL, - new SourceToParse("test", "test", "1", new BytesArray("{}"), XContentType.JSON), + new SourceToParse("test", "_doc", "1", new BytesArray("{}"), XContentType.JSON), SequenceNumbers.UNASSIGNED_SEQ_NO, 0, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); assertTrue(shard.shouldPeriodicallyFlush()); final Translog translog = getTranslog(shard); assertEquals(2, translog.stats().getUncommittedOperations()); - client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON) + client().prepareIndex("test", "_doc", "2").setSource("{}", XContentType.JSON) .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); assertBusy(() -> { // this is async assertFalse(shard.shouldPeriodicallyFlush()); @@ -376,7 +376,7 @@ public void testMaybeFlush() throws Exception { client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put( IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES)) .build()).get(); - client().prepareDelete("test", "test", "2").get(); + client().prepareDelete("test", "_doc", "2").get(); logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration()); assertBusy(() -> { // this is async @@ -434,8 +434,8 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { final boolean flush = randomBoolean(); final Settings settings; if (flush) { - // size of the operation plus two generations of overhead. - settings = Settings.builder().put("index.translog.flush_threshold_size", "180b").build(); + // size of the operation plus the overhead of one generation. + settings = Settings.builder().put("index.translog.flush_threshold_size", "125b").build(); } else { // size of the operation plus header and footer settings = Settings.builder().put("index.translog.generation_threshold_size", "117b").build(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 64c8c2ef4a7e1..f34d0cbc10444 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -117,7 +117,6 @@ import org.elasticsearch.index.translog.TestTranslog; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogStats; -import org.elasticsearch.index.translog.TranslogTests; import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -1405,7 +1404,7 @@ public void run() { latch.await(); for (int i = 0; i < 10000; i++) { semaphore.acquire(); - shard.sync(TranslogTests.randomTranslogLocation(), (ex) -> semaphore.release()); + shard.sync(new Translog.Location(randomLong(), randomLong(), randomInt()), (ex) -> semaphore.release()); } } catch (Exception ex) { throw new RuntimeException(ex); @@ -2033,17 +2032,20 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { shard.sync(); // advance local checkpoint final int translogOps; + final int replayedOps; if (randomBoolean()) { // Advance the global checkpoint to remove the 1st commit; this shard will recover the 2nd commit. shard.updateGlobalCheckpointOnReplica(3, "test"); logger.info("--> flushing shard"); shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); translogOps = 4; // delete #1 won't be replayed. - } else if (randomBoolean()) { - shard.getEngine().rollTranslogGeneration(); - translogOps = 5; + replayedOps = 3; } else { + if (randomBoolean()) { + shard.getEngine().rollTranslogGeneration(); + } translogOps = 5; + replayedOps = 5; } final ShardRouting replicaRouting = shard.routingEntry(); @@ -2053,10 +2055,9 @@ public void testRecoverFromStoreWithOutOfOrderDelete() throws IOException { DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); assertTrue(recoverFromStore(newShard)); - assertEquals(translogOps, newShard.recoveryState().getTranslog().recoveredOperations()); + assertEquals(replayedOps, newShard.recoveryState().getTranslog().recoveredOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperations()); assertEquals(translogOps, newShard.recoveryState().getTranslog().totalOperationsOnStart()); - assertEquals(100.0f, newShard.recoveryState().getTranslog().recoveredPercent(), 0.01f); updateRoutingEntry(newShard, ShardRoutingHelper.moveToStarted(newShard.routingEntry())); assertDocCount(newShard, 3); closeShards(newShard); diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index 8b0210113d825..ef51c066f7b87 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -126,15 +126,12 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { assertThat(resyncRequest.getMaxSeenAutoIdTimestampOnPrimary(), equalTo(shard.getMaxSeenAutoIdTimestamp())); } if (syncNeeded && globalCheckPoint < numDocs - 1) { + assertThat(resyncTask.getSkippedOperations(), equalTo(0)); + assertThat(resyncTask.getResyncedOperations(), equalTo(Math.toIntExact(numDocs - 1 - globalCheckPoint))); if (shard.indexSettings.isSoftDeleteEnabled()) { - assertThat(resyncTask.getSkippedOperations(), equalTo(0)); - assertThat(resyncTask.getResyncedOperations(), equalTo(resyncTask.getTotalOperations())); assertThat(resyncTask.getTotalOperations(), equalTo(Math.toIntExact(numDocs - 1 - globalCheckPoint))); } else { - int skippedOps = Math.toIntExact(globalCheckPoint + 1); // everything up to global checkpoint included - assertThat(resyncTask.getSkippedOperations(), equalTo(skippedOps)); - assertThat(resyncTask.getResyncedOperations(), equalTo(numDocs - skippedOps)); - assertThat(resyncTask.getTotalOperations(), equalTo(globalCheckPoint == numDocs - 1 ? 0 : numDocs)); + assertThat(resyncTask.getTotalOperations(), equalTo(numDocs)); } } else { assertThat(resyncTask.getSkippedOperations(), equalTo(0)); diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index 59feabb699d72..082c562257418 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -67,7 +67,6 @@ import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; @@ -845,9 +844,7 @@ public void testUserDataRead() throws IOException { writer.addDocument(doc); Map commitData = new HashMap<>(2); String syncId = "a sync id"; - String translogId = "a translog id"; commitData.put(Engine.SYNC_COMMIT_ID, syncId); - commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogId); writer.setLiveCommitData(commitData.entrySet()); writer.commit(); writer.close(); @@ -856,7 +853,6 @@ public void testUserDataRead() throws IOException { assertFalse(metadata.asMap().isEmpty()); // do not check for correct files, we have enough tests for that above assertThat(metadata.getCommitUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncId)); - assertThat(metadata.getCommitUserData().get(Translog.TRANSLOG_GENERATION_KEY), equalTo(translogId)); TestUtil.checkIndex(store.directory()); assertDeleteContent(store, store.directory()); IOUtils.close(store); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java index d78a731674258..56de723ac50cf 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TestTranslog.java @@ -22,12 +22,8 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.apache.logging.log4j.Logger; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.index.engine.CombinedDeletionPolicy; import java.io.IOException; import java.nio.ByteBuffer; @@ -69,7 +65,7 @@ public class TestTranslog { * See {@link TestTranslog#corruptFile(Logger, Random, Path, boolean)} for details of the corruption applied. */ public static void corruptRandomTranslogFile(Logger logger, Random random, Path translogDir) throws IOException { - corruptRandomTranslogFile(logger, random, translogDir, minTranslogGenUsedInRecovery(translogDir)); + corruptRandomTranslogFile(logger, random, translogDir, Translog.readCheckpoint(translogDir).minTranslogGeneration); } /** @@ -188,19 +184,6 @@ static void corruptFile(Logger logger, Random random, Path fileToCorrupt, boolea } } - /** - * Lists all existing commits in a given index path, then read the minimum translog generation that will be used in recoverFromTranslog. - */ - private static long minTranslogGenUsedInRecovery(Path translogPath) throws IOException { - try (NIOFSDirectory directory = new NIOFSDirectory(translogPath.getParent().resolve("index"))) { - List commits = DirectoryReader.listCommits(directory); - final String translogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY); - long globalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID); - IndexCommit recoveringCommit = CombinedDeletionPolicy.findSafeCommitPoint(commits, globalCheckpoint); - return Long.parseLong(recoveringCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); - } - } - /** * Returns the primary term associated with the current translog writer of the given translog. */ diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java index 0d296af5f0c8d..873a210df0aef 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java @@ -36,6 +36,7 @@ import java.util.ArrayList; import java.util.List; +import static java.lang.Math.min; import static org.hamcrest.Matchers.equalTo; @@ -48,12 +49,7 @@ public void testNoRetention() throws IOException { allGens.add(readersAndWriter.v2()); try { TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, 0, 0, 0); - assertMinGenRequired(deletionPolicy, readersAndWriter, 1L); - final int committedReader = randomIntBetween(0, allGens.size() - 1); - final long committedGen = allGens.get(committedReader).generation; - deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(committedGen, Long.MAX_VALUE)); - deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); - assertMinGenRequired(deletionPolicy, readersAndWriter, committedGen); + assertMinGenRequired(deletionPolicy, readersAndWriter, allGens.get(allGens.size() - 1).generation); } finally { IOUtils.close(readersAndWriter.v1()); IOUtils.close(readersAndWriter.v2()); @@ -127,8 +123,6 @@ public void testRetentionHierarchy() throws IOException { allGens.add(readersAndWriter.v2()); try { TranslogDeletionPolicy deletionPolicy = new MockDeletionPolicy(now, Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE); - deletionPolicy.setTranslogGenerationOfLastCommit(Long.MAX_VALUE); - deletionPolicy.setMinTranslogGenerationForRecovery(Long.MAX_VALUE); int selectedReader = randomIntBetween(0, allGens.size() - 1); final long selectedGenerationByAge = allGens.get(selectedReader).generation; long maxAge = now - allGens.get(selectedReader).getLastModifiedTime(); @@ -145,31 +139,28 @@ public void testRetentionHierarchy() throws IOException { max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles)); // make a new policy as committed gen can't go backwards (for now) deletionPolicy = new MockDeletionPolicy(now, size, maxAge, totalFiles); - long committedGen = randomFrom(allGens).generation; - deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(committedGen, Long.MAX_VALUE)); - deletionPolicy.setMinTranslogGenerationForRecovery(committedGen); - assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, - max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles))); + assertMinGenRequired(deletionPolicy, readersAndWriter, + max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles)); long viewGen = randomFrom(allGens).generation; try (Releasable ignored = deletionPolicy.acquireTranslogGen(viewGen)) { assertMinGenRequired(deletionPolicy, readersAndWriter, - min3(committedGen, viewGen, max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles))); + min(viewGen, max3(selectedGenerationByAge, selectedGenerationBySize, selectedGenerationByTotalFiles))); // disable age deletionPolicy.setRetentionAgeInMillis(-1); assertMinGenRequired(deletionPolicy, readersAndWriter, - min3(committedGen, viewGen, Math.max(selectedGenerationBySize, selectedGenerationByTotalFiles))); + min(viewGen, Math.max(selectedGenerationBySize, selectedGenerationByTotalFiles))); // disable size deletionPolicy.setRetentionAgeInMillis(maxAge); deletionPolicy.setRetentionSizeInBytes(-1); assertMinGenRequired(deletionPolicy, readersAndWriter, - min3(committedGen, viewGen, Math.max(selectedGenerationByAge, selectedGenerationByTotalFiles))); + min(viewGen, Math.max(selectedGenerationByAge, selectedGenerationByTotalFiles))); // disable age and zie deletionPolicy.setRetentionAgeInMillis(-1); deletionPolicy.setRetentionSizeInBytes(-1); - assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen)); + assertMinGenRequired(deletionPolicy, readersAndWriter, viewGen); // disable total files deletionPolicy.setRetentionTotalFiles(0); - assertMinGenRequired(deletionPolicy, readersAndWriter, Math.min(committedGen, viewGen)); + assertMinGenRequired(deletionPolicy, readersAndWriter, viewGen); } } finally { IOUtils.close(readersAndWriter.v1()); @@ -232,8 +223,4 @@ protected long currentTime() { private static long max3(long x1, long x2, long x3) { return Math.max(Math.max(x1, x2), x3); } - - private static long min3(long x1, long x2, long x3) { - return Math.min(Math.min(x1, x2), x3); - } } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 616254a16f96d..9a64d09b3cef9 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -166,7 +166,7 @@ protected void afterIfSuccessful() throws Exception { if (translog.isOpen()) { if (translog.currentFileGeneration() > 1) { - markCurrentGenAsCommitted(translog); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(Long.MAX_VALUE); translog.trimUnreferencedReaders(); assertFileDeleted(translog, translog.currentFileGeneration() - 1); } @@ -201,28 +201,6 @@ protected Translog openTranslog(TranslogConfig config, String translogUUID) thro } - private void markCurrentGenAsCommitted(Translog translog) throws IOException { - long genToCommit = translog.currentFileGeneration(); - long genToRetain = randomLongBetween(translog.getDeletionPolicy().getMinTranslogGenerationForRecovery(), genToCommit); - commit(translog, genToRetain, genToCommit); - } - - private void rollAndCommit(Translog translog) throws IOException { - translog.rollGeneration(); - markCurrentGenAsCommitted(translog); - } - - private long commit(Translog translog, long genToRetain, long genToCommit) throws IOException { - final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); - deletionPolicy.setTranslogGenerationOfLastCommit(genToCommit); - deletionPolicy.setMinTranslogGenerationForRecovery(genToRetain); - long minGenRequired = deletionPolicy.minTranslogGenRequired(translog.getReaders(), translog.getCurrent()); - translog.trimUnreferencedReaders(); - assertThat(minGenRequired, equalTo(translog.getMinFileGeneration())); - assertFilePresences(translog); - return minGenRequired; - } - @Override @Before public void setUp() throws Exception { @@ -356,7 +334,7 @@ public void testSimpleOperations() throws IOException { assertThat(snapshot.totalOperations(), equalTo(ops.size())); } - final long seqNo = randomNonNegativeLong(); + final long seqNo = randomLongBetween(0, Integer.MAX_VALUE); final String reason = randomAlphaOfLength(16); final long noopTerm = randomLongBetween(1, primaryTerm.get()); addToTranslogAndList(translog, ops, new Translog.NoOp(seqNo, noopTerm, reason)); @@ -389,9 +367,7 @@ public void testSimpleOperations() throws IOException { assertThat(snapshot.totalOperations(), equalTo(ops.size())); } - markCurrentGenAsCommitted(translog); - try (Translog.Snapshot snapshot = translog.newSnapshotFromGen( - new Translog.TranslogGeneration(translog.getTranslogUUID(), firstId + 1), randomNonNegativeLong())) { + try (Translog.Snapshot snapshot = translog.newSnapshot(seqNo + 1, randomLongBetween(seqNo + 1, Long.MAX_VALUE))) { assertThat(snapshot, SnapshotMatchers.size(0)); assertThat(snapshot.totalOperations(), equalTo(0)); } @@ -450,8 +426,8 @@ public void testStats() throws IOException { assertThat(stats.estimatedNumberOfOperations(), equalTo(1)); assertThat(stats.getTranslogSizeInBytes(), equalTo(162L)); assertThat(stats.getUncommittedOperations(), equalTo(1)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(162L)); - assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(107L)); + assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } translog.add(new Translog.Delete("test", "2", 1, primaryTerm.get(), newUid("2"))); @@ -460,8 +436,8 @@ public void testStats() throws IOException { assertThat(stats.estimatedNumberOfOperations(), equalTo(2)); assertThat(stats.getTranslogSizeInBytes(), equalTo(210L)); assertThat(stats.getUncommittedOperations(), equalTo(2)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(210L)); - assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(155L)); + assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } translog.add(new Translog.Delete("test", "3", 2, primaryTerm.get(), newUid("3"))); @@ -470,8 +446,8 @@ public void testStats() throws IOException { assertThat(stats.estimatedNumberOfOperations(), equalTo(3)); assertThat(stats.getTranslogSizeInBytes(), equalTo(258L)); assertThat(stats.getUncommittedOperations(), equalTo(3)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(258L)); - assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(203L)); + assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } translog.add(new Translog.NoOp(3, 1, randomAlphaOfLength(16))); @@ -480,19 +456,18 @@ public void testStats() throws IOException { assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); assertThat(stats.getTranslogSizeInBytes(), equalTo(300L)); assertThat(stats.getUncommittedOperations(), equalTo(4)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(300L)); - assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(245L)); + assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } - final long expectedSizeInBytes = 355L; translog.rollGeneration(); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(355L)); assertThat(stats.getUncommittedOperations(), equalTo(4)); - assertThat(stats.getUncommittedSizeInBytes(), equalTo(expectedSizeInBytes)); - assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); + assertThat(stats.getUncommittedSizeInBytes(), equalTo(300L)); + assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } { @@ -501,26 +476,26 @@ public void testStats() throws IOException { stats.writeTo(out); final TranslogStats copy = new TranslogStats(out.bytes().streamInput()); assertThat(copy.estimatedNumberOfOperations(), equalTo(4)); - assertThat(copy.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes)); + assertThat(copy.getTranslogSizeInBytes(), equalTo(355L)); try (XContentBuilder builder = XContentFactory.jsonBuilder()) { builder.startObject(); copy.toXContent(builder, ToXContent.EMPTY_PARAMS); builder.endObject(); - assertThat(Strings.toString(builder), equalTo("{\"translog\":{\"operations\":4,\"size_in_bytes\":" + expectedSizeInBytes - + ",\"uncommitted_operations\":4,\"uncommitted_size_in_bytes\":" + expectedSizeInBytes + assertThat(Strings.toString(builder), equalTo("{\"translog\":{\"operations\":4,\"size_in_bytes\":" + 355 + + ",\"uncommitted_operations\":4,\"uncommitted_size_in_bytes\":" + 300 + ",\"earliest_last_modified_age\":" + stats.getEarliestLastModifiedAge() + "}}")); } } - - markCurrentGenAsCommitted(translog); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(randomLongBetween(3, Long.MAX_VALUE)); + translog.trimUnreferencedReaders(); { final TranslogStats stats = stats(); assertThat(stats.estimatedNumberOfOperations(), equalTo(4)); - assertThat(stats.getTranslogSizeInBytes(), equalTo(expectedSizeInBytes)); + assertThat(stats.getTranslogSizeInBytes(), equalTo(355L)); assertThat(stats.getUncommittedOperations(), equalTo(0)); assertThat(stats.getUncommittedSizeInBytes(), equalTo(firstOperationPosition)); - assertThat(stats.getEarliestLastModifiedAge(), greaterThan(1L)); + assertThat(stats.getEarliestLastModifiedAge(), greaterThan(0L)); } } @@ -542,7 +517,7 @@ public void testUncommittedOperations() throws Exception { } assertThat(translog.stats().getUncommittedOperations(), equalTo(uncommittedOps)); if (frequently()) { - markCurrentGenAsCommitted(translog); + deletionPolicy.setLocalCheckpointOfSafeCommit(i); assertThat(translog.stats().getUncommittedOperations(), equalTo(operationsInLastGen)); uncommittedOps = operationsInLastGen; } @@ -603,7 +578,7 @@ public void testOldestEntryInSeconds() { assertThat(e, hasToString(containsString("earliestLastModifiedAge must be >= 0"))); } - public void testSnapshot() throws IOException { + public void testBasicSnapshot() throws IOException { ArrayList ops = new ArrayList<>(); try (Translog.Snapshot snapshot = translog.newSnapshot()) { assertThat(snapshot, SnapshotMatchers.size(0)); @@ -611,13 +586,13 @@ public void testSnapshot() throws IOException { addToTranslogAndList(translog, ops, new Translog.Index("test", "1", 0, primaryTerm.get(), new byte[]{1})); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = translog.newSnapshot(0, Long.MAX_VALUE)) { assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); assertThat(snapshot.totalOperations(), equalTo(1)); } - try (Translog.Snapshot snapshot = translog.newSnapshot(); - Translog.Snapshot snapshot1 = translog.newSnapshot()) { + try (Translog.Snapshot snapshot = translog.newSnapshot(0, randomIntBetween(0, 10)); + Translog.Snapshot snapshot1 = translog.newSnapshot(0, randomIntBetween(0, 10))) { assertThat(snapshot, SnapshotMatchers.equalsTo(ops)); assertThat(snapshot.totalOperations(), equalTo(1)); @@ -660,7 +635,7 @@ public void testSnapshotWithNewTranslog() throws IOException { Translog.Snapshot snapshot2 = translog.newSnapshot(); toClose.add(snapshot2); - markCurrentGenAsCommitted(translog); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(2); assertThat(snapshot2, containsOperationsInAnyOrder(ops)); assertThat(snapshot2.totalOperations(), equalTo(ops.size())); } finally { @@ -676,78 +651,62 @@ public void testSnapshotOnClosedTranslog() throws IOException { assertEquals(ex.getMessage(), "translog is already closed"); } - public void testSnapshotFromMinGen() throws Exception { - Map> operationsByGen = new HashMap<>(); - try (Translog.Snapshot snapshot = translog.newSnapshotFromGen( - new Translog.TranslogGeneration(translog.getTranslogUUID(), 1), randomNonNegativeLong())) { - assertThat(snapshot, SnapshotMatchers.size(0)); - } - int iters = between(1, 10); - for (int i = 0; i < iters; i++) { - long currentGeneration = translog.currentFileGeneration(); - operationsByGen.putIfAbsent(currentGeneration, new ArrayList<>()); - int numOps = between(0, 20); - for (int op = 0; op < numOps; op++) { - long seqNo = randomLongBetween(0, 1000); - addToTranslogAndList(translog, operationsByGen.get(currentGeneration), new Translog.Index("test", - Long.toString(seqNo), seqNo, primaryTerm.get(), new byte[]{1})); - } - long minGen = randomLongBetween(translog.getMinFileGeneration(), translog.currentFileGeneration()); - try (Translog.Snapshot snapshot = translog.newSnapshotFromGen( - new Translog.TranslogGeneration(translog.getTranslogUUID(), minGen), Long.MAX_VALUE)) { - List expectedOps = operationsByGen.entrySet().stream() - .filter(e -> e.getKey() >= minGen) - .flatMap(e -> e.getValue().stream()) - .collect(Collectors.toList()); - assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedOps)); - } - long upToSeqNo = randomLongBetween(0, 2000); - try (Translog.Snapshot snapshot = translog.newSnapshotFromGen( - new Translog.TranslogGeneration(translog.getTranslogUUID(), minGen), upToSeqNo)) { - List expectedOps = operationsByGen.entrySet().stream() - .filter(e -> e.getKey() >= minGen) - .flatMap(e -> e.getValue().stream().filter(op -> op.seqNo() <= upToSeqNo)) - .collect(Collectors.toList()); - assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedOps)); - } - translog.rollGeneration(); - } - } - - public void testSeqNoFilterSnapshot() throws Exception { + public void testRangeSnapshot() throws Exception { + long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; final int generations = between(2, 20); + Map> operationsByGen = new HashMap<>(); for (int gen = 0; gen < generations; gen++) { - List batch = LongStream.rangeClosed(0, between(0, 100)).boxed().collect(Collectors.toList()); - Randomness.shuffle(batch); - for (long seqNo : batch) { - Translog.Index op = - new Translog.Index("doc", randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[]{1}); + Set seqNos = new HashSet<>(); + int numOps = randomIntBetween(1, 100); + for (int i = 0; i < numOps; i++) { + final long seqNo = randomValueOtherThanMany(seqNos::contains, () -> randomLongBetween(0, 1000)); + minSeqNo = SequenceNumbers.min(minSeqNo, seqNo); + maxSeqNo = SequenceNumbers.max(maxSeqNo, seqNo); + seqNos.add(seqNo); + } + List ops = new ArrayList<>(seqNos.size()); + for (long seqNo : seqNos) { + Translog.Index op = new Translog.Index("_doc", randomAlphaOfLength(10), seqNo, primaryTerm.get(), new byte[]{randomByte()}); translog.add(op); + ops.add(op); } + operationsByGen.put(translog.currentFileGeneration(), ops); translog.rollGeneration(); + if (rarely()) { + translog.rollGeneration(); // empty generation + } } - List operations = new ArrayList<>(); - try (Translog.Snapshot snapshot = translog.newSnapshot()) { - Translog.Operation op; - while ((op = snapshot.next()) != null) { - operations.add(op); + + if (minSeqNo > 0) { + long fromSeqNo = randomLongBetween(0, minSeqNo - 1); + long toSeqNo = randomLongBetween(fromSeqNo, minSeqNo - 1); + try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo)) { + assertThat(snapshot.totalOperations(), equalTo(0)); + assertNull(snapshot.next()); } } - try (Translog.Snapshot snapshot = translog.newSnapshot()) { - Translog.Snapshot filter = new Translog.SeqNoFilterSnapshot(snapshot, between(200, 300), between(300, 400)); // out range - assertThat(filter, SnapshotMatchers.size(0)); - assertThat(filter.totalOperations(), equalTo(snapshot.totalOperations())); - assertThat(filter.skippedOperations(), equalTo(snapshot.totalOperations())); + + long fromSeqNo = randomLongBetween(maxSeqNo + 1, Long.MAX_VALUE); + long toSeqNo = randomLongBetween(fromSeqNo, Long.MAX_VALUE); + try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo)) { + assertThat(snapshot.totalOperations(), equalTo(0)); + assertNull(snapshot.next()); } - try (Translog.Snapshot snapshot = translog.newSnapshot()) { - int fromSeqNo = between(-2, 500); - int toSeqNo = between(fromSeqNo, 500); - List selectedOps = operations.stream() - .filter(op -> fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo).collect(Collectors.toList()); - Translog.Snapshot filter = new Translog.SeqNoFilterSnapshot(snapshot, fromSeqNo, toSeqNo); - assertThat(filter, SnapshotMatchers.containsOperationsInAnyOrder(selectedOps)); - assertThat(filter.totalOperations(), equalTo(snapshot.totalOperations())); - assertThat(filter.skippedOperations(), equalTo(snapshot.skippedOperations() + operations.size() - selectedOps.size())); + + fromSeqNo = randomLongBetween(0, 2000); + toSeqNo = randomLongBetween(fromSeqNo, 2000); + try (Translog.Snapshot snapshot = translog.newSnapshot(fromSeqNo, toSeqNo)) { + Set seenSeqNos = new HashSet<>(); + List expectedOps = new ArrayList<>(); + for (long gen = translog.currentFileGeneration(); gen > 0; gen--) { + for (Translog.Operation op : operationsByGen.getOrDefault(gen, Collections.emptyList())) { + if (fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && seenSeqNos.add(op.seqNo())) { + expectedOps.add(op); + } + } + } + assertThat(TestTranslog.drainSnapshot(snapshot, false), equalTo(expectedOps)); } } @@ -769,6 +728,7 @@ private void assertFilePresences(Translog translog) { for (long gen = 1; gen < translog.getMinFileGeneration(); gen++) { assertFileDeleted(translog, gen); } + } static class LocationOperation implements Comparable { @@ -950,8 +910,8 @@ public void testVerifyTranslogIsNotDeleted() throws IOException { assertThat(snapshot.totalOperations(), equalTo(1)); } translog.close(); - - assertFileIsPresent(translog, 1); + assertFileDeleted(translog, 1); + assertFileIsPresent(translog, 2); } /** @@ -1023,9 +983,7 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep translog.rollGeneration(); // expose the new checkpoint (simulating a commit), before we trim the translog lastCommittedLocalCheckpoint.set(localCheckpoint); - deletionPolicy.setTranslogGenerationOfLastCommit(translog.currentFileGeneration()); - deletionPolicy.setMinTranslogGenerationForRecovery( - translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); translog.trimUnreferencedReaders(); } } @@ -1094,7 +1052,7 @@ protected void doRun() throws Exception { // these are what we expect the snapshot to return (and potentially some more). Set expectedOps = new HashSet<>(writtenOps.keySet()); expectedOps.removeIf(op -> op.seqNo() <= committedLocalCheckpointAtView); - try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(committedLocalCheckpointAtView + 1L)) { + try (Translog.Snapshot snapshot = translog.newSnapshot(committedLocalCheckpointAtView + 1L, Long.MAX_VALUE)) { Translog.Operation op; while ((op = snapshot.next()) != null) { expectedOps.remove(op); @@ -1172,7 +1130,7 @@ public void testSyncUpTo() throws IOException { assertTrue("we only synced a previous operation yet", translog.syncNeeded()); } if (rarely()) { - rollAndCommit(translog); + translog.rollGeneration(); assertFalse("location is from a previous translog - already synced", translog.ensureSynced(location)); // not syncing now assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); } @@ -1192,7 +1150,7 @@ public void testSyncUpToStream() throws IOException { ArrayList locations = new ArrayList<>(); for (int op = 0; op < translogOperations; op++) { if (rarely()) { - rollAndCommit(translog); // do this first so that there is at least one pending tlog entry + translog.rollGeneration(); } final Translog.Location location = translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), @@ -1206,7 +1164,7 @@ public void testSyncUpToStream() throws IOException { // we are the last location so everything should be synced assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); } else if (rarely()) { - rollAndCommit(translog); + translog.rollGeneration(); // not syncing now assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream())); assertFalse("no sync needed since no operations in current translog", translog.syncNeeded()); @@ -1229,7 +1187,7 @@ public void testLocationComparison() throws IOException { translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(++count).getBytes(Charset.forName("UTF-8"))))); if (rarely() && translogOperations > op + 1) { - rollAndCommit(translog); + translog.rollGeneration(); } } Collections.shuffle(locations, random()); @@ -1412,7 +1370,7 @@ public void testBasicRecovery() throws IOException { Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); final boolean commit = commitOften ? frequently() : rarely(); if (commit && op < translogOperations - 1) { - rollAndCommit(translog); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(op); minUncommittedOp = op + 1; translogGeneration = translog.getGeneration(); } @@ -1435,7 +1393,7 @@ public void testBasicRecovery() throws IOException { assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, translog.currentFileGeneration()); assertFalse(translog.syncNeeded()); - try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(translogGeneration, Long.MAX_VALUE)) { + try (Translog.Snapshot snapshot = translog.newSnapshot(minUncommittedOp, Long.MAX_VALUE)) { for (int i = minUncommittedOp; i < translogOperations; i++) { assertEquals("expected operation" + i + " to be in the previous translog but wasn't", translog.currentFileGeneration() - 1, locations.get(i).generation); @@ -1866,7 +1824,9 @@ public void testOpenForeignTranslog() throws IOException { locations.add(translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); if (randomBoolean()) { - rollAndCommit(translog); + translog.rollGeneration(); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(op); + translog.trimUnreferencedReaders(); firstUncommitted = op + 1; } } @@ -1887,7 +1847,7 @@ public void testOpenForeignTranslog() throws IOException { } this.translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}); - try (Translog.Snapshot snapshot = this.translog.newSnapshotFromGen(translogGeneration, Long.MAX_VALUE)) { + try (Translog.Snapshot snapshot = this.translog.newSnapshot(randomLongBetween(0, firstUncommitted), Long.MAX_VALUE)) { for (int i = firstUncommitted; i < translogOperations; i++) { Translog.Operation next = snapshot.next(); assertNotNull("" + i, next); @@ -2068,7 +2028,7 @@ public void testFailFlush() throws IOException { } try { - rollAndCommit(translog); + translog.rollGeneration(); fail("already closed"); } catch (AlreadyClosedException ex) { assertNotNull(ex.getCause()); @@ -2246,28 +2206,29 @@ protected void afterAdd() throws IOException { */ public void testRecoveryFromAFutureGenerationCleansUp() throws IOException { int translogOperations = randomIntBetween(10, 100); - for (int op = 0; op < translogOperations / 2; op++) { - translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), + int op = 0; + for (; op < translogOperations / 2; op++) { + translog.add(new Translog.Index("_doc", Integer.toString(op), op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))); if (rarely()) { translog.rollGeneration(); } } translog.rollGeneration(); - long comittedGeneration = randomLongBetween(2, translog.currentFileGeneration()); - for (int op = translogOperations / 2; op < translogOperations; op++) { - translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), + long localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, op); + for (op = translogOperations / 2; op < translogOperations; op++) { + translog.add(new Translog.Index("test", Integer.toString(op), op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))); if (rarely()) { translog.rollGeneration(); } } + long minRetainedGen = translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; // engine blows up, after committing the above generation translog.close(); TranslogConfig config = translog.getConfig(); final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1, 0); - deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE)); - deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); translog = new Translog(config, translog.getTranslogUUID(), deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}); assertThat(translog.getMinFileGeneration(), equalTo(1L)); @@ -2276,7 +2237,7 @@ public void testRecoveryFromAFutureGenerationCleansUp() throws IOException { assertFileIsPresent(translog, gen); } translog.trimUnreferencedReaders(); - for (long gen = 1; gen < comittedGeneration; gen++) { + for (long gen = 1; gen < minRetainedGen; gen++) { assertFileDeleted(translog, gen); } } @@ -2290,8 +2251,9 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { final FailSwitch fail = new FailSwitch(); fail.failNever(); final TranslogConfig config = getTranslogConfig(tempDir); - final long comittedGeneration; + final long localCheckpoint; final String translogUUID; + long minGenForRecovery = 1L; try (Translog translog = getFailableTranslog(fail, config)) { final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); // disable retention so we trim things @@ -2299,24 +2261,25 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { deletionPolicy.setRetentionAgeInMillis(-1); translogUUID = translog.getTranslogUUID(); int translogOperations = randomIntBetween(10, 100); - for (int op = 0; op < translogOperations / 2; op++) { - translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), + int op = 0; + for (; op < translogOperations / 2; op++) { + translog.add(new Translog.Index("test", Integer.toString(op), op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))); if (rarely()) { translog.rollGeneration(); } } translog.rollGeneration(); - comittedGeneration = randomLongBetween(2, translog.currentFileGeneration()); - for (int op = translogOperations / 2; op < translogOperations; op++) { - translog.add(new Translog.Index("test", "" + op, op, primaryTerm.get(), + localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, op); + for (op = translogOperations / 2; op < translogOperations; op++) { + translog.add(new Translog.Index("test", Integer.toString(op), op, primaryTerm.get(), Integer.toString(op).getBytes(Charset.forName("UTF-8")))); if (rarely()) { translog.rollGeneration(); } } - deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, translog.currentFileGeneration())); - deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); + minGenForRecovery = translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; fail.failRandomly(); try { translog.trimUnreferencedReaders(); @@ -2325,16 +2288,16 @@ public void testRecoveryFromFailureOnTrimming() throws IOException { } } final TranslogDeletionPolicy deletionPolicy = new TranslogDeletionPolicy(-1, -1, 0); - deletionPolicy.setTranslogGenerationOfLastCommit(randomLongBetween(comittedGeneration, Long.MAX_VALUE)); - deletionPolicy.setMinTranslogGenerationForRecovery(comittedGeneration); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint); try (Translog translog = new Translog(config, translogUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {})) { // we don't know when things broke exactly assertThat(translog.getMinFileGeneration(), greaterThanOrEqualTo(1L)); - assertThat(translog.getMinFileGeneration(), lessThanOrEqualTo(comittedGeneration)); + assertThat(translog.getMinFileGeneration(), lessThanOrEqualTo(minGenForRecovery)); assertFilePresences(translog); + minGenForRecovery = translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration; translog.trimUnreferencedReaders(); - assertThat(translog.getMinFileGeneration(), equalTo(comittedGeneration)); + assertThat(translog.getMinFileGeneration(), equalTo(minGenForRecovery)); assertFilePresences(translog); } } @@ -2642,7 +2605,7 @@ public void testWithRandomException() throws IOException { fail.failRandomly(); TranslogConfig config = getTranslogConfig(tempDir); final int numOps = randomIntBetween(100, 200); - long minGenForRecovery = 1; + long localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED; List syncedDocs = new ArrayList<>(); List unsynced = new ArrayList<>(); if (randomBoolean()) { @@ -2672,8 +2635,8 @@ public void testWithRandomException() throws IOException { unsynced.clear(); failableTLog.rollGeneration(); committing = true; - failableTLog.getDeletionPolicy().setTranslogGenerationOfLastCommit(failableTLog.currentFileGeneration()); - failableTLog.getDeletionPolicy().setMinTranslogGenerationForRecovery(failableTLog.currentFileGeneration()); + failableTLog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(opsAdded); + syncedDocs.clear(); failableTLog.trimUnreferencedReaders(); committing = false; syncedDocs.clear(); @@ -2705,7 +2668,7 @@ public void testWithRandomException() throws IOException { assertThat(unsynced, empty()); } generationUUID = failableTLog.getTranslogUUID(); - minGenForRecovery = failableTLog.getDeletionPolicy().getMinTranslogGenerationForRecovery(); + localCheckpointOfSafeCommit = failableTLog.getDeletionPolicy().getLocalCheckpointOfSafeCommit(); IOUtils.closeWhileHandlingException(failableTLog); } } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) { @@ -2717,8 +2680,7 @@ public void testWithRandomException() throws IOException { if (randomBoolean()) { try { TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(); - deletionPolicy.setTranslogGenerationOfLastCommit(minGenForRecovery); - deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false, generationUUID, deletionPolicy)); } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) { // failed - that's ok, we didn't even create it @@ -2729,8 +2691,7 @@ public void testWithRandomException() throws IOException { fail.failNever(); // we don't wanna fail here but we might since we write a new checkpoint and create a new tlog file TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(); - deletionPolicy.setTranslogGenerationOfLastCommit(minGenForRecovery); - deletionPolicy.setMinTranslogGenerationForRecovery(minGenForRecovery); + deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); if (generationUUID == null) { // we never managed to successfully create a translog, make it generationUUID = Translog.createEmptyTranslog(config.getTranslogPath(), @@ -2738,8 +2699,7 @@ public void testWithRandomException() throws IOException { } try (Translog translog = new Translog(config, generationUUID, deletionPolicy, () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, seqNo -> {}); - Translog.Snapshot snapshot = translog.newSnapshotFromGen( - new Translog.TranslogGeneration(generationUUID, minGenForRecovery), Long.MAX_VALUE)) { + Translog.Snapshot snapshot = translog.newSnapshot(localCheckpointOfSafeCommit + 1, Long.MAX_VALUE)) { assertEquals(syncedDocs.size(), snapshot.totalOperations()); for (int i = 0; i < syncedDocs.size(); i++) { Translog.Operation next = snapshot.next(); @@ -2922,10 +2882,20 @@ public void testRollGeneration() throws Exception { .map(t -> t.getPrimaryTerm()).collect(Collectors.toList()); assertThat(storedPrimaryTerms, equalTo(primaryTerms)); } - long minGenForRecovery = randomLongBetween(generation, generation + rolls); - commit(translog, minGenForRecovery, generation + rolls); + + final BaseTranslogReader minRetainedReader = randomFrom( + Stream.concat(translog.getReaders().stream(), Stream.of(translog.getCurrent())) + .filter(r -> r.getCheckpoint().minSeqNo >= 0) + .collect(Collectors.toList())); + int retainedOps = Stream.concat(translog.getReaders().stream(), Stream.of(translog.getCurrent())) + .filter(r -> r.getCheckpoint().generation >= minRetainedReader.generation) + .mapToInt(r -> r.getCheckpoint().numOps) + .sum(); + deletionPolicy.setLocalCheckpointOfSafeCommit( + randomLongBetween(minRetainedReader.getCheckpoint().minSeqNo, minRetainedReader.getCheckpoint().maxSeqNo) - 1); + translog.trimUnreferencedReaders(); assertThat(translog.currentFileGeneration(), equalTo(generation + rolls)); - assertThat(translog.stats().getUncommittedOperations(), equalTo(0)); + assertThat(translog.stats().getUncommittedOperations(), equalTo(retainedOps)); if (longRetention) { for (int i = 0; i <= rolls; i++) { assertFileIsPresent(translog, generation + i); @@ -2933,17 +2903,17 @@ public void testRollGeneration() throws Exception { deletionPolicy.setRetentionAgeInMillis(randomBoolean() ? 100 : -1); assertBusy(() -> { translog.trimUnreferencedReaders(); - for (long i = 0; i < minGenForRecovery; i++) { + for (long i = 0; i < minRetainedReader.generation; i++) { assertFileDeleted(translog, i); } }); } else { // immediate cleanup - for (long i = 0; i < minGenForRecovery; i++) { + for (long i = 0; i < minRetainedReader.generation; i++) { assertFileDeleted(translog, i); } } - for (long i = minGenForRecovery; i < generation + rolls; i++) { + for (long i = minRetainedReader.generation; i < generation + rolls; i++) { assertFileIsPresent(translog, i); } } @@ -3002,7 +2972,7 @@ public void testMinSeqNoBasedAPI() throws IOException { } assertThat(translog.estimateTotalOperationsFromMinSeq(seqNo), equalTo(expectedSnapshotOps)); int readFromSnapshot = 0; - try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(seqNo)) { + try (Translog.Snapshot snapshot = translog.newSnapshot(seqNo, Long.MAX_VALUE)) { assertThat(snapshot.totalOperations(), equalTo(expectedSnapshotOps)); Translog.Operation op; while ((op = snapshot.next()) != null) { @@ -3031,8 +3001,7 @@ public void testSimpleCommit() throws IOException { translog.rollGeneration(); } } - long lastGen = randomLongBetween(1, translog.currentFileGeneration()); - commit(translog, randomLongBetween(1, lastGen), lastGen); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit(randomLongBetween(0, operations)); } public void testAcquiredLockIsPassedToDeletionPolicy() throws IOException { @@ -3044,9 +3013,8 @@ public void testAcquiredLockIsPassedToDeletionPolicy() throws IOException { translog.rollGeneration(); } if (rarely()) { - long lastGen = randomLongBetween(deletionPolicy.getTranslogGenerationOfLastCommit(), translog.currentFileGeneration()); - long minGen = randomLongBetween(deletionPolicy.getMinTranslogGenerationForRecovery(), lastGen); - commit(translog, minGen, lastGen); + translog.getDeletionPolicy().setLocalCheckpointOfSafeCommit( + randomLongBetween(deletionPolicy.getLocalCheckpointOfSafeCommit(), i)); } if (frequently()) { long minGen; @@ -3069,7 +3037,7 @@ public void testReadGlobalCheckpoint() throws Exception { translog.rollGeneration(); } } - rollAndCommit(translog); + translog.rollGeneration(); translog.close(); assertThat(Translog.readGlobalCheckpoint(translogDir, translogUUID), equalTo(globalCheckpoint.get())); expectThrows(TranslogCorruptedException.class, () -> Translog.readGlobalCheckpoint(translogDir, UUIDs.randomBase64UUID())); @@ -3201,9 +3169,8 @@ public void testMaxSeqNo() throws Exception { translog.sync(); assertThat(translog.getMaxSeqNo(), equalTo(maxSeqNoPerGeneration.isEmpty() ? SequenceNumbers.NO_OPS_PERFORMED : Collections.max(maxSeqNoPerGeneration.values()))); - long minRetainedGen = commit(translog, randomLongBetween(1, translog.currentFileGeneration()), translog.currentFileGeneration()); long expectedMaxSeqNo = maxSeqNoPerGeneration.entrySet().stream() - .filter(e -> e.getKey() >= minRetainedGen).mapToLong(e -> e.getValue()) + .filter(e -> e.getKey() >= translog.getMinFileGeneration()).mapToLong(e -> e.getValue()) .max().orElse(SequenceNumbers.NO_OPS_PERFORMED); assertThat(translog.getMaxSeqNo(), equalTo(expectedMaxSeqNo)); } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index d641448736e10..0db222c14ea41 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -36,7 +36,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SeqNoStats; @@ -177,14 +176,8 @@ public void testPrepareIndexForPeerRecovery() throws Exception { long globalCheckpoint = populateRandomData(shard).getGlobalCheckpoint(); Optional safeCommit = shard.store().findSafeIndexCommit(globalCheckpoint); assertTrue(safeCommit.isPresent()); - final Translog.TranslogGeneration recoveringTranslogGeneration; - try (Engine.IndexCommitRef commitRef = shard.acquireSafeIndexCommit()) { - recoveringTranslogGeneration = new Translog.TranslogGeneration( - commitRef.getIndexCommit().getUserData().get(Translog.TRANSLOG_UUID_KEY), - Long.parseLong(commitRef.getIndexCommit().getUserData().get(Translog.TRANSLOG_GENERATION_KEY))); - } int expectedTotalLocal = 0; - try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshotFromGen(recoveringTranslogGeneration, globalCheckpoint)) { + try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot(safeCommit.get().localCheckpoint + 1, globalCheckpoint)) { Translog.Operation op; while ((op = snapshot.next()) != null) { if (op.seqNo() <= globalCheckpoint) { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java index f18ee83069f11..049bc15d8d733 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java @@ -276,21 +276,17 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { .setOpenMode(IndexWriterConfig.OpenMode.APPEND); Map userData = new HashMap<>(replica.store().readLastCommittedSegmentsInfo().getUserData()); final String translogUUIDtoUse; - final long translogGenToUse; final String historyUUIDtoUse = UUIDs.randomBase64UUID(random()); if (randomBoolean()) { // create a new translog translogUUIDtoUse = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), flushedDocs, replica.shardId(), replica.getPendingPrimaryTerm()); - translogGenToUse = 1; } else { translogUUIDtoUse = translogGeneration.translogUUID; - translogGenToUse = translogGeneration.translogFileGeneration; } try (IndexWriter writer = new IndexWriter(replica.store().directory(), iwc)) { userData.put(Engine.HISTORY_UUID_KEY, historyUUIDtoUse); userData.put(Translog.TRANSLOG_UUID_KEY, translogUUIDtoUse); - userData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGenToUse)); writer.setLiveCommitData(userData.entrySet()); writer.commit(); } From 73e9d30f127dbe15b09b3dbfdfb03b870996ca22 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 20 Feb 2020 15:55:33 +0100 Subject: [PATCH 2/8] Separate translog from index deletion conditions (#52556) Separates the translog from the index deletion conditions (allowing the translog to be cleaned up more eagerly), and avoids taking the write lock on the translog if no clean-up is actually necessary. --- .../index/engine/InternalEngine.java | 2 +- .../index/translog/Translog.java | 38 +++++++++++++------ 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index b76a23c014154..5fa7bd2c8c24a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -585,8 +585,8 @@ public Translog.Location getTranslogLastWriteLocation() { private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException { if (combinedDeletionPolicy.hasUnreferencedCommits()) { indexWriter.deleteUnusedFiles(); - translog.trimUnreferencedReaders(); } + translog.trimUnreferencedReaders(); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index 87d228352164f..452d65ffcfde4 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1646,23 +1646,25 @@ public void rollGeneration() throws IOException { * required generation */ public void trimUnreferencedReaders() throws IOException { - // move most of the data to disk to reduce the time the lock is held + // first check under read lock if any readers can be trimmed + try (ReleasableLock ignored = readLock.acquire()) { + if (closed.get()) { + // we're shutdown potentially on some tragic event, don't delete anything + return; + } + if (getMinReferencedGen() == getMinFileGeneration()) { + return; + } + } + + // move most of the data to disk to reduce the time the write lock is held sync(); try (ReleasableLock ignored = writeLock.acquire()) { if (closed.get()) { // we're shutdown potentially on some tragic event, don't delete anything return; } - long minReferencedGen = Math.min(deletionPolicy.minTranslogGenRequired(readers, current), - minGenerationForSeqNo(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1, current, readers)); - assert minReferencedGen >= getMinFileGeneration() : - "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is [" - + getMinFileGeneration() + "]"; - assert minReferencedGen <= currentFileGeneration() : - "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] which is higher than the current generation [" - + currentFileGeneration() + "]"; - - + final long minReferencedGen = getMinReferencedGen(); for (Iterator iterator = readers.iterator(); iterator.hasNext(); ) { TranslogReader reader = iterator.next(); if (reader.getGeneration() >= minReferencedGen) { @@ -1689,6 +1691,20 @@ public void trimUnreferencedReaders() throws IOException { } } + private long getMinReferencedGen() throws IOException { + assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); + long minReferencedGen = Math.min( + deletionPolicy.minTranslogGenRequired(readers, current), + minGenerationForSeqNo(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1, current, readers)); + assert minReferencedGen >= getMinFileGeneration() : + "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is [" + + getMinFileGeneration() + "]"; + assert minReferencedGen <= currentFileGeneration() : + "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] which is higher than the current generation [" + + currentFileGeneration() + "]"; + return minReferencedGen; + } + /** * deletes all files associated with a reader. package-private to be able to simulate node failures at this point */ From c0c02009add35e71e745b926d25b4489ac82d52b Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 12 Feb 2020 11:08:48 -0500 Subject: [PATCH 3/8] Fix testPrepareIndexForPeerRecovery (#52245) Since #51905, we skip translog recovery if the local checkpoint of the safe commit equals to the global checkpoint. This change adjusts the test not to create a new snapshot in that case. Closes #52221 Relates #51905 --- .../recovery/PeerRecoveryTargetServiceTests.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 0db222c14ea41..18bf8c8e225dc 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -177,11 +177,13 @@ public void testPrepareIndexForPeerRecovery() throws Exception { Optional safeCommit = shard.store().findSafeIndexCommit(globalCheckpoint); assertTrue(safeCommit.isPresent()); int expectedTotalLocal = 0; - try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot(safeCommit.get().localCheckpoint + 1, globalCheckpoint)) { - Translog.Operation op; - while ((op = snapshot.next()) != null) { - if (op.seqNo() <= globalCheckpoint) { - expectedTotalLocal++; + if (safeCommit.get().localCheckpoint < globalCheckpoint) { + try (Translog.Snapshot snapshot = getTranslog(shard).newSnapshot(safeCommit.get().localCheckpoint + 1, globalCheckpoint)) { + Translog.Operation op; + while ((op = snapshot.next()) != null) { + if (op.seqNo() <= globalCheckpoint) { + expectedTotalLocal++; + } } } } From a409b391103ae1cf5e6dec66202f2f7cc2d7fed9 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 12 Feb 2020 11:06:11 -0500 Subject: [PATCH 4/8] Fix IndexShardIT#testMaybeFlush (#52247) Since #51905, we use the local checkpoint of the safe commit to calculate the number of uncommitted operations of a translog stats. If a periodic flush triggered by afterWriteOperation completes before we sync translog, then the last commit is not safe. We also need to sync translog from Engine instead of the translog so that we can advance the safe commit. Relates #51905 Closes #52223 --- .../test/java/org/elasticsearch/index/shard/IndexShardIT.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 35b7af704dc47..16c85073760f4 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -368,8 +368,8 @@ public void testMaybeFlush() throws Exception { assertFalse(shard.shouldPeriodicallyFlush()); assertThat(shard.flushStats().getPeriodic(), greaterThan(0L)); }); + shard.sync(); assertEquals(0, translog.stats().getUncommittedOperations()); - translog.sync(); long size = Math.max(translog.stats().getUncommittedSizeInBytes(), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1); logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration()); @@ -388,6 +388,7 @@ public void testMaybeFlush() throws Exception { commitStats.getUserData(), flushStats.getPeriodic(), flushStats.getTotal()); assertFalse(shard.shouldPeriodicallyFlush()); }); + shard.sync(); assertEquals(0, translog.stats().getUncommittedOperations()); } From 3c42d645c2039d9b4bd5c8f3ce442c0e8e11faf6 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 18 Feb 2020 08:50:16 -0500 Subject: [PATCH 5/8] Fix testRestoreLocalHistoryFromTranslog (#52441) Asserts that no new operations are made into the translog since we re-opened the engine. Relates #51905 Closes #52410 --- .../index/engine/InternalEngineTests.java | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 5e21db432cea0..db831aa547907 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -4559,7 +4559,6 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException { final EngineConfig engineConfig; final SeqNoStats prevSeqNoStats; final List prevDocs; - final List existingTranslog; try (InternalEngine engine = createEngine(store, createTempDir(), globalCheckpoint::get)) { engineConfig = engine.config(); for (final long seqNo : seqNos) { @@ -4578,24 +4577,17 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException { engine.syncTranslog(); prevSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get()); prevDocs = getDocIds(engine, true); - try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) { - existingTranslog = TestTranslog.drainSnapshot(snapshot, false); - } } try (InternalEngine engine = new InternalEngine(engineConfig)) { - final Translog.TranslogGeneration currrentTranslogGeneration = new Translog.TranslogGeneration( - engine.getTranslog().getTranslogUUID(), engine.getTranslog().currentFileGeneration()); + final long currentTranslogGeneration = engine.getTranslog().currentFileGeneration(); engine.recoverFromTranslog(translogHandler, globalCheckpoint.get()); engine.restoreLocalHistoryFromTranslog(translogHandler); assertThat(getDocIds(engine, true), equalTo(prevDocs)); SeqNoStats seqNoStats = engine.getSeqNoStats(globalCheckpoint.get()); assertThat(seqNoStats.getLocalCheckpoint(), equalTo(prevSeqNoStats.getLocalCheckpoint())); assertThat(seqNoStats.getMaxSeqNo(), equalTo(prevSeqNoStats.getMaxSeqNo())); - try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) { - assertThat("restore from local translog must not add operations to translog", - snapshot.totalOperations(), equalTo(existingTranslog.size())); - assertThat(TestTranslog.drainSnapshot(snapshot, false), equalTo(existingTranslog)); - } + assertThat("restore from local translog must not add operations to translog", + engine.getTranslog().totalOperationsByMinGen(currentTranslogGeneration), equalTo(0)); } assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test")); } @@ -6038,6 +6030,7 @@ public void testRecoverFromLocalTranslog() throws Exception { docs = getDocIds(engine, true); } try (InternalEngine engine = new InternalEngine(config)) { + engine.onSettingsChanged(TimeValue.MINUS_ONE, ByteSizeValue.ZERO, 0); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertThat(getDocIds(engine, randomBoolean()), equalTo(docs)); if (engine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo() == globalCheckpoint.get()) { From 9e121247db857887cf4c6399bbf5f775217b6a1a Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 21 Feb 2020 03:22:38 -0500 Subject: [PATCH 6/8] Fix testResyncAfterPrimaryPromotion (#52615) Adjusts the assertion as we might eagerly clean up translog during resync since #52556 Relates #52556 Closes #52598 --- .../index/replication/RecoveryDuringReplicationTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 43e76efee998e..c1b4bb064f36c 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -77,6 +77,7 @@ import java.util.stream.Collectors; import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; @@ -378,7 +379,6 @@ public void testReplicaRollbackStaleDocumentsInPeerRecovery() throws Exception { } public void testResyncAfterPrimaryPromotion() throws Exception { - // TODO: check translog trimming functionality once rollback is implemented in Lucene (ES trimming is done) Map mappings = Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}"); try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(2, mappings))) { @@ -446,7 +446,7 @@ public void testResyncAfterPrimaryPromotion() throws Exception { assertThat(source.source.utf8ToString(), is("{ \"f\": \"normal\"}")); } } - assertThat(translogOperations, is(initialDocs + extraDocs)); + assertThat(translogOperations, either(equalTo(initialDocs + extraDocs)).or(equalTo(task.getResyncedOperations()))); } } From ceb7967392d3517b1389e789654af5ca819c80f0 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 20 Feb 2020 14:29:48 -0500 Subject: [PATCH 7/8] Fix testSeqNoCollision (#52588) Adjusts the assertion as we trim translog more eagerly since #52556. Relates #52556 Closes #52148 --- .../replication/IndexLevelReplicationTests.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 083d343f09eee..c62d09c3acbc1 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -543,7 +543,10 @@ public void testRequestFailureReplication() throws Exception { } public void testSeqNoCollision() throws Exception { - try (ReplicationGroup shards = createGroup(2)) { + try (ReplicationGroup shards = createGroup(2, Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getKey(), "-1") + .put(IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getKey(), "-1").build())) { shards.startAll(); int initDocs = shards.indexDocs(randomInt(10)); List replicas = shards.getReplicas(); @@ -570,19 +573,17 @@ public void testSeqNoCollision() throws Exception { assertThat(snapshot.next(), nullValue()); assertThat(snapshot.skippedOperations(), equalTo(0)); } - // Make sure that replica2 receives translog ops (eg. op2) from replica1 - // and does not overwrite its stale operation (op1) as it is trimmed. logger.info("--> Promote replica1 as the primary"); shards.promoteReplicaToPrimary(replica1).get(); // wait until resync completed. shards.index(new IndexRequest(index.getName(), "type", "d2").source("{}", XContentType.JSON)); final Translog.Operation op2; try (Translog.Snapshot snapshot = getTranslog(replica2).newSnapshot()) { - assertThat(snapshot.totalOperations(), equalTo(initDocs + 2)); + assertThat(snapshot.totalOperations(), equalTo(1)); op2 = snapshot.next(); assertThat(op2.seqNo(), equalTo(op1.seqNo())); assertThat(op2.primaryTerm(), greaterThan(op1.primaryTerm())); - assertThat("Remaining of snapshot should contain init operations", snapshot, containsOperationsInAnyOrder(initOperations)); - assertThat(snapshot.skippedOperations(), equalTo(1)); + assertNull(snapshot.next()); + assertThat(snapshot.skippedOperations(), equalTo(0)); } // Make sure that peer-recovery transfers all but non-overridden operations. @@ -591,8 +592,7 @@ public void testSeqNoCollision() throws Exception { shards.promoteReplicaToPrimary(replica2).get(); logger.info("--> Recover replica3 from replica2"); recoverReplica(replica3, replica2, true); - try (Translog.Snapshot snapshot = replica3.getHistoryOperations( - "test", replica3.indexSettings().isSoftDeleteEnabled() ? Engine.HistorySource.INDEX : Engine.HistorySource.TRANSLOG, 0)) { + try (Translog.Snapshot snapshot = replica3.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); final List expectedOps = new ArrayList<>(initOperations); expectedOps.add(op2); From e94a201b8dcf7aa59cab453133bd2e50648b990c Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 25 Feb 2020 15:44:23 -0500 Subject: [PATCH 8/8] Add more assertions to testMaybeFlush (#52792) We aren't able to reproduce or figure out the reason that failed this test. This commit adds more assertions so we can narrow the scope. Relates #52223 --- .../java/org/elasticsearch/index/shard/IndexShardIT.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 16c85073760f4..7a7cc29aadd44 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -362,14 +362,18 @@ public void testMaybeFlush() throws Exception { assertTrue(shard.shouldPeriodicallyFlush()); final Translog translog = getTranslog(shard); assertEquals(2, translog.stats().getUncommittedOperations()); + assertThat(shard.flushStats().getTotal(), equalTo(0L)); client().prepareIndex("test", "_doc", "2").setSource("{}", XContentType.JSON) .setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); + assertThat(shard.getLastKnownGlobalCheckpoint(), equalTo(2L)); assertBusy(() -> { // this is async assertFalse(shard.shouldPeriodicallyFlush()); - assertThat(shard.flushStats().getPeriodic(), greaterThan(0L)); + assertThat(shard.flushStats().getPeriodic(), equalTo(1L)); + assertThat(shard.flushStats().getTotal(), equalTo(1L)); }); shard.sync(); - assertEquals(0, translog.stats().getUncommittedOperations()); + assertThat(shard.getLastSyncedGlobalCheckpoint(), equalTo(2L)); + assertThat("last commit [" + shard.commitStats().getUserData() + "]", translog.stats().getUncommittedOperations(), equalTo(0)); long size = Math.max(translog.stats().getUncommittedSizeInBytes(), Translog.DEFAULT_HEADER_SIZE_IN_BYTES + 1); logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", translog.stats().getUncommittedSizeInBytes(), translog.stats().getUncommittedOperations(), translog.getGeneration());