Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replica recovery could go into an endless flushing loop #28350

Merged
merged 13 commits into from
Jan 25, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,12 @@ public final boolean refreshNeeded() {
// NOTE: do NOT rename this to something containing flush or refresh!
public abstract void writeIndexingBuffer() throws EngineException;

/**
* Checks if this engine should be flushed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain that this can return false even if there are uncommitted changes. It's more of a maintainance function. maybe we should call it differently something like shouldFlushForMaintainance or maintainanceFlushPending() just suggestions to make it more clear

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yannick and I came up with shouldFlushToFreeTranslog

* This check is mainly based on the uncommitted translog size and the translog flush threshold setting.
*/
public abstract boolean shouldFlush();

/**
* Flushes the state of the engine including the transaction log, clearing memory.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1462,6 +1462,25 @@ final boolean tryRenewSyncCommit() {
return renewed;
}

@Override
public boolean shouldFlush() {
if (translog.shouldFlush() == false) {
return false;
}
/*
* We should only flush ony if the shouldFlush condition can become false after flushing. This condition will change if:
* 1. The min translog gen of the next commit points to a different translog gen than the last commit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this deserves a comment why we don't take the IW#hasUncommittedChanges() into account.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we call ensureOpen() here as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* 2. If Local checkpoint equals to max_seqno, the min translog gen of the next commit will point to the newly rolled generation
*/
final long localCheckpoint = localCheckpointTracker.getCheckpoint();
if (localCheckpoint == localCheckpointTracker.getMaxSeqNo()) {
return true;
}
final long translogGenFromLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
final long translogGenForNewCommit = translog.getMinGenerationForSeqNo(localCheckpoint + 1).translogFileGeneration;
return translogGenForNewCommit > translogGenFromLastCommit;
}

@Override
public CommitId flush() throws EngineException {
return flush(false, false);
Expand Down Expand Up @@ -1492,7 +1511,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
logger.trace("acquired flush lock immediately");
}
try {
if (indexWriter.hasUncommittedChanges() || force) {
if (indexWriter.hasUncommittedChanges() || force || shouldFlush()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment explaining why we have 3 things? Basically something like - we check if:

  1. We're forced.
  2. There are uncommitted lucene docs in lucene
  3. There are translog related reasons to create a new commit which point to a different place in the translog.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

ensureCanFlush();
try {
translog.rollGeneration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1597,17 +1597,16 @@ public boolean restoreFromRepository(Repository repository) {
}

/**
* Tests whether or not the translog should be flushed. This test is based on the current size of the translog comparted to the
* Tests whether or not the engine should be flushed. This test is based on the current size of the translog compared to the
* configured flush threshold size.
*
* @return {@code true} if the translog should be flushed
* @return {@code true} if the engine should be flushed
*/
boolean shouldFlush() {
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
final Translog translog = engine.getTranslog();
return translog.shouldFlush();
return engine.shouldFlush();
} catch (final AlreadyClosedException e) {
// we are already closed, no need to flush or roll
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.engine;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -46,6 +47,7 @@
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.IndexSearcher;
Expand Down Expand Up @@ -163,6 +165,7 @@
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
Expand Down Expand Up @@ -4439,4 +4442,53 @@ public void testCleanUpCommitsWhenGlobalCheckpointAdvanced() throws Exception {
assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1)));
}
}

public void testShouldFlush() throws Exception {
assertThat("Empty engine does not need flushing", engine.shouldFlush(), equalTo(false));
int numDocs = between(10, 100);
for (int id = 0; id < numDocs; id++) {
final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
}
assertThat("Not exceeded translog flush threshold yet", engine.shouldFlush(), equalTo(false));
long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, engine.getTranslog().uncommittedSizeInBytes());
final IndexSettings indexSettings = engine.config().getIndexSettings();
final IndexMetaData indexMetaData = IndexMetaData.builder(indexSettings.getIndexMetaData())
.settings(Settings.builder()
.put(indexSettings.getSettings())
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b"))
.build();
indexSettings.updateIndexMetaData(indexMetaData);
engine.onSettingsChanged();
int moreDocs = between(0, 10);
for (int id = numDocs; id < numDocs + moreDocs; id++) {
final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc));
}
assertThat(engine.shouldFlush(), equalTo(true));
engine.flush();
// No gap - can flush
for (int id = 0; id < numDocs; id++) {
final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null);
final Engine.IndexResult result = engine.index(replicaIndexForDoc(doc, 1L, id, false));
assertThat(result.isCreated(), equalTo(false));
}
SegmentInfos lastCommitInfo = engine.getLastCommittedSegmentInfos();
assertThat(engine.shouldFlush(), equalTo(true));
engine.flush(false, false);
assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo)));
// With gap - can not flush
final long maxSeqNo = engine.getLocalCheckpointTracker().generateSeqNo();
for (int id = 0; id < numDocs; id++) {
final ParsedDocument doc = testParsedDocument(Integer.toString(id), null, testDocumentWithTextField(), SOURCE, null);
final Engine.IndexResult result = engine.index(replicaIndexForDoc(doc, 1L, id, false));
assertThat(result.isCreated(), equalTo(false));
}
assertThat(engine.shouldFlush(), equalTo(false));
// Fill gap - can flush again
final ParsedDocument doc = testParsedDocument(Long.toString(maxSeqNo), null, testDocumentWithTextField(), SOURCE, null);
final Engine.IndexResult result = engine.index(replicaIndexForDoc(doc, 1L, maxSeqNo, false));
assertThat(result.isCreated(), equalTo(true));
assertThat(engine.shouldFlush(), equalTo(true));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.indices.recovery;

import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
Expand Down Expand Up @@ -306,4 +307,26 @@ public void testSequenceBasedRecoveryKeepsTranslog() throws Exception {
}
}

public void testShouldFlushAfterPeerRecovery() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add Javadoc to this method to explain what the goal of this test is?

try (ReplicationGroup shards = createGroup(0)) {
shards.startAll();
long translogSizeOnPrimary = 0;
int numDocs = shards.indexDocs(between(10, 100));
translogSizeOnPrimary += shards.getPrimary().getTranslog().uncommittedSizeInBytes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just define translogSizeOnPrimary here (no need to initialize)

shards.flush();

final IndexShard replica = shards.addReplica();
IndexMetaData.Builder builder = IndexMetaData.builder(replica.indexSettings().getIndexMetaData());
long flushThreshold = RandomNumbers.randomLongBetween(random(), 100, translogSizeOnPrimary);
builder.settings(Settings.builder().put(replica.indexSettings().getSettings())
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), flushThreshold + "b")
);
replica.indexSettings().updateIndexMetaData(builder.build());
replica.onSettingsChanged();
shards.recoverReplica(replica);
assertBusy(() -> assertThat(getEngine(replica).shouldFlush(), equalTo(false)));
assertThat(replica.getTranslog().totalOperations(), equalTo(numDocs));
shards.assertAllEqual(numDocs);
}
}
}