Skip to content

Commit

Permalink
Use SegmentReplicationTarget test to validate non-active on-disk file…
Browse files Browse the repository at this point in the history
…s are reused for replication (#11786) (#11806)

* Use SegmentReplicationTarget test to validate non-active on-disk files are reused for replication



* Fix original shard level unit test



---------


(cherry picked from commit fe98aad)

Signed-off-by: Suraj Singh <surajrider@gmail.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent c6e4587 commit 7a0d815
Showing 1 changed file with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.util.Version;
import org.opensearch.action.StepListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -371,7 +372,6 @@ public void testPrimaryRestart() throws Exception {
* prevent FileAlreadyExistsException. It does so by only copying files in first round of segment replication without
* committing locally so that in next round of segment replication those files are not considered for download again
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/10885")
public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception {
try (ReplicationGroup shards = createGroup(1, getIndexSettings(), new NRTReplicationEngineFactory())) {
shards.startAll();
Expand All @@ -388,6 +388,7 @@ public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception {
);
CountDownLatch latch = new CountDownLatch(1);

logger.info("--> Starting first round of replication");
// Start first round of segment replication. This should fail with simulated error but with replica having
// files in its local store but not in active reader.
final SegmentReplicationTarget target = targetService.startReplication(
Expand Down Expand Up @@ -427,6 +428,7 @@ public void onReplicationFailure(
// Start next round of segment replication and not throwing exception resulting in commit on replica
when(sourceFactory.get(any())).thenReturn(getRemoteStoreReplicationSource(replica, () -> {}));
CountDownLatch waitForSecondRound = new CountDownLatch(1);
logger.info("--> Starting second round of replication");
final SegmentReplicationTarget newTarget = targetService.startReplication(
replica,
primary.getLatestReplicationCheckpoint(),
Expand Down Expand Up @@ -559,8 +561,19 @@ public void getSegmentFiles(
BiConsumer<String, Long> fileProgressTracker,
ActionListener<GetSegmentFilesResponse> listener
) {
super.getSegmentFiles(replicationId, checkpoint, filesToFetch, indexShard, (fileName, bytesRecovered) -> {}, listener);
postGetFilesRunnable.run();
StepListener<GetSegmentFilesResponse> waitForCopyFilesListener = new StepListener();
super.getSegmentFiles(
replicationId,
checkpoint,
filesToFetch,
indexShard,
(fileName, bytesRecovered) -> {},
waitForCopyFilesListener
);
waitForCopyFilesListener.whenComplete(response -> {
postGetFilesRunnable.run();
listener.onResponse(response);
}, listener::onFailure);
}

@Override
Expand Down

0 comments on commit 7a0d815

Please sign in to comment.