@@ -180,16 +180,16 @@ impl Future {
180180
181181 /// Waits until this [`Future`] completes.
182182 #[ cfg( feature = "std" ) ]
183- pub fn wait ( self ) {
184- Sleeper :: from_single_future ( self ) . wait ( ) ;
183+ pub fn wait ( & self ) {
184+ Sleeper :: from_single_future ( & self ) . wait ( ) ;
185185 }
186186
187187 /// Waits until this [`Future`] completes or the given amount of time has elapsed.
188188 ///
189189 /// Returns true if the [`Future`] completed, false if the time elapsed.
190190 #[ cfg( feature = "std" ) ]
191- pub fn wait_timeout ( self , max_wait : Duration ) -> bool {
192- Sleeper :: from_single_future ( self ) . wait_timeout ( max_wait)
191+ pub fn wait_timeout ( & self , max_wait : Duration ) -> bool {
192+ Sleeper :: from_single_future ( & self ) . wait_timeout ( max_wait)
193193 }
194194
195195 #[ cfg( test) ]
@@ -202,6 +202,12 @@ impl Future {
202202 }
203203}
204204
205+ impl Drop for Future {
206+ fn drop ( & mut self ) {
207+ self . state . lock ( ) . unwrap ( ) . std_future_callbacks . retain ( |( idx, _) | * idx != self . self_idx ) ;
208+ }
209+ }
210+
205211use core:: task:: Waker ;
206212struct StdWaker ( pub Waker ) ;
207213
@@ -216,6 +222,7 @@ impl<'a> StdFuture for Future {
216222 Poll :: Ready ( ( ) )
217223 } else {
218224 let waker = cx. waker ( ) . clone ( ) ;
225+ state. std_future_callbacks . retain ( |( idx, _) | * idx != self . self_idx ) ;
219226 state. std_future_callbacks . push ( ( self . self_idx , StdWaker ( waker) ) ) ;
220227 Poll :: Pending
221228 }
@@ -232,17 +239,17 @@ pub struct Sleeper {
232239#[ cfg( feature = "std" ) ]
233240impl Sleeper {
234241 /// Constructs a new sleeper from one future, allowing blocking on it.
235- pub fn from_single_future ( future : Future ) -> Self {
236- Self { notifiers : vec ! [ future. state] }
242+ pub fn from_single_future ( future : & Future ) -> Self {
243+ Self { notifiers : vec ! [ Arc :: clone ( & future. state) ] }
237244 }
238245 /// Constructs a new sleeper from two futures, allowing blocking on both at once.
239246 // Note that this is the common case - a ChannelManager and ChainMonitor.
240- pub fn from_two_futures ( fut_a : Future , fut_b : Future ) -> Self {
241- Self { notifiers : vec ! [ fut_a. state, fut_b. state] }
247+ pub fn from_two_futures ( fut_a : & Future , fut_b : & Future ) -> Self {
248+ Self { notifiers : vec ! [ Arc :: clone ( & fut_a. state) , Arc :: clone ( & fut_b. state) ] }
242249 }
243250 /// Constructs a new sleeper on many futures, allowing blocking on all at once.
244251 pub fn new ( futures : Vec < Future > ) -> Self {
245- Self { notifiers : futures. into_iter ( ) . map ( |f| f. state ) . collect ( ) }
252+ Self { notifiers : futures. into_iter ( ) . map ( |f| Arc :: clone ( & f. state ) ) . collect ( ) }
246253 }
247254 /// Prepares to go into a wait loop body, creating a condition variable which we can block on
248255 /// and an `Arc<Mutex<Option<_>>>` which gets set to the waking `Future`'s state prior to the
@@ -447,13 +454,15 @@ mod tests {
447454
448455 // Wait on the other thread to finish its sleep, note that the leak only happened if we
449456 // actually have to sleep here, not if we immediately return.
450- Sleeper :: from_two_futures ( future_a, future_b) . wait ( ) ;
457+ Sleeper :: from_two_futures ( & future_a, & future_b) . wait ( ) ;
451458
452459 join_handle. join ( ) . unwrap ( ) ;
453460
454461 // then drop the notifiers and make sure the future states are gone.
455462 mem:: drop ( notifier_a) ;
456463 mem:: drop ( notifier_b) ;
464+ mem:: drop ( future_a) ;
465+ mem:: drop ( future_b) ;
457466
458467 assert ! ( future_state_a. upgrade( ) . is_none( ) && future_state_b. upgrade( ) . is_none( ) ) ;
459468 }
@@ -655,18 +664,18 @@ mod tests {
655664 // Set both notifiers as woken without sleeping yet.
656665 notifier_a. notify ( ) ;
657666 notifier_b. notify ( ) ;
658- Sleeper :: from_two_futures ( notifier_a. get_future ( ) , notifier_b. get_future ( ) ) . wait ( ) ;
667+ Sleeper :: from_two_futures ( & notifier_a. get_future ( ) , & notifier_b. get_future ( ) ) . wait ( ) ;
659668
660669 // One future has woken us up, but the other should still have a pending notification.
661- Sleeper :: from_two_futures ( notifier_a. get_future ( ) , notifier_b. get_future ( ) ) . wait ( ) ;
670+ Sleeper :: from_two_futures ( & notifier_a. get_future ( ) , & notifier_b. get_future ( ) ) . wait ( ) ;
662671
663672 // However once we've slept twice, we should no longer have any pending notifications
664- assert ! ( !Sleeper :: from_two_futures( notifier_a. get_future( ) , notifier_b. get_future( ) )
673+ assert ! ( !Sleeper :: from_two_futures( & notifier_a. get_future( ) , & notifier_b. get_future( ) )
665674 . wait_timeout( Duration :: from_millis( 10 ) ) ) ;
666675
667676 // Test ordering somewhat more.
668677 notifier_a. notify ( ) ;
669- Sleeper :: from_two_futures ( notifier_a. get_future ( ) , notifier_b. get_future ( ) ) . wait ( ) ;
678+ Sleeper :: from_two_futures ( & notifier_a. get_future ( ) , & notifier_b. get_future ( ) ) . wait ( ) ;
670679 }
671680
672681 #[ test]
@@ -684,7 +693,7 @@ mod tests {
684693
685694 // After sleeping one future (not guaranteed which one, however) will have its notification
686695 // bit cleared.
687- Sleeper :: from_two_futures ( notifier_a. get_future ( ) , notifier_b. get_future ( ) ) . wait ( ) ;
696+ Sleeper :: from_two_futures ( & notifier_a. get_future ( ) , & notifier_b. get_future ( ) ) . wait ( ) ;
688697
689698 // By registering a callback on the futures for both notifiers, one will complete
690699 // immediately, but one will remain tied to the notifier, and will complete once the
@@ -703,8 +712,37 @@ mod tests {
703712 notifier_b. notify ( ) ;
704713
705714 assert ! ( callback_a. load( Ordering :: SeqCst ) && callback_b. load( Ordering :: SeqCst ) ) ;
706- Sleeper :: from_two_futures ( notifier_a. get_future ( ) , notifier_b. get_future ( ) ) . wait ( ) ;
707- assert ! ( !Sleeper :: from_two_futures( notifier_a. get_future( ) , notifier_b. get_future( ) )
715+ Sleeper :: from_two_futures ( & notifier_a. get_future ( ) , & notifier_b. get_future ( ) ) . wait ( ) ;
716+ assert ! ( !Sleeper :: from_two_futures( & notifier_a. get_future( ) , & notifier_b. get_future( ) )
708717 . wait_timeout( Duration :: from_millis( 10 ) ) ) ;
709718 }
719+
720+ #[ test]
721+ #[ cfg( feature = "std" ) ]
722+ fn multi_poll_stores_single_waker ( ) {
723+ // When a `Future` is `poll()`ed multiple times, only the last `Waker` should be called,
724+ // but previously we'd store all `Waker`s until they're all woken at once. This tests a few
725+ // cases to ensure `Future`s avoid storing an endless set of `Waker`s.
726+ let notifier = Notifier :: new ( ) ;
727+ let future_state = Arc :: clone ( & notifier. get_future ( ) . state ) ;
728+ assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 0 ) ;
729+
730+ // Test that simply polling a future twice doesn't result in two pending `Waker`s.
731+ let mut future_a = notifier. get_future ( ) ;
732+ assert_eq ! ( Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Pending ) ;
733+ assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 1 ) ;
734+ assert_eq ! ( Pin :: new( & mut future_a) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Pending ) ;
735+ assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 1 ) ;
736+
737+ // If we poll a second future, however, that will store a second `Waker`.
738+ let mut future_b = notifier. get_future ( ) ;
739+ assert_eq ! ( Pin :: new( & mut future_b) . poll( & mut Context :: from_waker( & create_waker( ) . 1 ) ) , Poll :: Pending ) ;
740+ assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 2 ) ;
741+
742+ // but when we drop the `Future`s, the pending Wakers will also be dropped.
743+ mem:: drop ( future_a) ;
744+ assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 1 ) ;
745+ mem:: drop ( future_b) ;
746+ assert_eq ! ( future_state. lock( ) . unwrap( ) . std_future_callbacks. len( ) , 0 ) ;
747+ }
710748}
0 commit comments