From 3112d9b8bdb0d1194f8602e12ae4a1a3124acf19 Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Wed, 5 Apr 2017 20:07:18 -0400
Subject: [PATCH 01/21] Preserve multiple translog generations
Today when a flush is performed, the translog is committed and if there
are no outstanding views, only the current translog generation is
preserved. Yet for the purpose of sequence numbers, we need stronger
guarantees than this. This commit migrates the preservation of translog
generations to keep the minimum generation that would be needed to
recover after the local checkpoint.
---
.../index/engine/InternalEngine.java | 63 ++++----
.../index/translog/Translog.java | 59 ++++----
.../index/engine/InternalEngineTests.java | 141 ++++++++++++++----
.../index/translog/TranslogTests.java | 134 +++++++++++++++--
4 files changed, 299 insertions(+), 98 deletions(-)
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 333dd769eaf68..a6060bdd8d798 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -298,7 +298,7 @@ private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, Lon
throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist");
}
if (generation.translogUUID == null) {
- throw new IndexFormatTooOldException("trasnlog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
+ throw new IndexFormatTooOldException("translog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first");
}
}
final Translog translog = new Translog(translogConfig, generation, globalCheckpointSupplier);
@@ -1179,12 +1179,12 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
try {
translog.prepareCommit();
logger.trace("starting commit for flush; commitTranslog=true");
- commitIndexWriter(indexWriter, translog, null);
+ final long committedGeneration = commitIndexWriter(indexWriter, translog, null);
logger.trace("finished commit for flush");
// we need to refresh in order to clear older version values
refresh("version_table_flush");
// after refresh documents can be retrieved from the index so we can now commit the translog
- translog.commit();
+ translog.commit(committedGeneration);
} catch (Exception e) {
throw new FlushFailedEngineException(shardId, e);
}
@@ -1680,55 +1680,62 @@ protected void doRun() throws Exception {
}
}
- private void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
+ /**
+ * Commits the specified index writer.
+ *
+ * @param writer the index writer to commit
+ * @param translog the translog
+ * @param syncId the sync flush ID ({@code null} if not committing a synced flush)
+ * @return the local checkpoint committed with the specified index writer
+ * @throws IOException if an I/O exception occurs committing the specfied writer
+ */
+ private long commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
ensureCanFlush();
try {
- Translog.TranslogGeneration translogGeneration = translog.getGeneration();
-
- final String translogFileGen = Long.toString(translogGeneration.translogFileGeneration);
- final String translogUUID = translogGeneration.translogUUID;
- final String localCheckpoint = Long.toString(seqNoService().getLocalCheckpoint());
+ final long localCheckpoint = seqNoService().getLocalCheckpoint();
+ final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1);
writer.setLiveCommitData(() -> {
/*
* The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes
- * segments, including the local checkpoint amongst other values. The maximum sequence number is different - we never want
+ * segments, including the local checkpoint amongst other values. The maximum sequence number is different, we never want
* the maximum sequence number to be less than the last sequence number to go into a Lucene commit, otherwise we run the
* risk of re-using a sequence number for two different documents when restoring from this commit point and subsequently
- * writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the
- * {@link IndexWriter#commit()} call flushes all documents, we defer computation of the max_seq_no to the time of invocation
- * of the commit data iterator (which occurs after all documents have been flushed to Lucene).
+ * writing new documents to the index. Since we only know which Lucene documents made it into the final commit after the
+ * {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
+ * of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
- final Map commitData = new HashMap<>(6);
- commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGen);
- commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
- commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpoint);
+ final Map commitData = new HashMap<>(5);
+ commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration));
+ commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID);
+ commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
- if (logger.isTraceEnabled()) {
- logger.trace("committing writer with commit data [{}]", commitData);
- }
+ logger.trace("committing writer with commit data [{}]", commitData);
return commitData.entrySet().iterator();
});
writer.commit();
- } catch (Exception ex) {
+ return translogGeneration.translogFileGeneration;
+ } catch (final Exception ex) {
try {
failEngine("lucene commit failed", ex);
- } catch (Exception inner) {
+ } catch (final Exception inner) {
ex.addSuppressed(inner);
}
throw ex;
- } catch (AssertionError e) {
- // IndexWriter throws AssertionError on commit, if asserts are enabled, if any files don't exist, but tests that
- // randomly throw FNFE/NSFE can also hit this:
+ } catch (final AssertionError e) {
+ /*
+ * If assertions are enabled, IndexWriter throws AssertionError on commit if any files don't exist, but tests that randomly
+ * throw FileNotFoundException or NoSuchFileException can also hit this.
+ */
if (ExceptionsHelper.stackTrace(e).contains("org.apache.lucene.index.IndexWriter.filesExist")) {
- EngineException engineException = new EngineException(shardId, "failed to commit engine", e);
+ final EngineException engineException = new EngineException(shardId, "failed to commit engine", e);
try {
failEngine("lucene commit failed", engineException);
- } catch (Exception inner) {
+ } catch (final Exception inner) {
engineException.addSuppressed(inner);
}
throw engineException;
@@ -1812,7 +1819,7 @@ public boolean isRecovering() {
* Gets the commit data from {@link IndexWriter} as a map.
*/
private static Map commitDataAsMap(final IndexWriter indexWriter) {
- Map commitData = new HashMap<>(6);
+ Map commitData = new HashMap<>(5);
for (Map.Entry entry : indexWriter.getLiveCommitData()) {
commitData.put(entry.getKey(), entry.getValue());
}
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index d9a8cc408f822..d2363a52a5ea5 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -85,14 +85,14 @@
* When a translog is opened the checkpoint is use to retrieve the latest translog file generation and subsequently to open the last written file to recovery operations.
* The {@link org.elasticsearch.index.translog.Translog.TranslogGeneration}, given when the translog is opened / constructed is compared against
* the latest generation and all consecutive translog files singe the given generation and the last generation in the checkpoint will be recovered and preserved until the next
- * generation is committed using {@link Translog#commit()}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are
- * the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit()}. In such a case
+ * generation is committed using {@link Translog#commit(long)}. In the common case the translog file generation in the checkpoint and the generation passed to the translog on creation are
+ * the same. The only situation when they can be different is when an actual translog commit fails in between {@link Translog#prepareCommit()} and {@link Translog#commit(long)}. In such a case
* the currently being committed translog file will not be deleted since it's commit was not successful. Yet, a new/current translog file is already opened at that point such that there is more than
* one translog file present. Such an uncommitted translog file always has a translog-${gen}.ckp associated with it which is an fsynced copy of the it's last translog.ckp such that in
* disaster recovery last fsynced offsets, number of operation etc. are still preserved.
*
*/
-public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable, TwoPhaseCommit {
+public class Translog extends AbstractIndexShardComponent implements IndexShardComponent, Closeable {
/*
* TODO
@@ -1347,6 +1347,21 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl
out.writeInt((int) checksum);
}
+ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) {
+ try (ReleasableLock ignored = writeLock.acquire()) {
+ final long minTranslogFileGeneration = readers
+ .stream()
+ .filter(r -> {
+ final Checkpoint checkpoint = r.getCheckpoint();
+ return checkpoint.minSeqNo <= seqNo && seqNo <= checkpoint.maxSeqNo;
+ })
+ .mapToLong(TranslogReader::getGeneration)
+ .min()
+ .orElseGet(this::currentFileGeneration);
+ return new TranslogGeneration(translogUUID, minTranslogFileGeneration);
+ }
+ }
+
/**
* Roll the current translog generation into a new generation. This does not commit the
* translog.
@@ -1375,25 +1390,20 @@ public void rollGeneration() throws IOException {
}
}
- @Override
- public long prepareCommit() throws IOException {
+ public void prepareCommit() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
if (currentCommittingGeneration != NOT_SET_GENERATION) {
- final String message = String.format(
- Locale.ROOT,
- "already committing a translog with generation [%d]",
- currentCommittingGeneration);
+ final String message =
+ String.format(Locale.ROOT, "already committing a translog with generation [%d]", currentCommittingGeneration);
throw new IllegalStateException(message);
}
currentCommittingGeneration = current.getGeneration();
rollGeneration();
}
- return 0;
}
- @Override
- public long commit() throws IOException {
+ public void commit(final long committedGeneration) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
if (currentCommittingGeneration == NOT_SET_GENERATION) {
@@ -1403,26 +1413,26 @@ public long commit() throws IOException {
assert readers.stream().anyMatch(r -> r.getGeneration() == currentCommittingGeneration)
: "readers missing committing generation [" + currentCommittingGeneration + "]";
// set the last committed generation otherwise old files will not be cleaned up
- lastCommittedTranslogFileGeneration = currentCommittingGeneration + 1;
+ lastCommittedTranslogFileGeneration = committedGeneration;
currentCommittingGeneration = NOT_SET_GENERATION;
trimUnreferencedReaders();
}
- return 0;
}
void trimUnreferencedReaders() {
try (ReleasableLock ignored = writeLock.acquire()) {
if (closed.get()) {
- // we're shutdown potentially on some tragic event - don't delete anything
+ // we're shutdown potentially on some tragic event, don't delete anything
return;
}
- long minReferencedGen = outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE);
- minReferencedGen = Math.min(lastCommittedTranslogFileGeneration, minReferencedGen);
- final long finalMinReferencedGen = minReferencedGen;
- List unreferenced = readers.stream().filter(r -> r.getGeneration() < finalMinReferencedGen).collect(Collectors.toList());
+ long minReferencedGen = Math.min(
+ lastCommittedTranslogFileGeneration,
+ outstandingViews.stream().mapToLong(View::minTranslogGeneration).min().orElse(Long.MAX_VALUE));
+ final List unreferenced =
+ readers.stream().filter(r -> r.getGeneration() < minReferencedGen).collect(Collectors.toList());
for (final TranslogReader unreferencedReader : unreferenced) {
- Path translogPath = unreferencedReader.path();
- logger.trace("delete translog file - not referenced and not current anymore {}", translogPath);
+ final Path translogPath = unreferencedReader.path();
+ logger.trace("delete translog file [{}], not referenced and not current anymore", translogPath);
IOUtils.closeWhileHandlingException(unreferencedReader);
IOUtils.deleteFilesIgnoringExceptions(translogPath,
translogPath.resolveSibling(getCommitCheckpointFileName(unreferencedReader.getGeneration())));
@@ -1442,13 +1452,6 @@ void closeFilesIfNoPendingViews() throws IOException {
}
}
-
- @Override
- public void rollback() throws IOException {
- ensureOpen();
- close();
- }
-
/**
* References a transaction log generation
*/
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index 2d3ba055df4aa..9ebe1cd8ff2b4 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -3333,6 +3333,50 @@ public void testSequenceIDs() throws Exception {
searchResult.close();
}
+ /**
+ * A sequence number service that will generate a sequence number and if {@code stall} is set to
+ * {@code true} will wait on the barrier and the latch before returning. If the local checkpoint
+ * should advance (because {@code stall} is {@code false}), then the value of
+ * {@code expectedLocalCheckpoint} is set accordingly.
+ *
+ * @param latch to latch the thread for the purpose of stalling
+ * @param barrier to signal the thread has generated a new sequence number
+ * @param stall whether or not the thread should stall
+ * @param expectedLocalCheckpoint the expected local checkpoint after generating a new sequence
+ * number
+ * @return a sequence number service
+ */
+ private SequenceNumbersService getStallingSeqNoService(
+ final CountDownLatch latch,
+ final CyclicBarrier barrier,
+ final AtomicBoolean stall,
+ final AtomicLong expectedLocalCheckpoint) {
+ return new SequenceNumbersService(
+ shardId,
+ defaultSettings,
+ SequenceNumbersService.NO_OPS_PERFORMED,
+ SequenceNumbersService.NO_OPS_PERFORMED,
+ SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+ @Override
+ public long generateSeqNo() {
+ final long seqNo = super.generateSeqNo();
+ if (stall.get()) {
+ try {
+ barrier.await();
+ latch.await();
+ } catch (BrokenBarrierException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ if (expectedLocalCheckpoint.get() + 1 == seqNo) {
+ expectedLocalCheckpoint.set(seqNo);
+ }
+ }
+ return seqNo;
+ }
+ };
+ }
+
public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws BrokenBarrierException, InterruptedException, IOException {
engine.close();
final int docs = randomIntBetween(1, 32);
@@ -3340,41 +3384,18 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro
try {
final CountDownLatch latch = new CountDownLatch(1);
final CyclicBarrier barrier = new CyclicBarrier(2);
- final AtomicBoolean skip = new AtomicBoolean();
+ final AtomicBoolean stall = new AtomicBoolean();
final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED);
final List threads = new ArrayList<>();
final SequenceNumbersService seqNoService =
- new SequenceNumbersService(
- shardId,
- defaultSettings,
- SequenceNumbersService.NO_OPS_PERFORMED,
- SequenceNumbersService.NO_OPS_PERFORMED,
- SequenceNumbersService.UNASSIGNED_SEQ_NO) {
- @Override
- public long generateSeqNo() {
- final long seqNo = super.generateSeqNo();
- if (skip.get()) {
- try {
- barrier.await();
- latch.await();
- } catch (BrokenBarrierException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- } else {
- if (expectedLocalCheckpoint.get() + 1 == seqNo) {
- expectedLocalCheckpoint.set(seqNo);
- }
- }
- return seqNo;
- }
- };
+ getStallingSeqNoService(latch, barrier, stall, expectedLocalCheckpoint);
initialEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService);
final InternalEngine finalInitialEngine = initialEngine;
for (int i = 0; i < docs; i++) {
final String id = Integer.toString(i);
final ParsedDocument doc = testParsedDocument(id, "test", null, testDocumentWithTextField(), SOURCE, null);
- skip.set(randomBoolean());
+ stall.set(randomBoolean());
final Thread thread = new Thread(() -> {
try {
finalInitialEngine.index(indexForDoc(doc));
@@ -3383,7 +3404,7 @@ public long generateSeqNo() {
}
});
thread.start();
- if (skip.get()) {
+ if (stall.get()) {
threads.add(thread);
barrier.await();
} else {
@@ -3570,6 +3591,72 @@ public long generateSeqNo() {
}
}
+ public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierException, InterruptedException {
+ engine.close();
+ final int numberOfTriplets = randomIntBetween(1, 32);
+ InternalEngine actualEngine = null;
+ try {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+ final AtomicBoolean stall = new AtomicBoolean();
+ final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED);
+ final List threads = new ArrayList<>();
+ final SequenceNumbersService seqNoService = getStallingSeqNoService(latch, barrier, stall, expectedLocalCheckpoint);
+ actualEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService);
+ final InternalEngine finalActualEngine = actualEngine;
+ final long generation = finalActualEngine.getTranslog().currentFileGeneration();
+ for (int i = 0; i < numberOfTriplets; i++) {
+ /*
+ * Index three documents with the first and last landing in the same generation and the middle document being stalled until
+ * a later generation.
+ */
+ stall.set(false);
+ index(finalActualEngine, 3 * i);
+
+ final int skipId = 3 * i + 1;
+ stall.set(true);
+ final Thread thread = new Thread(() -> {
+ try {
+ index(finalActualEngine, skipId);
+ } catch (IOException e) {
+ throw new AssertionError(e);
+ }
+ });
+ thread.start();
+ threads.add(thread);
+ barrier.await();
+
+ stall.set(false);
+ index(finalActualEngine, 3 * i + 2);
+ finalActualEngine.flush();
+ }
+
+ latch.countDown();
+ for (final Thread thread : threads) {
+ thread.join();
+ }
+
+ final Translog translog = finalActualEngine.getTranslog();
+ for (int i = 0; i < numberOfTriplets; i++) {
+ /*
+ * This sequence number landed in the last generation, but the lower and upper bounds for an earlier generation straddle
+ * sequence number.
+ */
+ assertThat(translog.getMinGenerationForSeqNo(3 * i + 1).translogFileGeneration, equalTo(i + generation));
+ }
+
+ } finally {
+ IOUtils.close(actualEngine);
+ }
+ }
+
+ private void index(final InternalEngine engine, final int id) throws IOException {
+ final String docId = Integer.toString(id);
+ final ParsedDocument doc =
+ testParsedDocument(docId, "test", null, testDocumentWithTextField(), SOURCE, null);
+ engine.index(indexForDoc(doc));
+ }
+
/**
* Return a tuple representing the sequence ID for the given {@code Get}
* operation. The first value in the tuple is the sequence number, the
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index 6b2aa5e59215e..3f7517c9f377d 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -36,6 +36,7 @@
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.FileSystemUtils;
@@ -84,6 +85,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -101,6 +103,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import java.util.stream.LongStream;
import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
import static org.hamcrest.Matchers.containsString;
@@ -124,7 +127,7 @@ protected void afterIfSuccessful() throws Exception {
if (translog.isOpen()) {
if (translog.currentFileGeneration() > 1) {
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
assertFileDeleted(translog, translog.currentFileGeneration() - 1);
}
translog.close();
@@ -287,7 +290,7 @@ public void testSimpleOperations() throws IOException {
assertThat(snapshot, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot.totalOperations(), equalTo(ops.size()));
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
snapshot = translog.newSnapshot();
assertThat(snapshot, SnapshotMatchers.size(0));
assertThat(snapshot.totalOperations(), equalTo(0));
@@ -373,7 +376,7 @@ public void testStats() throws IOException {
}
}
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
{
final TranslogStats stats = stats();
assertThat(stats.estimatedNumberOfOperations(), equalTo(0L));
@@ -446,7 +449,7 @@ public void testSnapshotWithNewTranslog() throws IOException {
try (Translog.View view = translog.newView()) {
Translog.Snapshot snapshot2 = translog.newSnapshot();
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
assertThat(snapshot2, SnapshotMatchers.equalsTo(ops));
assertThat(snapshot2.totalOperations(), equalTo(ops.size()));
}
@@ -821,7 +824,7 @@ protected void doRun() throws Exception {
break;
}
}
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
}
} finally {
run.set(false);
@@ -858,7 +861,7 @@ public void testSyncUpTo() throws IOException {
assertTrue("we only synced a previous operation yet", translog.syncNeeded());
}
if (rarely()) {
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
assertFalse("location is from a previous translog - already synced", translog.ensureSynced(location)); // not syncing now
assertFalse("no sync needed since no operations in current translog", translog.syncNeeded());
}
@@ -878,7 +881,7 @@ public void testSyncUpToStream() throws IOException {
ArrayList locations = new ArrayList<>();
for (int op = 0; op < translogOperations; op++) {
if (rarely()) {
- translog.commit(); // do this first so that there is at least one pending tlog entry
+ translog.commit(translog.currentFileGeneration()); // do this first so that there is at least one pending tlog entry
}
final Translog.Location location = translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))));
locations.add(location);
@@ -889,7 +892,7 @@ public void testSyncUpToStream() throws IOException {
assertTrue("this operation has not been synced", translog.ensureSynced(locations.stream()));
assertFalse("the last call to ensureSycned synced all previous ops", translog.syncNeeded()); // we are the last location so everything should be synced
} else if (rarely()) {
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
assertFalse("location is from a previous translog - already synced", translog.ensureSynced(locations.stream())); // not syncing now
assertFalse("no sync needed since no operations in current translog", translog.syncNeeded());
} else {
@@ -909,7 +912,7 @@ public void testLocationComparison() throws IOException {
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))));
if (rarely() && translogOperations > op + 1) {
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
}
}
Collections.shuffle(locations, random());
@@ -1074,7 +1077,7 @@ public void testBasicRecovery() throws IOException {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
final boolean commit = commitOften ? frequently() : rarely();
if (commit && op < translogOperations - 1) {
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
minUncommittedOp = op + 1;
translogGeneration = translog.getGeneration();
}
@@ -1300,7 +1303,7 @@ public void testOpenForeignTranslog() throws IOException {
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
if (randomBoolean()) {
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
firstUncommitted = op + 1;
}
}
@@ -1483,7 +1486,7 @@ public void testFailFlush() throws IOException {
}
try {
- translog.commit();
+ translog.commit(translog.currentFileGeneration());
fail("already closed");
} catch (AlreadyClosedException ex) {
assertNotNull(ex.getCause());
@@ -1930,7 +1933,7 @@ public void testWithRandomException() throws IOException {
if (randomBoolean()) {
failableTLog.prepareCommit();
}
- failableTLog.commit();
+ failableTLog.commit(translog.currentFileGeneration());
syncedDocs.clear();
}
}
@@ -2110,7 +2113,7 @@ public void testRollGeneration() throws IOException {
for (int i = 0; i <= rolls; i++) {
assertFileIsPresent(translog, generation + i);
}
- translog.commit();
+ translog.commit(generation + rolls + 1);
assertThat(translog.currentFileGeneration(), equalTo(generation + rolls + 1));
assertThat(translog.totalOperations(), equalTo(0));
for (int i = 0; i <= rolls; i++) {
@@ -2167,7 +2170,7 @@ public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException
}
}
- translog.commit();
+ translog.commit(generation + rollsBefore + 1);
for (int i = 0; i <= rollsBefore; i++) {
assertFileDeleted(translog, generation + i);
@@ -2178,4 +2181,105 @@ public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException
}
+ public void testMinGenerationForSeqNo() throws IOException {
+ final int operations = randomIntBetween(1, 4096);
+ final List seqNos =
+ LongStream.range(0, operations).boxed().collect(Collectors.toList());
+ Randomness.shuffle(seqNos);
+ final Map> generations = new HashMap<>();
+
+ for (int i = 0; i < operations; i++) {
+ final Long seqNo = seqNos.get(i);
+ translog.add(new Translog.NoOp(seqNo, 0, "test"));
+ final List seqNoForGeneration =
+ generations.computeIfAbsent(
+ translog.currentFileGeneration(),
+ g -> new ArrayList<>());
+ seqNoForGeneration.add(seqNo);
+ if (rarely()) {
+ translog.rollGeneration();
+ }
+ }
+
+ final Set seenSeqNos = new HashSet<>();
+ for (final Map.Entry> entry : generations.entrySet()) {
+ final long min = entry.getValue().stream().min(Long::compareTo).orElse(Long.MIN_VALUE);
+ final long max = entry.getValue().stream().max(Long::compareTo).orElse(Long.MAX_VALUE);
+ for (long seqNo = min; seqNo <= max; seqNo++) {
+ if (seenSeqNos.add(seqNo)) {
+ assertThat(
+ translog.getMinGenerationForSeqNo(seqNo).translogFileGeneration,
+ equalTo(entry.getKey()));
+ }
+ }
+ }
+
+ assertThat(seenSeqNos, equalTo(new HashSet<>(seqNos)));
+ }
+
+ public void testSimpleCommit() throws IOException {
+ final int operations = randomIntBetween(1, 4096);
+ long seqNo = 0;
+ for (int i = 0; i < operations; i++) {
+ translog.add(new Translog.NoOp(seqNo++, 0, "test'"));
+ if (rarely()) {
+ translog.rollGeneration();
+ }
+ }
+
+ final long generation =
+ randomIntBetween(1, Math.toIntExact(translog.currentFileGeneration()));
+ translog.commit(generation);
+ for (long i = 0; i < generation; i++) {
+ assertFileDeleted(translog, i);
+ }
+ for (long i = generation; i <= translog.currentFileGeneration(); i++) {
+ assertFileIsPresent(translog, i);
+ }
+ }
+
+ public void testPrepareCommitAndCommit() throws IOException {
+ final int operations = randomIntBetween(1, 4096);
+ long seqNo = 0;
+ for (int i = 0; i < operations; i++) {
+ translog.add(new Translog.NoOp(seqNo++, 0, "test"));
+ if (rarely()) {
+ final long generation = translog.currentFileGeneration();
+ translog.prepareCommit();
+ translog.commit(randomIntBetween(1, Math.toIntExact(generation)));
+ for (long g = 0; i < generation; g++) {
+ assertFileDeleted(translog, g);
+ }
+ for (long g = generation; g < translog.currentFileGeneration(); g++) {
+ assertFileIsPresent(translog, g);
+ }
+ }
+ }
+ }
+
+ public void testCommitWithOpenView() throws IOException {
+ final int operations = randomIntBetween(1, 1 << 16);
+ long seqNo = 0;
+ long last;
+ for (int i = 0; i < operations; i++) {
+ translog.add(new Translog.NoOp(seqNo++, 0, "test"));
+ if (rarely()) {
+ try (Translog.View ignored = translog.newView()) {
+ final long generation = translog.currentFileGeneration();
+ translog.prepareCommit();
+ final long committedGeneration =
+ randomIntBetween(1, Math.toIntExact(generation));
+ translog.commit(committedGeneration);
+ last = committedGeneration;
+ for (long g = 0; i < last; g++) {
+ assertFileDeleted(translog, g);
+ }
+ for (long g = last; i < translog.currentFileGeneration(); g++) {
+ assertFileIsPresent(translog, g);
+ }
+ }
+ }
+ }
+ }
+
}
From 03a3fe67e7de56c05c3405fa36fbbc8646c710ad Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Mon, 10 Apr 2017 07:49:32 -0400
Subject: [PATCH 02/21] Add Javadocs for getMinGenerationForSeqNo
---
.../java/org/elasticsearch/index/translog/Translog.java | 7 +++++++
1 file changed, 7 insertions(+)
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index d2363a52a5ea5..ecf9a622a3997 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -1347,6 +1347,13 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl
out.writeInt((int) checksum);
}
+ /**
+ * Gets the minimum generation that could contain the sequence number, or the current generation if there is no generation with the
+ * specified sequence number between the minimum and maximum sequence numbers.
+ *
+ * @param seqNo the sequence number
+ * @return the minimum generation for the sequence number, or the current generation
+ */
public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) {
try (ReleasableLock ignored = writeLock.acquire()) {
final long minTranslogFileGeneration = readers
From 583a2e4f8e6457232156d8bd51d734957145479e Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Mon, 10 Apr 2017 07:53:39 -0400
Subject: [PATCH 03/21] Add Javadocs for prepareCommit and commit
---
.../elasticsearch/index/translog/Translog.java | 15 +++++++++++++++
1 file changed, 15 insertions(+)
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index ecf9a622a3997..749b4a044201c 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -1397,6 +1397,11 @@ public void rollGeneration() throws IOException {
}
}
+ /**
+ * Prepares a translog commit by setting the current committing generation and rolling the translog generation.
+ *
+ * @throws IOException if an I/O exception occurred while rolling the translog generation
+ */
public void prepareCommit() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
@@ -1410,6 +1415,16 @@ public void prepareCommit() throws IOException {
}
}
+ /**
+ * Commits the translog and sets the last committed translog generation to the specified generation. The specified committed generation
+ * will be used when trimming unreferenced translog generations such that generations from the committed generation will be preserved.
+ *
+ * If {@link Translog#prepareCommit()} was not called before calling commit, this method will be invoked too causing the translog
+ * generation to be rolled.
+ *
+ * @param committedGeneration the minimum translog generation to preserve after trimming unreferenced generations
+ * @throws IOException if an I/O exception occurred preparing the translog commit
+ */
public void commit(final long committedGeneration) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
From 9120bf9bee268153970eaa42200ea73fbd808191 Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Mon, 10 Apr 2017 07:56:28 -0400
Subject: [PATCH 04/21] More Javadocs
---
.../main/java/org/elasticsearch/index/translog/Translog.java | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index 749b4a044201c..b9377b7e9b5a0 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -1441,6 +1441,10 @@ public void commit(final long committedGeneration) throws IOException {
}
}
+ /**
+ * Trims unreferenced translog generations. The guarantee here is that translog generations will be preserved for all outstanding views
+ * and from the last committed translog generation defined by {@link Translog#lastCommittedTranslogFileGeneration}.
+ */
void trimUnreferencedReaders() {
try (ReleasableLock ignored = writeLock.acquire()) {
if (closed.get()) {
From c63543bfe5f622c71a820463443bca99196883a8 Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Mon, 10 Apr 2017 10:17:23 -0400
Subject: [PATCH 05/21] Stronger preservation
---
.../index/translog/Translog.java | 8 ++---
.../index/translog/TranslogTests.java | 32 ++++++++++++-------
2 files changed, 24 insertions(+), 16 deletions(-)
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index b9377b7e9b5a0..0c5e392d95ff3 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -1348,11 +1348,11 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl
}
/**
- * Gets the minimum generation that could contain the sequence number, or the current generation if there is no generation with the
- * specified sequence number between the minimum and maximum sequence numbers.
+ * Gets the minimum generation that could contain any sequence number after the specified sequence number, or the current generation if
+ * there is no generation that could any such sequence number.
*
* @param seqNo the sequence number
- * @return the minimum generation for the sequence number, or the current generation
+ * @return the minimum generation for the sequence number
*/
public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) {
try (ReleasableLock ignored = writeLock.acquire()) {
@@ -1360,7 +1360,7 @@ public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) {
.stream()
.filter(r -> {
final Checkpoint checkpoint = r.getCheckpoint();
- return checkpoint.minSeqNo <= seqNo && seqNo <= checkpoint.maxSeqNo;
+ return seqNo <= checkpoint.maxSeqNo;
})
.mapToLong(TranslogReader::getGeneration)
.min()
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index 3f7517c9f377d..08af6f1f5d648 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -2201,22 +2201,30 @@ public void testMinGenerationForSeqNo() throws IOException {
}
}
- final Set seenSeqNos = new HashSet<>();
- for (final Map.Entry> entry : generations.entrySet()) {
- final long min = entry.getValue().stream().min(Long::compareTo).orElse(Long.MIN_VALUE);
- final long max = entry.getValue().stream().max(Long::compareTo).orElse(Long.MAX_VALUE);
- for (long seqNo = min; seqNo <= max; seqNo++) {
- if (seenSeqNos.add(seqNo)) {
- assertThat(
- translog.getMinGenerationForSeqNo(seqNo).translogFileGeneration,
- equalTo(entry.getKey()));
- }
- }
+ final Map maxSeqNoByGeneration =
+ generations
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> e.getValue().stream().max(Long::compareTo).orElse(Long.MAX_VALUE)));
+
+ for (long seqNo = 0; seqNo < operations; seqNo++) {
+ final long finalLongSeqNo = seqNo;
+ final Long operand =
+ maxSeqNoByGeneration
+ .entrySet()
+ .stream()
+ .filter(e -> finalLongSeqNo <= e.getValue())
+ .map(Map.Entry::getKey).min(Long::compareTo)
+ .orElse(Long.MIN_VALUE);
+ assertThat(translog.getMinGenerationForSeqNo(seqNo).translogFileGeneration, equalTo(operand));
}
- assertThat(seenSeqNos, equalTo(new HashSet<>(seqNos)));
}
+
+
public void testSimpleCommit() throws IOException {
final int operations = randomIntBetween(1, 4096);
long seqNo = 0;
From 5ba7402c33963c419601f42e62f104009a291e3f Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Mon, 10 Apr 2017 17:27:44 -0400
Subject: [PATCH 06/21] Fix comment
---
.../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index a6060bdd8d798..5f98ad9658692 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -1686,7 +1686,7 @@ protected void doRun() throws Exception {
* @param writer the index writer to commit
* @param translog the translog
* @param syncId the sync flush ID ({@code null} if not committing a synced flush)
- * @return the local checkpoint committed with the specified index writer
+ * @return the minimum translog generation for the local checkpoint committed with the specified index writer
* @throws IOException if an I/O exception occurs committing the specfied writer
*/
private long commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
From c4690ddb71d1829d5b9c86d46f0025140922844d Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Mon, 10 Apr 2017 18:10:22 -0400
Subject: [PATCH 07/21] Add extra roll
---
.../java/org/elasticsearch/index/translog/TranslogTests.java | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index 08af6f1f5d648..c54e55cd8dfb7 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -2254,6 +2254,10 @@ public void testPrepareCommitAndCommit() throws IOException {
if (rarely()) {
final long generation = translog.currentFileGeneration();
translog.prepareCommit();
+ if (rarely()) {
+ // simulate generation filling up and rolling between preparing the commit and committing
+ translog.rollGeneration();
+ }
translog.commit(randomIntBetween(1, Math.toIntExact(generation)));
for (long g = 0; i < generation; g++) {
assertFileDeleted(translog, g);
From 9490312e532f369d8c54df2d35144ae0e96cc313 Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Mon, 10 Apr 2017 18:19:10 -0400
Subject: [PATCH 08/21] Assert generation
---
.../main/java/org/elasticsearch/index/translog/Translog.java | 1 +
.../java/org/elasticsearch/index/translog/TranslogTests.java | 5 +++--
2 files changed, 4 insertions(+), 2 deletions(-)
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index 0c5e392d95ff3..b83f81f334988 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -1428,6 +1428,7 @@ public void prepareCommit() throws IOException {
public void commit(final long committedGeneration) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
+ assert committedGeneration <= current.generation;
if (currentCommittingGeneration == NOT_SET_GENERATION) {
prepareCommit();
}
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index c54e55cd8dfb7..ba592833619b9 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -2113,12 +2113,13 @@ public void testRollGeneration() throws IOException {
for (int i = 0; i <= rolls; i++) {
assertFileIsPresent(translog, generation + i);
}
- translog.commit(generation + rolls + 1);
+ translog.commit(generation + rolls);
assertThat(translog.currentFileGeneration(), equalTo(generation + rolls + 1));
assertThat(translog.totalOperations(), equalTo(0));
- for (int i = 0; i <= rolls; i++) {
+ for (int i = 0; i < rolls; i++) {
assertFileDeleted(translog, generation + i);
}
+ assertFileIsPresent(translog, generation + rolls);
assertFileIsPresent(translog, generation + rolls + 1);
}
From f5d3973dd7714ac98717c4a376a11c81d1e74b49 Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Mon, 10 Apr 2017 20:42:07 -0400
Subject: [PATCH 09/21] Check commit metadata
---
.../index/engine/InternalEngineTests.java | 54 +++++++++++--------
1 file changed, 31 insertions(+), 23 deletions(-)
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index 9ebe1cd8ff2b4..2de80786ccc1a 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -88,6 +88,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
+import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
@@ -148,7 +149,9 @@
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -3334,12 +3337,11 @@ public void testSequenceIDs() throws Exception {
}
/**
- * A sequence number service that will generate a sequence number and if {@code stall} is set to
- * {@code true} will wait on the barrier and the latch before returning. If the local checkpoint
- * should advance (because {@code stall} is {@code false}), then the value of
- * {@code expectedLocalCheckpoint} is set accordingly.
+ * A sequence number service that will generate a sequence number and if {@code stall} is set to {@code true} will wait on the barrier
+ * and the referenced latch before returning. If the local checkpoint should advance (because {@code stall} is {@code false}), then the
+ * value of {@code expectedLocalCheckpoint} is set accordingly.
*
- * @param latch to latch the thread for the purpose of stalling
+ * @param latchReference to latch the thread for the purpose of stalling
* @param barrier to signal the thread has generated a new sequence number
* @param stall whether or not the thread should stall
* @param expectedLocalCheckpoint the expected local checkpoint after generating a new sequence
@@ -3347,7 +3349,7 @@ public void testSequenceIDs() throws Exception {
* @return a sequence number service
*/
private SequenceNumbersService getStallingSeqNoService(
- final CountDownLatch latch,
+ final AtomicReference latchReference,
final CyclicBarrier barrier,
final AtomicBoolean stall,
final AtomicLong expectedLocalCheckpoint) {
@@ -3360,6 +3362,7 @@ private SequenceNumbersService getStallingSeqNoService(
@Override
public long generateSeqNo() {
final long seqNo = super.generateSeqNo();
+ final CountDownLatch latch = latchReference.get();
if (stall.get()) {
try {
barrier.await();
@@ -3382,13 +3385,12 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro
final int docs = randomIntBetween(1, 32);
InternalEngine initialEngine = null;
try {
- final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference latchReference = new AtomicReference<>(new CountDownLatch(1));
final CyclicBarrier barrier = new CyclicBarrier(2);
final AtomicBoolean stall = new AtomicBoolean();
final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED);
final List threads = new ArrayList<>();
- final SequenceNumbersService seqNoService =
- getStallingSeqNoService(latch, barrier, stall, expectedLocalCheckpoint);
+ final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint);
initialEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService);
final InternalEngine finalInitialEngine = initialEngine;
for (int i = 0; i < docs; i++) {
@@ -3416,7 +3418,7 @@ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary() throws Bro
assertThat(initialEngine.seqNoService().getMaxSeqNo(), equalTo((long) (docs - 1)));
initialEngine.flush(true, true);
- latch.countDown();
+ latchReference.get().countDown();
for (final Thread thread : threads) {
thread.join();
}
@@ -3596,14 +3598,15 @@ public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierExcepti
final int numberOfTriplets = randomIntBetween(1, 32);
InternalEngine actualEngine = null;
try {
- final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference latchReference = new AtomicReference<>();
final CyclicBarrier barrier = new CyclicBarrier(2);
final AtomicBoolean stall = new AtomicBoolean();
final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbersService.NO_OPS_PERFORMED);
- final List threads = new ArrayList<>();
- final SequenceNumbersService seqNoService = getStallingSeqNoService(latch, barrier, stall, expectedLocalCheckpoint);
+ final Map threads = new LinkedHashMap<>();
+ final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint);
actualEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, () -> seqNoService);
final InternalEngine finalActualEngine = actualEngine;
+ final Translog translog = finalActualEngine.getTranslog();
final long generation = finalActualEngine.getTranslog().currentFileGeneration();
for (int i = 0; i < numberOfTriplets; i++) {
/*
@@ -3613,6 +3616,8 @@ public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierExcepti
stall.set(false);
index(finalActualEngine, 3 * i);
+ final CountDownLatch latch = new CountDownLatch(1);
+ latchReference.set(latch);
final int skipId = 3 * i + 1;
stall.set(true);
final Thread thread = new Thread(() -> {
@@ -3623,28 +3628,31 @@ public void testMinGenerationForSeqNo() throws IOException, BrokenBarrierExcepti
}
});
thread.start();
- threads.add(thread);
+ threads.put(thread, latch);
barrier.await();
stall.set(false);
index(finalActualEngine, 3 * i + 2);
finalActualEngine.flush();
- }
- latch.countDown();
- for (final Thread thread : threads) {
- thread.join();
- }
-
- final Translog translog = finalActualEngine.getTranslog();
- for (int i = 0; i < numberOfTriplets; i++) {
/*
* This sequence number landed in the last generation, but the lower and upper bounds for an earlier generation straddle
- * sequence number.
+ * this sequence number.
*/
assertThat(translog.getMinGenerationForSeqNo(3 * i + 1).translogFileGeneration, equalTo(i + generation));
}
+ int i = 0;
+ for (final Map.Entry entry : threads.entrySet()) {
+ final Map userData = finalActualEngine.commitStats().getUserData();
+ assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(Long.toString(3 * i)));
+ assertThat(userData.get(Translog.TRANSLOG_GENERATION_KEY), equalTo(Long.toString(i + generation)));
+ entry.getValue().countDown();
+ entry.getKey().join();
+ finalActualEngine.flush();
+ i++;
+ }
+
} finally {
IOUtils.close(actualEngine);
}
From 1817095845459b8e2a78f16946bbf02ccbfc43d9 Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Mon, 10 Apr 2017 21:17:39 -0400
Subject: [PATCH 10/21] Stronger test
---
.../index/translog/TranslogTests.java | 38 +++++++++++++++----
1 file changed, 30 insertions(+), 8 deletions(-)
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index ba592833619b9..b21f7d441f10d 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -102,6 +102,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
@@ -2271,23 +2273,43 @@ public void testPrepareCommitAndCommit() throws IOException {
}
public void testCommitWithOpenView() throws IOException {
- final int operations = randomIntBetween(1, 1 << 16);
+ final int operations = randomIntBetween(1, 4096);
long seqNo = 0;
- long last;
+ long lastCommittedGeneration = -1;
for (int i = 0; i < operations; i++) {
translog.add(new Translog.NoOp(seqNo++, 0, "test"));
if (rarely()) {
- try (Translog.View ignored = translog.newView()) {
+ if (randomBoolean()) {
+ try (Translog.View ignored = translog.newView()) {
+ final long viewGeneration = lastCommittedGeneration;
+ translog.prepareCommit();
+ final long committedGeneration = randomIntBetween(
+ Math.max(1, Math.toIntExact(lastCommittedGeneration)),
+ Math.toIntExact(translog.currentFileGeneration()));
+ translog.commit(committedGeneration);
+ lastCommittedGeneration = committedGeneration;
+ // with an open view, committing should preserve generations back to the last committed generation
+ for (long g = 1; g < Math.min(lastCommittedGeneration, viewGeneration); g++) {
+ assertFileDeleted(translog, g);
+ }
+ // the view generation could be -1 if no commit has been performed
+ final long max = Math.max(1, Math.min(lastCommittedGeneration, viewGeneration));
+ for (long g = max; g < translog.currentFileGeneration(); g++) {
+ assertFileIsPresent(translog, g);
+ }
+ }
+ } else {
final long generation = translog.currentFileGeneration();
translog.prepareCommit();
- final long committedGeneration =
- randomIntBetween(1, Math.toIntExact(generation));
+ final long committedGeneration = randomIntBetween(
+ Math.max(1, Math.toIntExact(lastCommittedGeneration)),
+ Math.toIntExact(generation));
translog.commit(committedGeneration);
- last = committedGeneration;
- for (long g = 0; i < last; g++) {
+ lastCommittedGeneration = committedGeneration;
+ for (long g = 1; g < lastCommittedGeneration; g++) {
assertFileDeleted(translog, g);
}
- for (long g = last; i < translog.currentFileGeneration(); g++) {
+ for (long g = lastCommittedGeneration; g < translog.currentFileGeneration(); g++) {
assertFileIsPresent(translog, g);
}
}
From dd1a7e43e5080e587af7a9b9315147963cc0ad77 Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Mon, 10 Apr 2017 21:19:46 -0400
Subject: [PATCH 11/21] Remove imports
---
.../java/org/elasticsearch/index/translog/TranslogTests.java | 2 --
1 file changed, 2 deletions(-)
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index b21f7d441f10d..77c368961e85f 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -102,8 +102,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
-import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
From 42685cc65abc95e0c7e7a1b8b6b1eec4392ae852 Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Tue, 11 Apr 2017 09:17:23 -0400
Subject: [PATCH 12/21] Remove unneeded else
---
.../index/translog/TranslogTests.java | 32 +++++--------------
1 file changed, 8 insertions(+), 24 deletions(-)
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index 77c368961e85f..4ead703271b4c 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -2277,37 +2277,21 @@ public void testCommitWithOpenView() throws IOException {
for (int i = 0; i < operations; i++) {
translog.add(new Translog.NoOp(seqNo++, 0, "test"));
if (rarely()) {
- if (randomBoolean()) {
- try (Translog.View ignored = translog.newView()) {
- final long viewGeneration = lastCommittedGeneration;
- translog.prepareCommit();
- final long committedGeneration = randomIntBetween(
- Math.max(1, Math.toIntExact(lastCommittedGeneration)),
- Math.toIntExact(translog.currentFileGeneration()));
- translog.commit(committedGeneration);
- lastCommittedGeneration = committedGeneration;
- // with an open view, committing should preserve generations back to the last committed generation
- for (long g = 1; g < Math.min(lastCommittedGeneration, viewGeneration); g++) {
- assertFileDeleted(translog, g);
- }
- // the view generation could be -1 if no commit has been performed
- final long max = Math.max(1, Math.min(lastCommittedGeneration, viewGeneration));
- for (long g = max; g < translog.currentFileGeneration(); g++) {
- assertFileIsPresent(translog, g);
- }
- }
- } else {
- final long generation = translog.currentFileGeneration();
+ try (Translog.View ignored = translog.newView()) {
+ final long viewGeneration = lastCommittedGeneration;
translog.prepareCommit();
final long committedGeneration = randomIntBetween(
Math.max(1, Math.toIntExact(lastCommittedGeneration)),
- Math.toIntExact(generation));
+ Math.toIntExact(translog.currentFileGeneration()));
translog.commit(committedGeneration);
lastCommittedGeneration = committedGeneration;
- for (long g = 1; g < lastCommittedGeneration; g++) {
+ // with an open view, committing should preserve generations back to the last committed generation
+ for (long g = 1; g < Math.min(lastCommittedGeneration, viewGeneration); g++) {
assertFileDeleted(translog, g);
}
- for (long g = lastCommittedGeneration; g < translog.currentFileGeneration(); g++) {
+ // the view generation could be -1 if no commit has been performed
+ final long max = Math.max(1, Math.min(lastCommittedGeneration, viewGeneration));
+ for (long g = max; g < translog.currentFileGeneration(); g++) {
assertFileIsPresent(translog, g);
}
}
From afb86e8ac801587b98e05ab83c9506ef718834e9 Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Tue, 11 Apr 2017 10:05:26 -0400
Subject: [PATCH 13/21] Explicit test
---
.../index/translog/TranslogTests.java | 53 +++++++++----------
1 file changed, 26 insertions(+), 27 deletions(-)
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index 4ead703271b4c..4fd6ae17da7e8 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -2183,48 +2183,47 @@ public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException
}
public void testMinGenerationForSeqNo() throws IOException {
+ final long initialGeneration = translog.getGeneration().translogFileGeneration;
final int operations = randomIntBetween(1, 4096);
- final List seqNos =
- LongStream.range(0, operations).boxed().collect(Collectors.toList());
+ final List seqNos = LongStream.range(0, operations).boxed().collect(Collectors.toList());
Randomness.shuffle(seqNos);
- final Map> generations = new HashMap<>();
for (int i = 0; i < operations; i++) {
final Long seqNo = seqNos.get(i);
translog.add(new Translog.NoOp(seqNo, 0, "test"));
- final List seqNoForGeneration =
- generations.computeIfAbsent(
- translog.currentFileGeneration(),
- g -> new ArrayList<>());
- seqNoForGeneration.add(seqNo);
if (rarely()) {
translog.rollGeneration();
}
}
- final Map maxSeqNoByGeneration =
- generations
- .entrySet()
- .stream()
- .collect(Collectors.toMap(
- Map.Entry::getKey,
- e -> e.getValue().stream().max(Long::compareTo).orElse(Long.MAX_VALUE)));
+ Map> generations = new HashMap<>();
+ translog.commit(initialGeneration);
for (long seqNo = 0; seqNo < operations; seqNo++) {
- final long finalLongSeqNo = seqNo;
- final Long operand =
- maxSeqNoByGeneration
- .entrySet()
- .stream()
- .filter(e -> finalLongSeqNo <= e.getValue())
- .map(Map.Entry::getKey).min(Long::compareTo)
- .orElse(Long.MIN_VALUE);
- assertThat(translog.getMinGenerationForSeqNo(seqNo).translogFileGeneration, equalTo(operand));
- }
-
- }
+ final Set seenSeqNos = new HashSet<>();
+ final long generation = translog.getMinGenerationForSeqNo(seqNo).translogFileGeneration;
+ for (long g = generation; g < translog.currentFileGeneration(); g++) {
+ if (!generations.containsKey(g)) {
+ final Set generationSeenSeqNos = new HashSet<>();
+ final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.getCommitCheckpointFileName(g)));
+ try (TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(g)), checkpoint)) {
+ Translog.Snapshot snapshot = reader.newSnapshot();
+ Translog.Operation operation;
+ while ((operation = snapshot.next()) != null) {
+ generationSeenSeqNos.add(operation.seqNo());
+ }
+ }
+ generations.put(g, generationSeenSeqNos);
+ }
+ seenSeqNos.addAll(generations.get(g));
+ }
+ final Set expected = LongStream.range(seqNo, operations).boxed().collect(Collectors.toSet());
+ seenSeqNos.retainAll(expected);
+ assertThat(seenSeqNos, equalTo(expected));
+ }
+ }
public void testSimpleCommit() throws IOException {
final int operations = randomIntBetween(1, 4096);
From 32e14d0622aad159e3f560e3e4952a16b9503018 Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Tue, 11 Apr 2017 12:04:26 -0400
Subject: [PATCH 14/21] Hard exception
---
.../elasticsearch/index/translog/Translog.java | 18 +++++++++++++++++-
.../index/translog/TranslogTests.java | 5 ++++-
2 files changed, 21 insertions(+), 2 deletions(-)
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index b83f81f334988..25d374e8a44d5 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -1428,7 +1428,23 @@ public void prepareCommit() throws IOException {
public void commit(final long committedGeneration) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
- assert committedGeneration <= current.generation;
+ if (committedGeneration > current.generation) {
+ final String message = String.format(
+ Locale.ROOT,
+ "tried to commit generation [%d] later than the current generation [%d]",
+ committedGeneration,
+ current.generation);
+ throw new IllegalStateException(message);
+ }
+ final Long min = readers.stream().map(TranslogReader::getGeneration).min(Long::compareTo).orElse(Long.MIN_VALUE);
+ if (committedGeneration < min) {
+ final String message = String.format(
+ Locale.ROOT,
+ "tried to commit generation [%d] before minimum generation [%d]",
+ committedGeneration,
+ min);
+ throw new IllegalStateException(message);
+ }
if (currentCommittingGeneration == NOT_SET_GENERATION) {
prepareCommit();
}
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index 4fd6ae17da7e8..0e34cae573b8c 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -2249,6 +2249,7 @@ public void testSimpleCommit() throws IOException {
public void testPrepareCommitAndCommit() throws IOException {
final int operations = randomIntBetween(1, 4096);
long seqNo = 0;
+ long last = -1;
for (int i = 0; i < operations; i++) {
translog.add(new Translog.NoOp(seqNo++, 0, "test"));
if (rarely()) {
@@ -2258,7 +2259,9 @@ public void testPrepareCommitAndCommit() throws IOException {
// simulate generation filling up and rolling between preparing the commit and committing
translog.rollGeneration();
}
- translog.commit(randomIntBetween(1, Math.toIntExact(generation)));
+ final int committedGeneration = randomIntBetween(Math.max(1, Math.toIntExact(last)), Math.toIntExact(generation));
+ translog.commit(committedGeneration);
+ last = committedGeneration;
for (long g = 0; i < generation; g++) {
assertFileDeleted(translog, g);
}
From 435fe25905bffb547395abf57d59afa93ff2aa87 Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Thu, 13 Apr 2017 07:13:29 -0400
Subject: [PATCH 15/21] Rewrite min generation
---
.../index/translog/Translog.java | 21 +++++++++++--------
1 file changed, 12 insertions(+), 9 deletions(-)
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index 25d374e8a44d5..dfd26627bd042 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -1356,15 +1356,18 @@ public static void writeOperationNoSize(BufferedChecksumStreamOutput out, Transl
*/
public TranslogGeneration getMinGenerationForSeqNo(final long seqNo) {
try (ReleasableLock ignored = writeLock.acquire()) {
- final long minTranslogFileGeneration = readers
- .stream()
- .filter(r -> {
- final Checkpoint checkpoint = r.getCheckpoint();
- return seqNo <= checkpoint.maxSeqNo;
- })
- .mapToLong(TranslogReader::getGeneration)
- .min()
- .orElseGet(this::currentFileGeneration);
+ /*
+ * When flushing, the engine will ask the translog for the minimum generation that could contain any sequence number after the
+ * local checkpoint. Immediately after flushing, there will be no such generation, so this minimum generation in this case will
+ * be the current translog generation as we do not need any prior generations to have a complete history up to the current local
+ * checkpoint.
+ */
+ long minTranslogFileGeneration = this.currentFileGeneration();
+ for (final TranslogReader reader : readers) {
+ if (seqNo <= reader.getCheckpoint().maxSeqNo) {
+ minTranslogFileGeneration = Math.min(minTranslogFileGeneration, reader.getGeneration());
+ }
+ }
return new TranslogGeneration(translogUUID, minTranslogFileGeneration);
}
}
From e73ea2a52cce5dbe7dfa192a92499c762672bf66 Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Wed, 12 Apr 2017 07:12:24 -0400
Subject: [PATCH 16/21] Add test for recovery on multiple generations
---
.../index/engine/InternalEngineTests.java | 54 +++++++++++++++++++
1 file changed, 54 insertions(+)
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index 2de80786ccc1a..4b0c833b6648d 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -166,6 +166,8 @@
import java.util.function.BiFunction;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
import static java.util.Collections.emptyMap;
import static java.util.Collections.shuffle;
@@ -834,6 +836,58 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
}
}
+ public void testTranslogRecoveryWithMultipleGenerations() throws IOException {
+ final int docs = randomIntBetween(1, 4096);
+ final List seqNos = LongStream.range(0, docs).boxed().collect(Collectors.toList());
+ Collections.shuffle(seqNos);
+ engine.close();
+ Engine initialEngine = null;
+ try {
+ final AtomicInteger counter = new AtomicInteger();
+ initialEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG)) {
+ @Override
+ public SequenceNumbersService seqNoService() {
+ return new SequenceNumbersService(
+ engine.shardId,
+ engine.config().getIndexSettings(),
+ SequenceNumbersService.NO_OPS_PERFORMED,
+ SequenceNumbersService.NO_OPS_PERFORMED,
+ SequenceNumbersService.UNASSIGNED_SEQ_NO) {
+ @Override
+ public long generateSeqNo() {
+ return seqNos.get(counter.getAndIncrement());
+ }
+ };
+ }
+ };
+ for (int i = 0; i < docs; i++) {
+ final String id = Integer.toString(i);
+ final ParsedDocument doc = testParsedDocument(id, "test", null, testDocumentWithTextField(), SOURCE, null);
+ initialEngine.index(indexForDoc(doc));
+ if (rarely()) {
+ initialEngine.getTranslog().rollGeneration();
+ } else if (rarely()) {
+ initialEngine.flush();
+ }
+ }
+ } finally {
+ IOUtils.close(initialEngine);
+ }
+
+ Engine recoveringEngine = null;
+ try {
+ recoveringEngine = new InternalEngine(copy(initialEngine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG));
+ recoveringEngine.recoverFromTranslog();
+ try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
+ TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs);
+ assertEquals(docs, topDocs.totalHits);
+ }
+ } finally {
+ IOUtils.close(recoveringEngine);
+ }
+
+ }
+
public void testConcurrentGetAndFlush() throws Exception {
ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null);
engine.index(indexForDoc(doc));
From e6abaf9de56f25745105716e7652245fe45176f1 Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Thu, 13 Apr 2017 07:54:34 -0400
Subject: [PATCH 17/21] Move to asserts
---
.../index/translog/Translog.java | 27 +++++++------------
1 file changed, 10 insertions(+), 17 deletions(-)
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index dfd26627bd042..dca985e60eda3 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -1431,23 +1431,7 @@ public void prepareCommit() throws IOException {
public void commit(final long committedGeneration) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
- if (committedGeneration > current.generation) {
- final String message = String.format(
- Locale.ROOT,
- "tried to commit generation [%d] later than the current generation [%d]",
- committedGeneration,
- current.generation);
- throw new IllegalStateException(message);
- }
- final Long min = readers.stream().map(TranslogReader::getGeneration).min(Long::compareTo).orElse(Long.MIN_VALUE);
- if (committedGeneration < min) {
- final String message = String.format(
- Locale.ROOT,
- "tried to commit generation [%d] before minimum generation [%d]",
- committedGeneration,
- min);
- throw new IllegalStateException(message);
- }
+ assert assertCommittedGenerationIsInValidRange(committedGeneration);
if (currentCommittingGeneration == NOT_SET_GENERATION) {
prepareCommit();
}
@@ -1461,6 +1445,15 @@ public void commit(final long committedGeneration) throws IOException {
}
}
+ private boolean assertCommittedGenerationIsInValidRange(final long committedGeneration) {
+ assert committedGeneration > current.generation
+ : "tried to commit generation [" + committedGeneration + "] after current generation [" + current.generation + "]";
+ final long min = readers.stream().map(TranslogReader::getGeneration).min(Long::compareTo).orElse(Long.MIN_VALUE);
+ assert committedGeneration < min
+ : "tried to commit generation [" + committedGeneration + "] before minimum generation [" + min + "]";
+ return true;
+ }
+
/**
* Trims unreferenced translog generations. The guarantee here is that translog generations will be preserved for all outstanding views
* and from the last committed translog generation defined by {@link Translog#lastCommittedTranslogFileGeneration}.
From 5f3d2d897f5ba595aebacfe9724acfbd1ca65859 Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Thu, 13 Apr 2017 08:09:53 -0400
Subject: [PATCH 18/21] Return local variables
---
.../org/elasticsearch/index/engine/InternalEngine.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 5f98ad9658692..fe01842053c1a 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -1694,6 +1694,9 @@ private long commitIndexWriter(final IndexWriter writer, final Translog translog
try {
final long localCheckpoint = seqNoService().getLocalCheckpoint();
final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1);
+ final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration);
+ final String translogUUID = translogGeneration.translogUUID;
+ final String localCheckpointValue = Long.toString(localCheckpoint);
writer.setLiveCommitData(() -> {
/*
@@ -1706,9 +1709,9 @@ private long commitIndexWriter(final IndexWriter writer, final Translog translog
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map commitData = new HashMap<>(5);
- commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration));
- commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID);
- commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
+ commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration));
+ commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
+ commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue);
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
From a05b2b75ad4690610298652fcc4ce83af7cf14e3 Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Thu, 13 Apr 2017 08:19:49 -0400
Subject: [PATCH 19/21] Fix compilation
---
.../java/org/elasticsearch/index/engine/InternalEngine.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index fe01842053c1a..a56335d7599e3 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -1709,7 +1709,7 @@ private long commitIndexWriter(final IndexWriter writer, final Translog translog
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map commitData = new HashMap<>(5);
- commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration));
+ commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue);
if (syncId != null) {
From 4486007682cf3761dd54bb0ac2fe13bfcabaf203 Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Thu, 13 Apr 2017 08:43:46 -0400
Subject: [PATCH 20/21] Add duplicate test
---
.../index/translog/Translog.java | 9 ++++--
.../index/translog/TranslogTests.java | 30 ++++++++++++-------
2 files changed, 26 insertions(+), 13 deletions(-)
diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
index dca985e60eda3..014500230a404 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java
@@ -804,6 +804,8 @@ public static Type fromId(byte id) {
long seqNo();
+ long primaryTerm();
+
/**
* Reads the type and the operation from the given stream. The operation must be written with
* {@link Operation#writeType(Operation, StreamOutput)}
@@ -953,6 +955,7 @@ public long seqNo() {
return seqNo;
}
+ @Override
public long primaryTerm() {
return primaryTerm;
}
@@ -1104,6 +1107,7 @@ public long seqNo() {
return seqNo;
}
+ @Override
public long primaryTerm() {
return primaryTerm;
}
@@ -1180,6 +1184,7 @@ public long seqNo() {
return seqNo;
}
+ @Override
public long primaryTerm() {
return primaryTerm;
}
@@ -1446,10 +1451,10 @@ public void commit(final long committedGeneration) throws IOException {
}
private boolean assertCommittedGenerationIsInValidRange(final long committedGeneration) {
- assert committedGeneration > current.generation
+ assert committedGeneration <= current.generation
: "tried to commit generation [" + committedGeneration + "] after current generation [" + current.generation + "]";
final long min = readers.stream().map(TranslogReader::getGeneration).min(Long::compareTo).orElse(Long.MIN_VALUE);
- assert committedGeneration < min
+ assert committedGeneration >= min
: "tried to commit generation [" + committedGeneration + "] before minimum generation [" + min + "]";
return true;
}
diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
index 0e34cae573b8c..fa0e1259f0fbc 100644
--- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
+++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
@@ -39,6 +39,7 @@
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
@@ -2185,32 +2186,38 @@ public void testRollGenerationBetweenPrepareCommitAndCommit() throws IOException
public void testMinGenerationForSeqNo() throws IOException {
final long initialGeneration = translog.getGeneration().translogFileGeneration;
final int operations = randomIntBetween(1, 4096);
- final List seqNos = LongStream.range(0, operations).boxed().collect(Collectors.toList());
- Randomness.shuffle(seqNos);
-
- for (int i = 0; i < operations; i++) {
- final Long seqNo = seqNos.get(i);
- translog.add(new Translog.NoOp(seqNo, 0, "test"));
+ final List shuffledSeqNos = LongStream.range(0, operations).boxed().collect(Collectors.toList());
+ Randomness.shuffle(shuffledSeqNos);
+ final List> seqNos = new ArrayList<>();
+ final Map terms = new HashMap<>();
+ for (final Long seqNo : shuffledSeqNos) {
+ seqNos.add(Tuple.tuple(seqNo, terms.computeIfAbsent(seqNo, k -> 0L)));
+ Long repeatingTermSeqNo = randomFrom(seqNos.stream().map(Tuple::v1).collect(Collectors.toList()));
+ seqNos.add(Tuple.tuple(repeatingTermSeqNo, terms.computeIfPresent(repeatingTermSeqNo, (s, t) -> t + 1)));
+ }
+
+ for (final Tuple tuple : seqNos) {
+ translog.add(new Translog.NoOp(tuple.v1(), tuple.v2(), "test"));
if (rarely()) {
translog.rollGeneration();
}
}
- Map> generations = new HashMap<>();
+ Map>> generations = new HashMap<>();
translog.commit(initialGeneration);
for (long seqNo = 0; seqNo < operations; seqNo++) {
- final Set seenSeqNos = new HashSet<>();
+ final Set> seenSeqNos = new HashSet<>();
final long generation = translog.getMinGenerationForSeqNo(seqNo).translogFileGeneration;
for (long g = generation; g < translog.currentFileGeneration(); g++) {
if (!generations.containsKey(g)) {
- final Set generationSeenSeqNos = new HashSet<>();
+ final Set> generationSeenSeqNos = new HashSet<>();
final Checkpoint checkpoint = Checkpoint.read(translog.location().resolve(Translog.getCommitCheckpointFileName(g)));
try (TranslogReader reader = translog.openReader(translog.location().resolve(Translog.getFilename(g)), checkpoint)) {
Translog.Snapshot snapshot = reader.newSnapshot();
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
- generationSeenSeqNos.add(operation.seqNo());
+ generationSeenSeqNos.add(Tuple.tuple(operation.seqNo(), operation.primaryTerm()));
}
}
generations.put(g, generationSeenSeqNos);
@@ -2219,7 +2226,8 @@ public void testMinGenerationForSeqNo() throws IOException {
seenSeqNos.addAll(generations.get(g));
}
- final Set expected = LongStream.range(seqNo, operations).boxed().collect(Collectors.toSet());
+ final long seqNoLowerBound = seqNo;
+ final Set> expected = seqNos.stream().filter(t -> t.v1() >= seqNoLowerBound).collect(Collectors.toSet());
seenSeqNos.retainAll(expected);
assertThat(seenSeqNos, equalTo(expected));
}
From bb9d8f05f8dd2ea38a8a22310851d156a0cac0ba Mon Sep 17 00:00:00 2001
From: Jason Tedor
Date: Mon, 17 Apr 2017 07:16:16 -0400
Subject: [PATCH 21/21] Do not use forbidden shuffle
---
.../org/elasticsearch/index/engine/InternalEngineTests.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index 26590c54ffe84..6c9ca6adc963b 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -839,7 +839,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
public void testTranslogRecoveryWithMultipleGenerations() throws IOException {
final int docs = randomIntBetween(1, 4096);
final List seqNos = LongStream.range(0, docs).boxed().collect(Collectors.toList());
- Collections.shuffle(seqNos);
+ Randomness.shuffle(seqNos);
engine.close();
Engine initialEngine = null;
try {