@@ -15,7 +15,7 @@ use super::utils::{Backoff, CachePadded};
15
15
use super :: waker:: SyncWaker ;
16
16
17
17
use crate :: cell:: UnsafeCell ;
18
- use crate :: mem:: { self , MaybeUninit } ;
18
+ use crate :: mem:: MaybeUninit ;
19
19
use crate :: ptr;
20
20
use crate :: sync:: atomic:: { self , AtomicUsize , Ordering } ;
21
21
use crate :: time:: Instant ;
@@ -479,82 +479,58 @@ impl<T> Channel<T> {
479
479
///
480
480
/// `tail` should be the current (and therefore last) value of `tail`.
481
481
///
482
+ /// # Panicking
483
+ /// If a destructor panics, the remaining messages are leaked, matching the
484
+ /// behaviour of the unbounded channel.
485
+ ///
482
486
/// # Safety
483
487
/// This method must only be called when dropping the last receiver. The
484
488
/// destruction of all other receivers must have been observed with acquire
485
489
/// ordering or stronger.
486
490
unsafe fn discard_all_messages ( & self , tail : usize ) {
487
491
debug_assert ! ( self . is_disconnected( ) ) ;
488
492
489
- /// Use a helper struct with a custom `Drop` to ensure all messages are
490
- /// dropped, even if a destructor panicks.
491
- struct DiscardState < ' a , T > {
492
- channel : & ' a Channel < T > ,
493
- head : usize ,
494
- tail : usize ,
495
- backoff : Backoff ,
496
- }
493
+ // Only receivers modify `head`, so since we are the last one,
494
+ // this value will not change and will not be observed (since
495
+ // no new messages can be sent after disconnection).
496
+ let mut head = self . head . load ( Ordering :: Relaxed ) ;
497
+ let tail = tail & !self . mark_bit ;
497
498
498
- impl < ' a , T > DiscardState < ' a , T > {
499
- fn discard ( & mut self ) {
500
- loop {
501
- // Deconstruct the head.
502
- let index = self . head & ( self . channel . mark_bit - 1 ) ;
503
- let lap = self . head & !( self . channel . one_lap - 1 ) ;
504
-
505
- // Inspect the corresponding slot.
506
- debug_assert ! ( index < self . channel. buffer. len( ) ) ;
507
- let slot = unsafe { self . channel . buffer . get_unchecked ( index) } ;
508
- let stamp = slot. stamp . load ( Ordering :: Acquire ) ;
509
-
510
- // If the stamp is ahead of the head by 1, we may drop the message.
511
- if self . head + 1 == stamp {
512
- self . head = if index + 1 < self . channel . cap {
513
- // Same lap, incremented index.
514
- // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
515
- self . head + 1
516
- } else {
517
- // One lap forward, index wraps around to zero.
518
- // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
519
- lap. wrapping_add ( self . channel . one_lap )
520
- } ;
521
-
522
- // We updated the head, so even if this descrutor panics,
523
- // we will not attempt to destroy the slot again.
524
- unsafe {
525
- ( * slot. msg . get ( ) ) . assume_init_drop ( ) ;
526
- }
527
- // If the tail equals the head, that means the channel is empty.
528
- } else if self . tail == self . head {
529
- return ;
530
- // Otherwise, a sender is about to write into the slot, so we need
531
- // to wait for it to update the stamp.
532
- } else {
533
- self . backoff . spin_heavy ( ) ;
534
- }
535
- }
536
- }
537
- }
499
+ let backoff = Backoff :: new ( ) ;
500
+ loop {
501
+ // Deconstruct the head.
502
+ let index = head & ( self . mark_bit - 1 ) ;
503
+ let lap = head & !( self . one_lap - 1 ) ;
538
504
539
- impl < ' a , T > Drop for DiscardState < ' a , T > {
540
- fn drop ( & mut self ) {
541
- self . discard ( ) ;
505
+ // Inspect the corresponding slot.
506
+ debug_assert ! ( index < self . buffer. len( ) ) ;
507
+ let slot = unsafe { self . buffer . get_unchecked ( index) } ;
508
+ let stamp = slot. stamp . load ( Ordering :: Acquire ) ;
509
+
510
+ // If the stamp is ahead of the head by 1, we may drop the message.
511
+ if head + 1 == stamp {
512
+ head = if index + 1 < self . cap {
513
+ // Same lap, incremented index.
514
+ // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
515
+ head + 1
516
+ } else {
517
+ // One lap forward, index wraps around to zero.
518
+ // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
519
+ lap. wrapping_add ( self . one_lap )
520
+ } ;
521
+
522
+ unsafe {
523
+ ( * slot. msg . get ( ) ) . assume_init_drop ( ) ;
524
+ }
525
+ // If the tail equals the head, that means the channel is empty.
526
+ } else if tail == head {
527
+ return ;
528
+ // Otherwise, a sender is about to write into the slot, so we need
529
+ // to wait for it to update the stamp.
530
+ } else {
531
+ backoff. spin_heavy ( ) ;
542
532
}
543
533
}
544
-
545
- let mut state = DiscardState {
546
- channel : self ,
547
- // Only receivers modify `head`, so since we are the last one,
548
- // this value will not change and will not be observed (since
549
- // no new messages can be sent after disconnection).
550
- head : self . head . load ( Ordering :: Relaxed ) ,
551
- tail : tail & !self . mark_bit ,
552
- backoff : Backoff :: new ( ) ,
553
- } ;
554
- state. discard ( ) ;
555
- // This point is only reached if no destructor panics, so all messages
556
- // have already been dropped.
557
- mem:: forget ( state) ;
558
534
}
559
535
560
536
/// Returns `true` if the channel is disconnected.
0 commit comments