Skip to content

Commit

Permalink
Remove sync flush logic in Engine (#51450)
Browse files Browse the repository at this point in the history
This change removes the sync-flush logic in the InternalEngine as we no 
longer issue or renew syncIds in 8.0.

Relates #50776
  • Loading branch information
dnhatn authored Jan 27, 2020
1 parent 038f033 commit 0c87892
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 336 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,6 @@ public String getId() {
return id;
}

/**
* A raw version of the commit id (see {@link SegmentInfos#getId()}
*/
public Engine.CommitId getRawCommitId() {
return new Engine.CommitId(Base64.getDecoder().decode(id));
}

/**
* Returns the number of documents in the in this commit
*/
Expand Down
83 changes: 4 additions & 79 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.Loggers;
Expand Down Expand Up @@ -89,7 +86,6 @@
import java.io.UncheckedIOException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.Base64;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -113,7 +109,7 @@

public abstract class Engine implements Closeable {

public static final String SYNC_COMMIT_ID = "sync_id";
public static final String SYNC_COMMIT_ID = "sync_id"; // TODO: Remove sync_id in 9.0
public static final String HISTORY_UUID_KEY = "history_uuid";
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
Expand Down Expand Up @@ -582,22 +578,6 @@ public static class NoOpResult extends Result {

}

/**
* Attempts to do a special commit where the given syncID is put into the commit data. The attempt
* succeeds if there are not pending writes in lucene and the current point is equal to the expected one.
*
* @param syncId id of this sync
* @param expectedCommitId the expected value of
* @return true if the sync commit was made, false o.w.
*/
public abstract SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) throws EngineException;

public enum SyncedFlushResult {
SUCCESS,
COMMIT_MISMATCH,
PENDING_OPERATIONS
}

protected final GetResult getFromSearcher(Get get, BiFunction<String, SearcherScope, Engine.Searcher> searcherFactory,
SearcherScope scope) throws EngineException {
final Engine.Searcher searcher = searcherFactory.apply("get", scope);
Expand Down Expand Up @@ -1052,20 +1032,17 @@ public boolean refreshNeeded() {
* @param force if <code>true</code> a lucene commit is executed even if no changes need to be committed.
* @param waitIfOngoing if <code>true</code> this call will block until all currently running flushes have finished.
* Otherwise this call will return without blocking.
* @return the commit Id for the resulting commit
*/
public abstract CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException;
public abstract void flush(boolean force, boolean waitIfOngoing) throws EngineException;

/**
* Flushes the state of the engine including the transaction log, clearing memory and persisting
* documents in the lucene index to disk including a potentially heavy and durable fsync operation.
* This operation is not going to block if another flush operation is currently running and won't write
* a lucene commit if nothing needs to be committed.
*
* @return the commit Id for the resulting commit
*/
public final CommitId flush() throws EngineException {
return flush(false, false);
public final void flush() throws EngineException {
flush(false, false);
}


Expand Down Expand Up @@ -1708,58 +1685,6 @@ private void awaitPendingClose() {
}
}

public static class CommitId implements Writeable {

private final byte[] id;

public CommitId(byte[] id) {
assert id != null;
this.id = Arrays.copyOf(id, id.length);
}

/**
* Read from a stream.
*/
public CommitId(StreamInput in) throws IOException {
assert in != null;
this.id = in.readByteArray();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeByteArray(id);
}

@Override
public String toString() {
return Base64.getEncoder().encodeToString(id);
}

public boolean idsEqual(byte[] id) {
return Arrays.equals(id, this.id);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

CommitId commitId = (CommitId) o;

return Arrays.equals(id, commitId.id);

}

@Override
public int hashCode() {
return Arrays.hashCode(id);
}
}

public static class IndexCommitRef implements Closeable {
private final AtomicBoolean closed = new AtomicBoolean();
private final CheckedRunnable<IOException> onClose;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
opsRecovered, translogGeneration == null ? null :
translogGeneration.translogFileGeneration, translog.currentFileGeneration());
commitIndexWriter(indexWriter, translog, null);
commitIndexWriter(indexWriter, translog);
refreshLastCommittedSegmentInfos();
refresh("translog_recovery");
}
Expand Down Expand Up @@ -1582,69 +1582,6 @@ public void writeIndexingBuffer() throws EngineException {
refresh("write indexing buffer", SearcherScope.INTERNAL, false);
}

@Override
public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) throws EngineException {
// best effort attempt before we acquire locks
ensureOpen();
if (indexWriter.hasUncommittedChanges()) {
logger.trace("can't sync commit [{}]. have pending changes", syncId);
return SyncedFlushResult.PENDING_OPERATIONS;
}
if (expectedCommitId.idsEqual(lastCommittedSegmentInfos.getId()) == false) {
logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId);
return SyncedFlushResult.COMMIT_MISMATCH;
}
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
ensureCanFlush();
// lets do a refresh to make sure we shrink the version map. This refresh will be either a no-op (just shrink the version map)
// or we also have uncommitted changes and that causes this syncFlush to fail.
refresh("sync_flush", SearcherScope.INTERNAL, true);
if (indexWriter.hasUncommittedChanges()) {
logger.trace("can't sync commit [{}]. have pending changes", syncId);
return SyncedFlushResult.PENDING_OPERATIONS;
}
if (expectedCommitId.idsEqual(lastCommittedSegmentInfos.getId()) == false) {
logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId);
return SyncedFlushResult.COMMIT_MISMATCH;
}
logger.trace("starting sync commit [{}]", syncId);
commitIndexWriter(indexWriter, translog, syncId);
logger.debug("successfully sync committed. sync id [{}].", syncId);
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
return SyncedFlushResult.SUCCESS;
} catch (IOException ex) {
maybeFailEngine("sync commit", ex);
throw new EngineException(shardId, "failed to sync commit", ex);
}
}

final boolean tryRenewSyncCommit() {
boolean renewed = false;
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
ensureCanFlush();
String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID);
long translogGenOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
if (syncId != null && indexWriter.hasUncommittedChanges() && translog.totalOperationsByMinGen(translogGenOfLastCommit) == 0) {
logger.trace("start renewing sync commit [{}]", syncId);
commitIndexWriter(indexWriter, translog, syncId);
logger.debug("successfully sync committed. sync id [{}].", syncId);
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
renewed = true;
}
} catch (IOException ex) {
maybeFailEngine("renew sync commit", ex);
throw new EngineException(shardId, "failed to renew sync commit", ex);
}
if (renewed) {
// refresh outside of the write lock
// we have to refresh internal reader here to ensure we release unreferenced segments.
refresh("renew sync commit", SearcherScope.INTERNAL, true);
}
return renewed;
}

@Override
public boolean shouldPeriodicallyFlush() {
ensureOpen();
Expand Down Expand Up @@ -1679,14 +1616,13 @@ public boolean shouldPeriodicallyFlush() {
}

@Override
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
ensureOpen();
if (force && waitIfOngoing == false) {
assert false : "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing;
throw new IllegalArgumentException(
"wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing);
}
final byte[] newCommitId;
/*
* Unfortunately the lock order is important here. We have to acquire the readlock first otherwise
* if we are flushing at the end of the recovery while holding the write lock we can deadlock if:
Expand All @@ -1697,13 +1633,12 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
ensureOpen();
if (flushLock.tryLock() == false) {
// if we can't get the lock right away we block if needed otherwise barf
if (waitIfOngoing) {
logger.trace("waiting for in-flight flush to finish");
flushLock.lock();
logger.trace("acquired flush lock after blocking");
} else {
return new CommitId(lastCommittedSegmentInfos.getId());
if (waitIfOngoing == false) {
return;
}
logger.trace("waiting for in-flight flush to finish");
flushLock.lock();
logger.trace("acquired flush lock after blocking");
} else {
logger.trace("acquired flush lock immediately");
}
Expand All @@ -1717,7 +1652,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
try {
translog.rollGeneration();
logger.trace("starting commit for flush; commitTranslog=true");
commitIndexWriter(indexWriter, translog, null);
commitIndexWriter(indexWriter, translog);
logger.trace("finished commit for flush");

// a temporary debugging to investigate test failure - issue#32827. Remove when the issue is resolved
Expand All @@ -1735,7 +1670,6 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
refreshLastCommittedSegmentInfos();

}
newCommitId = lastCommittedSegmentInfos.getId();
} catch (FlushFailedEngineException ex) {
maybeFailEngine("flush", ex);
throw ex;
Expand All @@ -1748,7 +1682,6 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti
if (engineConfig.isEnableGcDeletes()) {
pruneDeletedTombstones();
}
return new CommitId(newCommitId);
}

private void refreshLastCommittedSegmentInfos() {
Expand Down Expand Up @@ -1916,9 +1849,7 @@ public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpu
indexWriter.forceMerge(maxNumSegments, true /* blocks and waits for merges*/);
}
if (flush) {
if (tryRenewSyncCommit() == false) {
flush(false, true);
}
flush(false, true);
}
if (upgrade) {
logger.info("finished segment upgrade");
Expand Down Expand Up @@ -2306,15 +2237,9 @@ public void onFailure(Exception e) {

@Override
protected void doRun() {
// if we have no pending merges and we are supposed to flush once merges have finished
// we try to renew a sync commit which is the case when we are having a big merge after we
// are inactive. If that didn't work we go and do a real flush which is ok since it only doesn't work
// if we either have records in the translog or if we don't have a sync ID at all...
// maybe even more important, we flush after all merges finish and we are inactive indexing-wise to
// if we have no pending merges and we are supposed to flush once merges have finished to
// free up transient disk usage of the (presumably biggish) segments that were just merged
if (tryRenewSyncCommit() == false) {
flush();
}
flush();
}
});
} else if (merge.getTotalBytesSize() >= engineConfig.getIndexSettings().getFlushAfterMergeThresholdSize().getBytes()) {
Expand Down Expand Up @@ -2351,10 +2276,8 @@ protected void doRun() throws Exception {
*
* @param writer the index writer to commit
* @param translog the translog
* @param syncId the sync flush ID ({@code null} if not committing a synced flush)
* @throws IOException if an I/O exception occurs committing the specfied writer
*/
protected void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
protected void commitIndexWriter(final IndexWriter writer, final Translog translog) throws IOException {
ensureCanFlush();
try {
final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
Expand All @@ -2373,13 +2296,10 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(8);
final Map<String, String> commitData = new HashMap<>(7);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue);
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
commitData.put(HISTORY_UUID_KEY, historyUUID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,14 +377,8 @@ public boolean shouldPeriodicallyFlush() {
}

@Override
public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) {
// we can't do synced flushes this would require an indexWriter which we don't have
throw new UnsupportedOperationException("syncedFlush is not supported on a read-only engine");
}

@Override
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException {
return new CommitId(lastCommittedSegmentInfos.getId());
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
// noop
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1037,9 +1037,8 @@ public CompletionStats completionStats(String... fields) {
* Executes the given flush request against the engine.
*
* @param request the flush request
* @return the commit ID
*/
public Engine.CommitId flush(FlushRequest request) {
public void flush(FlushRequest request) {
final boolean waitIfOngoing = request.waitIfOngoing();
final boolean force = request.force();
logger.trace("flush with {}", request);
Expand All @@ -1050,9 +1049,8 @@ public Engine.CommitId flush(FlushRequest request) {
*/
verifyNotClosed();
final long time = System.nanoTime();
final Engine.CommitId commitId = getEngine().flush(force, waitIfOngoing);
getEngine().flush(force, waitIfOngoing);
flushMetric.inc(System.nanoTime() - time);
return commitId;
}

/**
Expand Down
Loading

0 comments on commit 0c87892

Please sign in to comment.