From 972b81f8a9de5e88106180a5becc5baab39ccdeb Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 16 Dec 2019 11:08:00 -0500 Subject: [PATCH] Account trimAboveSeqNo in committed translog generation (#50205) Today we do not consider trimAboveSeqNo when calculating the translog generation of an index commit. If there is no new indexing after the primary promotion, then we won't be able to clean up the translog. --- .../org/elasticsearch/index/translog/Checkpoint.java | 11 +++++++++++ .../org/elasticsearch/index/translog/Translog.java | 8 ++------ .../replication/RecoveryDuringReplicationTests.java | 5 +++++ 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java b/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java index fe20a52f482f3..1e16b9c3a60ae 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Checkpoint.java @@ -106,6 +106,17 @@ private void write(DataOutput out) throws IOException { out.writeLong(trimmedAboveSeqNo); } + /** + * Returns the maximum sequence number of operations in this checkpoint after applying {@link #trimmedAboveSeqNo}. + */ + long maxEffectiveSeqNo() { + if (trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) { + return maxSeqNo; + } else { + return Math.min(trimmedAboveSeqNo, maxSeqNo); + } + } + static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint, long minTranslogGeneration) { final long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED; diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index e38880797785b..055b9b82fc917 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -696,11 +696,7 @@ private Stream readersAboveMinSeqNo(long minSeqNo) assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() : "callers of readersAboveMinSeqNo must hold a lock: readLock [" + readLock.isHeldByCurrentThread() + "], writeLock [" + readLock.isHeldByCurrentThread() + "]"; - return Stream.concat(readers.stream(), Stream.of(current)) - .filter(reader -> { - final long maxSeqNo = reader.getCheckpoint().maxSeqNo; - return maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || maxSeqNo >= minSeqNo; - }); + return Stream.concat(readers.stream(), Stream.of(current)).filter(reader -> minSeqNo <= reader.getCheckpoint().maxEffectiveSeqNo()); } /** @@ -1629,7 +1625,7 @@ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) { */ long minTranslogFileGeneration = this.currentFileGeneration(); for (final TranslogReader reader : readers) { - if (seqNo <= reader.getCheckpoint().maxSeqNo) { + if (seqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) { minTranslogFileGeneration = Math.min(minTranslogFileGeneration, reader.getGeneration()); } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index f6756274642e7..53a96e531585a 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -800,6 +800,11 @@ public void testRollbackOnPromotion() throws Exception { shards.assertAllEqual(initDocs + inFlightOpsOnNewPrimary + moreDocsAfterRollback); done.set(true); thread.join(); + + for (IndexShard shard : shards) { + shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); + assertThat(shard.translogStats().getUncommittedOperations(), equalTo(0)); + } } }