Skip to content

Commit

Permalink
define max buffers for unbounded channel
Browse files Browse the repository at this point in the history
  • Loading branch information
gnunicorn committed Mar 4, 2020
1 parent 56980e7 commit 9b60a7d
Showing 1 changed file with 29 additions and 1 deletion.
30 changes: 29 additions & 1 deletion futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ impl std::error::Error for TryRecvError {}

#[derive(Debug)]
struct UnboundedInner<T> {
// Maximum number of items to buffer, if 0 is disabled
max_buffer: usize,
// Internal channel state. Consists of the number of messages stored in the
// channel as well as a flag signalling that the channel is closed.
state: AtomicUsize,
Expand Down Expand Up @@ -398,8 +400,24 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
/// the channel. Using an `unbounded` channel has the ability of causing the
/// process to run out of memory. In this case, the process will be aborted.
pub fn unbounded<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
unbounded_max_buf(0)
}

/// Creates an unbounded mpsc channel for communicating between asynchronous
/// tasks buffering a specific number of items.
///
/// A `send` on this channel will always succeed as long as the receive half has
/// not been closed. If the receiver falls behind, messages will be buffered
/// to the given count. Given `0` it is abirtrarily buffered.
///
/// **Note** that the amount of available system memory is an implicit bound to
/// the channel. Using an `unbounded` channel has the ability of causing the
/// process to run out of memory. In this case, the process will be aborted.
pub fn unbounded_max_buf<T>(max_buffer: usize) -> (UnboundedSender<T>, UnboundedReceiver<T>) {

let inner = Arc::new(UnboundedInner {
max_buffer,
state: AtomicUsize::new(INIT_STATE),
message_queue: Queue::new(),
num_senders: AtomicUsize::new(1),
Expand Down Expand Up @@ -820,7 +838,17 @@ impl<T> UnboundedSender<T> {
// Do the send without parking current task.
fn do_send_nb(&self, msg: T) -> Result<(), TrySendError<T>> {
if let Some(inner) = &self.0 {
if inner.inc_num_messages().is_some() {

if let Some(num) = inner.inc_num_messages() {
if inner.inner.max_buffer > 0 && num > inner.inner.max_buffer {
return Err(TrySendError {
err: SendError {
kind: SendErrorKind::Full,
},
val: msg,
})

}
inner.queue_push_and_signal(msg);
return Ok(());
}
Expand Down

0 comments on commit 9b60a7d

Please sign in to comment.