diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index e3ca599662bee..70479c830af5d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -44,6 +44,8 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; import org.opensearch.common.settings.Settings; @@ -60,6 +62,7 @@ import org.opensearch.index.shard.IndexShard; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.recovery.FileChunkRequest; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.search.SearchService; import org.opensearch.search.builder.PointInTimeBuilder; @@ -984,8 +987,11 @@ public void testScrollCreatedOnReplica() throws Exception { ) ); final IndexShard replicaShard = getIndexShard(replica, INDEX_NAME); - final SegmentInfos segmentInfos = replicaShard.getLatestSegmentInfosAndCheckpoint().v1().get(); - final Collection snapshottedSegments = segmentInfos.files(false); + final Tuple, ReplicationCheckpoint> tuple = replicaShard.getLatestSegmentInfosAndCheckpoint(); + final Collection snapshottedSegments; + try (final GatedCloseable closeable = tuple.v1()) { + snapshottedSegments = closeable.get().files(false); + } // opens a scrolled query before a flush is called. // this is for testing scroll segment consistency between refresh and flush SearchResponse searchResponse = client(replica).prepareSearch() diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 8f000dffd0ebd..758bb86e48de8 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -35,6 +35,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Objects; @@ -516,6 +517,20 @@ protected SegmentInfos getLatestSegmentInfos() { return readerManager.getSegmentInfos(); } + @Override + public synchronized GatedCloseable getSegmentInfosSnapshot() { + // get reference to latest infos + final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); + // incref all files + try { + final Collection files = latestSegmentInfos.files(false); + store.incRefFileDeleter(files); + return new GatedCloseable<>(latestSegmentInfos, () -> store.decRefFileDeleter(files)); + } catch (IOException e) { + throw new EngineException(shardId, e.getMessage(), e); + } + } + protected LocalCheckpointTracker getLocalCheckpointTracker() { return localCheckpointTracker; } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index b420651836747..57bf447312cd2 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -679,8 +679,19 @@ public void updateShardState( // this Shard's engine was read only, we need to update its engine before restoring local history from xlog. assert newRouting.primary() && currentRouting.primary() == false; resetEngineToGlobalCheckpoint(); + // It is possible an engine can open with a SegmentInfos on a higher gen but the reader does not refresh to + // trigger our refresh listener. + // Force update the checkpoint post engine reset. + updateReplicationCheckpoint(); } + replicationTracker.activatePrimaryMode(getLocalCheckpoint()); + if (indexSettings.isSegRepEnabled()) { + // force publish a checkpoint once in primary mode so that replicas not caught up to previous primary + // are brought up to date. + checkpointPublisher.publish(this, getLatestReplicationCheckpoint()); + } + ensurePeerRecoveryRetentionLeasesExist(); /* * If this shard was serving as a replica shard when another shard was promoted to primary then @@ -1556,15 +1567,7 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti * @return EMPTY checkpoint before the engine is opened and null for non-segrep enabled indices */ public ReplicationCheckpoint getLatestReplicationCheckpoint() { - final Tuple, ReplicationCheckpoint> infosAndCheckpoint = getLatestSegmentInfosAndCheckpoint(); - if (infosAndCheckpoint == null) { - return null; - } - try (final GatedCloseable ignored = infosAndCheckpoint.v1()) { - return infosAndCheckpoint.v2(); - } catch (IOException e) { - throw new OpenSearchException("Error Closing SegmentInfos Snapshot", e); - } + return replicationTracker.getLatestReplicationCheckpoint(); } /** @@ -1578,13 +1581,11 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() { * */ public Tuple, ReplicationCheckpoint> getLatestSegmentInfosAndCheckpoint() { - if (indexSettings.isSegRepEnabled() == false) { - return null; - } + assert indexSettings.isSegRepEnabled(); Tuple, ReplicationCheckpoint> nullSegmentInfosEmptyCheckpoint = new Tuple<>( new GatedCloseable<>(null, () -> {}), - ReplicationCheckpoint.empty(shardId, getDefaultCodecName()) + getLatestReplicationCheckpoint() ); if (getEngineOrNull() == null) { @@ -1603,11 +1604,7 @@ public Tuple, ReplicationCheckpoint> getLatestSegme getOperationPrimaryTerm(), segmentInfos.getGeneration(), segmentInfos.getVersion(), - // TODO: Update replicas to compute length from SegmentInfos. Replicas do not yet incref segments with - // getSegmentInfosSnapshot, so computing length from SegmentInfos can cause issues. - shardRouting.primary() - ? store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum() - : store.stats(StoreStats.UNKNOWN_RESERVED_BYTES).getSizeInBytes(), + store.getSegmentMetadataMap(segmentInfos).values().stream().mapToLong(StoreFileMetadata::length).sum(), getEngine().config().getCodec().getName() ) ); @@ -1863,10 +1860,6 @@ public void resetToWriteableEngine() throws IOException, InterruptedException, T indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); }); } - public void onCheckpointPublished(ReplicationCheckpoint checkpoint) { - replicationTracker.setLatestReplicationCheckpoint(checkpoint); - } - /** * Wrapper for a non-closing reader * @@ -2347,6 +2340,11 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, b final Engine newEngine = engineFactory.newReadWriteEngine(config); onNewEngine(newEngine); currentEngineReference.set(newEngine); + + if (indexSettings.isSegRepEnabled()) { + // set initial replication checkpoints into tracker. + updateReplicationCheckpoint(); + } // We set active because we are now writing operations to the engine; this way, // we can flush if we go idle after some time and become inactive. active.set(true); @@ -3678,6 +3676,9 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro internalRefreshListener.clear(); internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric)); + if (indexSettings.isSegRepEnabled()) { + internalRefreshListener.add(new ReplicationCheckpointUpdater()); + } if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) { internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher)); } @@ -4482,6 +4483,30 @@ public void afterRefresh(boolean didRefresh) throws IOException { } } + /** + * Refresh listener to update the Shard's ReplicationCheckpoint post refresh. + */ + private class ReplicationCheckpointUpdater implements ReferenceManager.RefreshListener { + @Override + public void beforeRefresh() throws IOException {} + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + if (didRefresh) { + updateReplicationCheckpoint(); + } + } + } + + private void updateReplicationCheckpoint() { + final Tuple, ReplicationCheckpoint> tuple = getLatestSegmentInfosAndCheckpoint(); + try (final GatedCloseable ignored = tuple.v1()) { + replicationTracker.setLatestReplicationCheckpoint(tuple.v2()); + } catch (IOException e) { + throw new OpenSearchException("Error Closing SegmentInfos Snapshot", e); + } + } + private EngineConfig.TombstoneDocSupplier tombstoneDocSupplier() { final RootObjectMapper.Builder noopRootMapper = new RootObjectMapper.Builder("__noop"); final DocumentMapper noopDocumentMapper = mapperService != null diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 41b975e532595..98910e83c608e 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -185,7 +185,6 @@ private synchronized boolean syncSegments() { return true; } ReplicationCheckpoint checkpoint = indexShard.getLatestReplicationCheckpoint(); - indexShard.onCheckpointPublished(checkpoint); beforeSegmentsSync(); long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs(); long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo(); @@ -207,6 +206,10 @@ private synchronized boolean syncSegments() { try (GatedCloseable segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); + assert segmentInfos.getGeneration() == checkpoint.getSegmentsGen() : "SegmentInfos generation: " + + segmentInfos.getGeneration() + + " does not match metadata generation: " + + checkpoint.getSegmentsGen(); // Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can // move. long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint(); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index c22701dfc94ce..3a84163bb979d 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -51,9 +51,14 @@ public class SegmentReplicationTarget extends ReplicationTarget { public final static String REPLICATION_PREFIX = "replication."; - public SegmentReplicationTarget(IndexShard indexShard, SegmentReplicationSource source, ReplicationListener listener) { + public SegmentReplicationTarget( + IndexShard indexShard, + ReplicationCheckpoint checkpoint, + SegmentReplicationSource source, + ReplicationListener listener + ) { super("replication_target", indexShard, new ReplicationLuceneIndex(), listener); - this.checkpoint = indexShard.getLatestReplicationCheckpoint(); + this.checkpoint = checkpoint; this.source = source; this.state = new SegmentReplicationState( indexShard.routingEntry(), @@ -90,12 +95,19 @@ public SegmentReplicationState state() { } public SegmentReplicationTarget retryCopy() { - return new SegmentReplicationTarget(indexShard, source, listener); + return new SegmentReplicationTarget(indexShard, checkpoint, source, listener); } @Override public String description() { - return String.format(Locale.ROOT, "Id:[%d] Shard:[%s] Source:[%s]", getId(), shardId(), source.getDescription()); + return String.format( + Locale.ROOT, + "Id:[%d] Checkpoint [%s] Shard:[%s] Source:[%s]", + getId(), + getCheckpoint(), + shardId(), + source.getDescription() + ); } @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index b41c9e09add45..84d6a722e572e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -234,7 +234,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe logger.trace( () -> new ParameterizedMessage( "Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}", - replicaShard.getLatestReplicationCheckpoint() + ongoingReplicationTarget.getCheckpoint() ) ); return; @@ -242,7 +242,7 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe } final Thread thread = Thread.currentThread(); if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) { - startReplication(replicaShard, new SegmentReplicationListener() { + startReplication(replicaShard, receivedCheckpoint, new SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { logger.trace( @@ -280,6 +280,8 @@ public void onReplicationFailure( ); if (sendShardFailure == true) { failShard(e, replicaShard); + } else { + processLatestReceivedCheckpoint(replicaShard, thread); } } }); @@ -396,8 +398,24 @@ protected void updateLatestReceivedCheckpoint(ReplicationCheckpoint receivedChec } } - public SegmentReplicationTarget startReplication(final IndexShard indexShard, final SegmentReplicationListener listener) { - final SegmentReplicationTarget target = new SegmentReplicationTarget(indexShard, sourceFactory.get(indexShard), listener); + /** + * Start a round of replication and sync to at least the given checkpoint. + * @param indexShard - {@link IndexShard} replica shard + * @param checkpoint - {@link ReplicationCheckpoint} checkpoint to sync to + * @param listener - {@link ReplicationListener} + * @return {@link SegmentReplicationTarget} target event orchestrating the event. + */ + public SegmentReplicationTarget startReplication( + final IndexShard indexShard, + final ReplicationCheckpoint checkpoint, + final SegmentReplicationListener listener + ) { + final SegmentReplicationTarget target = new SegmentReplicationTarget( + indexShard, + checkpoint, + sourceFactory.get(indexShard), + listener + ); startReplication(target); return target; } @@ -529,50 +547,59 @@ private void forceReplication(ForceSyncRequest request, ActionListener new ParameterizedMessage( + "[shardId {}] [replication id {}] Force replication Sync complete to {}, timing data: {}", + shardId, + state.getReplicationId(), + indexShard.getLatestReplicationCheckpoint(), + state.getTimingData() + ) + ); + // Promote engine type for primary target + if (indexShard.recoveryState().getPrimary() == true) { + indexShard.resetToWriteableEngine(); + } else { + // Update the replica's checkpoint on primary's replication tracker. + updateVisibleCheckpoint(state.getReplicationId(), indexShard); + } + listener.onResponse(TransportResponse.Empty.INSTANCE); + } catch (Exception e) { + logger.error("Error while marking replication completed", e); + listener.onFailure(e); + } + } + + @Override + public void onReplicationFailure( + SegmentReplicationState state, + ReplicationFailedException e, + boolean sendShardFailure + ) { + logger.error( () -> new ParameterizedMessage( - "[shardId {}] [replication id {}] Force replication Sync complete to {}, timing data: {}", - shardId, + "[shardId {}] [replication id {}] Force replication Sync failed, timing data: {}", + indexShard.shardId().getId(), state.getReplicationId(), - indexShard.getLatestReplicationCheckpoint(), state.getTimingData() - ) + ), + e ); - // Promote engine type for primary target - if (indexShard.recoveryState().getPrimary() == true) { - indexShard.resetToWriteableEngine(); - } else { - // Update the replica's checkpoint on primary's replication tracker. - updateVisibleCheckpoint(state.getReplicationId(), indexShard); + if (sendShardFailure) { + failShard(e, indexShard); } - listener.onResponse(TransportResponse.Empty.INSTANCE); - } catch (Exception e) { - logger.error("Error while marking replication completed", e); listener.onFailure(e); } } - - @Override - public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { - logger.error( - () -> new ParameterizedMessage( - "[shardId {}] [replication id {}] Replication failed, timing data: {}", - indexShard.shardId().getId(), - state.getReplicationId(), - state.getTimingData() - ), - e - ); - if (sendShardFailure) { - failShard(e, indexShard); - } - listener.onFailure(e); - } - }); + ); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java index b4bcdc92e539a..f5cb32b741862 100644 --- a/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java +++ b/server/src/main/java/org/opensearch/indices/replication/checkpoint/SegmentReplicationCheckpointPublisher.java @@ -34,7 +34,6 @@ public SegmentReplicationCheckpointPublisher(PublishAction publishAction) { public void publish(IndexShard indexShard, ReplicationCheckpoint checkpoint) { publishAction.publish(indexShard, checkpoint); - indexShard.onCheckpointPublished(checkpoint); } /** diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 327c8d16e1a21..8f8a7627476a7 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -12,7 +12,9 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.search.ReferenceManager; +import org.apache.lucene.store.IOContext; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.UUIDs; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.index.OpenSearchDirectoryReader; @@ -28,6 +30,8 @@ import java.io.IOException; import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -364,4 +368,72 @@ private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException { return buildNrtReplicaEngine(globalCheckpoint, store, defaultSettings); } + + public void testGetSegmentInfosSnapshotPreservesFilesUntilRelease() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + + // TODO: Remove this divergent commit logic and copy Segments_N from primary with node-node. + // randomly toggle commit / no commit. + IndexSettings settings = REMOTE_STORE_INDEX_SETTINGS; + final boolean shouldCommit = randomBoolean(); + if (shouldCommit) { + settings = INDEX_SETTINGS; + } + try ( + final Store nrtEngineStore = createStore(REMOTE_STORE_INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore, settings) + ) { + // only index 2 docs here, this will create segments _0 and _1 and after forcemerge into _2. + final int docCount = 2; + List operations = generateHistoryOnReplica(docCount, randomBoolean(), randomBoolean(), randomBoolean()); + for (Engine.Operation op : operations) { + applyOperation(engine, op); + applyOperation(nrtEngine, op); + // refresh to create a lot of segments. + engine.refresh("test"); + } + assertEquals(2, engine.segmentsStats(false, false).getCount()); + // wipe the nrt directory initially so we can sync with primary. + Lucene.cleanLuceneIndex(nrtEngineStore.directory()); + assertFalse( + Arrays.stream(nrtEngineStore.directory().listAll()) + .anyMatch(file -> file.equals("write.lock") == false && file.equals("extra0") == false) + ); + for (String file : engine.getLatestSegmentInfos().files(true)) { + nrtEngineStore.directory().copyFrom(store.directory(), file, file, IOContext.DEFAULT); + } + nrtEngine.updateSegments(engine.getLatestSegmentInfos()); + assertEquals(engine.getLatestSegmentInfos(), nrtEngine.getLatestSegmentInfos()); + final GatedCloseable snapshot = nrtEngine.getSegmentInfosSnapshot(); + final Collection replicaSnapshotFiles = snapshot.get().files(false); + List replicaFiles = List.of(nrtEngine.store.directory().listAll()); + + // merge primary down to 1 segment + engine.forceMerge(true, 1, false, false, false, UUIDs.randomBase64UUID()); + // we expect a 3rd segment to be created after merge. + assertEquals(3, engine.segmentsStats(false, false).getCount()); + final Collection latestPrimaryFiles = engine.getLatestSegmentInfos().files(false); + + // copy new segments in and load reader. + for (String file : latestPrimaryFiles) { + if (replicaFiles.contains(file) == false) { + nrtEngineStore.directory().copyFrom(store.directory(), file, file, IOContext.DEFAULT); + } + } + nrtEngine.updateSegments(engine.getLatestSegmentInfos()); + + replicaFiles = List.of(nrtEngine.store.directory().listAll()); + assertTrue(replicaFiles.containsAll(replicaSnapshotFiles)); + + // close snapshot, files should be cleaned up + snapshot.close(); + + replicaFiles = List.of(nrtEngine.store.directory().listAll()); + assertFalse(replicaFiles.containsAll(replicaSnapshotFiles)); + + // Ensure we still have all the active files. Note - we exclude the infos file here if we aren't committing + // the nrt reader will still reference segments_n-1 after being loaded until a local commit occurs. + assertTrue(replicaFiles.containsAll(nrtEngine.getLatestSegmentInfos().files(shouldCommit))); + } + } } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 3d20fd7cb99d0..097d001bc3012 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -3690,7 +3690,7 @@ public void testReadSnapshotConcurrently() throws IOException, InterruptedExcept */ public void testCheckpointRefreshListener() throws IOException { final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class); - IndexShard shard = newStartedShard(p -> newShard(mock), true); + IndexShard shard = newStartedShard(p -> newShard(true, mock), true); List refreshListeners = shard.getEngine().config().getInternalRefreshListener(); assertTrue(refreshListeners.stream().anyMatch(e -> e instanceof CheckpointRefreshListener)); closeShards(shard); @@ -3700,58 +3700,13 @@ public void testCheckpointRefreshListener() throws IOException { * here we are passing null in place of SegmentReplicationCheckpointPublisher and testing on index shard if CheckpointRefreshListener is not added to the InternalrefreshListerners List */ public void testCheckpointRefreshListenerWithNull() throws IOException { - IndexShard shard = newStartedShard(p -> newShard(null), true); + final SegmentReplicationCheckpointPublisher publisher = null; + IndexShard shard = newStartedShard(p -> newShard(true, publisher), true); List refreshListeners = shard.getEngine().config().getInternalRefreshListener(); assertFalse(refreshListeners.stream().anyMatch(e -> e instanceof CheckpointRefreshListener)); closeShards(shard); } - /** - * creates a new initializing shard. The shard will be put in its proper path under the - * current node id the shard is assigned to. - * @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint - */ - private IndexShard newShard(SegmentReplicationCheckpointPublisher checkpointPublisher) throws IOException { - final ShardId shardId = new ShardId("index", "_na_", 0); - final ShardRouting shardRouting = TestShardRouting.newShardRouting( - shardId, - randomAlphaOfLength(10), - true, - ShardRoutingState.INITIALIZING, - RecoverySource.EmptyStoreRecoverySource.INSTANCE - ); - final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir()); - ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); - - Settings indexSettings = Settings.builder() - .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT") - .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), between(0, 1000)) - .put(Settings.EMPTY) - .build(); - IndexMetadata metadata = IndexMetadata.builder(shardRouting.getIndexName()) - .settings(indexSettings) - .primaryTerm(0, primaryTerm) - .putMapping("{ \"properties\": {} }") - .build(); - return newShard( - shardRouting, - shardPath, - metadata, - null, - null, - new InternalEngineFactory(), - new EngineConfigFactory(new IndexSettings(metadata, metadata.getSettings())), - () -> {}, - RetentionLeaseSyncer.EMPTY, - EMPTY_EVENT_LISTENER, - checkpointPublisher, - null - ); - } - public void testIndexCheckOnStartup() throws Exception { final IndexShard indexShard = newStartedShard(true); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 12b7341349442..57602d96745f6 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -19,6 +19,7 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingHelper; import org.opensearch.common.collect.Tuple; @@ -63,6 +64,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -422,7 +424,70 @@ public void testShardIdleWithNoReplicas() throws Exception { /** * here we are starting a new primary shard in PrimaryMode and testing if the shard publishes checkpoint after refresh. */ - public void testPublishCheckpointOnPrimaryMode() throws IOException { + public void testPublishCheckpointOnPrimaryMode() throws IOException, InterruptedException { + final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class); + IndexShard shard = newStartedShard(p -> newShard(false, mock, settings), false); + + final ShardRouting shardRouting = shard.routingEntry(); + promoteReplica( + shard, + Collections.singleton(shardRouting.allocationId().getId()), + new IndexShardRoutingTable.Builder(shardRouting.shardId()).addShard(shardRouting).build() + ); + + final CountDownLatch latch = new CountDownLatch(1); + shard.acquirePrimaryOperationPermit(new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + releasable.close(); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }, ThreadPool.Names.GENERIC, ""); + + latch.await(); + // verify checkpoint is published + verify(mock, times(1)).publish(any(), any()); + closeShards(shard); + } + + public void testPublishCheckpointOnPrimaryMode_segrep_off() throws IOException, InterruptedException { + final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class); + final Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build(); + IndexShard shard = newStartedShard(p -> newShard(false, mock, settings), false); + + final ShardRouting shardRouting = shard.routingEntry(); + promoteReplica( + shard, + Collections.singleton(shardRouting.allocationId().getId()), + new IndexShardRoutingTable.Builder(shardRouting.shardId()).addShard(shardRouting).build() + ); + + final CountDownLatch latch = new CountDownLatch(1); + shard.acquirePrimaryOperationPermit(new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + releasable.close(); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }, ThreadPool.Names.GENERIC, ""); + + latch.await(); + // verify checkpoint is published + verify(mock, times(0)).publish(any(), any()); + closeShards(shard); + } + + public void testPublishCheckpointPostFailover() throws IOException { final SegmentReplicationCheckpointPublisher mock = mock(SegmentReplicationCheckpointPublisher.class); IndexShard shard = newStartedShard(true); CheckpointRefreshListener refreshListener = new CheckpointRefreshListener(shard, mock); @@ -481,7 +546,7 @@ public void testRejectCheckpointOnShardRoutingPrimary() throws IOException { spy.onNewCheckpoint(new ReplicationCheckpoint(primaryShard.shardId(), 0L, 0L, 0L, Codec.getDefault().getName()), spyShard); // Verify that checkpoint is not processed as shard routing is primary. - verify(spy, times(0)).startReplication(any(), any()); + verify(spy, times(0)).startReplication(any(), any(), any()); closeShards(primaryShard); } @@ -655,7 +720,7 @@ public void cancel() { } }; when(sourceFactory.get(any())).thenReturn(source); - startReplicationAndAssertCancellation(replica, targetService); + startReplicationAndAssertCancellation(replica, primary, targetService); shards.removeReplica(replica); closeShards(replica); @@ -700,11 +765,15 @@ protected void resolveCheckpointInfoResponseListener(ActionListener() { @@ -422,7 +422,11 @@ public void testTemporaryFilesNotCleanup() throws Exception { runnablePostGetFiles ); when(sourceFactory.get(any())).thenReturn(segmentReplicationSource); - targetService.startReplication(replica, getTargetListener(primaryShard, replica, primaryMetadata, countDownLatch)); + targetService.startReplication( + replica, + primaryShard.getLatestReplicationCheckpoint(), + getTargetListener(primaryShard, replica, primaryMetadata, countDownLatch) + ); countDownLatch.await(30, TimeUnit.SECONDS); assertEquals("Replication failed", 0, countDownLatch.getCount()); shards.assertAllEqual(numDocs); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index a455d862b44b5..d7f49dbe6c0e2 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -84,6 +84,7 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { private IndicesService indicesService; private SegmentReplicationState state; + private ReplicationCheckpoint initialCheckpoint; private static final long TRANSPORT_TIMEOUT = 30000;// 30sec @@ -134,7 +135,7 @@ public void setUp() throws Exception { when(clusterState.nodes()).thenReturn(DiscoveryNodes.builder().add(localNode).build()); sut = prepareForReplication(primaryShard, replicaShard, transportService, indicesService, clusterService); - ReplicationCheckpoint initialCheckpoint = replicaShard.getLatestReplicationCheckpoint(); + initialCheckpoint = primaryShard.getLatestReplicationCheckpoint(); aheadCheckpoint = new ReplicationCheckpoint( initialCheckpoint.getShardId(), initialCheckpoint.getPrimaryTerm(), @@ -169,19 +170,23 @@ public void tearDown() throws Exception { public void testsSuccessfulReplication_listenerCompletes() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - sut.startReplication(replicaShard, new SegmentReplicationTargetService.SegmentReplicationListener() { - @Override - public void onReplicationDone(SegmentReplicationState state) { - assertEquals(SegmentReplicationState.Stage.DONE, state.getStage()); - latch.countDown(); - } + sut.startReplication( + replicaShard, + primaryShard.getLatestReplicationCheckpoint(), + new SegmentReplicationTargetService.SegmentReplicationListener() { + @Override + public void onReplicationDone(SegmentReplicationState state) { + assertEquals(SegmentReplicationState.Stage.DONE, state.getStage()); + latch.countDown(); + } - @Override - public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { - logger.error("Unexpected error", e); - Assert.fail("Test should succeed"); + @Override + public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { + logger.error("Unexpected error", e); + Assert.fail("Test should succeed"); + } } - }); + ); latch.await(2, TimeUnit.SECONDS); assertEquals(0, latch.getCount()); } @@ -213,6 +218,7 @@ public void getSegmentFiles( }; final SegmentReplicationTarget target = new SegmentReplicationTarget( replicaShard, + primaryShard.getLatestReplicationCheckpoint(), source, new SegmentReplicationTargetService.SegmentReplicationListener() { @Override @@ -237,7 +243,7 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile public void testAlreadyOnNewCheckpoint() { SegmentReplicationTargetService spy = spy(sut); spy.onNewCheckpoint(replicaShard.getLatestReplicationCheckpoint(), replicaShard); - verify(spy, times(0)).startReplication(any(), any()); + verify(spy, times(0)).startReplication(any(), any(), any()); } public void testShardAlreadyReplicating() { @@ -275,24 +281,22 @@ public void getSegmentFiles( } }; final SegmentReplicationTarget target = spy( - new SegmentReplicationTarget(replicaShard, source, mock(SegmentReplicationTargetService.SegmentReplicationListener.class)) + new SegmentReplicationTarget( + replicaShard, + primaryShard.getLatestReplicationCheckpoint(), + source, + mock(SegmentReplicationTargetService.SegmentReplicationListener.class) + ) ); + + final SegmentReplicationTargetService spy = spy(sut); + doReturn(false).when(spy).processLatestReceivedCheckpoint(eq(replicaShard), any()); // Start first round of segment replication. - sut.startReplication(target); + spy.startReplication(target); // Start second round of segment replication, this should fail to start as first round is still in-progress - sut.startReplication(replicaShard, new SegmentReplicationTargetService.SegmentReplicationListener() { - @Override - public void onReplicationDone(SegmentReplicationState state) { - Assert.fail("Should not succeed"); - } - - @Override - public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { - assertEquals("Shard " + replicaShard.shardId() + " is already replicating", e.getMessage()); - assertFalse(sendShardFailure); - } - }); + spy.onNewCheckpoint(newPrimaryCheckpoint, replicaShard); + verify(spy, times(1)).processLatestReceivedCheckpoint(eq(replicaShard), any()); blockGetCheckpointMetadata.countDown(); } @@ -341,8 +345,21 @@ public void cancel() { } }; + final ReplicationCheckpoint updatedCheckpoint = new ReplicationCheckpoint( + initialCheckpoint.getShardId(), + initialCheckpoint.getPrimaryTerm(), + initialCheckpoint.getSegmentsGen(), + initialCheckpoint.getSegmentInfosVersion() + 1, + primaryShard.getDefaultCodecName() + ); + final SegmentReplicationTarget targetSpy = spy( - new SegmentReplicationTarget(replicaShard, source, mock(SegmentReplicationTargetService.SegmentReplicationListener.class)) + new SegmentReplicationTarget( + replicaShard, + updatedCheckpoint, + source, + mock(SegmentReplicationTargetService.SegmentReplicationListener.class) + ) ); // start replication. This adds the target to on-ongoing replication collection @@ -356,20 +373,20 @@ public void cancel() { // ensure the old target is cancelled. and new iteration kicks off. verify(targetSpy, times(1)).cancel("Cancelling stuck target after new primary"); - verify(serviceSpy, times(1)).startReplication(eq(replicaShard), any()); + verify(serviceSpy, times(1)).startReplication(eq(replicaShard), any(), any()); } public void testNewCheckpointBehindCurrentCheckpoint() { SegmentReplicationTargetService spy = spy(sut); spy.onNewCheckpoint(checkpoint, replicaShard); - verify(spy, times(0)).startReplication(any(), any()); + verify(spy, times(0)).startReplication(any(), any(), any()); } public void testShardNotStarted() throws IOException { SegmentReplicationTargetService spy = spy(sut); IndexShard shard = newShard(false); spy.onNewCheckpoint(checkpoint, shard); - verify(spy, times(0)).startReplication(any(), any()); + verify(spy, times(0)).startReplication(any(), any(), any()); closeShards(shard); } @@ -385,7 +402,7 @@ public void testRejectCheckpointOnShardPrimaryMode() throws IOException { spy.onNewCheckpoint(aheadCheckpoint, spyShard); // Verify that checkpoint is not processed as shard is in PrimaryMode. - verify(spy, times(0)).startReplication(any(), any()); + verify(spy, times(0)).startReplication(any(), any(), any()); closeShards(primaryShard); } @@ -410,10 +427,10 @@ public void testStartReplicationListenerSuccess() throws InterruptedException { SegmentReplicationTargetService spy = spy(sut); CountDownLatch latch = new CountDownLatch(1); doAnswer(i -> { - ((SegmentReplicationTargetService.SegmentReplicationListener) i.getArgument(1)).onReplicationDone(state); + ((SegmentReplicationTargetService.SegmentReplicationListener) i.getArgument(2)).onReplicationDone(state); latch.countDown(); return null; - }).when(spy).startReplication(any(), any()); + }).when(spy).startReplication(any(), any(), any()); doNothing().when(spy).updateVisibleCheckpoint(eq(0L), any()); spy.afterIndexShardStarted(replicaShard); @@ -426,14 +443,14 @@ public void testStartReplicationListenerFailure() throws InterruptedException { SegmentReplicationTargetService spy = spy(sut); CountDownLatch latch = new CountDownLatch(1); doAnswer(i -> { - ((SegmentReplicationTargetService.SegmentReplicationListener) i.getArgument(1)).onReplicationFailure( + ((SegmentReplicationTargetService.SegmentReplicationListener) i.getArgument(2)).onReplicationFailure( state, new ReplicationFailedException(replicaShard, null), false ); latch.countDown(); return null; - }).when(spy).startReplication(any(), any()); + }).when(spy).startReplication(any(), any(), any()); doNothing().when(spy).updateVisibleCheckpoint(eq(0L), any()); spy.afterIndexShardStarted(replicaShard); @@ -574,6 +591,7 @@ public void testForceSegmentSyncHandlerWithFailure_AlreadyClosedException_swallo public void testTargetCancelledBeforeStartInvoked() { final SegmentReplicationTarget target = new SegmentReplicationTarget( replicaShard, + primaryShard.getLatestReplicationCheckpoint(), mock(SegmentReplicationSource.class), new SegmentReplicationTargetService.SegmentReplicationListener() { @Override diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 548c1f57cab34..6866ab6e86358 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -141,7 +141,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, repCheckpoint, segrepSource, segRepListener); segrepTarget.startReplication(new ActionListener() { @Override @@ -189,7 +189,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, repCheckpoint, segrepSource, segRepListener); segrepTarget.startReplication(new ActionListener() { @Override @@ -232,7 +232,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, repCheckpoint, segrepSource, segRepListener); segrepTarget.startReplication(new ActionListener() { @Override @@ -275,7 +275,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, repCheckpoint, segrepSource, segRepListener); doThrow(exception).when(spyIndexShard).finalizeReplication(any()); @@ -320,7 +320,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, repCheckpoint, segrepSource, segRepListener); doThrow(exception).when(spyIndexShard).finalizeReplication(any()); @@ -364,7 +364,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, repCheckpoint, segrepSource, segRepListener); when(spyIndexShard.getSegmentMetadataMap()).thenReturn(SI_SNAPSHOT_DIFFERENT); segrepTarget.startReplication(new ActionListener() { @Override @@ -417,7 +417,7 @@ public void getSegmentFiles( SegmentReplicationTargetService.SegmentReplicationListener.class ); - segrepTarget = new SegmentReplicationTarget(spyIndexShard, segrepSource, segRepListener); + segrepTarget = new SegmentReplicationTarget(spyIndexShard, repCheckpoint, segrepSource, segRepListener); when(spyIndexShard.getSegmentMetadataMap()).thenReturn(storeMetadataSnapshots.get(0).asMap()); segrepTarget.startReplication(new ActionListener() { @Override diff --git a/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java b/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java index 776173f73ce5c..9c38c5848e297 100644 --- a/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java +++ b/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java @@ -120,11 +120,13 @@ public void testStartMultipleReplicationsForSingleShard() throws Exception { shards.recoverReplica(shard); final SegmentReplicationTarget target1 = new SegmentReplicationTarget( shard, + shards.getPrimary().getLatestReplicationCheckpoint(), mock(SegmentReplicationSource.class), mock(ReplicationListener.class) ); final SegmentReplicationTarget target2 = new SegmentReplicationTarget( shard, + shards.getPrimary().getLatestReplicationCheckpoint(), mock(SegmentReplicationSource.class), mock(ReplicationListener.class) ); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 66e5459cfea3b..9dc114eb923d3 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -91,6 +91,7 @@ import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.engine.EngineTestCase; import org.opensearch.index.engine.InternalEngineFactory; +import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.SourceToParse; import org.opensearch.index.remote.RemoteRefreshSegmentPressureService; @@ -138,6 +139,7 @@ import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; @@ -526,6 +528,58 @@ protected IndexShard newShard( ); } + protected IndexShard newShard(boolean primary, SegmentReplicationCheckpointPublisher checkpointPublisher) throws IOException { + final Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); + return newShard(primary, checkpointPublisher, settings); + } + + /** + * creates a new initializing shard. The shard will be put in its proper path under the + * current node id the shard is assigned to. + * @param checkpointPublisher Segment Replication Checkpoint Publisher to publish checkpoint + */ + protected IndexShard newShard(boolean primary, SegmentReplicationCheckpointPublisher checkpointPublisher, Settings settings) + throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 0); + final ShardRouting shardRouting = TestShardRouting.newShardRouting( + shardId, + randomAlphaOfLength(10), + primary, + ShardRoutingState.INITIALIZING, + primary ? RecoverySource.EmptyStoreRecoverySource.INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE + ); + final NodeEnvironment.NodePath nodePath = new NodeEnvironment.NodePath(createTempDir()); + ShardPath shardPath = new ShardPath(false, nodePath.resolve(shardId), nodePath.resolve(shardId), shardId); + + Settings indexSettings = Settings.builder() + .put(settings) + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), between(0, 1000)) + .put(Settings.EMPTY) + .build(); + IndexMetadata metadata = IndexMetadata.builder(shardRouting.getIndexName()) + .settings(indexSettings) + .primaryTerm(0, primaryTerm) + .putMapping("{ \"properties\": {} }") + .build(); + return newShard( + shardRouting, + shardPath, + metadata, + null, + null, + new NRTReplicationEngineFactory(), + new EngineConfigFactory(new IndexSettings(metadata, metadata.getSettings())), + () -> {}, + RetentionLeaseSyncer.EMPTY, + EMPTY_EVENT_LISTENER, + checkpointPublisher, + null + ); + } + /** * creates a new initializing shard. * @param routing shard routing to use @@ -1483,10 +1537,7 @@ public void getCheckpointMetadata( ActionListener listener ) { try { - final CopyState copyState = new CopyState( - ReplicationCheckpoint.empty(primaryShard.shardId, primaryShard.getLatestReplicationCheckpoint().getCodec()), - primaryShard - ); + final CopyState copyState = new CopyState(primaryShard.getLatestReplicationCheckpoint(), primaryShard); listener.onResponse( new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes()) ); @@ -1541,6 +1592,7 @@ protected final List replicateSegments(IndexShard prim final SegmentReplicationTargetService targetService = prepareForReplication(primaryShard, replica); final SegmentReplicationTarget target = targetService.startReplication( replica, + primaryShard.getLatestReplicationCheckpoint(), getTargetListener(primaryShard, replica, primaryMetadata, countDownLatch) ); ids.add(target);