@@ -308,9 +308,7 @@ public void testRetentionLeasesAreNotBeingRenewedAfterRecoveryCompletes() throws
308308 * After we wake up, it should be the case that the retention leases are the same (same timestamp) as that indicates that they were
309309 * not renewed while we were sleeping.
310310 */
311- final TimeValue renewIntervalSetting = CcrRetentionLeases .RETENTION_LEASE_RENEW_INTERVAL_SETTING .get (followerClusterSettings ());
312- final long renewEnd = System .nanoTime ();
313- Thread .sleep (Math .max (0 , randomIntBetween (2 , 4 ) * renewIntervalSetting .millis () - TimeUnit .NANOSECONDS .toMillis (renewEnd - start )));
311+ waitForAFewRenewalIntervals (start );
314312
315313 // now ensure that the retention leases are the same
316314 {
@@ -512,9 +510,6 @@ public void testRetentionLeaseRenewedWhileFollowing() throws Exception {
512510 final int numberOfReplicas = randomIntBetween (0 , 1 );
513511 final Map <String , String > additionalIndexSettings = new HashMap <>();
514512 additionalIndexSettings .put (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), Boolean .toString (true ));
515- additionalIndexSettings .put (
516- IndexService .RETENTION_LEASE_SYNC_INTERVAL_SETTING .getKey (),
517- TimeValue .timeValueMillis (200 ).getStringRep ());
518513 final String leaderIndexSettings = getIndexSettings (numberOfShards , numberOfReplicas , additionalIndexSettings );
519514 assertAcked (leaderClient ().admin ().indices ().prepareCreate (leaderIndex ).setSource (leaderIndexSettings , XContentType .JSON ).get ());
520515 ensureLeaderYellow (leaderIndex );
@@ -532,9 +527,6 @@ public void testRetentionLeaseAdvancesWhileFollowing() throws Exception {
532527 final int numberOfReplicas = randomIntBetween (0 , 1 );
533528 final Map <String , String > additionalIndexSettings = new HashMap <>();
534529 additionalIndexSettings .put (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), Boolean .toString (true ));
535- additionalIndexSettings .put (
536- IndexService .RETENTION_LEASE_SYNC_INTERVAL_SETTING .getKey (),
537- TimeValue .timeValueMillis (200 ).getStringRep ());
538530 final String leaderIndexSettings = getIndexSettings (numberOfShards , numberOfReplicas , additionalIndexSettings );
539531 assertAcked (leaderClient ().admin ().indices ().prepareCreate (leaderIndex ).setSource (leaderIndexSettings , XContentType .JSON ).get ());
540532 ensureLeaderYellow (leaderIndex );
@@ -566,25 +558,28 @@ public void testRetentionLeaseAdvancesWhileFollowing() throws Exception {
566558 leaderGlobalCheckpoints .put (routing .id (), leaderShardStats .getSeqNoStats ().getGlobalCheckpoint ());
567559 }
568560
561+ // wait for the follower to renew its leases
562+ waitForAFewRenewalIntervals (System .nanoTime ());
563+
564+ getLeaderCluster ().syncRetentionLeases (resolveLeaderIndex (leaderIndex ));
565+
569566 // now assert that the retention leases have advanced to the global checkpoints
570- assertBusy (() -> {
571- final IndicesStatsResponse stats =
572- leaderClient ().admin ().indices ().stats (new IndicesStatsRequest ().clear ().indices (leaderIndex )).actionGet ();
573- assertNotNull (stats .getShards ());
574- assertThat (stats .getShards (), arrayWithSize (numberOfShards * (1 + numberOfReplicas )));
575- final List <ShardStats > shardsStats = getShardsStats (stats );
576- for (int i = 0 ; i < numberOfShards * (1 + numberOfReplicas ); i ++) {
577- final RetentionLeases currentRetentionLeases = shardsStats .get (i ).getRetentionLeaseStats ().retentionLeases ();
578- assertThat (currentRetentionLeases .leases (), hasSize (1 ));
579- final RetentionLease retentionLease =
580- currentRetentionLeases .leases ().iterator ().next ();
581- assertThat (retentionLease .id (), equalTo (getRetentionLeaseId (followerIndex , leaderIndex )));
582- // we assert that retention leases are being advanced
583- assertThat (
584- retentionLease .retainingSequenceNumber (),
585- equalTo (leaderGlobalCheckpoints .get (shardsStats .get (i ).getShardRouting ().id ())));
586- }
587- });
567+ final IndicesStatsResponse stats =
568+ leaderClient ().admin ().indices ().stats (new IndicesStatsRequest ().clear ().indices (leaderIndex )).actionGet ();
569+ assertNotNull (stats .getShards ());
570+ assertThat (stats .getShards (), arrayWithSize (numberOfShards * (1 + numberOfReplicas )));
571+ final List <ShardStats > shardsStats = getShardsStats (stats );
572+ for (int i = 0 ; i < numberOfShards * (1 + numberOfReplicas ); i ++) {
573+ final RetentionLeases currentRetentionLeases = shardsStats .get (i ).getRetentionLeaseStats ().retentionLeases ();
574+ assertThat (currentRetentionLeases .leases (), hasSize (1 ));
575+ final RetentionLease retentionLease =
576+ currentRetentionLeases .leases ().iterator ().next ();
577+ assertThat (retentionLease .id (), equalTo (getRetentionLeaseId (followerIndex , leaderIndex )));
578+ // we assert that retention leases are being advanced
579+ assertThat (
580+ retentionLease .retainingSequenceNumber (),
581+ equalTo (leaderGlobalCheckpoints .get (shardsStats .get (i ).getShardRouting ().id ())));
582+ }
588583 }
589584
590585 @ TestLogging (value = "org.elasticsearch.xpack.ccr:trace" )
@@ -595,9 +590,6 @@ public void testRetentionLeaseRenewalIsCancelledWhenFollowingIsPaused() throws E
595590 final int numberOfReplicas = randomIntBetween (0 , 1 );
596591 final Map <String , String > additionalIndexSettings = new HashMap <>();
597592 additionalIndexSettings .put (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), Boolean .toString (true ));
598- additionalIndexSettings .put (
599- IndexService .RETENTION_LEASE_SYNC_INTERVAL_SETTING .getKey (),
600- TimeValue .timeValueMillis (200 ).getStringRep ());
601593 final String leaderIndexSettings = getIndexSettings (numberOfShards , numberOfReplicas , additionalIndexSettings );
602594 assertAcked (leaderClient ().admin ().indices ().prepareCreate (leaderIndex ).setSource (leaderIndexSettings , XContentType .JSON ).get ());
603595 ensureLeaderYellow (leaderIndex );
@@ -613,16 +605,7 @@ public void testRetentionLeaseRenewalIsCancelledWhenFollowingIsPaused() throws E
613605 * We want to ensure that the retention leases have been synced to all shard copies, as otherwise they might sync between the two
614606 * times that we sample the retention leases, which would cause our check to fail.
615607 */
616- final TimeValue syncIntervalSetting = IndexService .RETENTION_LEASE_SYNC_INTERVAL_SETTING .get (
617- leaderClient ()
618- .admin ()
619- .indices ()
620- .prepareGetSettings (leaderIndex )
621- .get ()
622- .getIndexToSettings ()
623- .get (leaderIndex ));
624- final long syncEnd = System .nanoTime ();
625- Thread .sleep (Math .max (0 , randomIntBetween (2 , 4 ) * syncIntervalSetting .millis () - TimeUnit .NANOSECONDS .toMillis (syncEnd - start )));
608+ getLeaderCluster ().syncRetentionLeases (resolveLeaderIndex (leaderIndex ));
626609
627610 final ClusterStateResponse leaderIndexClusterState =
628611 leaderClient ().admin ().cluster ().prepareState ().clear ().setMetaData (true ).setIndices (leaderIndex ).get ();
@@ -661,9 +644,7 @@ public void testRetentionLeaseRenewalIsCancelledWhenFollowingIsPaused() throws E
661644 * up, it should be the case that the retention leases are the same (same timestamp) as that indicates that they were not renewed
662645 * while we were sleeping.
663646 */
664- final TimeValue renewIntervalSetting = CcrRetentionLeases .RETENTION_LEASE_RENEW_INTERVAL_SETTING .get (followerClusterSettings ());
665- final long renewEnd = System .nanoTime ();
666- Thread .sleep (Math .max (0 , randomIntBetween (2 , 4 ) * renewIntervalSetting .millis () - TimeUnit .NANOSECONDS .toMillis (renewEnd - start )));
647+ waitForAFewRenewalIntervals (start );
667648
668649 // now ensure that the retention leases are the same
669650 assertBusy (() -> {
@@ -697,9 +678,6 @@ public void testRetentionLeaseRenewalIsResumedWhenFollowingIsResumed() throws Ex
697678 final int numberOfReplicas = randomIntBetween (0 , 1 );
698679 final Map <String , String > additionalIndexSettings = new HashMap <>();
699680 additionalIndexSettings .put (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), Boolean .toString (true ));
700- additionalIndexSettings .put (
701- IndexService .RETENTION_LEASE_SYNC_INTERVAL_SETTING .getKey (),
702- TimeValue .timeValueMillis (200 ).getStringRep ());
703681 final String leaderIndexSettings = getIndexSettings (numberOfShards , numberOfReplicas , additionalIndexSettings );
704682 assertAcked (leaderClient ().admin ().indices ().prepareCreate (leaderIndex ).setSource (leaderIndexSettings , XContentType .JSON ).get ());
705683 ensureLeaderYellow (leaderIndex );
@@ -724,9 +702,6 @@ public void testRetentionLeaseIsAddedIfItDisappearsWhileFollowing() throws Excep
724702 final int numberOfReplicas = 1 ;
725703 final Map <String , String > additionalIndexSettings = new HashMap <>();
726704 additionalIndexSettings .put (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), Boolean .toString (true ));
727- additionalIndexSettings .put (
728- IndexService .RETENTION_LEASE_SYNC_INTERVAL_SETTING .getKey (),
729- TimeValue .timeValueMillis (200 ).getStringRep ());
730705 final String leaderIndexSettings = getIndexSettings (numberOfShards , numberOfReplicas , additionalIndexSettings );
731706 assertAcked (leaderClient ().admin ().indices ().prepareCreate (leaderIndex ).setSource (leaderIndexSettings , XContentType .JSON ).get ());
732707 ensureLeaderYellow (leaderIndex );
@@ -812,9 +787,6 @@ public void testPeriodicRenewalDoesNotAddRetentionLeaseAfterUnfollow() throws Ex
812787 final int numberOfReplicas = 1 ;
813788 final Map <String , String > additionalIndexSettings = new HashMap <>();
814789 additionalIndexSettings .put (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), Boolean .toString (true ));
815- additionalIndexSettings .put (
816- IndexService .RETENTION_LEASE_SYNC_INTERVAL_SETTING .getKey (),
817- TimeValue .timeValueMillis (200 ).getStringRep ());
818790 final String leaderIndexSettings = getIndexSettings (numberOfShards , numberOfReplicas , additionalIndexSettings );
819791 assertAcked (leaderClient ().admin ().indices ().prepareCreate (leaderIndex ).setSource (leaderIndexSettings , XContentType .JSON ).get ());
820792 ensureLeaderYellow (leaderIndex );
@@ -918,23 +890,24 @@ private void assertRetentionLeaseRenewal(
918890 }
919891 });
920892
893+ waitForAFewRenewalIntervals (System .nanoTime ());
894+
921895 // now ensure that the retention leases are being renewed
922- assertBusy (() -> {
923- final IndicesStatsResponse stats =
924- leaderClient ().admin ().indices ().stats (new IndicesStatsRequest ().clear ().indices (leaderIndex )).actionGet ();
925- assertNotNull (stats .getShards ());
926- assertThat (stats .getShards (), arrayWithSize (numberOfShards * (1 + numberOfReplicas )));
927- final List <ShardStats > shardsStats = getShardsStats (stats );
928- for (int i = 0 ; i < numberOfShards * (1 + numberOfReplicas ); i ++) {
929- final RetentionLeases currentRetentionLeases = shardsStats .get (i ).getRetentionLeaseStats ().retentionLeases ();
930- assertThat (currentRetentionLeases .leases (), hasSize (1 ));
931- final RetentionLease retentionLease =
932- currentRetentionLeases .leases ().iterator ().next ();
933- assertThat (retentionLease .id (), equalTo (getRetentionLeaseId (followerIndex , leaderIndex )));
934- // we assert that retention leases are being renewed by an increase in the timestamp
935- assertThat (retentionLease .timestamp (), greaterThan (retentionLeases .get (i ).leases ().iterator ().next ().timestamp ()));
936- }
937- });
896+ getLeaderCluster ().syncRetentionLeases (resolveLeaderIndex (leaderIndex ));
897+ final IndicesStatsResponse stats =
898+ leaderClient ().admin ().indices ().stats (new IndicesStatsRequest ().clear ().indices (leaderIndex )).actionGet ();
899+ assertNotNull (stats .getShards ());
900+ assertThat (stats .getShards (), arrayWithSize (numberOfShards * (1 + numberOfReplicas )));
901+ final List <ShardStats > shardsStats = getShardsStats (stats );
902+ for (int i = 0 ; i < numberOfShards * (1 + numberOfReplicas ); i ++) {
903+ final RetentionLeases currentRetentionLeases = shardsStats .get (i ).getRetentionLeaseStats ().retentionLeases ();
904+ assertThat (currentRetentionLeases .leases (), hasSize (1 ));
905+ final RetentionLease retentionLease =
906+ currentRetentionLeases .leases ().iterator ().next ();
907+ assertThat (retentionLease .id (), equalTo (getRetentionLeaseId (followerIndex , leaderIndex )));
908+ // we assert that retention leases are being renewed by an increase in the timestamp
909+ assertThat (retentionLease .timestamp (), greaterThan (retentionLeases .get (i ).leases ().iterator ().next ().timestamp ()));
910+ }
938911 }
939912
940913 /**
@@ -983,4 +956,9 @@ private void assertExpectedDocument(final String followerIndex, final int value)
983956 assertThat (getResponse .getSource ().get ("f" ), equalTo (value ));
984957 }
985958
959+ private void waitForAFewRenewalIntervals (final long renewStartNanos ) throws Exception {
960+ final long renewDelayMillis
961+ = randomIntBetween (2 , 4 ) * CcrRetentionLeases .RETENTION_LEASE_RENEW_INTERVAL_SETTING .get (followerClusterSettings ()).millis ();
962+ assertBusy (() -> assertThat (TimeUnit .NANOSECONDS .toMillis (System .nanoTime () - renewStartNanos ), greaterThan (renewDelayMillis )));
963+ }
986964}
0 commit comments