2121
2222import org .elasticsearch .action .ActionListener ;
2323import org .elasticsearch .action .support .PlainActionFuture ;
24+ import org .elasticsearch .action .support .replication .ReplicationResponse ;
2425import org .elasticsearch .cluster .metadata .IndexMetaData ;
2526import org .elasticsearch .common .Randomness ;
2627import org .elasticsearch .common .settings .Settings ;
3435import java .util .ArrayList ;
3536import java .util .List ;
3637import java .util .concurrent .CountDownLatch ;
37- import java .util .concurrent .atomic .AtomicReference ;
38- import java .util .function .Consumer ;
3938
4039import static org .hamcrest .Matchers .containsInAnyOrder ;
4140import static org .hamcrest .Matchers .equalTo ;
@@ -76,20 +75,19 @@ public void testOutOfOrderRetentionLeasesRequests() throws Exception {
7675 Settings settings = Settings .builder ().put (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), true ).build ();
7776 int numberOfReplicas = between (1 , 2 );
7877 IndexMetaData indexMetaData = buildIndexMetaData (numberOfReplicas , settings , indexMapping );
79- final List <RetentionLeaseSyncAction .Request > requests = new ArrayList <>();
8078 try (ReplicationGroup group = new ReplicationGroup (indexMetaData ) {
8179 @ Override
82- protected void syncRetentionLeases (ShardId shardId , RetentionLeases leases , ActionListener <Void > listener ) {
83- requests .add (new RetentionLeaseSyncAction .Request (shardId , leases ));
84- listener .onResponse (null );
80+ protected void syncRetentionLeases (ShardId shardId , RetentionLeases leases , ActionListener <ReplicationResponse > listener ) {
81+ listener .onResponse (new SyncRetentionLeasesResponse (new RetentionLeaseSyncAction .Request (shardId , leases )));
8582 }
8683 }) {
8784 group .startAll ();
8885 int numLeases = between (1 , 10 );
86+ List <RetentionLeaseSyncAction .Request > requests = new ArrayList <>();
8987 for (int i = 0 ; i < numLeases ; i ++) {
90- PlainActionFuture <Void > future = new PlainActionFuture <>();
88+ PlainActionFuture <ReplicationResponse > future = new PlainActionFuture <>();
9189 group .addRetentionLease (Integer .toString (i ), randomNonNegativeLong (), "test-" + i , future );
92- future .get ( );
90+ requests . add ((( SyncRetentionLeasesResponse ) future .actionGet ()). syncRequest );
9391 }
9492 RetentionLeases leasesOnPrimary = group .getPrimary ().getRetentionLeases ();
9593 for (IndexShard replica : group .getReplicas ()) {
@@ -104,56 +102,50 @@ public void testSyncRetentionLeasesWithPrimaryPromotion() throws Exception {
104102 Settings settings = Settings .builder ().put (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), true ).build ();
105103 int numberOfReplicas = between (2 , 4 );
106104 IndexMetaData indexMetaData = buildIndexMetaData (numberOfReplicas , settings , indexMapping );
107- final AtomicReference <Consumer <RetentionLeaseSyncAction .Request >> onRequest = new AtomicReference <>();
108105 try (ReplicationGroup group = new ReplicationGroup (indexMetaData ) {
109106 @ Override
110- protected void syncRetentionLeases (ShardId shardId , RetentionLeases leases , ActionListener <Void > listener ) {
111- onRequest .get ().accept (new RetentionLeaseSyncAction .Request (shardId , leases ));
112- listener .onResponse (null );
107+ protected void syncRetentionLeases (ShardId shardId , RetentionLeases leases , ActionListener <ReplicationResponse > listener ) {
108+ listener .onResponse (new SyncRetentionLeasesResponse (new RetentionLeaseSyncAction .Request (shardId , leases )));
113109 }
114110 }) {
115111 group .startAll ();
116112 int numLeases = between (1 , 100 );
117- final IndexShard newPrimary = randomFrom (group .getReplicas ());
118- final AtomicReference <RetentionLeases > latestRetentionLeasesOnNewPrimary = new AtomicReference <>(RetentionLeases .EMPTY );
119-
120- onRequest .set (request -> {
121- final RetentionLeases newRetentionLeases = request .getRetentionLeases ();
113+ IndexShard newPrimary = randomFrom (group .getReplicas ());
114+ RetentionLeases latestRetentionLeasesOnNewPrimary = RetentionLeases .EMPTY ;
115+ for (int i = 0 ; i < numLeases ; i ++) {
116+ PlainActionFuture <ReplicationResponse > addLeaseFuture = new PlainActionFuture <>();
117+ group .addRetentionLease (Integer .toString (i ), randomNonNegativeLong (), "test-" + i , addLeaseFuture );
118+ RetentionLeaseSyncAction .Request request = ((SyncRetentionLeasesResponse ) addLeaseFuture .actionGet ()).syncRequest ;
122119 for (IndexShard replica : randomSubsetOf (group .getReplicas ())) {
123120 group .executeRetentionLeasesSyncRequestOnReplica (request , replica );
124121 if (newPrimary == replica ) {
125- latestRetentionLeasesOnNewPrimary .updateAndGet (currentRetentionLeases
126- -> newRetentionLeases .supersedes (currentRetentionLeases ) ? newRetentionLeases : currentRetentionLeases );
122+ latestRetentionLeasesOnNewPrimary = request .getRetentionLeases ();
127123 }
128124 }
129- });
130-
131- for (int i = 0 ; i < numLeases ; i ++) {
132- PlainActionFuture <Void > addLeaseFuture = new PlainActionFuture <>();
133- group .addRetentionLease (Integer .toString (i ), randomNonNegativeLong (), "test-" + i , addLeaseFuture );
134- addLeaseFuture .get ();
135125 }
136126 group .promoteReplicaToPrimary (newPrimary ).get ();
137-
138127 // we need to make changes to retention leases to sync it to replicas
139128 // since we don't sync retention leases when promoting a new primary.
140-
141- onRequest .set (request -> {
142- for (IndexShard replica : group .getReplicas ()) {
143- group .executeRetentionLeasesSyncRequestOnReplica (request , replica );
144- }
145- });
146-
147- final PlainActionFuture <Void > newLeaseFuture = new PlainActionFuture <>();
129+ PlainActionFuture <ReplicationResponse > newLeaseFuture = new PlainActionFuture <>();
148130 group .addRetentionLease ("new-lease-after-promotion" , randomNonNegativeLong (), "test" , newLeaseFuture );
149- final RetentionLeases leasesOnPrimary = group .getPrimary ().getRetentionLeases ();
131+ RetentionLeases leasesOnPrimary = group .getPrimary ().getRetentionLeases ();
150132 assertThat (leasesOnPrimary .primaryTerm (), equalTo (group .getPrimary ().getOperationPrimaryTerm ()));
151- assertThat (leasesOnPrimary .version (), equalTo (latestRetentionLeasesOnNewPrimary .get ().version () + 1L ));
152- assertThat (leasesOnPrimary .leases (), hasSize (latestRetentionLeasesOnNewPrimary .get ().leases ().size () + 1 ));
153- newLeaseFuture .get ();
133+ assertThat (leasesOnPrimary .version (), equalTo (latestRetentionLeasesOnNewPrimary .version () + 1L ));
134+ assertThat (leasesOnPrimary .leases (), hasSize (latestRetentionLeasesOnNewPrimary .leases ().size () + 1 ));
135+ RetentionLeaseSyncAction .Request request = ((SyncRetentionLeasesResponse ) newLeaseFuture .actionGet ()).syncRequest ;
136+ for (IndexShard replica : group .getReplicas ()) {
137+ group .executeRetentionLeasesSyncRequestOnReplica (request , replica );
138+ }
154139 for (IndexShard replica : group .getReplicas ()) {
155140 assertThat (replica .getRetentionLeases (), equalTo (leasesOnPrimary ));
156141 }
157142 }
158143 }
144+
145+ static final class SyncRetentionLeasesResponse extends ReplicationResponse {
146+ final RetentionLeaseSyncAction .Request syncRequest ;
147+ SyncRetentionLeasesResponse (RetentionLeaseSyncAction .Request syncRequest ) {
148+ this .syncRequest = syncRequest ;
149+ }
150+ }
159151}
0 commit comments