Skip to content

Commit

Permalink
Preserve multiple translog generations
Browse files Browse the repository at this point in the history
Today when a flush is performed, the translog is committed and if there
are no outstanding views, only the current translog generation is
preserved. Yet for the purpose of sequence numbers, we need stronger
guarantees than this. This commit migrates the preservation of translog
generations to keep the minimum generation that would be needed to
recover after the local checkpoint.

Relates #24015
  • Loading branch information
jasontedor authored Apr 17, 2017
1 parent 8033c57 commit f7ebe9d
Show file tree
Hide file tree
Showing 4 changed files with 435 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, Lon
throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist");
}
if (generation.translogUUID == null) {
throw new IndexFormatTooOldException("trasnlog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
throw new IndexFormatTooOldException("translog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
}
}
final Translog translog = new Translog(translogConfig, generation, globalCheckpointSupplier);
Expand Down Expand Up @@ -1233,12 +1233,12 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
try {
translog.prepareCommit();
logger.trace("starting commit for flush; commitTranslog=true");
commitIndexWriter(indexWriter, translog, null);
final long committedGeneration = commitIndexWriter(indexWriter, translog, null);
logger.trace("finished commit for flush");
// we need to refresh in order to clear older version values
refresh("version_table_flush");
// after refresh documents can be retrieved from the index so we can now commit the translog
translog.commit();
translog.commit(committedGeneration);
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
}
Expand Down Expand Up @@ -1734,55 +1734,65 @@ protected void doRun() throws Exception {
}
}

private void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
/**
* Commits the specified index writer.
*
* @param writer the index writer to commit
* @param translog the translog
* @param syncId the sync flush ID ({@code null} if not committing a synced flush)
* @return the minimum translog generation for the local checkpoint committed with the specified index writer
* @throws IOException if an I/O exception occurs committing the specfied writer
*/
private long commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
ensureCanFlush();
try {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();

final String translogFileGen = Long.toString(translogGeneration.translogFileGeneration);
final long localCheckpoint = seqNoService().getLocalCheckpoint();
final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1);
final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
final String localCheckpoint = Long.toString(seqNoService().getLocalCheckpoint());
final String localCheckpointValue = Long.toString(localCheckpoint);

writer.setLiveCommitData(() -> {
/*
* The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes
* segments, including the local checkpoint amongst other values. The maximum sequence number is different - we never want
* segments, including the local checkpoint amongst other values. The maximum sequence number is different, we never want
* the maximum sequence number to be less than the last sequence number to go into a Lucene commit, otherwise we run the
* risk of re-using a sequence number for two different documents when restoring from this commit point and subsequently
* writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the max_seq_no to the time of invocation
* of the commit data iterator (which occurs after all documents have been flushed to Lucene).
* writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
final Map<String, String> commitData = new HashMap<>(5);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpoint);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue);
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
if (logger.isTraceEnabled()) {
logger.trace("committing writer with commit data [{}]", commitData);
}
logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});

writer.commit();
} catch (Exception ex) {
return translogGeneration.translogFileGeneration;
} catch (final Exception ex) {
try {
failEngine("lucene commit failed", ex);
} catch (Exception inner) {
} catch (final Exception inner) {
ex.addSuppressed(inner);
}
throw ex;
} catch (AssertionError e) {
// IndexWriter throws AssertionError on commit, if asserts are enabled, if any files don't exist, but tests that
// randomly throw FNFE/NSFE can also hit this:
} catch (final AssertionError e) {
/*
* If assertions are enabled, IndexWriter throws AssertionError on commit if any files don't exist, but tests that randomly
* throw FileNotFoundException or NoSuchFileException can also hit this.
*/
if (ExceptionsHelper.stackTrace(e).contains("org.apache.lucene.index.IndexWriter.filesExist")) {
EngineException engineException = new EngineException(shardId, "failed to commit engine", e);
final EngineException engineException = new EngineException(shardId, "failed to commit engine", e);
try {
failEngine("lucene commit failed", engineException);
} catch (Exception inner) {
} catch (final Exception inner) {
engineException.addSuppressed(inner);
}
throw engineException;
Expand Down Expand Up @@ -1866,7 +1876,7 @@ public boolean isRecovering() {
* Gets the commit data from {@link IndexWriter} as a map.
*/
private static Map<String, String> commitDataAsMap(final IndexWriter indexWriter) {
Map<String, String> commitData = new HashMap<>(6);
Map<String, String> commitData = new HashMap<>(5);
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
commitData.put(entry.getKey(), entry.getValue());
}
Expand Down
103 changes: 75 additions & 28 deletions core/src/main/java/org/elasticsearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@
* When a translog is opened the checkpoint is use to retrieve the latest translog file generation and subsequently to open the last written file to recovery operations.
* The {@link org.elasticsearch.index.translog.Translog.TranslogGeneration}, given when the translog is opened / constructed is compared against
* the latest generation and all consecutive translog files singe the given generation and the last generation in the checkpoint will be recovered and preserved until the next
* generation is committed using {@link Translog#commit()}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are
* the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit()}. In such a case
* generation is committed using {@link Translog#commit(long)}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are
* the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit(long)}. In such a case
* the currently being committed translog file will not be deleted since it's commit was not successful. Yet, a new/current translog file is already opened at that point such that there is more than
* one translog file present. Such an uncommitted translog file always has a <tt>translog-${gen}.ckp</tt> associated with it which is an fsynced copy of the it's last <tt>translog.ckp</tt> such that in
* disaster recovery last fsynced offsets, number of operation etc. are still preserved.
* </p>
*/
public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable, TwoPhaseCommit {
public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable {

/*
* TODO
Expand Down Expand Up @@ -804,6 +804,8 @@ public static Type fromId(byte id) {

long seqNo();

long primaryTerm();

/**
* Reads the type and the operation from the given stream. The operation must be written with
* {@link Operation#writeType(Operation, StreamOutput)}
Expand Down Expand Up @@ -953,6 +955,7 @@ public long seqNo() {
return seqNo;
}

@Override
public long primaryTerm() {
return primaryTerm;
}
Expand Down Expand Up @@ -1104,6 +1107,7 @@ public long seqNo() {
return seqNo;
}

@Override
public long primaryTerm() {
return primaryTerm;
}
Expand Down Expand Up @@ -1180,6 +1184,7 @@ public long seqNo() {
return seqNo;
}

@Override
public long primaryTerm() {
return primaryTerm;
}
Expand Down Expand Up @@ -1347,6 +1352,31 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl
out.writeInt((int) checksum);
}

/**
* Gets the minimum generation that could contain any sequence number after the specified sequence number, or the current generation if
* there is no generation that could any such sequence number.
*
* @param seqNo the sequence number
* @return the minimum generation for the sequence number
*/
public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) {
try (ReleasableLock ignored = writeLock.acquire()) {
/*
* When flushing, the engine will ask the translog for the minimum generation that could contain any sequence number after the
* local checkpoint. Immediately after flushing, there will be no such generation, so this minimum generation in this case will
* be the current translog generation as we do not need any prior generations to have a complete history up to the current local
* checkpoint.
*/
long minTranslogFileGeneration = this.currentFileGeneration();
for (final TranslogReader reader : readers) {
if (seqNo <= reader.getCheckpoint().maxSeqNo) {
minTranslogFileGeneration = Math.min(minTranslogFileGeneration, reader.getGeneration());
}
}
return new TranslogGeneration(translogUUID, minTranslogFileGeneration);
}
}

/**
* Roll the current translog generation into a new generation. This does not commit the
* translog.
Expand Down Expand Up @@ -1375,54 +1405,78 @@ public void rollGeneration() throws IOException {
}
}

@Override
public long prepareCommit() throws IOException {
/**
* Prepares a translog commit by setting the current committing generation and rolling the translog generation.
*
* @throws IOException if an I/O exception occurred while rolling the translog generation
*/
public void prepareCommit() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
if (currentCommittingGeneration != NOT_SET_GENERATION) {
final String message = String.format(
Locale.ROOT,
"already committing a translog with generation [%d]",
currentCommittingGeneration);
final String message =
String.format(Locale.ROOT, "already committing a translog with generation [%d]", currentCommittingGeneration);
throw new IllegalStateException(message);
}
currentCommittingGeneration = current.getGeneration();
rollGeneration();
}
return 0;
}

@Override
public long commit() throws IOException {
/**
* Commits the translog and sets the last committed translog generation to the specified generation. The specified committed generation
* will be used when trimming unreferenced translog generations such that generations from the committed generation will be preserved.
*
* If {@link Translog#prepareCommit()} was not called before calling commit, this method will be invoked too causing the translog
* generation to be rolled.
*
* @param committedGeneration the minimum translog generation to preserve after trimming unreferenced generations
* @throws IOException if an I/O exception occurred preparing the translog commit
*/
public void commit(final long committedGeneration) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
assert assertCommittedGenerationIsInValidRange(committedGeneration);
if (currentCommittingGeneration == NOT_SET_GENERATION) {
prepareCommit();
}
assert currentCommittingGeneration != NOT_SET_GENERATION;
assert readers.stream().anyMatch(r -> r.getGeneration() == currentCommittingGeneration)
: "readers missing committing generation [" + currentCommittingGeneration + "]";
// set the last committed generation otherwise old files will not be cleaned up
lastCommittedTranslogFileGeneration = currentCommittingGeneration + 1;
lastCommittedTranslogFileGeneration = committedGeneration;
currentCommittingGeneration = NOT_SET_GENERATION;
trimUnreferencedReaders();
}
return 0;
}

private boolean assertCommittedGenerationIsInValidRange(final long committedGeneration) {
assert committedGeneration <= current.generation
: "tried to commit generation [" + committedGeneration + "] after current generation [" + current.generation + "]";
final long min = readers.stream().map(TranslogReader::getGeneration).min(Long::compareTo).orElse(Long.MIN_VALUE);
assert committedGeneration >= min
: "tried to commit generation [" + committedGeneration + "] before minimum generation [" + min + "]";
return true;
}

/**
* Trims unreferenced translog generations. The guarantee here is that translog generations will be preserved for all outstanding views
* and from the last committed translog generation defined by {@link Translog#lastCommittedTranslogFileGeneration}.
*/
void trimUnreferencedReaders() {
try (ReleasableLock ignored = writeLock.acquire()) {
if (closed.get()) {
// we're shutdown potentially on some tragic event - don't delete anything
// we're shutdown potentially on some tragic event, don't delete anything
return;
}
long minReferencedGen = outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE);
minReferencedGen = Math.min(lastCommittedTranslogFileGeneration, minReferencedGen);
final long finalMinReferencedGen = minReferencedGen;
List<TranslogReader> unreferenced = readers.stream().filter(r -> r.getGeneration() < finalMinReferencedGen).collect(Collectors.toList());
long minReferencedGen = Math.min(
lastCommittedTranslogFileGeneration,
outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE));
final List<TranslogReader> unreferenced =
readers.stream().filter(r -> r.getGeneration() < minReferencedGen).collect(Collectors.toList());
for (final TranslogReader unreferencedReader : unreferenced) {
Path translogPath = unreferencedReader.path();
logger.trace("delete translog file - not referenced and not current anymore {}", translogPath);
final Path translogPath = unreferencedReader.path();
logger.trace("delete translog file [{}], not referenced and not current anymore", translogPath);
IOUtils.closeWhileHandlingException(unreferencedReader);
IOUtils.deleteFilesIgnoringExceptions(translogPath,
translogPath.resolveSibling(getCommitCheckpointFileName(unreferencedReader.getGeneration())));
Expand All @@ -1442,13 +1496,6 @@ void closeFilesIfNoPendingViews() throws IOException {
}
}


@Override
public void rollback() throws IOException {
ensureOpen();
close();
}

/**
* References a transaction log generation
*/
Expand Down
Loading

0 comments on commit f7ebe9d

Please sign in to comment.