1
1
use std:: cell:: UnsafeCell ;
2
2
use std:: error:: Error ;
3
- use std:: fmt:: { Debug , Display , self } ;
3
+ use std:: fmt:: { self , Debug , Display } ;
4
4
use std:: future:: Future ;
5
5
use std:: isize;
6
6
use std:: marker:: PhantomData ;
@@ -388,22 +388,20 @@ impl<T> Receiver<T> {
388
388
/// // Then we drop the sender
389
389
/// });
390
390
///
391
- /// assert_eq!(r.recv().await, Some(1));
392
- /// assert_eq!(r.recv().await, Some(2));
393
- ///
394
- /// // recv() returns `None`
395
- /// assert_eq!(r.recv().await, None);
391
+ /// assert_eq!(r.recv().await, Ok(1));
392
+ /// assert_eq!(r.recv().await, Ok(2));
393
+ /// assert!(r.recv().await.is_err());
396
394
/// #
397
395
/// # })
398
396
/// ```
399
- pub async fn recv ( & self ) -> Option < T > {
397
+ pub async fn recv ( & self ) -> Result < T , RecvError > {
400
398
struct RecvFuture < ' a , T > {
401
399
channel : & ' a Channel < T > ,
402
400
opt_key : Option < usize > ,
403
401
}
404
402
405
403
impl < T > Future for RecvFuture < ' _ , T > {
406
- type Output = Option < T > ;
404
+ type Output = Result < T , RecvError > ;
407
405
408
406
fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
409
407
poll_recv (
@@ -569,12 +567,13 @@ impl<T> Stream for Receiver<T> {
569
567
570
568
fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
571
569
let this = & mut * self ;
572
- poll_recv (
570
+ let res = futures_core :: ready! ( poll_recv(
573
571
& this. channel,
574
572
& this. channel. stream_wakers,
575
573
& mut this. opt_key,
576
574
cx,
577
- )
575
+ ) ) ;
576
+ Poll :: Ready ( res. ok ( ) )
578
577
}
579
578
}
580
579
@@ -593,7 +592,7 @@ fn poll_recv<T>(
593
592
wakers : & WakerSet ,
594
593
opt_key : & mut Option < usize > ,
595
594
cx : & mut Context < ' _ > ,
596
- ) -> Poll < Option < T > > {
595
+ ) -> Poll < Result < T , RecvError > > {
597
596
loop {
598
597
// If the current task is in the set, remove it.
599
598
if let Some ( key) = opt_key. take ( ) {
@@ -602,8 +601,8 @@ fn poll_recv<T>(
602
601
603
602
// Try receiving a message.
604
603
match channel. try_recv ( ) {
605
- Ok ( msg) => return Poll :: Ready ( Some ( msg) ) ,
606
- Err ( TryRecvError :: Disconnected ) => return Poll :: Ready ( None ) ,
604
+ Ok ( msg) => return Poll :: Ready ( Ok ( msg) ) ,
605
+ Err ( TryRecvError :: Disconnected ) => return Poll :: Ready ( Err ( RecvError { } ) ) ,
607
606
Err ( TryRecvError :: Empty ) => {
608
607
// Insert this receive operation.
609
608
* opt_key = Some ( wakers. insert ( cx) ) ;
@@ -1035,3 +1034,17 @@ impl Display for TryRecvError {
1035
1034
}
1036
1035
}
1037
1036
}
1037
+
1038
+ /// An error returned from the `recv` method.
1039
+ #[ cfg( feature = "unstable" ) ]
1040
+ #[ cfg_attr( feature = "docs" , doc( cfg( unstable) ) ) ]
1041
+ #[ derive( Debug ) ]
1042
+ pub struct RecvError ;
1043
+
1044
+ impl Error for RecvError { }
1045
+
1046
+ impl Display for RecvError {
1047
+ fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
1048
+ Display :: fmt ( "The channel is empty." , f)
1049
+ }
1050
+ }
0 commit comments