Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep commits and translog up to the global checkpoint #27606

Merged
merged 32 commits into from
Dec 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ad052ec
Keep index commits up to the global checkpoint
dnhatn Nov 28, 2017
97e6cff
add doc for policy
dnhatn Dec 1, 2017
48381a2
use set
dnhatn Dec 1, 2017
4b9ebb6
compact switch
dnhatn Dec 1, 2017
7f32e22
format
dnhatn Dec 1, 2017
94b735f
add randomLong to ESTestCase
dnhatn Dec 2, 2017
ab262b1
move gcp policy to the combined policy
dnhatn Dec 2, 2017
38310cd
Merge branch 'master' into gcp-deletion-policy
dnhatn Dec 3, 2017
72a4190
Use the kept position
dnhatn Dec 4, 2017
1e3d96b
Merge branch 'master' into gcp-deletion-policy
dnhatn Dec 6, 2017
6b2646b
Delete all commits in OPEN_INDEX_CREATE_TRANSLOG
dnhatn Dec 6, 2017
b7177cd
more assert
dnhatn Dec 6, 2017
12cae92
call lastGen() before minGen()
dnhatn Dec 5, 2017
dbdacc4
improve tests
dnhatn Dec 6, 2017
d07af94
tighten onInit()
dnhatn Dec 6, 2017
639cac6
test advance minGen
dnhatn Dec 6, 2017
d146583
use matcher
dnhatn Dec 6, 2017
2a6198e
inline small functions
dnhatn Dec 7, 2017
b819a8b
test: update local checkpoint
dnhatn Dec 7, 2017
b29623d
add comment onInit()
dnhatn Dec 7, 2017
60a204f
resume out of order test
dnhatn Dec 7, 2017
6b74ab1
use TRANSLOG_UUID_KEY to valid commit
dnhatn Dec 9, 2017
d8705b4
minor feedbacks
dnhatn Dec 9, 2017
7ec417c
use size retention translog
dnhatn Dec 9, 2017
6733b75
do not use size-retention translog
dnhatn Dec 12, 2017
3a67d4d
do nothing with OPEN_INDEX_CREATE_TRANSLOG
dnhatn Dec 12, 2017
53702f9
simplify the recovery tests
dnhatn Dec 12, 2017
5644456
just test keep only one safe commit
dnhatn Dec 12, 2017
dad3a1b
assert #forceNewHistoryUUID
dnhatn Dec 12, 2017
b18725c
keeps ops after local checkpoint of safe commit
dnhatn Dec 12, 2017
09b0f91
explain the assertion
dnhatn Dec 12, 2017
ce8b52d
Merge branch 'master' into gcp-deletion-policy
dnhatn Dec 12, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,43 +21,48 @@

import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.LongSupplier;

/**
* An {@link IndexDeletionPolicy} that coordinates between Lucene's commits and the retention of translog generation files,
* making sure that all translog files that are needed to recover from the Lucene commit are not deleted.
* <p>
* In particular, this policy will delete index commits whose max sequence number is at most
* the current global checkpoint except the index commit which has the highest max sequence number among those.
*/
class CombinedDeletionPolicy extends IndexDeletionPolicy {

final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final TranslogDeletionPolicy translogDeletionPolicy;
private final EngineConfig.OpenMode openMode;
private final LongSupplier globalCheckpointSupplier;

private final SnapshotDeletionPolicy indexDeletionPolicy;

CombinedDeletionPolicy(SnapshotDeletionPolicy indexDeletionPolicy, TranslogDeletionPolicy translogDeletionPolicy,
EngineConfig.OpenMode openMode) {
this.indexDeletionPolicy = indexDeletionPolicy;
this.translogDeletionPolicy = translogDeletionPolicy;
CombinedDeletionPolicy(EngineConfig.OpenMode openMode, TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier) {
this.openMode = openMode;
this.translogDeletionPolicy = translogDeletionPolicy;
this.globalCheckpointSupplier = globalCheckpointSupplier;
}

@Override
public void onInit(List<? extends IndexCommit> commits) throws IOException {
indexDeletionPolicy.onInit(commits);
switch (openMode) {
case CREATE_INDEX_AND_TRANSLOG:
assert commits.isEmpty() : "index is created, but we have commits";
break;
case OPEN_INDEX_CREATE_TRANSLOG:
assert commits.isEmpty() == false : "index is opened, but we have no commits";
// When an engine starts with OPEN_INDEX_CREATE_TRANSLOG, a new fresh index commit will be created immediately.
// We therefore can simply skip processing here as `onCommit` will be called right after with a new commit.
break;
case OPEN_INDEX_AND_TRANSLOG:
assert commits.isEmpty() == false : "index is opened, but we have no commits";
setLastCommittedTranslogGeneration(commits);
onCommit(commits);
break;
default:
throw new IllegalArgumentException("unknown openMode [" + openMode + "]");
Expand All @@ -66,24 +71,56 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {

@Override
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
indexDeletionPolicy.onCommit(commits);
setLastCommittedTranslogGeneration(commits);
final int keptPosition = indexOfKeptCommits(commits);
for (int i = 0; i < keptPosition; i++) {
commits.get(i).delete();
}
updateTranslogDeletionPolicy(commits.get(keptPosition), commits.get(commits.size() - 1));
}

private void setLastCommittedTranslogGeneration(List<? extends IndexCommit> commits) throws IOException {
// when opening an existing lucene index, we currently always open the last commit.
// we therefore use the translog gen as the one that will be required for recovery
final IndexCommit indexCommit = commits.get(commits.size() - 1);
assert indexCommit.isDeleted() == false : "last commit is deleted";
long minGen = Long.parseLong(indexCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minGen);
}
private void updateTranslogDeletionPolicy(final IndexCommit minRequiredCommit, final IndexCommit lastCommit) throws IOException {
assert minRequiredCommit.isDeleted() == false : "The minimum required commit must not be deleted";
final long minRequiredGen = Long.parseLong(minRequiredCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));

assert lastCommit.isDeleted() == false : "The last commit must not be deleted";
final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));

public SnapshotDeletionPolicy getIndexDeletionPolicy() {
return indexDeletionPolicy;
assert minRequiredGen <= lastGen : "minRequiredGen must not be greater than lastGen";
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);
}

public TranslogDeletionPolicy getTranslogDeletionPolicy() {
return translogDeletionPolicy;
/**
* Find the highest index position of a safe index commit whose max sequence number is not greater than the global checkpoint.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we extend this to doc the first that we trim old commits that have a different translog uuid (and why)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment.

* Index commits with different translog UUID will be filtered out as they don't belong to this engine.
*/
private int indexOfKeptCommits(List<? extends IndexCommit> commits) throws IOException {
final long currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong();
final String expectedTranslogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);

// Commits are sorted by age (the 0th one is the oldest commit).
for (int i = commits.size() - 1; i >= 0; i--) {
final Map<String, String> commitUserData = commits.get(i).getUserData();
// Ignore index commits with different translog uuid.
if (expectedTranslogUUID.equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) {
return i + 1;
}
// 5.x commits do not contain MAX_SEQ_NO.
if (commitUserData.containsKey(SequenceNumbers.MAX_SEQ_NO) == false) {
return i;
}
final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
if (maxSeqNoFromCommit <= currentGlobalCheckpoint) {
return i;
}
}
/*
* We may reach to this point in these cases:
* 1. In the previous 6.x, we keep only the last commit - which is likely not a safe commit if writes are in progress.
* Thus, after upgrading, we may not find a safe commit until we can reserve one.
* 2. In peer-recovery, if the file-based happens, a replica will be received the latest commit from a primary.
* However, that commit may not be a safe commit if writes are in progress in the primary.
*/
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
Expand Down Expand Up @@ -128,7 +127,7 @@ public class InternalEngine extends Engine {

private final String uidField;

private final CombinedDeletionPolicy deletionPolicy;
private final SnapshotDeletionPolicy snapshotDeletionPolicy;

// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
Expand Down Expand Up @@ -167,8 +166,6 @@ public InternalEngine(EngineConfig engineConfig) {
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
);
this.deletionPolicy = new CombinedDeletionPolicy(
new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode);
store.incRef();
IndexWriter writer = null;
Translog translog = null;
Expand All @@ -182,30 +179,19 @@ public InternalEngine(EngineConfig engineConfig) {
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
try {
final SeqNoStats seqNoStats;
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
writer = createWriter(false);
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
seqNoStats = store.loadSeqNoStats(globalCheckpoint);
break;
case OPEN_INDEX_CREATE_TRANSLOG:
writer = createWriter(false);
seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
break;
case CREATE_INDEX_AND_TRANSLOG:
writer = createWriter(true);
seqNoStats = new SeqNoStats(
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.UNASSIGNED_SEQ_NO);
break;
default:
throw new IllegalArgumentException(openMode.toString());
}
final SeqNoStats seqNoStats = loadSeqNoStats(openMode);
logger.trace("recovered [{}]", seqNoStats);
seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
this.seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
this.snapshotDeletionPolicy = new SnapshotDeletionPolicy(
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, seqNoService::getGlobalCheckpoint)
);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
assert engineConfig.getForceNewHistoryUUID() == false
|| openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG
|| openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
: "OpenMode must be either CREATE_INDEX_AND_TRANSLOG or OPEN_INDEX_CREATE_TRANSLOG if forceNewHistoryUUID; " +
"openMode [" + openMode + "], forceNewHistoryUUID [" + engineConfig.getForceNewHistoryUUID() + "]";
historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID());
Objects.requireNonNull(historyUUID, "history uuid should not be null");
indexWriter = writer;
Expand Down Expand Up @@ -380,6 +366,23 @@ static SequenceNumbersService sequenceNumberService(
seqNoStats.getGlobalCheckpoint());
}

private SeqNoStats loadSeqNoStats(EngineConfig.OpenMode openMode) throws IOException {
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
return store.loadSeqNoStats(globalCheckpoint);
case OPEN_INDEX_CREATE_TRANSLOG:
return store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
case CREATE_INDEX_AND_TRANSLOG:
return new SeqNoStats(
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.UNASSIGNED_SEQ_NO);
default:
throw new IllegalArgumentException(openMode.toString());
}
}

@Override
public InternalEngine recoverFromTranslog() throws IOException {
flushLock.lock();
Expand Down Expand Up @@ -1607,7 +1610,7 @@ public IndexCommitRef acquireIndexCommit(final boolean flushFirst) throws Engine
}
try (ReleasableLock lock = readLock.acquire()) {
logger.trace("pulling snapshot");
return new IndexCommitRef(deletionPolicy.getIndexDeletionPolicy());
return new IndexCommitRef(snapshotDeletionPolicy);
} catch (IOException e) {
throw new SnapshotFailedEngineException(shardId, e);
}
Expand Down Expand Up @@ -1788,7 +1791,7 @@ private IndexWriterConfig getIndexWriterConfig(boolean create) {
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexDeletionPolicy(deletionPolicy);
iwc.setIndexDeletionPolicy(snapshotDeletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,14 +372,14 @@ long getMinFileGeneration() {
* Returns the number of operations in the translog files that aren't committed to lucene.
*/
public int uncommittedOperations() {
return totalOperations(deletionPolicy.getMinTranslogGenerationForRecovery());
return totalOperations(deletionPolicy.getTranslogGenerationOfLastCommit());
}

/**
* Returns the size in bytes of the translog files that aren't committed to lucene.
*/
public long uncommittedSizeInBytes() {
return sizeInBytesByMinGen(deletionPolicy.getMinTranslogGenerationForRecovery());
return sizeInBytesByMinGen(deletionPolicy.getTranslogGenerationOfLastCommit());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public void assertNoOpenTranslogRefs() {
*/
private long minTranslogGenerationForRecovery = 1;

/**
* This translog generation is used to calculate the number of uncommitted operations since the last index commit.
*/
private long translogGenerationOfLastCommit = 1;

private long retentionSizeInBytes;

private long retentionAgeInMillis;
Expand All @@ -69,13 +74,24 @@ public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMill
}

public synchronized void setMinTranslogGenerationForRecovery(long newGen) {
if (newGen < minTranslogGenerationForRecovery) {
throw new IllegalArgumentException("minTranslogGenerationForRecovery can't go backwards. new [" + newGen + "] current [" +
minTranslogGenerationForRecovery + "]");
if (newGen < minTranslogGenerationForRecovery || newGen > translogGenerationOfLastCommit) {
throw new IllegalArgumentException("Invalid minTranslogGenerationForRecovery can't go backwards; new [" + newGen + "]," +
"current [" + minTranslogGenerationForRecovery + "], lastGen [" + translogGenerationOfLastCommit + "]");
}
minTranslogGenerationForRecovery = newGen;
}

/**
* Sets the translog generation of the last index commit.
*/
public synchronized void setTranslogGenerationOfLastCommit(long lastGen) {
if (lastGen < translogGenerationOfLastCommit || lastGen < minTranslogGenerationForRecovery) {
throw new IllegalArgumentException("Invalid translogGenerationOfLastCommit; new [" + lastGen + "]," +
"current [" + translogGenerationOfLastCommit + "], minRequiredGen [" + minTranslogGenerationForRecovery + "]");
}
translogGenerationOfLastCommit = lastGen;
}

public synchronized void setRetentionSizeInBytes(long bytes) {
retentionSizeInBytes = bytes;
}
Expand Down Expand Up @@ -193,6 +209,14 @@ public synchronized long getMinTranslogGenerationForRecovery() {
return minTranslogGenerationForRecovery;
}

/**
* Returns a translog generation that will be used to calculate the number of uncommitted operations since the last index commit.
* See {@link Translog#uncommittedOperations()} and {@link Translog#uncommittedSizeInBytes()}
*/
public synchronized long getTranslogGenerationOfLastCommit() {
return translogGenerationOfLastCommit;
}

synchronized long getTranslogRefCount(long gen) {
final Counter counter = translogRefCounts.get(gen);
return counter == null ? 0 : counter.get();
Expand Down
Loading