@@ -1020,21 +1020,25 @@ where
10201020 }
10211021}
10221022
1023- macro_rules! drop_handled_events_and_abort { ( $self: expr, $res : expr , $offset : expr, $event_queue: expr) => {
1023+ macro_rules! drop_handled_events_and_abort { ( $self: expr, $res_iter : expr, $event_queue: expr) => {
10241024 // We want to make sure to cleanly abort upon event handling failure. To this end, we drop all
10251025 // successfully handled events from the given queue, reset the events processing flag, and
10261026 // return, to have the events eventually replayed upon next invocation.
10271027 {
10281028 let mut queue_lock = $event_queue. lock( ) . unwrap( ) ;
10291029
1030- // We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`.
1031- let mut res_iter = $res. iter( ) . skip( $offset) ;
1032-
10331030 // Keep all events which previously error'd *or* any that have been added since we dropped
10341031 // the Mutex before.
1035- queue_lock. retain( |_| res_iter. next( ) . map_or( true , |r| r. is_err( ) ) ) ;
1032+ let mut any_error = false ;
1033+ queue_lock. retain( |_| {
1034+ $res_iter. next( ) . map_or( true , |r| {
1035+ let is_err = r. is_err( ) ;
1036+ any_error |= is_err;
1037+ is_err
1038+ } )
1039+ } ) ;
10361040
1037- if $res . iter ( ) . any ( |r| r . is_err ( ) ) {
1041+ if any_error {
10381042 // We failed handling some events. Return to have them eventually replayed.
10391043 $self. pending_events_processor. store( false , Ordering :: Release ) ;
10401044 return ;
@@ -1384,7 +1388,8 @@ where
13841388 }
13851389 // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
13861390 let res = MultiResultFuturePoller :: new ( futures) . await ;
1387- drop_handled_events_and_abort ! ( self , res, intercepted_msgs_offset, self . pending_intercepted_msgs_events) ;
1391+ let mut res_iter = res. iter ( ) . skip ( intercepted_msgs_offset) ;
1392+ drop_handled_events_and_abort ! ( self , res_iter, self . pending_intercepted_msgs_events) ;
13881393 }
13891394
13901395 {
@@ -1407,7 +1412,8 @@ where
14071412 futures. push ( future) ;
14081413 }
14091414 let res = MultiResultFuturePoller :: new ( futures) . await ;
1410- drop_handled_events_and_abort ! ( self , res, 0 , self . pending_peer_connected_events) ;
1415+ let mut res_iter = res. iter ( ) ;
1416+ drop_handled_events_and_abort ! ( self , res_iter, self . pending_peer_connected_events) ;
14111417 }
14121418 }
14131419 self . pending_events_processor . store ( false , Ordering :: Release ) ;
@@ -1479,21 +1485,11 @@ where
14791485 pending_peer_connected_events. shrink_to ( 10 ) ; // Limit total heap usage
14801486 }
14811487
1482- if intercepted_msgs. len ( ) == 1 {
1483- let res = intercepted_msgs. into_iter ( ) . next ( ) . map ( |ev| handler. handle_event ( ev) ) ;
1484- drop_handled_events_and_abort ! ( self , res, 0 , self . pending_intercepted_msgs_events) ;
1485- } else {
1486- let res = intercepted_msgs. into_iter ( ) . map ( |ev| handler. handle_event ( ev) ) . collect :: < Vec < _ > > ( ) ;
1487- drop_handled_events_and_abort ! ( self , res, 0 , self . pending_intercepted_msgs_events) ;
1488- } ;
1488+ let mut res_iter = intercepted_msgs. into_iter ( ) . map ( |ev| handler. handle_event ( ev) ) ;
1489+ drop_handled_events_and_abort ! ( self , res_iter, self . pending_intercepted_msgs_events) ;
14891490
1490- if peer_connecteds. len ( ) == 1 {
1491- let res = peer_connecteds. into_iter ( ) . next ( ) . map ( |ev| handler. handle_event ( ev) ) ;
1492- drop_handled_events_and_abort ! ( self , res, 0 , self . pending_peer_connected_events) ;
1493- } else {
1494- let res = peer_connecteds. into_iter ( ) . map ( |ev| handler. handle_event ( ev) ) . collect :: < Vec < _ > > ( ) ;
1495- drop_handled_events_and_abort ! ( self , res, 0 , self . pending_peer_connected_events) ;
1496- }
1491+ let mut res_iter = peer_connecteds. into_iter ( ) . map ( |ev| handler. handle_event ( ev) ) ;
1492+ drop_handled_events_and_abort ! ( self , res_iter, self . pending_peer_connected_events) ;
14971493
14981494 self . pending_events_processor . store ( false , Ordering :: Release ) ;
14991495 }
0 commit comments