diff --git a/server/src/main/java/org/opensearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/opensearch/action/admin/indices/flush/TransportShardFlushAction.java index f1e30b83affc4..53e774306e746 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -82,6 +82,12 @@ public TransportShardFlushAction( ShardFlushRequest::new, ThreadPool.Names.FLUSH ); + transportService.registerRequestHandler( + PRE_SYNCED_FLUSH_ACTION_NAME, + ThreadPool.Names.FLUSH, + PreShardSyncedFlushRequest::new, + new PreSyncedFlushTransportHandler(indicesService) + ); } @Override diff --git a/server/src/main/java/org/opensearch/index/engine/CommitStats.java b/server/src/main/java/org/opensearch/index/engine/CommitStats.java index 914e44880da25..1220208d3a8f3 100644 --- a/server/src/main/java/org/opensearch/index/engine/CommitStats.java +++ b/server/src/main/java/org/opensearch/index/engine/CommitStats.java @@ -89,13 +89,6 @@ public String getId() { return id; } - /** - * A raw version of the commit id (see {@link SegmentInfos#getId()} - */ - public Engine.CommitId getRawCommitId() { - return new Engine.CommitId(Base64.getDecoder().decode(id)); - } - /** * Returns the number of documents in the in this commit */ diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index d4415e91e2fe8..3e92f6a2aef97 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -60,9 +60,6 @@ import org.opensearch.common.Nullable; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.collect.ImmutableOpenMap; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.common.io.stream.StreamOutput; -import org.opensearch.common.io.stream.Writeable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.common.logging.Loggers; @@ -96,7 +93,6 @@ import java.io.UncheckedIOException; import java.nio.file.NoSuchFileException; import java.util.Arrays; -import java.util.Base64; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -121,7 +117,7 @@ public abstract class Engine implements Closeable { - public static final String SYNC_COMMIT_ID = "sync_id"; + public static final String SYNC_COMMIT_ID = "sync_id"; // TODO: remove sync_id in 3.0 public static final String HISTORY_UUID_KEY = "history_uuid"; public static final String FORCE_MERGE_UUID_KEY = "force_merge_uuid"; public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no"; @@ -577,22 +573,6 @@ public static class NoOpResult extends Result { } - /** - * Attempts to do a special commit where the given syncID is put into the commit data. The attempt - * succeeds if there are not pending writes in lucene and the current point is equal to the expected one. - * - * @param syncId id of this sync - * @param expectedCommitId the expected value of - * @return true if the sync commit was made, false o.w. - */ - public abstract SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) throws EngineException; - - public enum SyncedFlushResult { - SUCCESS, - COMMIT_MISMATCH, - PENDING_OPERATIONS - } - protected final GetResult getFromSearcher( Get get, BiFunction searcherFactory, @@ -1139,20 +1119,17 @@ public boolean refreshNeeded() { * @param force if true a lucene commit is executed even if no changes need to be committed. * @param waitIfOngoing if true this call will block until all currently running flushes have finished. * Otherwise this call will return without blocking. - * @return the commit Id for the resulting commit */ - public abstract CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException; + public abstract void flush(boolean force, boolean waitIfOngoing) throws EngineException; /** * Flushes the state of the engine including the transaction log, clearing memory and persisting * documents in the lucene index to disk including a potentially heavy and durable fsync operation. * This operation is not going to block if another flush operation is currently running and won't write * a lucene commit if nothing needs to be committed. - * - * @return the commit Id for the resulting commit */ - public final CommitId flush() throws EngineException { - return flush(false, false); + public final void flush() throws EngineException { + flush(false, false); } /** @@ -1923,58 +1900,6 @@ private void awaitPendingClose() { } } - public static class CommitId implements Writeable { - - private final byte[] id; - - public CommitId(byte[] id) { - assert id != null; - this.id = Arrays.copyOf(id, id.length); - } - - /** - * Read from a stream. - */ - public CommitId(StreamInput in) throws IOException { - assert in != null; - this.id = in.readByteArray(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeByteArray(id); - } - - @Override - public String toString() { - return Base64.getEncoder().encodeToString(id); - } - - public boolean idsEqual(byte[] id) { - return Arrays.equals(id, this.id); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - CommitId commitId = (CommitId) o; - - return Arrays.equals(id, commitId.id); - - } - - @Override - public int hashCode() { - return Arrays.hashCode(id); - } - } - public static class IndexCommitRef implements Closeable { private final AtomicBoolean closed = new AtomicBoolean(); private final CheckedRunnable onClose; diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 3fdeaf13ae564..219740849231c 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -1938,71 +1938,6 @@ public void writeIndexingBuffer() throws EngineException { refresh("write indexing buffer", SearcherScope.INTERNAL, false); } - @Override - public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) throws EngineException { - // best effort attempt before we acquire locks - ensureOpen(); - if (indexWriter.hasUncommittedChanges()) { - logger.trace("can't sync commit [{}]. have pending changes", syncId); - return SyncedFlushResult.PENDING_OPERATIONS; - } - if (expectedCommitId.idsEqual(lastCommittedSegmentInfos.getId()) == false) { - logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId); - return SyncedFlushResult.COMMIT_MISMATCH; - } - try (ReleasableLock lock = writeLock.acquire()) { - ensureOpen(); - ensureCanFlush(); - // lets do a refresh to make sure we shrink the version map. This refresh will be either a no-op (just shrink the version map) - // or we also have uncommitted changes and that causes this syncFlush to fail. - refresh("sync_flush", SearcherScope.INTERNAL, true); - if (indexWriter.hasUncommittedChanges()) { - logger.trace("can't sync commit [{}]. have pending changes", syncId); - return SyncedFlushResult.PENDING_OPERATIONS; - } - if (expectedCommitId.idsEqual(lastCommittedSegmentInfos.getId()) == false) { - logger.trace("can't sync commit [{}]. current commit id is not equal to expected.", syncId); - return SyncedFlushResult.COMMIT_MISMATCH; - } - logger.trace("starting sync commit [{}]", syncId); - commitIndexWriter(indexWriter, translog, syncId); - logger.debug("successfully sync committed. sync id [{}].", syncId); - lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); - return SyncedFlushResult.SUCCESS; - } catch (IOException ex) { - maybeFailEngine("sync commit", ex); - throw new EngineException(shardId, "failed to sync commit", ex); - } - } - - final boolean tryRenewSyncCommit() { - boolean renewed = false; - try (ReleasableLock lock = writeLock.acquire()) { - ensureOpen(); - ensureCanFlush(); - String syncId = lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID); - long localCheckpointOfLastCommit = Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); - if (syncId != null - && indexWriter.hasUncommittedChanges() - && translog.estimateTotalOperationsFromMinSeq(localCheckpointOfLastCommit + 1) == 0) { - logger.trace("start renewing sync commit [{}]", syncId); - commitIndexWriter(indexWriter, translog, syncId); - logger.debug("successfully sync committed. sync id [{}].", syncId); - lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); - renewed = true; - } - } catch (IOException ex) { - maybeFailEngine("renew sync commit", ex); - throw new EngineException(shardId, "failed to renew sync commit", ex); - } - if (renewed) { - // refresh outside of the write lock - // we have to refresh internal reader here to ensure we release unreferenced segments. - refresh("renew sync commit", SearcherScope.INTERNAL, true); - } - return renewed; - } - @Override public boolean shouldPeriodicallyFlush() { ensureOpen(); @@ -2042,7 +1977,7 @@ public boolean shouldPeriodicallyFlush() { } @Override - public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException { + public void flush(boolean force, boolean waitIfOngoing) throws EngineException { ensureOpen(); if (force && waitIfOngoing == false) { assert false : "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing; @@ -2050,18 +1985,16 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti "wait_if_ongoing must be true for a force flush: force=" + force + " wait_if_ongoing=" + waitIfOngoing ); } - final byte[] newCommitId; try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); if (flushLock.tryLock() == false) { // if we can't get the lock right away we block if needed otherwise barf - if (waitIfOngoing) { - logger.trace("waiting for in-flight flush to finish"); - flushLock.lock(); - logger.trace("acquired flush lock after blocking"); - } else { - return new CommitId(lastCommittedSegmentInfos.getId()); + if (waitIfOngoing == false) { + return; } + logger.trace("waiting for in-flight flush to finish"); + flushLock.lock(); + logger.trace("acquired flush lock after blocking"); } else { logger.trace("acquired flush lock immediately"); } @@ -2081,7 +2014,7 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti try { translog.rollGeneration(); logger.trace("starting commit for flush; commitTranslog=true"); - commitIndexWriter(indexWriter, translog, null); + commitIndexWriter(indexWriter, translog); logger.trace("finished commit for flush"); // a temporary debugging to investigate test failure - issue#32827. Remove when the issue is resolved @@ -2104,7 +2037,6 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti refreshLastCommittedSegmentInfos(); } - newCommitId = lastCommittedSegmentInfos.getId(); } catch (FlushFailedEngineException ex) { maybeFailEngine("flush", ex); throw ex; @@ -2117,7 +2049,6 @@ public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineExcepti if (engineConfig.isEnableGcDeletes()) { pruneDeletedTombstones(); } - return new CommitId(newCommitId); } private void refreshLastCommittedSegmentInfos() { @@ -2289,9 +2220,7 @@ public void forceMerge( this.forceMergeUUID = forceMergeUUID; } if (flush) { - if (tryRenewSyncCommit() == false) { - flush(false, true); - } + flush(false, true); } if (upgrade) { logger.info("finished segment upgrade"); @@ -2682,15 +2611,9 @@ public void onFailure(Exception e) { @Override protected void doRun() { - // if we have no pending merges and we are supposed to flush once merges have finished - // we try to renew a sync commit which is the case when we are having a big merge after we - // are inactive. If that didn't work we go and do a real flush which is ok since it only doesn't work - // if we either have records in the translog or if we don't have a sync ID at all... - // maybe even more important, we flush after all merges finish and we are inactive indexing-wise to + // if we have no pending merges and we are supposed to flush once merges have finished to // free up transient disk usage of the (presumably biggish) segments that were just merged - if (tryRenewSyncCommit() == false) { - flush(); - } + flush(); } }); } else if (merge.getTotalBytesSize() >= engineConfig.getIndexSettings().getFlushAfterMergeThresholdSize().getBytes()) { @@ -2727,10 +2650,8 @@ protected void doRun() throws Exception { * * @param writer the index writer to commit * @param translog the translog - * @param syncId the sync flush ID ({@code null} if not committing a synced flush) - * @throws IOException if an I/O exception occurs committing the specfied writer */ - protected void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException { + protected void commitIndexWriter(final IndexWriter writer, final Translog translog) throws IOException { ensureCanFlush(); try { final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint(); @@ -2747,9 +2668,6 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl final Map commitData = new HashMap<>(7); commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID()); commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint)); - if (syncId != null) { - commitData.put(Engine.SYNC_COMMIT_ID, syncId); - } commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo())); commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); commitData.put(HISTORY_UUID_KEY, historyUUID); diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index b0c05701ae0c6..a3bb66125a871 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -435,15 +435,7 @@ public boolean shouldPeriodicallyFlush() { } @Override - public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) { - // we can't do synced flushes this would require an indexWriter which we don't have - throw new UnsupportedOperationException("syncedFlush is not supported on a read-only engine"); - } - - @Override - public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException { - return new CommitId(lastCommittedSegmentInfos.getId()); - } + public void flush(boolean force, boolean waitIfOngoing) throws EngineException {} @Override public void forceMerge( diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 5dec6c5c5bf1f..b2f94f3d398ef 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1321,9 +1321,8 @@ public CompletionStats completionStats(String... fields) { * Executes the given flush request against the engine. * * @param request the flush request - * @return the commit ID */ - public Engine.CommitId flush(FlushRequest request) { + public void flush(FlushRequest request) { final boolean waitIfOngoing = request.waitIfOngoing(); final boolean force = request.force(); logger.trace("flush with {}", request); @@ -1334,9 +1333,8 @@ public Engine.CommitId flush(FlushRequest request) { */ verifyNotClosed(); final long time = System.nanoTime(); - final Engine.CommitId commitId = getEngine().flush(force, waitIfOngoing); + getEngine().flush(force, waitIfOngoing); flushMetric.inc(System.nanoTime() - time); - return commitId; } /** diff --git a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestSyncedFlushAction.java b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestSyncedFlushAction.java index 7d89100571693..726fd69dc29c1 100644 --- a/server/src/main/java/org/opensearch/rest/action/admin/indices/RestSyncedFlushAction.java +++ b/server/src/main/java/org/opensearch/rest/action/admin/indices/RestSyncedFlushAction.java @@ -57,7 +57,7 @@ public class RestSyncedFlushAction extends BaseRestHandler { - private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(RestSyncedFlushAction.class.getClass()); + private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(RestSyncedFlushAction.class); @Override public List routes() { diff --git a/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 43d5a85094a36..72c7b5168fe15 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -53,7 +53,6 @@ import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; -import org.opensearch.index.engine.Engine; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ReplicationGroup; @@ -78,6 +77,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import static org.mockito.Mockito.doNothing; import static org.opensearch.action.support.replication.ClusterStateCreationUtils.state; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.opensearch.test.ClusterServiceUtils.setState; @@ -194,8 +194,7 @@ private void executeOnPrimaryOrReplica(boolean phase1) throws Throwable { public void testShardIsFlushed() throws Throwable { final ArgumentCaptor flushRequest = ArgumentCaptor.forClass(FlushRequest.class); - when(indexShard.flush(flushRequest.capture())).thenReturn(new Engine.CommitId(new byte[0])); - + doNothing().when(indexShard).flush(flushRequest.capture()); executeOnPrimaryOrReplica(); verify(indexShard, times(1)).flush(any(FlushRequest.class)); assertThat(flushRequest.getValue().force(), is(true)); diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index be955be882160..0a83879c7cfb9 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -82,6 +82,7 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.SetOnce; import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionListener; @@ -115,6 +116,7 @@ import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.internal.io.IOUtils; import org.opensearch.index.IndexSettings; @@ -165,7 +167,6 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; -import java.util.Base64; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -749,10 +750,9 @@ public long getProcessedCheckpoint() { : randomIntBetween(0, (int) localCheckpoint.get()) ); - final Engine.CommitId commitId = engine.flush(true, true); + engine.flush(true, true); CommitStats stats2 = engine.commitStats(); - assertThat(stats2.getRawCommitId(), equalTo(commitId)); assertThat(stats2.getGeneration(), greaterThan(stats1.getGeneration())); assertThat(stats2.getId(), notNullValue()); assertThat(stats2.getId(), not(equalTo(stats1.getId()))); @@ -861,9 +861,9 @@ public void testTranslogRecoveryDoesNotReplayIntoTranslog() throws IOException { recoveringEngine = new InternalEngine(initialEngine.config()) { @Override - protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { + protected void commitIndexWriter(IndexWriter writer, Translog translog) throws IOException { committed.set(true); - super.commitIndexWriter(writer, translog, syncId); + super.commitIndexWriter(writer, translog); } }; assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs)); @@ -1317,137 +1317,31 @@ public void testSyncTranslogConcurrently() throws Exception { checker.run(); } - public void testSyncedFlush() throws IOException { - try ( - Store store = createStore(); - Engine engine = createEngine(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null) - ) { - final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); - ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); - engine.index(indexForDoc(doc)); - Engine.CommitId commitID = engine.flush(); - assertThat(commitID, equalTo(new Engine.CommitId(store.readLastCommittedSegmentsInfo().getId()))); - byte[] wrongBytes = Base64.getDecoder().decode(commitID.toString()); - wrongBytes[0] = (byte) ~wrongBytes[0]; - Engine.CommitId wrongId = new Engine.CommitId(wrongBytes); - assertEquals( - "should fail to sync flush with wrong id (but no docs)", - engine.syncFlush(syncId + "1", wrongId), - Engine.SyncedFlushResult.COMMIT_MISMATCH - ); - engine.index(indexForDoc(doc)); - assertEquals( - "should fail to sync flush with right id but pending doc", - engine.syncFlush(syncId + "2", commitID), - Engine.SyncedFlushResult.PENDING_OPERATIONS - ); - commitID = engine.flush(); - assertEquals( - "should succeed to flush commit with right id and no pending doc", - engine.syncFlush(syncId, commitID), - Engine.SyncedFlushResult.SUCCESS - ); - assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); - assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); - } - } - - public void testRenewSyncFlush() throws Exception { - final int iters = randomIntBetween(2, 5); // run this a couple of times to get some coverage - for (int i = 0; i < iters; i++) { - try ( - Store store = createStore(); - InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), new LogDocMergePolicy(), null)) - ) { - final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); - Engine.Index doc1 = indexForDoc(testParsedDocument("1", null, testDocumentWithTextField(), B_1, null)); - engine.index(doc1); - assertEquals(engine.getLastWriteNanos(), doc1.startTime()); - engine.flush(); - Engine.Index doc2 = indexForDoc(testParsedDocument("2", null, testDocumentWithTextField(), B_1, null)); - engine.index(doc2); - assertEquals(engine.getLastWriteNanos(), doc2.startTime()); - engine.flush(); - final boolean forceMergeFlushes = randomBoolean(); - final ParsedDocument parsedDoc3 = testParsedDocument("3", null, testDocumentWithTextField(), B_1, null); - if (forceMergeFlushes) { - engine.index( - new Engine.Index( - newUid(parsedDoc3), - parsedDoc3, - UNASSIGNED_SEQ_NO, - 0, - Versions.MATCH_ANY, - VersionType.INTERNAL, - Engine.Operation.Origin.PRIMARY, - System.nanoTime() - engine.engineConfig.getFlushMergesAfter().nanos(), - -1, - false, - UNASSIGNED_SEQ_NO, - 0 - ) - ); - } else { - engine.index(indexForDoc(parsedDoc3)); - } - Engine.CommitId commitID = engine.flush(); - assertEquals( - "should succeed to flush commit with right id and no pending doc", - engine.syncFlush(syncId, commitID), - Engine.SyncedFlushResult.SUCCESS - ); - assertEquals(3, engine.segments(false).size()); - - engine.forceMerge(forceMergeFlushes, 1, false, false, false, UUIDs.randomBase64UUID()); - if (forceMergeFlushes == false) { - engine.refresh("make all segments visible"); - assertEquals(4, engine.segments(false).size()); - assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); - assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); - assertTrue(engine.tryRenewSyncCommit()); - assertEquals(1, engine.segments(false).size()); - } else { - engine.refresh("test"); - assertBusy(() -> assertEquals(1, engine.segments(false).size())); - } - assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); - assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); - - if (randomBoolean()) { - Engine.Index doc4 = indexForDoc(testParsedDocument("4", null, testDocumentWithTextField(), B_1, null)); - engine.index(doc4); - assertEquals(engine.getLastWriteNanos(), doc4.startTime()); - } else { - Engine.Delete delete = new Engine.Delete(doc1.type(), doc1.id(), doc1.uid(), primaryTerm.get()); - engine.delete(delete); - assertEquals(engine.getLastWriteNanos(), delete.startTime()); - } - assertFalse(engine.tryRenewSyncCommit()); - // we might hit a concurrent flush from a finishing merge here - just wait if ongoing... - engine.flush(false, true); - assertNull(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID)); - assertNull(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); - } - } - } - public void testSyncedFlushSurvivesEngineRestart() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); IOUtils.close(store, engine); + SetOnce indexWriterHolder = new SetOnce<>(); + IndexWriterFactory indexWriterFactory = (directory, iwc) -> { + indexWriterHolder.set(new IndexWriter(directory, iwc)); + return indexWriterHolder.get(); + }; store = createStore(); - engine = createEngine(store, primaryTranslogDir, globalCheckpoint::get); + engine = createEngine( + defaultSettings, + store, + primaryTranslogDir, + newMergePolicy(), + indexWriterFactory, + null, + globalCheckpoint::get + ); final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), new BytesArray("{}"), null); engine.index(indexForDoc(doc)); globalCheckpoint.set(0L); - final Engine.CommitId commitID = engine.flush(); - assertEquals( - "should succeed to flush commit with right id and no pending doc", - engine.syncFlush(syncId, commitID), - Engine.SyncedFlushResult.SUCCESS - ); + engine.flush(); + syncFlush(indexWriterHolder.get(), engine, syncId); assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); - assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); EngineConfig config = engine.config(); if (randomBoolean()) { engine.close(); @@ -1469,17 +1363,30 @@ public void testSyncedFlushSurvivesEngineRestart() throws IOException { } public void testSyncedFlushVanishesOnReplay() throws IOException { + IOUtils.close(store, engine); + SetOnce indexWriterHolder = new SetOnce<>(); + IndexWriterFactory indexWriterFactory = (directory, iwc) -> { + indexWriterHolder.set(new IndexWriter(directory, iwc)); + return indexWriterHolder.get(); + }; + store = createStore(); + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + engine = createEngine( + defaultSettings, + store, + primaryTranslogDir, + newMergePolicy(), + indexWriterFactory, + null, + globalCheckpoint::get + ); final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), new BytesArray("{}"), null); + globalCheckpoint.set(engine.getProcessedLocalCheckpoint()); engine.index(indexForDoc(doc)); - final Engine.CommitId commitID = engine.flush(); - assertEquals( - "should succeed to flush commit with right id and no pending doc", - engine.syncFlush(syncId, commitID), - Engine.SyncedFlushResult.SUCCESS - ); + engine.flush(); + syncFlush(indexWriterHolder.get(), engine, syncId); assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); - assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); doc = testParsedDocument("2", null, testDocumentWithTextField(), new BytesArray("{}"), null); engine.index(indexForDoc(doc)); EngineConfig config = engine.config(); @@ -1492,6 +1399,16 @@ public void testSyncedFlushVanishesOnReplay() throws IOException { ); } + void syncFlush(IndexWriter writer, InternalEngine engine, String syncId) throws IOException { + try (ReleasableLock ignored = engine.writeLock.acquire()) { + Map userData = new HashMap<>(); + writer.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue())); + userData.put(Engine.SYNC_COMMIT_ID, syncId); + writer.setLiveCommitData(userData.entrySet()); + writer.commit(); + } + } + public void testVersioningNewCreate() throws IOException { ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null); Engine.Index create = new Engine.Index(newUid(doc), primaryTerm.get(), doc, Versions.MATCH_DELETED); @@ -3511,8 +3428,8 @@ public void testTranslogCleanUpPostCommitCrash() throws Exception { ) { @Override - protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { - super.commitIndexWriter(writer, translog, syncId); + protected void commitIndexWriter(IndexWriter writer, Translog translog) throws IOException { + super.commitIndexWriter(writer, translog); if (throwErrorOnCommit.get()) { throw new RuntimeException("power's out"); } @@ -5855,14 +5772,14 @@ public void testKeepTranslogAfterGlobalCheckpoint() throws Exception { final AtomicLong lastSyncedGlobalCheckpointBeforeCommit = new AtomicLong(Translog.readGlobalCheckpoint(translogPath, translogUUID)); try (InternalEngine engine = new InternalEngine(engineConfig) { @Override - protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { + protected void commitIndexWriter(IndexWriter writer, Translog translog) throws IOException { lastSyncedGlobalCheckpointBeforeCommit.set(Translog.readGlobalCheckpoint(translogPath, translogUUID)); // Advance the global checkpoint during the flush to create a lag between a persisted global checkpoint in the translog // (this value is visible to the deletion policy) and an in memory global checkpoint in the SequenceNumbersService. if (rarely()) { globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), getPersistedLocalCheckpoint())); } - super.commitIndexWriter(writer, translog, syncId); + super.commitIndexWriter(writer, translog); } }) { engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); diff --git a/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java index 956e136575a69..609e972b2c026 100644 --- a/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java @@ -150,50 +150,6 @@ public void testReadOnlyEngine() throws Exception { } } - public void testFlushes() throws IOException { - IOUtils.close(engine, store); - Engine readOnlyEngine = null; - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); - int numDocs = scaledRandomIntBetween(10, 1000); - try (InternalEngine engine = createEngine(config)) { - for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); - engine.index( - new Engine.Index( - newUid(doc), - doc, - i, - primaryTerm.get(), - 1, - null, - Engine.Operation.Origin.REPLICA, - System.nanoTime(), - -1, - false, - SequenceNumbers.UNASSIGNED_SEQ_NO, - 0 - ) - ); - if (rarely()) { - engine.flush(); - } - engine.syncTranslog(); // advance persisted local checkpoint - globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); - } - globalCheckpoint.set(engine.getPersistedLocalCheckpoint()); - engine.syncTranslog(); - engine.flushAndClose(); - readOnlyEngine = new ReadOnlyEngine(engine.engineConfig, null, null, true, Function.identity(), true); - Engine.CommitId flush = readOnlyEngine.flush(randomBoolean(), true); - assertEquals(flush, readOnlyEngine.flush(randomBoolean(), true)); - } finally { - IOUtils.close(readOnlyEngine); - } - } - } - public void testEnsureMaxSeqNoIsEqualToGlobalCheckpoint() throws IOException { IOUtils.close(engine, store); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); @@ -263,7 +219,6 @@ public void testReadOnly() throws IOException { expectThrows(expectedException, () -> readOnlyEngine.index(null)); expectThrows(expectedException, () -> readOnlyEngine.delete(null)); expectThrows(expectedException, () -> readOnlyEngine.noOp(null)); - expectThrows(UnsupportedOperationException.class, () -> readOnlyEngine.syncFlush(null, null)); } } } diff --git a/test/framework/src/main/java/org/opensearch/test/rest/OpenSearchRestTestCase.java b/test/framework/src/main/java/org/opensearch/test/rest/OpenSearchRestTestCase.java index 742079697f2f8..79980ccc39272 100644 --- a/test/framework/src/main/java/org/opensearch/test/rest/OpenSearchRestTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/rest/OpenSearchRestTestCase.java @@ -1270,46 +1270,38 @@ protected static Version minimumNodeVersion() throws IOException { } protected void syncedFlush(String indexName, boolean retryOnConflict) throws Exception { + final Request request = new Request("POST", indexName + "/_flush/synced"); + final Builder options = RequestOptions.DEFAULT.toBuilder(); // 8.0 kept in warning message for legacy purposes TODO: changge to 3.0 - final List deprecationMessages = Arrays.asList( + final List warningMessage = Arrays.asList( "Synced flush is deprecated and will be removed in 8.0. Use flush at _/flush or /{index}/_flush instead." ); - final List transitionMessages = Arrays.asList( + final List expectedWarnings = Arrays.asList( "Synced flush was removed and a normal flush was performed instead. This transition will be removed in a future version." ); - final WarningsHandler warningsHandler; - if (minimumNodeVersion().onOrAfter(Version.V_2_0_0)) { - warningsHandler = warnings -> warnings.equals(transitionMessages) == false; - } else if (minimumNodeVersion().onOrAfter(LegacyESVersion.V_7_6_0)) { - warningsHandler = warnings -> warnings.equals(deprecationMessages) == false && warnings.equals(transitionMessages) == false; - } else if (nodeVersions.stream().anyMatch(n -> n.onOrAfter(Version.V_2_0_0))) { - warningsHandler = warnings -> warnings.isEmpty() == false && warnings.equals(transitionMessages) == false; - } else { - warningsHandler = warnings -> warnings.isEmpty() == false; + if (nodeVersions.stream().allMatch(version -> version.onOrAfter(Version.V_2_0_0))) { + options.setWarningsHandler(warnings -> warnings.isEmpty() == false && warnings.equals(expectedWarnings) == false); + } else if (nodeVersions.stream().anyMatch(version -> version.onOrAfter(LegacyESVersion.V_7_6_0))) { + options.setWarningsHandler( + warnings -> warnings.isEmpty() == false + && warnings.equals(expectedWarnings) == false + && warnings.equals(warningMessage) == false + ); } + request.setOptions(options); // We have to spin synced-flush requests here because we fire the global checkpoint sync for the last write operation. // A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit. assertBusy(() -> { try { + Response resp = client().performRequest(request); if (retryOnConflict) { - if (nodeVersions.stream().allMatch(v -> v.before(Version.V_2_0_0))) { - final Request request = new Request("POST", indexName + "/_flush/synced"); - Builder optionsBuilder = RequestOptions.DEFAULT.toBuilder(); - optionsBuilder.setWarningsHandler(warningsHandler); - request.setOptions(optionsBuilder); - Response resp = client().performRequest(request); - Map result = ObjectPath.createFromResponse(resp).evaluate("_shards"); - assertThat(result.get("failed"), equalTo(0)); - } + Map result = ObjectPath.createFromResponse(resp).evaluate("_shards"); + assertThat(result.get("failed"), equalTo(0)); } } catch (ResponseException ex) { + assertThat(ex.getResponse().getStatusLine(), equalTo(HttpStatus.SC_CONFLICT)); if (retryOnConflict) { - if (ex.getResponse().getStatusLine().getStatusCode() == RestStatus.CONFLICT.getStatus() - && ex.getResponse().getWarnings().equals(transitionMessages)) { - logger.info("a normal flush was performed instead"); - } else { - throw new AssertionError(ex); // cause assert busy to retry - } + throw new AssertionError(ex); // cause assert busy to retry } } });