Skip to content

Commit

Permalink
Wait forever on very large timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Feb 28, 2023
1 parent b13bdb0 commit 923d050
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 27 deletions.
28 changes: 21 additions & 7 deletions crossbeam-channel/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::err::{
};
use crate::flavors;
use crate::select::{Operation, SelectHandle, Token};
use crate::utils;

/// Creates a channel of unbounded capacity.
///
Expand Down Expand Up @@ -172,8 +171,11 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
/// assert!(eq(Instant::now(), start + ms(500)));
/// ```
pub fn after(duration: Duration) -> Receiver<Instant> {
Receiver {
flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_timeout(duration))),
match Instant::now().checked_add(duration) {
Some(deadline) => Receiver {
flavor: ReceiverFlavor::At(Arc::new(flavors::at::Channel::new_deadline(deadline))),
},
None => never(),
}
}

Expand Down Expand Up @@ -320,8 +322,14 @@ pub fn never<T>() -> Receiver<T> {
/// assert!(eq(Instant::now(), start + ms(700)));
/// ```
pub fn tick(duration: Duration) -> Receiver<Instant> {
Receiver {
flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(duration))),
match Instant::now().checked_add(duration) {
Some(delivery_time) => Receiver {
flavor: ReceiverFlavor::Tick(Arc::new(flavors::tick::Channel::new(
delivery_time,
duration,
))),
},
None => never(),
}
}

Expand Down Expand Up @@ -474,7 +482,10 @@ impl<T> Sender<T> {
/// );
/// ```
pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
self.send_deadline(msg, utils::convert_timeout_to_deadline(timeout))
match Instant::now().checked_add(timeout) {
Some(deadline) => self.send_deadline(msg, deadline),
None => self.send(msg).map_err(SendTimeoutError::from),
}
}

/// Waits for a message to be sent into the channel, but only until a given deadline.
Expand Down Expand Up @@ -864,7 +875,10 @@ impl<T> Receiver<T> {
/// );
/// ```
pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
self.recv_deadline(utils::convert_timeout_to_deadline(timeout))
match Instant::now().checked_add(timeout) {
Some(deadline) => self.recv_deadline(deadline),
None => self.recv().map_err(RecvTimeoutError::from),
}
}

/// Waits for a message to be received from the channel, but only before a given deadline.
Expand Down
7 changes: 1 addition & 6 deletions crossbeam-channel/src/flavors/at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::{Duration, Instant};
use std::time::Instant;

use crate::context::Context;
use crate::err::{RecvTimeoutError, TryRecvError};
Expand Down Expand Up @@ -32,11 +32,6 @@ impl Channel {
received: AtomicBool::new(false),
}
}
/// Creates a channel that delivers a message after a certain duration of time.
#[inline]
pub(crate) fn new_timeout(dur: Duration) -> Self {
Self::new_deadline(utils::convert_timeout_to_deadline(dur))
}

/// Attempts to receive a message without blocking.
#[inline]
Expand Down
5 changes: 2 additions & 3 deletions crossbeam-channel/src/flavors/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crossbeam_utils::atomic::AtomicCell;
use crate::context::Context;
use crate::err::{RecvTimeoutError, TryRecvError};
use crate::select::{Operation, SelectHandle, Token};
use crate::utils;

/// Result of a receive operation.
pub(crate) type TickToken = Option<Instant>;
Expand All @@ -27,9 +26,9 @@ pub(crate) struct Channel {
impl Channel {
/// Creates a channel that delivers messages periodically.
#[inline]
pub(crate) fn new(dur: Duration) -> Self {
pub(crate) fn new(delivery_time: Instant, dur: Duration) -> Self {
Channel {
delivery_time: AtomicCell::new(utils::convert_timeout_to_deadline(dur)),
delivery_time: AtomicCell::new(delivery_time),
duration: dur,
}
}
Expand Down
10 changes: 8 additions & 2 deletions crossbeam-channel/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,10 @@ pub fn select_timeout<'a>(
handles: &mut [(&'a dyn SelectHandle, usize, *const u8)],
timeout: Duration,
) -> Result<SelectedOperation<'a>, SelectTimeoutError> {
select_deadline(handles, utils::convert_timeout_to_deadline(timeout))
match Instant::now().checked_add(timeout) {
Some(deadline) => select_deadline(handles, deadline),
None => Ok(select(handles)),
}
}

/// Blocks until a given deadline, or until one of the operations becomes ready and selects it.
Expand Down Expand Up @@ -1045,7 +1048,10 @@ impl<'a> Select<'a> {
/// }
/// ```
pub fn ready_timeout(&mut self, timeout: Duration) -> Result<usize, ReadyTimeoutError> {
self.ready_deadline(utils::convert_timeout_to_deadline(timeout))
match Instant::now().checked_add(timeout) {
Some(deadline) => self.ready_deadline(deadline),
None => Ok(self.ready()),
}
}

/// Blocks until a given deadline, or until one of the operations becomes ready.
Expand Down
8 changes: 0 additions & 8 deletions crossbeam-channel/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,3 @@ pub(crate) fn sleep_until(deadline: Option<Instant>) {
}
}
}

// https://github.com/crossbeam-rs/crossbeam/issues/795
pub(crate) fn convert_timeout_to_deadline(timeout: Duration) -> Instant {
match Instant::now().checked_add(timeout) {
Some(deadline) => deadline,
None => Instant::now() + Duration::from_secs(86400 * 365 * 30),
}
}
5 changes: 4 additions & 1 deletion crossbeam-utils/src/sync/parker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ impl Parker {
/// p.park_timeout(Duration::from_millis(500));
/// ```
pub fn park_timeout(&self, timeout: Duration) {
self.park_deadline(Instant::now() + timeout)
match Instant::now().checked_add(timeout) {
Some(deadline) => self.park_deadline(deadline),
None => self.park(),
}
}

/// Blocks the current thread until the token is made available, or until a certain deadline.
Expand Down

0 comments on commit 923d050

Please sign in to comment.