Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep commits and translog up to the global checkpoint #27606

Merged
merged 32 commits into from
Dec 13, 2017
Merged
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ad052ec
Keep index commits up to the global checkpoint
dnhatn Nov 28, 2017
97e6cff
add doc for policy
dnhatn Dec 1, 2017
48381a2
use set
dnhatn Dec 1, 2017
4b9ebb6
compact switch
dnhatn Dec 1, 2017
7f32e22
format
dnhatn Dec 1, 2017
94b735f
add randomLong to ESTestCase
dnhatn Dec 2, 2017
ab262b1
move gcp policy to the combined policy
dnhatn Dec 2, 2017
38310cd
Merge branch 'master' into gcp-deletion-policy
dnhatn Dec 3, 2017
72a4190
Use the kept position
dnhatn Dec 4, 2017
1e3d96b
Merge branch 'master' into gcp-deletion-policy
dnhatn Dec 6, 2017
6b2646b
Delete all commits in OPEN_INDEX_CREATE_TRANSLOG
dnhatn Dec 6, 2017
b7177cd
more assert
dnhatn Dec 6, 2017
12cae92
call lastGen() before minGen()
dnhatn Dec 5, 2017
dbdacc4
improve tests
dnhatn Dec 6, 2017
d07af94
tighten onInit()
dnhatn Dec 6, 2017
639cac6
test advance minGen
dnhatn Dec 6, 2017
d146583
use matcher
dnhatn Dec 6, 2017
2a6198e
inline small functions
dnhatn Dec 7, 2017
b819a8b
test: update local checkpoint
dnhatn Dec 7, 2017
b29623d
add comment onInit()
dnhatn Dec 7, 2017
60a204f
resume out of order test
dnhatn Dec 7, 2017
6b74ab1
use TRANSLOG_UUID_KEY to valid commit
dnhatn Dec 9, 2017
d8705b4
minor feedbacks
dnhatn Dec 9, 2017
7ec417c
use size retention translog
dnhatn Dec 9, 2017
6733b75
do not use size-retention translog
dnhatn Dec 12, 2017
3a67d4d
do nothing with OPEN_INDEX_CREATE_TRANSLOG
dnhatn Dec 12, 2017
53702f9
simplify the recovery tests
dnhatn Dec 12, 2017
5644456
just test keep only one safe commit
dnhatn Dec 12, 2017
dad3a1b
assert #forceNewHistoryUUID
dnhatn Dec 12, 2017
b18725c
keeps ops after local checkpoint of safe commit
dnhatn Dec 12, 2017
09b0f91
explain the assertion
dnhatn Dec 12, 2017
ce8b52d
Merge branch 'master' into gcp-deletion-policy
dnhatn Dec 12, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Keep index commits up to the global checkpoint
We need to keep index commits and translog operations up to the current
global checkpoint to allow us to throw away unsafe operations and
increase the operation-based recovery chance. This is achieved by a new
index deletion policy.
dnhatn committed Nov 30, 2017
commit ad052eca9386c1c2fda90070d3337eb4b874256b
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.common.lucene.index;

import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;

import java.io.IOException;
import java.util.List;

/**
* An {@link IndexDeletionPolicy} that deletes unneeded index commits, and returns index commits are not deleted by this policy.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not an IndexDeletionPolicy any more!

*/
public interface ESIndexDeletionPolicy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a separate interface? If we want to know which commits can be kept,
we can just do commits.stream().allMatch(c -> c.isDeleted() == false) after invoking the onInit or onCommit methods?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface is to avoid keeping translog of snapshotted index commits. SnapshotDeletionPolicy suppresses the delete() for snapshotted commits.

https://github.com/apache/lucene-solr/blob/e2521b2a8baabdaf43b92192588f51e042d21e97/lucene/core/src/java/org/apache/lucene/index/SnapshotDeletionPolicy.java#L221-L225

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have documented its purpose.


/**
* Similar to {@link IndexDeletionPolicy#onInit(List)} but returns a list of kept index commits.
*
* @param commits A list of index commits sorted by age (the 0th one is the oldest commit).
* @return a list of index commits that are not deleted by this policy.
*/
List<IndexCommit> onInit(List<? extends IndexCommit> commits) throws IOException;

/**
* Similar to {@link IndexDeletionPolicy#onCommit(List)} but returns a list of kept index commits.
*
* @param commits A list of index commits sorted by age (the 0th one is the oldest commit).
* @return a list of index commits that are not deleted by this policy.
*/
List<IndexCommit> onCommit(List<? extends IndexCommit> commits) throws IOException;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface clearly came from IndexDeletionPolicy but I think it's now different enough that it's worth breaking the link entirely and coming up with more descriptive names. The onInit() and onCommit() methods of IndexDeletionPolicy return void but here we're returning a list of the kept commits. Additionally, I think all the implementations just delegate onInit() to onCommit() (except KeepUntilGlobalCheckpointDeletionPolicy that special-cases an empty argument).

Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@

import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.elasticsearch.common.lucene.index.ESIndexDeletionPolicy;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;

@@ -33,13 +33,11 @@
* making sure that all translog files that are needed to recover from the Lucene commit are not deleted.
*/
class CombinedDeletionPolicy extends IndexDeletionPolicy {

private final TranslogDeletionPolicy translogDeletionPolicy;
private final EngineConfig.OpenMode openMode;
private final ESIndexDeletionPolicy indexDeletionPolicy;

private final SnapshotDeletionPolicy indexDeletionPolicy;

CombinedDeletionPolicy(SnapshotDeletionPolicy indexDeletionPolicy, TranslogDeletionPolicy translogDeletionPolicy,
CombinedDeletionPolicy(ESIndexDeletionPolicy indexDeletionPolicy, TranslogDeletionPolicy translogDeletionPolicy,
EngineConfig.OpenMode openMode) {
this.indexDeletionPolicy = indexDeletionPolicy;
this.translogDeletionPolicy = translogDeletionPolicy;
@@ -48,7 +46,7 @@ class CombinedDeletionPolicy extends IndexDeletionPolicy {

@Override
public void onInit(List<? extends IndexCommit> commits) throws IOException {
indexDeletionPolicy.onInit(commits);
final List<IndexCommit> keptCommits = indexDeletionPolicy.onInit(commits);
switch (openMode) {
case CREATE_INDEX_AND_TRANSLOG:
break;
@@ -57,7 +55,7 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {
break;
case OPEN_INDEX_AND_TRANSLOG:
assert commits.isEmpty() == false : "index is opened, but we have no commits";
setLastCommittedTranslogGeneration(commits);
setLastCommittedTranslogGeneration(keptCommits);
break;
default:
throw new IllegalArgumentException("unknown openMode [" + openMode + "]");
@@ -66,24 +64,24 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {

@Override
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
indexDeletionPolicy.onCommit(commits);
setLastCommittedTranslogGeneration(commits);
}

private void setLastCommittedTranslogGeneration(List<? extends IndexCommit> commits) throws IOException {
// when opening an existing lucene index, we currently always open the last commit.
// we therefore use the translog gen as the one that will be required for recovery
final IndexCommit indexCommit = commits.get(commits.size() - 1);
assert indexCommit.isDeleted() == false : "last commit is deleted";
long minGen = Long.parseLong(indexCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minGen);
assert commits.isEmpty() == false;
final List<IndexCommit> keptCommits = indexDeletionPolicy.onCommit(commits);
setLastCommittedTranslogGeneration(keptCommits);
}

public SnapshotDeletionPolicy getIndexDeletionPolicy() {
return indexDeletionPolicy;
}
private void setLastCommittedTranslogGeneration(List<IndexCommit> keptCommits) throws IOException {
assert keptCommits.isEmpty() == false : "All index commits were deleted";
assert keptCommits.stream().allMatch(commit -> commit.isDeleted() == false) : "Kept commits must not be deleted";

public TranslogDeletionPolicy getTranslogDeletionPolicy() {
return translogDeletionPolicy;
final IndexCommit lastCommit = keptCommits.get(keptCommits.size() - 1);
final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
long minRequiredGen = lastGen;
for (IndexCommit indexCommit : keptCommits) {
long translogGen = Long.parseLong(indexCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
minRequiredGen = Math.min(minRequiredGen, translogGen);
}
assert minRequiredGen <= lastGen;
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);
}
}
Original file line number Diff line number Diff line change
@@ -128,7 +128,7 @@ public class InternalEngine extends Engine {

private final String uidField;

private final CombinedDeletionPolicy deletionPolicy;
private final SnapshotDeletionPolicy snapshotDeletionPolicy;

// How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges
// are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttling
@@ -167,8 +167,6 @@ public InternalEngine(EngineConfig engineConfig) {
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
);
this.deletionPolicy = new CombinedDeletionPolicy(
new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode);
store.incRef();
IndexWriter writer = null;
Translog translog = null;
@@ -182,29 +180,13 @@ public InternalEngine(EngineConfig engineConfig) {
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
try {
final SeqNoStats seqNoStats;
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
writer = createWriter(false);
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
seqNoStats = store.loadSeqNoStats(globalCheckpoint);
break;
case OPEN_INDEX_CREATE_TRANSLOG:
writer = createWriter(false);
seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
break;
case CREATE_INDEX_AND_TRANSLOG:
writer = createWriter(true);
seqNoStats = new SeqNoStats(
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.UNASSIGNED_SEQ_NO);
break;
default:
throw new IllegalArgumentException(openMode.toString());
}
final SeqNoStats seqNoStats = loadSeqNoStats(openMode);
logger.trace("recovered [{}]", seqNoStats);
seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
this.seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
this.snapshotDeletionPolicy = new SnapshotDeletionPolicy(new CombinedDeletionPolicy(
new KeepUntilGlobalCheckpointDeletionPolicy(seqNoService::getGlobalCheckpoint), translogDeletionPolicy, openMode)
);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID());
Copy link
Contributor

@bleskes bleskes Dec 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we assert that if we need to generate a new history uuid we always use *_CREATE_TRANSLOG as an open mode? that's why we rely on the translog uuid only for trimming purposes (and avoid thinking about what it means to generate a new history uuid)

Objects.requireNonNull(historyUUID, "history uuid should not be null");
@@ -377,6 +359,28 @@ static SequenceNumbersService sequenceNumberService(
seqNoStats.getGlobalCheckpoint());
}

private SeqNoStats loadSeqNoStats(EngineConfig.OpenMode openMode) throws IOException {
final SeqNoStats seqNoStats;
switch (openMode) {
case OPEN_INDEX_AND_TRANSLOG:
final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
seqNoStats = store.loadSeqNoStats(globalCheckpoint);
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just return store.loadSeqNoStats() here rather than breaking and returning at the bottom. Not sure if that's against our style tho?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

case OPEN_INDEX_CREATE_TRANSLOG:
seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO);
break;
case CREATE_INDEX_AND_TRANSLOG:
seqNoStats = new SeqNoStats(
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.UNASSIGNED_SEQ_NO);
break;
default:
throw new IllegalArgumentException(openMode.toString());
}
return seqNoStats;
}

@Override
public InternalEngine recoverFromTranslog() throws IOException {
flushLock.lock();
@@ -1642,7 +1646,7 @@ public IndexCommitRef acquireIndexCommit(final boolean flushFirst) throws Engine
}
try (ReleasableLock lock = readLock.acquire()) {
logger.trace("pulling snapshot");
return new IndexCommitRef(deletionPolicy.getIndexDeletionPolicy());
return new IndexCommitRef(snapshotDeletionPolicy);
} catch (IOException e) {
throw new SnapshotFailedEngineException(shardId, e);
}
@@ -1823,7 +1827,7 @@ private IndexWriterConfig getIndexWriterConfig(boolean create) {
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexDeletionPolicy(deletionPolicy);
iwc.setIndexDeletionPolicy(snapshotDeletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;
try {
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.common.lucene.index.ESIndexDeletionPolicy;
import org.elasticsearch.index.seqno.SequenceNumbers;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.LongSupplier;

/**
* An {@link ESIndexDeletionPolicy} that deletes index commits that are not required for recovery.
* In particular, this policy will delete index commits whose max sequence number is smaller (or equal) than
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/smaller (or equal) than/at most/ ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* the current global checkpoint except the index commit which has the highest max sequence number among those.
*/
final class KeepUntilGlobalCheckpointDeletionPolicy implements ESIndexDeletionPolicy {
private final LongSupplier globalCheckpointSupplier;

KeepUntilGlobalCheckpointDeletionPolicy(LongSupplier globalCheckpointSupplier) {
this.globalCheckpointSupplier = globalCheckpointSupplier;
}

@Override
public List<IndexCommit> onInit(List<? extends IndexCommit> commits) throws IOException {
if (commits.isEmpty()) {
return Collections.emptyList();
}
return onCommit(commits);
}

@Override
public List<IndexCommit> onCommit(List<? extends IndexCommit> commits) throws IOException {
assert commits.isEmpty() == false : "onCommit() must be called with a non-empty list of commits";

final List<IndexCommit> keptCommits = new ArrayList<>();
final int keptPosition = indexOfKeptCommits(commits);
final List<Integer> duplicateIndexes = indexesOfDuplicateCommits(commits);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only call this collection's contains() method so maybe a HashSet<Integer> would be better? Not sure if it gets big enough to make a difference.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expect to have zero or one element in a list, there should be no difference. However, using Set makes more sense here; I updated. Thank you.


for (int i = 0; i < commits.size() - 1; i++) {
final IndexCommit commit = commits.get(i);
if (i < keptPosition || duplicateIndexes.contains(i)) {
commit.delete();
} else {
keptCommits.add(commit);
}
}
keptCommits.add(commits.get(commits.size() - 1)); // Always keep the last commit.

assert keptCommits.stream().allMatch(c -> c.isDeleted() == false) : "All kept commits must not be deleted";
return keptCommits;
}

/**
* Find the index position of a safe index commit whose max sequence number is not greater than the global checkpoint.
*/
private int indexOfKeptCommits(List<? extends IndexCommit> commits) throws IOException {
final long currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong();

// Commits are sorted by age (the 0th one is the oldest commit).
for (int i = commits.size() - 1; i >= 0; i--) {
final Map<String, String> commitUserData = commits.get(i).getUserData();
// 5.x commits do not contain MAX_SEQ_NO.
if (commitUserData.containsKey(SequenceNumbers.MAX_SEQ_NO) == false) {
return i;
}
final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
if (maxSeqNoFromCommit <= currentGlobalCheckpoint) {
return i;
}
}
/*
* We may reach to this point in these cases:
* 1. In the previous 6.x, we keep only the last commit - which is likely not a safe commit if writes are in progress.
* Thus, after upgrading, we may not find a safe commit until we can reserve one.
* 2. In peer-recovery, if the file-based happens, a replica will be received the latest commit from a primary.
* However, that commit may not be a safe commit if writes are in progress in the primary.
*/
return -1;
}

/**
* In some cases, we may have more than one index commits with the same max sequence number.
* We better scan and delete these duplicate index commits as soon as possible.
*
* @return index positions of duplicate commits.
*/
private List<Integer> indexesOfDuplicateCommits(List<? extends IndexCommit> commits) throws IOException {
final List<Integer> duplicateEntries = new ArrayList<>();
long lastMaxSeqNo = Long.MIN_VALUE;
for (int i = commits.size() - 1; i >= 0; i--) {
final Map<String, String> commitUserData = commits.get(i).getUserData();
// 5.x commits do not contain MAX_SEQ_NO.
if (commitUserData.containsKey(SequenceNumbers.MAX_SEQ_NO)) {
final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
if (lastMaxSeqNo == maxSeqNoFromCommit) {
duplicateEntries.add(i);
}
lastMaxSeqNo = maxSeqNoFromCommit;
}
}
return duplicateEntries;
}
}
Original file line number Diff line number Diff line change
@@ -372,14 +372,14 @@ long getMinFileGeneration() {
* Returns the number of operations in the translog files that aren't committed to lucene.
*/
public int uncommittedOperations() {
return totalOperations(deletionPolicy.getMinTranslogGenerationForRecovery());
return totalOperations(deletionPolicy.getTranslogGenerationOfLastCommit());
}

/**
* Returns the size in bytes of the translog files that aren't committed to lucene.
*/
public long uncommittedSizeInBytes() {
return sizeInBytesByMinGen(deletionPolicy.getMinTranslogGenerationForRecovery());
return sizeInBytesByMinGen(deletionPolicy.getTranslogGenerationOfLastCommit());
}

/**
Loading