Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rollback a primary before recovering from translog #27804

Merged
merged 23 commits into from
Dec 22, 2017
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.index.translog.TranslogDeletionPolicy;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.function.LongSupplier;
Expand All @@ -37,7 +38,7 @@
* In particular, this policy will delete index commits whose max sequence number is at most
* the current global checkpoint except the index commit which has the highest max sequence number among those.
*/
final class CombinedDeletionPolicy extends IndexDeletionPolicy {
public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final TranslogDeletionPolicy translogDeletionPolicy;
private final EngineConfig.OpenMode openMode;
private final LongSupplier globalCheckpointSupplier;
Expand Down Expand Up @@ -71,7 +72,7 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {

@Override
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
final int keptPosition = indexOfKeptCommits(commits);
final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());
for (int i = 0; i < keptPosition; i++) {
commits.get(i).delete();
}
Expand All @@ -90,12 +91,26 @@ private void updateTranslogDeletionPolicy(final IndexCommit minRequiredCommit, f
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);
}

/**
* Find a safe commit point from a list of existing commits based on the persisted global checkpoint from translog.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

persisted global checkpoint from the translog -> supplied global checkpoint.

* The max seqno of a safe commit point should be at most the global checkpoint from the translog checkpoint.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a huge warning that says that if the index was created before 6.2 we can't guarantee we'll find one and that in that case we return the oldest valid commit?

*
* @param commits a list of existing commit points
* @param globalCheckpoint the persisted global checkpoint from the translog, see {@link Translog#readGlobalCheckpoint(Path)}
*/
public static IndexCommit findSafeCommitPoint(List<IndexCommit> commits, long globalCheckpoint) throws IOException {
if (commits.isEmpty()) {
throw new IllegalArgumentException("Commit list must not empty");
}
final int keptPosition = indexOfKeptCommits(commits, globalCheckpoint);
return commits.get(keptPosition);
}

/**
* Find the highest index position of a safe index commit whose max sequence number is not greater than the global checkpoint.
* Index commits with different translog UUID will be filtered out as they don't belong to this engine.
*/
private int indexOfKeptCommits(List<? extends IndexCommit> commits) throws IOException {
final long currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong();
private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long globalCheckpoint) throws IOException {
final String expectedTranslogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);

// Commits are sorted by age (the 0th one is the oldest commit).
Expand All @@ -110,7 +125,7 @@ private int indexOfKeptCommits(List<? extends IndexCommit> commits) throws IOExc
return i;
}
final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
if (maxSeqNoFromCommit <= currentGlobalCheckpoint) {
if (maxSeqNoFromCommit <= globalCheckpoint) {
return i;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
Expand Down Expand Up @@ -79,6 +80,8 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -177,14 +180,17 @@ public InternalEngine(EngineConfig engineConfig) {
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
try {
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
final IndexCommit startingCommit = getStartingCommitPoint();
assert startingCommit == null || openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this boolean does what you want? if start commit is null it will always pass.

"OPEN_INDEX_AND_TRANSLOG must have starting commit; mode [" + openMode + "]; startingCommit [" + startingCommit + "]";
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit);
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
assert translog.getGeneration() != null;
this.translog = translog;
this.snapshotDeletionPolicy = new SnapshotDeletionPolicy(
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint)
);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
assert engineConfig.getForceNewHistoryUUID() == false
|| openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG
Expand Down Expand Up @@ -234,7 +240,7 @@ public InternalEngine(EngineConfig engineConfig) {
}

private LocalCheckpointTracker createLocalCheckpointTracker(
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException {
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier, IndexCommit startingCommit) throws IOException {
final long maxSeqNo;
final long localCheckpoint;
switch (openMode) {
Expand All @@ -243,8 +249,14 @@ private LocalCheckpointTracker createLocalCheckpointTracker(
localCheckpoint = SequenceNumbers.NO_OPS_PERFORMED;
break;
case OPEN_INDEX_AND_TRANSLOG:
// When recovering from a previous commit point, we use the local checkpoint from that commit,
// but the max_seqno from the last commit. This allows use to throw away stale operations.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this distinction? can't we keep it conceptually simple? also we recover everything from the translog, so I'm not sure how it can happen that we throw away stuff?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • If we use the local checkpoint from the last commit, operations having seqno less than or equal to the local checkpoint will be skipped -> we need to use the local checkpoint from the starting commit.

  • We fillSeqNoGaps until the max_seqno -> we need to use max_seqno from the last commit to have full history.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We fillSeqNoGaps until the max_seqno -> we need to use max_seqno from the last commit to have full history.

We do so after we replay the translog. at which point we'll know what the max seq is (and it may be higher than the one in the last commit, but it can't be lower)

maxSeqNo = store.loadSeqNoInfo(null).v1();
localCheckpoint = store.loadSeqNoInfo(startingCommit).v2();
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
break;
case OPEN_INDEX_CREATE_TRANSLOG:
final Tuple<Long, Long> seqNoStats = store.loadSeqNoInfo();
final Tuple<Long, Long> seqNoStats = store.loadSeqNoInfo(null);
maxSeqNo = seqNoStats.v1();
localCheckpoint = seqNoStats.v2();
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
Expand Down Expand Up @@ -399,6 +411,32 @@ public InternalEngine recoverFromTranslog() throws IOException {
return this;
}

private IndexCommit getStartingCommitPoint() throws IOException {
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
final Path translogPath = engineConfig.getTranslogConfig().getTranslogPath();
final long globalCheckpoint = Translog.readGlobalCheckpoint(translogPath);
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose full translog
// files are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
// To avoid this issue, we only select index commits whose translog files are fully retained.
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) {
final List<IndexCommit> recoverableCommits = new ArrayList<>();
final long minRetainedTranslogGen = Translog.readMinReferencedTranslogGen(translogPath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we open the translog before the writer now - can we use the open translog for all this information rather than static reading it? this will make sure that we use something that went through all the right validations.

for (IndexCommit commit : existingCommits) {
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
recoverableCommits.add(commit);
}
}
assert recoverableCommits.isEmpty() == false : "Unable to select a proper safe commit point; " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to say that no commit point was found which could be recovered from the translog.

"commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
return CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, globalCheckpoint);
} else {
return CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint);
}
}
return null;
}

private void recoverFromTranslogInternal() throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
Expand Down Expand Up @@ -521,7 +559,11 @@ private ExternalSearcherManager createSearcherManager(SearchFactory externalSear
final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
internalSearcherManager = new SearcherManager(directoryReader,
new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()));
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(internalSearcherManager, store);
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain why we need this destinction?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we open other commit rather the last commit, we should assign lastCommittedSegmentInfos from that commit. I can revert this change and add this logic to recoverFromTranslogInternal. Your thought?

lastCommittedSegmentInfos = Lucene.readSegmentInfos(indexWriter.getConfig().getIndexCommit());
} else {
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(internalSearcherManager, store);
}
ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(internalSearcherManager,
externalSearcherFactory);
success = true;
Expand Down Expand Up @@ -1778,9 +1820,11 @@ private long loadCurrentVersionFromIndex(Term uid) throws IOException {
}
}

private IndexWriter createWriter(boolean create) throws IOException {
private IndexWriter createWriter(boolean create, IndexCommit startingCommit) throws IOException {
try {
final IndexWriterConfig iwc = getIndexWriterConfig(create);
assert startingCommit == null || create == false : "Starting commit makes sense only when create=false";
iwc.setIndexCommit(startingCommit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be part of getIndexWriterConfig?

return createWriter(store.directory(), iwc);
} catch (LockObtainFailedException ex) {
logger.warn("could not lock IndexWriter", ex);
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,14 @@ private static SegmentInfos readSegmentsInfo(IndexCommit commit, Directory direc
}

/**
* Loads the maximum sequence number and local checkpoint from the latest Lucene commit point.
* Loads the maximum sequence number and local checkpoint from the given Lucene commit point or the latest if not provided.
*
* @param commit the commit point to load seqno stats, or the last commit in the store if the parameter is null
* @return a tuple populated with the maximum sequence number and the local checkpoint
* @throws IOException if an I/O exception occurred reading the latest Lucene commit point from disk
*/
public Tuple<Long, Long> loadSeqNoInfo() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(directory).getUserData();
public Tuple<Long, Long> loadSeqNoInfo(final IndexCommit commit) throws IOException {
final Map<String, String> userData = commit != null ? commit.getUserData() : SegmentInfos.readLatestCommit(directory).getUserData();
return SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet());
}

Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -1704,6 +1704,17 @@ public static final long readGlobalCheckpoint(final Path location) throws IOExce
return readCheckpoint(location).globalCheckpoint;
}

/**
* Reads the minimum referenced generation translog generation from the translog checkpoint.
*
* @param location the location of the translog
* @return the minimum generation referenced by the translog.
* @throws IOException if an I/O exception occurred reading the checkpoint
*/
public static long readMinReferencedTranslogGen(final Path location) throws IOException {
return readCheckpoint(location).minTranslogGeneration;
}

/**
* Returns the translog uuid used to associate a lucene index with a translog.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
try {
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation());
final Tuple<Long, Long> seqNoStats = recoveryTarget.store().loadSeqNoInfo();
final Tuple<Long, Long> seqNoStats = recoveryTarget.store().loadSeqNoInfo(null);
long maxSeqNo = seqNoStats.v1();
long localCheckpoint = seqNoStats.v2();
if (maxSeqNo <= globalCheckpoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@
import org.elasticsearch.index.mapper.RootObjectMapper;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.GlobalCheckpointTracker;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
Expand Down Expand Up @@ -1121,8 +1120,12 @@ public void testRenewSyncFlush() throws Exception {
}

public void testSyncedFlushSurvivesEngineRestart() throws IOException {
final LongSupplier inSyncGlobalCheckpointSupplier = () -> this.engine.getLocalCheckpointTracker().getCheckpoint();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed?

IOUtils.close(store, engine);
store = createStore();
engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier);
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), new BytesArray("{}"), null);
engine.index(indexForDoc(doc));
final Engine.CommitId commitID = engine.flush();
assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID),
Expand All @@ -1145,7 +1148,7 @@ public void testSyncedFlushSurvivesEngineRestart() throws IOException {

public void testSyncedFlushVanishesOnReplay() throws IOException {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), new BytesArray("{}"), null);
engine.index(indexForDoc(doc));
final Engine.CommitId commitID = engine.flush();
assertEquals("should succeed to flush commit with right id and no pending doc", engine.syncFlush(syncId, commitID),
Expand Down Expand Up @@ -2514,6 +2517,7 @@ private Path[] filterExtraFSFiles(Path[] files) {
}

public void testTranslogReplay() throws IOException {
final LongSupplier inSyncGlobalCheckpointSupplier = () -> this.engine.getLocalCheckpointTracker().getCheckpoint();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should make this the default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the time, we prefer to deal with an in-sync global checkpoint, thus making this as a default makes sense to me. I will do in a follow-up.

final int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
Expand All @@ -2527,7 +2531,7 @@ public void testTranslogReplay() throws IOException {
parser.mappingUpdate = dynamicUpdate();

engine.close();
engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)); // we need to reuse the engine config unless the parser.mappingModified won't work
engine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, inSyncGlobalCheckpointSupplier)); // we need to reuse the engine config unless the parser.mappingModified won't work
engine.recoverFromTranslog();

assertVisibleCount(engine, numDocs, false);
Expand All @@ -2541,7 +2545,7 @@ public void testTranslogReplay() throws IOException {
}

engine.close();
engine = createEngine(store, primaryTranslogDir);
engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier);
assertVisibleCount(engine, numDocs, false);
parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner();
assertEquals(0, parser.appliedOperations());
Expand All @@ -2568,7 +2572,7 @@ public void testTranslogReplay() throws IOException {
}

engine.close();
engine = createEngine(store, primaryTranslogDir);
engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier);
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), numDocs + 1);
assertThat(topDocs.totalHits, equalTo(numDocs + 1L));
Expand All @@ -2580,7 +2584,7 @@ public void testTranslogReplay() throws IOException {
engine.refresh("test");
} else {
engine.close();
engine = createEngine(store, primaryTranslogDir);
engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier);
}
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), numDocs);
Expand Down Expand Up @@ -2809,8 +2813,9 @@ protected void doRun() throws Exception {
}

public void testCurrentTranslogIDisCommitted() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null);
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);

// create
{
Expand All @@ -2820,7 +2825,7 @@ public void testCurrentTranslogIDisCommitted() throws IOException {
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG))){
assertFalse(engine.isRecovering());
engine.index(firstIndexRequest);

globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
Expand Down
Loading