Skip to content

Commit

Permalink
Keep commits and translog up to the global checkpoint (#27606)
Browse files Browse the repository at this point in the history
We need to keep index commits and translog operations up to the current 
global checkpoint to allow us to throw away unsafe operations and
increase the operation-based recovery chance. This is achieved by a new
index deletion policy.

Relates #10708
  • Loading branch information
dnhatn committed Dec 13, 2017
1 parent 53fab41 commit 9b55821
Show file tree
Hide file tree
Showing 12 changed files with 541 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,43 +21,48 @@

import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.LongSupplier;

/**
* An {@link IndexDeletionPolicy} that coordinates between Lucene's commits and the retention of translog generation files,
* making sure that all translog files that are needed to recover from the Lucene commit are not deleted.
* <p>
* In particular, this policy will delete index commits whose max sequence number is at most
* the current global checkpoint except the index commit which has the highest max sequence number among those.
*/
class CombinedDeletionPolicy extends IndexDeletionPolicy {

final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final TranslogDeletionPolicy translogDeletionPolicy;
private final EngineConfig.OpenMode openMode;
private final LongSupplier globalCheckpointSupplier;

private final SnapshotDeletionPolicy indexDeletionPolicy;

CombinedDeletionPolicy(SnapshotDeletionPolicy indexDeletionPolicy, TranslogDeletionPolicy translogDeletionPolicy,
EngineConfig.OpenMode openMode) {
this.indexDeletionPolicy = indexDeletionPolicy;
this.translogDeletionPolicy = translogDeletionPolicy;
CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier) {
this.openMode = openMode;
this.translogDeletionPolicy = translogDeletionPolicy;
this.globalCheckpointSupplier = globalCheckpointSupplier;
}

@Override
public void onInit(List<? extends IndexCommit> commits) throws IOException {
indexDeletionPolicy.onInit(commits);
switch (openMode) {
case CREATE_INDEX_AND_TRANSLOG:
assert commits.isEmpty() : "index is created, but we have commits";
break;
case OPEN_INDEX_CREATE_TRANSLOG:
assert commits.isEmpty() == false : "index is opened, but we have no commits";
// When an engine starts with OPEN_INDEX_CREATE_TRANSLOG, a new fresh index commit will be created immediately.
// We therefore can simply skip processing here as `onCommit` will be called right after with a new commit.
break;
case OPEN_INDEX_AND_TRANSLOG:
assert commits.isEmpty() == false : "index is opened, but we have no commits";
setLastCommittedTranslogGeneration(commits);
onCommit(commits);
break;
default:
throw new IllegalArgumentException("unknown openMode [" + openMode + "]");
Expand All @@ -66,24 +71,56 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {

@Override
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
indexDeletionPolicy.onCommit(commits);
setLastCommittedTranslogGeneration(commits);
final int keptPosition = indexOfKeptCommits(commits);
for (int i = 0; i < keptPosition; i++) {
commits.get(i).delete();
}
updateTranslogDeletionPolicy(commits.get(keptPosition), commits.get(commits.size() - 1));
}

private void setLastCommittedTranslogGeneration(List<? extends IndexCommit> commits) throws IOException {
// when opening an existing lucene index, we currently always open the last commit.
// we therefore use the translog gen as the one that will be required for recovery
final IndexCommit indexCommit = commits.get(commits.size() - 1);
assert indexCommit.isDeleted() == false : "last commit is deleted";
long minGen = Long.parseLong(indexCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minGen);
}
private void updateTranslogDeletionPolicy(final IndexCommit minRequiredCommit, final IndexCommit lastCommit) throws IOException {
assert minRequiredCommit.isDeleted() == false : "The minimum required commit must not be deleted";
final long minRequiredGen = Long.parseLong(minRequiredCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));

assert lastCommit.isDeleted() == false : "The last commit must not be deleted";
final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));

public SnapshotDeletionPolicy getIndexDeletionPolicy() {
return indexDeletionPolicy;
assert minRequiredGen <= lastGen : "minRequiredGen must not be greater than lastGen";
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);
}

public TranslogDeletionPolicy getTranslogDeletionPolicy() {
return translogDeletionPolicy;
/**
* Find the highest index position of a safe index commit whose max sequence number is not greater than the global checkpoint.
* Index commits with different translog UUID will be filtered out as they don't belong to this engine.
*/
private int indexOfKeptCommits(List<? extends IndexCommit> commits) throws IOException {
final long currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong();
final String expectedTranslogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);

// Commits are sorted by age (the 0th one is the oldest commit).
for (int i = commits.size() - 1; i >= 0; i--) {
final Map<String, String> commitUserData = commits.get(i).getUserData();
// Ignore index commits with different translog uuid.
if (expectedTranslogUUID.equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) {
return i + 1;
}
// 5.x commits do not contain MAX_SEQ_NO.
if (commitUserData.containsKey(SequenceNumbers.MAX_SEQ_NO) == false) {
return i;
}
final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
if (maxSeqNoFromCommit <= currentGlobalCheckpoint) {
return i;
}
}
/*
* We may reach to this point in these cases:
* 1. In the previous 6.x, we keep only the last commit - which is likely not a safe commit if writes are in progress.
* Thus, after upgrading, we may not find a safe commit until we can reserve one.
* 2. In peer-recovery, if the file-based happens, a replica will be received the latest commit from a primary.
* However, that commit may not be a safe commit if writes are in progress in the primary.
*/
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
Expand Down Expand Up @@ -128,7 +127,7 @@ public class InternalEngine extends Engine {

private final String uidField;

private final CombinedDeletionPolicy deletionPolicy;
private final SnapshotDeletionPolicy snapshotDeletionPolicy;

// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
Expand Down Expand Up @@ -167,8 +166,6 @@ public InternalEngine(EngineConfig engineConfig) {
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
);
this.deletionPolicy = new CombinedDeletionPolicy(
new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode);
store.incRef();
IndexWriter writer = null;
Translog translog = null;
Expand All @@ -182,30 +179,19 @@ public InternalEngine(EngineConfig engineConfig) {
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
try {
final SeqNoStats seqNoStats;
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
writer = createWriter(false);
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
seqNoStats = store.loadSeqNoStats(globalCheckpoint);
break;
case OPEN_INDEX_CREATE_TRANSLOG:
writer = createWriter(false);
seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
break;
case CREATE_INDEX_AND_TRANSLOG:
writer = createWriter(true);
seqNoStats = new SeqNoStats(
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.UNASSIGNED_SEQ_NO);
break;
default:
throw new IllegalArgumentException(openMode.toString());
}
final SeqNoStats seqNoStats = loadSeqNoStats(openMode);
logger.trace("recovered [{}]", seqNoStats);
seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
this.seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
this.snapshotDeletionPolicy = new SnapshotDeletionPolicy(
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, seqNoService::getGlobalCheckpoint)
);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
assert engineConfig.getForceNewHistoryUUID() == false
|| openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG
|| openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
: "OpenMode must be either CREATE_INDEX_AND_TRANSLOG or OPEN_INDEX_CREATE_TRANSLOG if forceNewHistoryUUID; " +
"openMode [" + openMode + "], forceNewHistoryUUID [" + engineConfig.getForceNewHistoryUUID() + "]";
historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID());
Objects.requireNonNull(historyUUID, "history uuid should not be null");
indexWriter = writer;
Expand Down Expand Up @@ -382,6 +368,23 @@ static SequenceNumbersService sequenceNumberService(
seqNoStats.getGlobalCheckpoint());
}

private SeqNoStats loadSeqNoStats(EngineConfig.OpenMode openMode) throws IOException {
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
return store.loadSeqNoStats(globalCheckpoint);
case OPEN_INDEX_CREATE_TRANSLOG:
return store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
case CREATE_INDEX_AND_TRANSLOG:
return new SeqNoStats(
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.UNASSIGNED_SEQ_NO);
default:
throw new IllegalArgumentException(openMode.toString());
}
}

@Override
public InternalEngine recoverFromTranslog() throws IOException {
flushLock.lock();
Expand Down Expand Up @@ -1662,7 +1665,7 @@ public IndexCommitRef acquireIndexCommit(final boolean flushFirst) throws Engine
}
try (ReleasableLock lock = readLock.acquire()) {
logger.trace("pulling snapshot");
return new IndexCommitRef(deletionPolicy.getIndexDeletionPolicy());
return new IndexCommitRef(snapshotDeletionPolicy);
} catch (IOException e) {
throw new SnapshotFailedEngineException(shardId, e);
}
Expand Down Expand Up @@ -1843,7 +1846,7 @@ private IndexWriterConfig getIndexWriterConfig(boolean create) {
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexDeletionPolicy(deletionPolicy);
iwc.setIndexDeletionPolicy(snapshotDeletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,14 +372,14 @@ long getMinFileGeneration() {
* Returns the number of operations in the translog files that aren't committed to lucene.
*/
public int uncommittedOperations() {
return totalOperations(deletionPolicy.getMinTranslogGenerationForRecovery());
return totalOperations(deletionPolicy.getTranslogGenerationOfLastCommit());
}

/**
* Returns the size in bytes of the translog files that aren't committed to lucene.
*/
public long uncommittedSizeInBytes() {
return sizeInBytesByMinGen(deletionPolicy.getMinTranslogGenerationForRecovery());
return sizeInBytesByMinGen(deletionPolicy.getTranslogGenerationOfLastCommit());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public void assertNoOpenTranslogRefs() {
*/
private long minTranslogGenerationForRecovery = 1;

/**
* This translog generation is used to calculate the number of uncommitted operations since the last index commit.
*/
private long translogGenerationOfLastCommit = 1;

private long retentionSizeInBytes;

private long retentionAgeInMillis;
Expand All @@ -69,13 +74,24 @@ public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMill
}

public synchronized void setMinTranslogGenerationForRecovery(long newGen) {
if (newGen < minTranslogGenerationForRecovery) {
throw new IllegalArgumentException("minTranslogGenerationForRecovery can't go backwards. new [" + newGen + "] current [" +
minTranslogGenerationForRecovery + "]");
if (newGen < minTranslogGenerationForRecovery || newGen > translogGenerationOfLastCommit) {
throw new IllegalArgumentException("Invalid minTranslogGenerationForRecovery can't go backwards; new [" + newGen + "]," +
"current [" + minTranslogGenerationForRecovery + "], lastGen [" + translogGenerationOfLastCommit + "]");
}
minTranslogGenerationForRecovery = newGen;
}

/**
* Sets the translog generation of the last index commit.
*/
public synchronized void setTranslogGenerationOfLastCommit(long lastGen) {
if (lastGen < translogGenerationOfLastCommit || lastGen < minTranslogGenerationForRecovery) {
throw new IllegalArgumentException("Invalid translogGenerationOfLastCommit; new [" + lastGen + "]," +
"current [" + translogGenerationOfLastCommit + "], minRequiredGen [" + minTranslogGenerationForRecovery + "]");
}
translogGenerationOfLastCommit = lastGen;
}

public synchronized void setRetentionSizeInBytes(long bytes) {
retentionSizeInBytes = bytes;
}
Expand Down Expand Up @@ -193,6 +209,14 @@ public synchronized long getMinTranslogGenerationForRecovery() {
return minTranslogGenerationForRecovery;
}

/**
* Returns a translog generation that will be used to calculate the number of uncommitted operations since the last index commit.
* See {@link Translog#uncommittedOperations()} and {@link Translog#uncommittedSizeInBytes()}
*/
public synchronized long getTranslogGenerationOfLastCommit() {
return translogGenerationOfLastCommit;
}

synchronized long getTranslogRefCount(long gen) {
final Counter counter = translogRefCounts.get(gen);
return counter == null ? 0 : counter.get();
Expand Down
Loading

0 comments on commit 9b55821

Please sign in to comment.