Skip to content

Commit

Permalink
Harden periodically check to avoid endless flush loop
Browse files Browse the repository at this point in the history
In elastic#28350, we fixed an endless flushing loop which can happen on
replicas by tightening the relation between the flush action and the
periodically flush condition.

1. The periodically flush condition is enabled only if it will be
disabled after a flush.

2. If the periodically flush condition is true then a flush will
actually happen regardless of Lucene state.

(1) and (2) guarantee a flushing loop will be terminated. Sadly, the
condition elastic#1 can be violated in edge cases as we used two different
algorithms to evaluate the current and future uncommitted size.

- We use method `uncommittedSizeInBytes` to calculate current
uncommitted size. It is the sum of translogs whose generation at least
the minGen (determined by a given seqno). We pick a continuous range of
translogs since the minGen to evaluate the current uncommitted size.

- We use method `sizeOfGensAboveSeqNoInBytes` to calculate the future
uncommitted size. It is the sum of translogs whose maxSeqNo at least
the given seqNo. Here we don't pick a range but select translog one
by one.

Suppose we have 3 translogs gen1={elastic#1,elastic#2}, gen2={}, gen3={elastic#3} and
seqno=elastic#1, uncommittedSizeInBytes is the sum of gen1, gen2, and gen3
while sizeOfGensAboveSeqNoInBytes is sum of gen1 and gen3. Gen2 is
excluded because its maxSeqno is still -1.

This commit ensures sizeOfGensAboveSeqNoInBytes use the same algorithm
from uncommittedSizeInBytes

Closes elastic#29097
  • Loading branch information
dnhatn committed Mar 17, 2018
1 parent 22ad52a commit 7cbd56a
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.UUIDs;
Expand All @@ -39,6 +38,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
Expand Down Expand Up @@ -431,7 +431,7 @@ public int estimateTotalOperationsFromMinSeq(long minSeqNo) {
/**
* Returns the size in bytes of the translog files above the given generation
*/
private long sizeInBytesByMinGen(long minGeneration) {
long sizeInBytesByMinGen(long minGeneration) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
return Stream.concat(readers.stream(), Stream.of(current))
Expand All @@ -447,7 +447,21 @@ private long sizeInBytesByMinGen(long minGeneration) {
public long sizeOfGensAboveSeqNoInBytes(long minSeqNo) {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
return readersAboveMinSeqNo(minSeqNo).mapToLong(BaseTranslogReader::sizeInBytes).sum();
final List<BaseTranslogReader> readers = new ArrayList<>(this.readers);
readers.add(current);
int keptIndex = Integer.MAX_VALUE;
for (int i = 0; i < readers.size(); i++) {
final long maxSeqNo = readers.get(i).getCheckpoint().maxSeqNo;
if (maxSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || maxSeqNo >= minSeqNo) {
keptIndex = i;
break;
}
}
long totalBytes = 0;
for (int i = keptIndex; i < readers.size(); i++) {
totalBytes += readers.get(i).sizeInBytes();
}
return totalBytes;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4283,6 +4283,26 @@ public void testShouldPeriodicallyFlush() throws Exception {
engine.flush(false, false);
assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo)));
assertThat(engine.getTranslog().uncommittedOperations(), equalTo(0));

// If the new index commit still points to the same translog generation as the current index commit,
// we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes.
engine.getLocalCheckpointTracker().generateSeqNo(); // create a gap here
for (int id = 0; id < numDocs; id++) {
if (randomBoolean()){
engine.getTranslog().rollGeneration();
}
final ParsedDocument doc = testParsedDocument("new" + id, null, testDocumentWithTextField(), SOURCE, null);
long seqno = engine.getLocalCheckpointTracker().generateSeqNo();
final Engine.IndexResult result = engine.index(replicaIndexForDoc(doc, 2L, seqno, false));
assertThat(result.isCreated(), equalTo(true));
}
// A flush must change the periodically flush condition.
lastCommitInfo = engine.getLastCommittedSegmentInfos();
if (engine.shouldPeriodicallyFlush()) {
engine.flush();
assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo)));
}
assertThat(engine.shouldPeriodicallyFlush(), equalTo(false));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand All @@ -54,6 +53,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
Expand Down Expand Up @@ -113,6 +113,7 @@
import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
import static org.elasticsearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -510,6 +511,58 @@ public void testUncommittedOperations() throws Exception {
}
}

public void testSizeOfGensAboveSeqNoInBytes() throws Exception {
final long emptyTranslogSize = Translog.DEFAULT_HEADER_SIZE_IN_BYTES;
assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomNonNegativeLong()), equalTo(0L));
// Gen1: seqno in 1001-2000
int ops = between(1, 100);
final Set<Long> seqnoGen1 = new HashSet<>();
final long gen1 = translog.currentFileGeneration();
for (int i = 0; i < ops; i++) {
long seqno = randomValueOtherThanMany(n -> seqnoGen1.add(n) == false, () -> randomLongBetween(1001, 2000));
translog.add(new Translog.Index("test", UUIDs.randomBase64UUID(), seqno, new byte[]{1}));
}
final long maxSeqnoGen1 = Collections.max(seqnoGen1);
long sizeGen1 = translog.getCurrent().sizeInBytes();
for (int numOfEmptyGen = between(0, 10), i = 0; i < numOfEmptyGen; i++) {
translog.rollGeneration();
sizeGen1 += emptyTranslogSize;
}
assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(maxSeqnoGen1 + 1, Long.MAX_VALUE)), equalTo(0L));
assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(0, maxSeqnoGen1)),
allOf(equalTo(sizeGen1), equalTo(translog.sizeInBytesByMinGen(gen1))));
// Gen2: seqno in 0-1000
translog.rollGeneration();
ops = between(1, 100);
for (int i = 0; i < ops; i++) {
translog.add(new Translog.Index("test", UUIDs.randomBase64UUID(), i, new byte[]{1}));
}
long sizeGen2 = translog.getCurrent().sizeInBytes();
assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(maxSeqnoGen1 + 1, Long.MAX_VALUE)), equalTo(0L));
assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(0, maxSeqnoGen1)),
allOf(equalTo(sizeGen1 + sizeGen2), equalTo(translog.sizeInBytesByMinGen(gen1))));
// Gen3: seqno in 2001+
ops = between(1, 100);
translog.rollGeneration();
final long gen3 = translog.currentFileGeneration();
final Set<Long> seqnoGen3 = new HashSet<>();
for (int i = 0; i < ops; i++) {
long seqno = randomValueOtherThanMany(n -> seqnoGen3.add(n) == false, () -> randomLongBetween(2001, Long.MAX_VALUE));
translog.add(new Translog.Index("test", UUIDs.randomBase64UUID(), seqno, new byte[]{1}));
}
final long maxSeqnoGen3 = Collections.max(seqnoGen3);
long sizeGen3 = translog.getCurrent().sizeInBytes();
for (int numOfEmptyGen = between(0, 10), i = 0; i < numOfEmptyGen; i++) {
translog.rollGeneration();
sizeGen3 += emptyTranslogSize; // check an empty generation is included
}
assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(maxSeqnoGen3 + 1, Long.MAX_VALUE)), equalTo(0L));
assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(maxSeqnoGen1 + 1, maxSeqnoGen3)),
allOf(equalTo(sizeGen3), equalTo(translog.sizeInBytesByMinGen(gen3)))); // Since gen3
assertThat(translog.sizeOfGensAboveSeqNoInBytes(randomLongBetween(0, maxSeqnoGen1)),
allOf(equalTo(sizeGen1 + sizeGen2 + sizeGen3), equalTo(translog.sizeInBytesByMinGen(gen1)))); // Since gen1
}

public void testTotalTests() {
final TranslogStats total = new TranslogStats(0, 0, 0, 0, 1);
final int n = randomIntBetween(0, 16);
Expand Down Expand Up @@ -2590,6 +2643,7 @@ public void testMinSeqNoBasedAPI() throws IOException {
seenSeqNos.addAll(generationSeqNo);
}
assertThat(translog.estimateTotalOperationsFromMinSeq(seqNo), equalTo(expectedSnapshotOps));
assertThat(translog.sizeInBytesByMinGen(generation), equalTo(translog.sizeOfGensAboveSeqNoInBytes(seqNo)));
int readFromSnapshot = 0;
try (Translog.Snapshot snapshot = translog.newSnapshotFromMinSeqNo(seqNo)) {
assertThat(snapshot.totalOperations(), equalTo(expectedSnapshotOps));
Expand Down

0 comments on commit 7cbd56a

Please sign in to comment.