diff --git a/CHANGELOG.md b/CHANGELOG.md index b9082ed039712..d8373e9904d7f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fix NoSuchFileExceptions with segment replication when computing primary metadata snapshots ([#4366](https://github.com/opensearch-project/OpenSearch/pull/4366)) - [Segment Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test ([#4414](https://github.com/opensearch-project/OpenSearch/pull/4414)) - Fixed the `_cat/shards/10_basic.yml` test cases fix. +- [Segment Replication] Fix timeout issue by calculating time needed to process getSegmentFiles ([#4426](https://github.com/opensearch-project/OpenSearch/pull/4426)) ### Security - CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341)) @@ -70,4 +71,4 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) [Unreleased]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...HEAD -[2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...2.x \ No newline at end of file +[2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.2.0...2.x diff --git a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java index aa0b5416dd0ff..8107f99723eaf 100644 --- a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java @@ -13,11 +13,13 @@ import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.io.stream.Writeable; +import org.opensearch.common.unit.TimeValue; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RetryableTransportClient; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.transport.TransportRequestOptions; import org.opensearch.transport.TransportService; import java.util.List; @@ -78,6 +80,17 @@ public void getSegmentFiles( ) { final Writeable.Reader reader = GetSegmentFilesResponse::new; final ActionListener responseListener = ActionListener.map(listener, r -> r); + // Few of the below assumptions and calculations are added for experimental release of segment replication feature in 2.3 + // version. These will be changed in next release. + + // Storing the size of files to fetch in bytes. + final long sizeOfSegmentFiles = filesToFetch.stream().mapToLong(file -> file.length()).sum(); + + // Maximum size of files to fetch (segment files) in bytes, that can be processed in 1 minute for a m5.xlarge machine. + long baseSegmentFilesSize = 100000000; + + // Formula for calculating time needed to process a replication event's files to fetch process + final long timeToGetSegmentFiles = 1 + (sizeOfSegmentFiles / baseSegmentFilesSize); final GetSegmentFilesRequest request = new GetSegmentFilesRequest( replicationId, targetAllocationId, @@ -85,7 +98,10 @@ public void getSegmentFiles( filesToFetch, checkpoint ); - transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, responseListener, reader); + final TransportRequestOptions options = TransportRequestOptions.builder() + .withTimeout(TimeValue.timeValueMinutes(timeToGetSegmentFiles)) + .build(); + transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, options, responseListener, reader); } @Override