Skip to content

Commit

Permalink
Encapsulate Translog in Engine (#31220)
Browse files Browse the repository at this point in the history
This removes the abstract `getTranslog` method in `Engine`, instead leaving it
to the abstract implementations of the other methods that use the translog. This
allows future Engines not to have a Translog, as instead they must implement the
methods that use the translog pieces to return necessary values.
  • Loading branch information
dakrone authored Jun 11, 2018
1 parent 99e0458 commit c064b50
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 40 deletions.
38 changes: 8 additions & 30 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -565,18 +565,10 @@ public enum SearcherScope {
EXTERNAL, INTERNAL
}

/**
* Returns the translog associated with this engine.
* Prefer to keep the translog package-private, so that an engine can control all accesses to the translog.
*/
abstract Translog getTranslog();

/**
* Checks if the underlying storage sync is required.
*/
public boolean isTranslogSyncNeeded() {
return getTranslog().syncNeeded();
}
public abstract boolean isTranslogSyncNeeded();

/**
* Ensures that all locations in the given stream have been written to the underlying storage.
Expand All @@ -585,35 +577,25 @@ public boolean isTranslogSyncNeeded() {

public abstract void syncTranslog() throws IOException;

public Closeable acquireTranslogRetentionLock() {
return getTranslog().acquireRetentionLock();
}
public abstract Closeable acquireTranslogRetentionLock();

/**
* Creates a new translog snapshot from this engine for reading translog operations whose seq# at least the provided seq#.
* The caller has to close the returned snapshot after finishing the reading.
*/
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
return getTranslog().newSnapshotFromMinSeqNo(minSeqNo);
}
public abstract Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException;

/**
* Returns the estimated number of translog operations in this engine whose seq# at least the provided seq#.
*/
public int estimateTranslogOperationsFromMinSeq(long minSeqNo) {
return getTranslog().estimateTotalOperationsFromMinSeq(minSeqNo);
}
public abstract int estimateTranslogOperationsFromMinSeq(long minSeqNo);

public TranslogStats getTranslogStats() {
return getTranslog().stats();
}
public abstract TranslogStats getTranslogStats();

/**
* Returns the last location that the translog of this engine has written into.
*/
public Translog.Location getTranslogLastWriteLocation() {
return getTranslog().getLastWriteLocation();
}
public abstract Translog.Location getTranslogLastWriteLocation();

protected final void ensureOpen(Exception suppressed) {
if (isClosed.get()) {
Expand Down Expand Up @@ -661,9 +643,7 @@ public CommitStats commitStats() {
/**
* Returns the latest global checkpoint value that has been persisted in the underlying storage (i.e. translog's checkpoint)
*/
public long getLastSyncedGlobalCheckpoint() {
return getTranslog().getLastSyncedGlobalCheckpoint();
}
public abstract long getLastSyncedGlobalCheckpoint();

/**
* Global stats on segments.
Expand Down Expand Up @@ -935,9 +915,7 @@ public final boolean refreshNeeded() {
*
* @return {@code true} if the current generation should be rolled to a new generation
*/
public boolean shouldRollTranslogGeneration() {
return getTranslog().shouldRollGeneration();
}
public abstract boolean shouldRollTranslogGeneration();

/**
* Rolls the translog generation and cleans unneeded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.index.translog.TranslogDeletionPolicy;
import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -422,12 +424,17 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, engineConfig.getPrimaryTermSupplier());
}

@Override
// Package private for testing purposes only
Translog getTranslog() {
ensureOpen();
return translog;
}

@Override
public boolean isTranslogSyncNeeded() {
return getTranslog().syncNeeded();
}

@Override
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException {
final boolean synced = translog.ensureSynced(locations);
Expand All @@ -443,6 +450,31 @@ public void syncTranslog() throws IOException {
revisitIndexDeletionPolicyOnTranslogSynced();
}

@Override
public Closeable acquireTranslogRetentionLock() {
return getTranslog().acquireRetentionLock();
}

@Override
public Translog.Snapshot newTranslogSnapshotFromMinSeqNo(long minSeqNo) throws IOException {
return getTranslog().newSnapshotFromMinSeqNo(minSeqNo);
}

@Override
public int estimateTranslogOperationsFromMinSeq(long minSeqNo) {
return getTranslog().estimateTotalOperationsFromMinSeq(minSeqNo);
}

@Override
public TranslogStats getTranslogStats() {
return getTranslog().stats();
}

@Override
public Translog.Location getTranslogLastWriteLocation() {
return getTranslog().getLastWriteLocation();
}

private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException {
if (combinedDeletionPolicy.hasUnreferencedCommits()) {
indexWriter.deleteUnusedFiles();
Expand Down Expand Up @@ -1570,6 +1602,11 @@ public void trimUnreferencedTranslogFiles() throws EngineException {
}
}

@Override
public boolean shouldRollTranslogGeneration() {
return getTranslog().shouldRollGeneration();
}

@Override
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
Expand Down Expand Up @@ -2191,6 +2228,11 @@ LocalCheckpointTracker getLocalCheckpointTracker() {
return localCheckpointTracker;
}

@Override
public long getLastSyncedGlobalCheckpoint() {
return getTranslog().getLastSyncedGlobalCheckpoint();
}

@Override
public long getLocalCheckpoint() {
return localCheckpointTracker.getCheckpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
super.commitIndexWriter(writer, translog, syncId);
}
};
assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(docs));
assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs));
recoveringEngine.recoverFromTranslog();
assertTrue(committed.get());
} finally {
Expand All @@ -758,7 +758,7 @@ public void testTranslogRecoveryWithMultipleGenerations() throws IOException {
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
initialEngine.index(indexForDoc(doc));
if (rarely()) {
initialEngine.getTranslog().rollGeneration();
getTranslog(initialEngine).rollGeneration();
} else if (rarely()) {
initialEngine.flush();
}
Expand Down Expand Up @@ -3983,14 +3983,14 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpoint());
trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get));
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().stats().getUncommittedOperations());
assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations());
recoveringEngine.recoverFromTranslog();
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint());
assertEquals((maxSeqIDOnReplica + 1) - numDocsOnReplica, recoveringEngine.fillSeqNoGaps(2));

// now snapshot the tlog and ensure the primary term is updated
try (Translog.Snapshot snapshot = recoveringEngine.getTranslog().newSnapshot()) {
try (Translog.Snapshot snapshot = getTranslog(recoveringEngine).newSnapshot()) {
assertTrue((maxSeqIDOnReplica + 1) - numDocsOnReplica <= snapshot.totalOperations());
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
Expand All @@ -4005,7 +4005,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint());
if ((flushed = randomBoolean())) {
globalCheckpoint.set(recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
recoveringEngine.getTranslog().sync();
getTranslog(recoveringEngine).sync();
recoveringEngine.flush(true, true);
}
}
Expand All @@ -4018,7 +4018,7 @@ public void testFillUpSequenceIdGapsOnRecovery() throws IOException {
trimUnsafeCommits(replicaEngine.config());
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
if (flushed) {
assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(0));
assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
}
recoveringEngine.recoverFromTranslog();
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
Expand Down Expand Up @@ -4221,7 +4221,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
engine.index(indexForDoc(testParsedDocument(Integer.toString(docId), null, document, B_1, null)));
if (frequently()) {
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.getTranslog().sync();
engine.syncTranslog();
}
if (frequently()) {
final long lastSyncedGlobalCheckpoint = Translog.readGlobalCheckpoint(translogPath, translogUUID);
Expand All @@ -4243,7 +4243,7 @@ protected void commitIndexWriter(IndexWriter writer, Translog translog, String s
}
// Make sure we keep all translog operations after the local checkpoint of the safe commit.
long localCheckpointFromSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) {
try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(localCheckpointFromSafeCommit + 1, docId));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,8 @@ protected Engine.Delete replicaDeleteForDoc(String id, long version, long seqNo,
* Exposes a translog associated with the given engine for testing purpose.
*/
public static Translog getTranslog(Engine engine) {
return engine.getTranslog();
assert engine instanceof InternalEngine : "only InternalEngines have translogs, got: " + engine.getClass();
InternalEngine internalEngine = (InternalEngine) engine;
return internalEngine.getTranslog();
}
}

0 comments on commit c064b50

Please sign in to comment.