Skip to content

Commit

Permalink
Account trimAboveSeqNo in committed translog generation (#50205)
Browse files Browse the repository at this point in the history
Today we do not consider trimAboveSeqNo when calculating the translog 
generation of an index commit. If there is no new indexing after the
primary promotion, then we won't be able to clean up the translog.
  • Loading branch information
dnhatn authored Dec 16, 2019
1 parent 736e9f9 commit 972b81f
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,17 @@ private void write(DataOutput out) throws IOException {
out.writeLong(trimmedAboveSeqNo);
}

/**
* Returns the maximum sequence number of operations in this checkpoint after applying {@link #trimmedAboveSeqNo}.
*/
long maxEffectiveSeqNo() {
if (trimmedAboveSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
return maxSeqNo;
} else {
return Math.min(trimmedAboveSeqNo, maxSeqNo);
}
}

static Checkpoint emptyTranslogCheckpoint(final long offset, final long generation, final long globalCheckpoint,
long minTranslogGeneration) {
final long minSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,11 +696,7 @@ private Stream<? extends BaseTranslogReader> readersAboveMinSeqNo(long minSeqNo)
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread() :
"callers of readersAboveMinSeqNo must hold a lock: readLock ["
+ readLock.isHeldByCurrentThread() + "], writeLock [" + readLock.isHeldByCurrentThread() + "]";
return Stream.concat(readers.stream(), Stream.of(current))
.filter(reader -> {
final long maxSeqNo = reader.getCheckpoint().maxSeqNo;
return maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || maxSeqNo >= minSeqNo;
});
return Stream.concat(readers.stream(), Stream.of(current)).filter(reader -> minSeqNo <= reader.getCheckpoint().maxEffectiveSeqNo());
}

/**
Expand Down Expand Up @@ -1629,7 +1625,7 @@ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) {
*/
long minTranslogFileGeneration = this.currentFileGeneration();
for (final TranslogReader reader : readers) {
if (seqNo <= reader.getCheckpoint().maxSeqNo) {
if (seqNo <= reader.getCheckpoint().maxEffectiveSeqNo()) {
minTranslogFileGeneration = Math.min(minTranslogFileGeneration, reader.getGeneration());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,11 @@ public void testRollbackOnPromotion() throws Exception {
shards.assertAllEqual(initDocs + inFlightOpsOnNewPrimary + moreDocsAfterRollback);
done.set(true);
thread.join();

for (IndexShard shard : shards) {
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
assertThat(shard.translogStats().getUncommittedOperations(), equalTo(0));
}
}
}

Expand Down

0 comments on commit 972b81f

Please sign in to comment.