@@ -514,7 +514,8 @@ public void transferTestWhenDestBrokerFails()
514
514
assertEquals (0 , getOwnerRequests2 .size ());
515
515
516
516
// recovered, check the monitor update state : Assigned -> Owned
517
- doReturn (CompletableFuture .completedFuture (Optional .of (lookupServiceAddress1 ))).when (loadManager ).selectAsync (any ());
517
+ doReturn (CompletableFuture .completedFuture (Optional .of (lookupServiceAddress1 )))
518
+ .when (loadManager ).selectAsync (any ());
518
519
FieldUtils .writeDeclaredField (channel2 , "producer" , producer , true );
519
520
FieldUtils .writeDeclaredField (channel1 ,
520
521
"inFlightStateWaitingTimeInMillis" , 1 , true );
@@ -733,8 +734,8 @@ public void handleBrokerDeletionEventTest()
733
734
734
735
var owner1 = channel1 .getOwnerAsync (bundle1 );
735
736
var owner2 = channel2 .getOwnerAsync (bundle2 );
736
- doReturn (CompletableFuture .completedFuture (Optional .of (lookupServiceAddress2 ))). when ( loadManager ). selectAsync ( any ());
737
-
737
+ doReturn (CompletableFuture .completedFuture (Optional .of (lookupServiceAddress2 )))
738
+ . when ( loadManager ). selectAsync ( any ());
738
739
assertTrue (owner1 .get ().isEmpty ());
739
740
assertTrue (owner2 .get ().isEmpty ());
740
741
@@ -1099,7 +1100,8 @@ public void assignTestWhenDestBrokerProducerFails()
1099
1100
"inFlightStateWaitingTimeInMillis" , 3 * 1000 , true );
1100
1101
FieldUtils .writeDeclaredField (channel2 ,
1101
1102
"inFlightStateWaitingTimeInMillis" , 3 * 1000 , true );
1102
- doReturn (CompletableFuture .completedFuture (Optional .of (lookupServiceAddress2 ))).when (loadManager ).selectAsync (any ());
1103
+ doReturn (CompletableFuture .completedFuture (Optional .of (lookupServiceAddress2 )))
1104
+ .when (loadManager ).selectAsync (any ());
1103
1105
channel1 .publishAssignEventAsync (bundle , lookupServiceAddress2 );
1104
1106
// channel1 is broken. the assign won't be complete.
1105
1107
waitUntilState (channel1 , bundle );
@@ -1437,7 +1439,8 @@ public void testOverrideInactiveBrokerStateData()
1437
1439
new ServiceUnitStateData (Owned , broker , null , 1 ));
1438
1440
1439
1441
// test stable metadata state
1440
- doReturn (Optional .of (lookupServiceAddress2 )).when (brokerSelector ).select (any (), any (), any ());
1442
+ doReturn (CompletableFuture .completedFuture (Optional .of (lookupServiceAddress2 )))
1443
+ .when (loadManager ).selectAsync (any ());
1441
1444
leaderChannel .handleMetadataSessionEvent (SessionReestablished );
1442
1445
followerChannel .handleMetadataSessionEvent (SessionReestablished );
1443
1446
FieldUtils .writeDeclaredField (leaderChannel , "lastMetadataSessionEventTimestamp" ,
@@ -1501,7 +1504,8 @@ public void testOverrideOrphanStateData()
1501
1504
new ServiceUnitStateData (Owned , broker , null , 1 ));
1502
1505
1503
1506
// test stable metadata state
1504
- doReturn (Optional .of (lookupServiceAddress2 )).when (brokerSelector ).select (any (), any (), any ());
1507
+ doReturn (CompletableFuture .completedFuture (Optional .of (lookupServiceAddress2 )))
1508
+ .when (loadManager ).selectAsync (any ());
1505
1509
FieldUtils .writeDeclaredField (leaderChannel , "inFlightStateWaitingTimeInMillis" ,
1506
1510
-1 , true );
1507
1511
FieldUtils .writeDeclaredField (followerChannel , "inFlightStateWaitingTimeInMillis" ,
0 commit comments