Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rollback a primary before recovering from translog #27804

Merged
merged 23 commits into from
Dec 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.index.translog.TranslogDeletionPolicy;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.function.LongSupplier;
Expand All @@ -37,7 +38,7 @@
* In particular, this policy will delete index commits whose max sequence number is at most
* the current global checkpoint except the index commit which has the highest max sequence number among those.
*/
final class CombinedDeletionPolicy extends IndexDeletionPolicy {
public final class CombinedDeletionPolicy extends IndexDeletionPolicy {
private final TranslogDeletionPolicy translogDeletionPolicy;
private final EngineConfig.OpenMode openMode;
private final LongSupplier globalCheckpointSupplier;
Expand Down Expand Up @@ -70,7 +71,7 @@ public void onInit(List<? extends IndexCommit> commits) throws IOException {

@Override
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
final int keptPosition = indexOfKeptCommits(commits);
final int keptPosition = indexOfKeptCommits(commits, globalCheckpointSupplier.getAsLong());
for (int i = 0; i < keptPosition; i++) {
commits.get(i).delete();
}
Expand All @@ -89,12 +90,28 @@ private void updateTranslogDeletionPolicy(final IndexCommit minRequiredCommit, f
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);
}

/**
* Find a safe commit point from a list of existing commits based on the supplied global checkpoint.
* The max sequence number of a safe commit point should be at most the global checkpoint.
* If an index was created before v6.2, and we haven't retained a safe commit yet, this method will return the oldest commit.
*
* @param commits a list of existing commit points
* @param globalCheckpoint the persisted global checkpoint from the translog, see {@link Translog#readGlobalCheckpoint(Path)}
* @return a safe commit or the oldest commit if a safe commit is not found
*/
public static IndexCommit findSafeCommitPoint(List<IndexCommit> commits, long globalCheckpoint) throws IOException {
if (commits.isEmpty()) {
throw new IllegalArgumentException("Commit list must not empty");
}
final int keptPosition = indexOfKeptCommits(commits, globalCheckpoint);
return commits.get(keptPosition);
}

/**
* Find the highest index position of a safe index commit whose max sequence number is not greater than the global checkpoint.
* Index commits with different translog UUID will be filtered out as they don't belong to this engine.
*/
private int indexOfKeptCommits(List<? extends IndexCommit> commits) throws IOException {
final long currentGlobalCheckpoint = globalCheckpointSupplier.getAsLong();
private static int indexOfKeptCommits(List<? extends IndexCommit> commits, long globalCheckpoint) throws IOException {
final String expectedTranslogUUID = commits.get(commits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);

// Commits are sorted by age (the 0th one is the oldest commit).
Expand All @@ -109,7 +126,7 @@ private int indexOfKeptCommits(List<? extends IndexCommit> commits) throws IOExc
return i;
}
final long maxSeqNoFromCommit = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO));
if (maxSeqNoFromCommit <= currentGlobalCheckpoint) {
if (maxSeqNoFromCommit <= globalCheckpoint) {
return i;
}
}
Expand Down
23 changes: 1 addition & 22 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -568,28 +568,7 @@ public CommitStats commitStats() {
* @return the sequence number service
*/
public abstract LocalCheckpointTracker getLocalCheckpointTracker();

/**
* Read the last segments info from the commit pointed to by the searcher manager
*/
protected static SegmentInfos readLastCommittedSegmentInfos(final ReferenceManager<IndexSearcher> sm, final Store store) throws IOException {
IndexSearcher searcher = sm.acquire();
try {
IndexCommit latestCommit = ((DirectoryReader) searcher.getIndexReader()).getIndexCommit();
return Lucene.readSegmentInfos(latestCommit);
} catch (IOException e) {
// Fall back to reading from the store if reading from the commit fails
try {
return store.readLastCommittedSegmentsInfo();
} catch (IOException e2) {
e2.addSuppressed(e);
throw e2;
}
} finally {
sm.release(searcher);
}
}


/**
* Global stats on segments.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
Expand Down Expand Up @@ -77,6 +78,7 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -175,14 +177,17 @@ public InternalEngine(EngineConfig engineConfig) {
mergeScheduler = scheduler = new EngineMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings());
throttle = new IndexThrottle();
try {
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier);
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier());
assert translog.getGeneration() != null;
this.translog = translog;
final IndexCommit startingCommit = getStartingCommitPoint();
assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG || startingCommit != null :
"Starting commit should be non-null; mode [" + openMode + "]; startingCommit [" + startingCommit + "]";
this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier, startingCommit);
this.snapshotDeletionPolicy = new SnapshotDeletionPolicy(
new CombinedDeletionPolicy(openMode, translogDeletionPolicy, translog::getLastSyncedGlobalCheckpoint)
);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, startingCommit);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
assert engineConfig.getForceNewHistoryUUID() == false
|| openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG
Expand Down Expand Up @@ -232,7 +237,7 @@ public InternalEngine(EngineConfig engineConfig) {
}

private LocalCheckpointTracker createLocalCheckpointTracker(
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) throws IOException {
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier, IndexCommit startingCommit) throws IOException {
final long maxSeqNo;
final long localCheckpoint;
switch (openMode) {
Expand All @@ -242,7 +247,7 @@ private LocalCheckpointTracker createLocalCheckpointTracker(
break;
case OPEN_INDEX_AND_TRANSLOG:
case OPEN_INDEX_CREATE_TRANSLOG:
final SequenceNumbers.CommitInfo seqNoStats = store.loadSeqNoInfo();
final SequenceNumbers.CommitInfo seqNoStats = store.loadSeqNoInfo(startingCommit);
maxSeqNo = seqNoStats.maxSeqNo;
localCheckpoint = seqNoStats.localCheckpoint;
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint);
Expand Down Expand Up @@ -397,6 +402,31 @@ public InternalEngine recoverFromTranslog() throws IOException {
return this;
}

private IndexCommit getStartingCommitPoint() throws IOException {
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
final long lastSyncedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
final long minRetainedTranslogGen = translog.getMinFileGeneration();
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose full translog
// files are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
// To avoid this issue, we only select index commits whose translog files are fully retained.
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_2_0)) {
final List<IndexCommit> recoverableCommits = new ArrayList<>();
for (IndexCommit commit : existingCommits) {
if (minRetainedTranslogGen <= Long.parseLong(commit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY))) {
recoverableCommits.add(commit);
}
}
assert recoverableCommits.isEmpty() == false : "No commit point with full translog found; " +
"commits [" + existingCommits + "], minRetainedTranslogGen [" + minRetainedTranslogGen + "]";
return CombinedDeletionPolicy.findSafeCommitPoint(recoverableCommits, lastSyncedGlobalCheckpoint);
} else {
return CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, lastSyncedGlobalCheckpoint);
}
}
return null;
}

private void recoverFromTranslogInternal() throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
Expand Down Expand Up @@ -519,7 +549,9 @@ private ExternalSearcherManager createSearcherManager(SearchFactory externalSear
final DirectoryReader directoryReader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(indexWriter), shardId);
internalSearcherManager = new SearcherManager(directoryReader,
new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()));
lastCommittedSegmentInfos = readLastCommittedSegmentInfos(internalSearcherManager, store);
// The index commit from IndexWriterConfig is null if the engine is open with other modes
// rather than CREATE_INDEX_AND_TRANSLOG. In those cases lastCommittedSegmentInfos will be retrieved from the last commit.
lastCommittedSegmentInfos = store.readCommittedSegmentsInfo(indexWriter.getConfig().getIndexCommit());
ExternalSearcherManager externalSearcherManager = new ExternalSearcherManager(internalSearcherManager,
externalSearcherFactory);
success = true;
Expand Down Expand Up @@ -1776,9 +1808,9 @@ private long loadCurrentVersionFromIndex(Term uid) throws IOException {
}
}

private IndexWriter createWriter(boolean create) throws IOException {
private IndexWriter createWriter(boolean create, IndexCommit startingCommit) throws IOException {
try {
final IndexWriterConfig iwc = getIndexWriterConfig(create);
final IndexWriterConfig iwc = getIndexWriterConfig(create, startingCommit);
return createWriter(store.directory(), iwc);
} catch (LockObtainFailedException ex) {
logger.warn("could not lock IndexWriter", ex);
Expand All @@ -1791,10 +1823,11 @@ IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOEx
return new IndexWriter(directory, iwc);
}

private IndexWriterConfig getIndexWriterConfig(boolean create) {
private IndexWriterConfig getIndexWriterConfig(boolean create, IndexCommit startingCommit) {
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
iwc.setIndexCommit(startingCommit);
iwc.setIndexDeletionPolicy(snapshotDeletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1291,7 +1291,7 @@ public void createIndexAndTranslog() throws IOException {
public void openIndexAndCreateTranslog(boolean forceNewHistoryUUID, long globalCheckpoint) throws IOException {
assert recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE &&
recoveryState.getRecoverySource().getType() != RecoverySource.Type.EXISTING_STORE;
SequenceNumbers.CommitInfo commitInfo = store.loadSeqNoInfo();
SequenceNumbers.CommitInfo commitInfo = store.loadSeqNoInfo(null);
assert commitInfo.localCheckpoint >= globalCheckpoint :
"trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint ["
+ globalCheckpoint + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
recoveryState.getIndex().updateVersion(version);
if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
assert indexShouldExists;
indexShard.openIndexAndCreateTranslog(true, store.loadSeqNoInfo().localCheckpoint);
indexShard.openIndexAndCreateTranslog(true, store.loadSeqNoInfo(null).localCheckpoint);
} else {
// since we recover from local, just fill the files and size
try {
Expand Down Expand Up @@ -442,7 +442,7 @@ private void restore(final IndexShard indexShard, final Repository repository, f
final long localCheckpoint;
store.incRef();
try {
localCheckpoint = store.loadSeqNoInfo().localCheckpoint;
localCheckpoint = store.loadSeqNoInfo(null).localCheckpoint;
} finally {
store.decRef();
}
Expand Down
17 changes: 13 additions & 4 deletions core/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,17 @@ public Directory directory() {
* @throws IOException if the index is corrupted or the segments file is not present
*/
public SegmentInfos readLastCommittedSegmentsInfo() throws IOException {
return readCommittedSegmentsInfo(null);
}

/**
* Returns the committed segments info for the given commit point.
* If the commit point is not provided, this method will return the segments info of the last commit in the store.
*/
public SegmentInfos readCommittedSegmentsInfo(final IndexCommit commit) throws IOException {
failIfCorrupted();
try {
return readSegmentsInfo(null, directory());
return readSegmentsInfo(commit, directory());
} catch (CorruptIndexException ex) {
markStoreCorrupted(ex);
throw ex;
Expand Down Expand Up @@ -212,13 +220,14 @@ private static SegmentInfos readSegmentsInfo(IndexCommit commit, Directory direc
}

/**
* Loads the maximum sequence number and local checkpoint from the latest Lucene commit point.
* Loads the maximum sequence number and local checkpoint from the given Lucene commit point or the latest if not provided.
*
* @param commit the commit point to load seqno stats, or the last commit in the store if the parameter is null
* @return {@link SequenceNumbers.CommitInfo} containing information about the last commit
* @throws IOException if an I/O exception occurred reading the latest Lucene commit point from disk
*/
public SequenceNumbers.CommitInfo loadSeqNoInfo() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(directory).getUserData();
public SequenceNumbers.CommitInfo loadSeqNoInfo(final IndexCommit commit) throws IOException {
final Map<String, String> userData = commit != null ? commit.getUserData() : SegmentInfos.readLatestCommit(directory).getUserData();
return SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public long currentFileGeneration() {
/**
* Returns the minimum file generation referenced by the translog
*/
long getMinFileGeneration() {
public long getMinFileGeneration() {
try (ReleasableLock ignored = readLock.acquire()) {
if (readers.isEmpty()) {
return current.getGeneration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
public static long getStartingSeqNo(final RecoveryTarget recoveryTarget) {
try {
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation());
final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo();
final SequenceNumbers.CommitInfo seqNoStats = recoveryTarget.store().loadSeqNoInfo(null);
if (seqNoStats.maxSeqNo <= globalCheckpoint) {
assert seqNoStats.localCheckpoint <= globalCheckpoint;
/*
Expand Down
Loading