Skip to content
This repository has been archived by the owner on Nov 5, 2018. It is now read-only.

How to know if all the receivers has been dropped? #61

Closed
dvaerum opened this issue Jun 14, 2018 · 32 comments · Fixed by #106
Closed

How to know if all the receivers has been dropped? #61

dvaerum opened this issue Jun 14, 2018 · 32 comments · Fixed by #106
Labels

Comments

@dvaerum
Copy link

dvaerum commented Jun 14, 2018

How to know if all the receivers has been dropped? now that send function no longer returns anything.

pub fn send(&self, msg: T) {
match &self.0.flavor {
ChannelFlavor::Array(chan) => chan.send(msg),
ChannelFlavor::List(chan) => chan.send(msg),
ChannelFlavor::Zero(chan) => chan.send(msg),
}
}

@ghost
Copy link

ghost commented Jun 15, 2018

Dropping receivers doesn't close the channel anymore, but one can still relatively easily implement the old interface where they do. See this for an example:

https://github.com/crossbeam-rs/crossbeam-channel/blob/master/examples/mpsc.rs

@ghost ghost added the question label Jun 15, 2018
@Flaise
Copy link

Flaise commented Jun 27, 2018

That's not a trivial amount of work you linked to and I have this use case too - I'm writing a program with one main thread that reads from a receiver with several worker threads that send messages to it and exit if the main thread terminates and drops its receiver at any time while they're blocking on std/mpsc send operations. I'd like to use crossbeam_channel instead because it's a better library... except for this one feature that I'd have to reimplement myself. (Aborting the program when the main thread exits wouldn't be enough because my unit tests would block without this behavior.)

Since we have more than one use case for this feature, it looks to me like it would be better in this library than in call sites of this library.

@ghost
Copy link

ghost commented Jun 27, 2018

@Flaise How about the following solution?

// Spawns a thread and returns a channel that signals its termination.
fn spawn(f: impl FnOnce()) -> Receiver<()> {
    let (s, r) = channel::bounded(0);
    thread::spawn(move || {
        f();
        drop(s);
    });
    r
}

// The channel for sending messages.
let (s, r) = channel::unbounded();

// Create the main thread.
let done = spawn(move || {
    loop {
        let msg = r.recv();
        // Do something...
    }
});

// Creater worker threads.
for _ in 0..WORKER {
    let s = s.clone();
    let done = done.clone();
    spawn(move || {
        loop {
            let msg = produce_message();
            select! {
                send(s, msg) => {}
                recv(done) => break
            }
        }
    });
}

@BusyJay
Copy link

BusyJay commented Jun 29, 2018

I think this is a very common situation that needs to be detected. And if all receivers are dropped, and sender keeps sending messages, the program may OOM, which is not obvious to tell and debug.

@ghost
Copy link

ghost commented Aug 10, 2018

Duplicate issue: #55

Pinging people who might be interested: @vorner @jdm @SimonSapin @BurntSushi @glaebhoerl @danburkert @matthieu-m @coder543

Here's a suggestion on how we might solve this issue.

Several people already have expressed interest in being able to detect whether all receivers have been dropped during send.

Currently, send will just send a message into the channel without caring about dropped receivers, which might lead to deadlocks and memory leaks. This API decision was not made lightly and has faced controversy - see #39 and BurntSushi/chan#2.

If we change the behavior of send so that it returns a Result indicating whether receivers have been dropped, then it's not clear how should send(s, msg) inside select! behave, which leads us down another rabbit hole... It's a long story.

Instead, I propose we simply add a new method to Sender:

impl<T> Sender<T> {
    // Just like `send`, except it checks for dropped receivers.
    // Might block if the channel is full.
    pub fn checked_send(msg: T) -> Result<(), NoReceiversError<T>>;
}

struct NoReceiversError<T>(pub T);

How does everyone feel about this? Would that solve the problem?

Alternatively, checked_send could also return an error when the channel is full:

impl<T> Sender<T> {
    // Just like `send`, but never blocks and checks for dropped receivers.
    pub fn checked_send(msg: T) -> Result<(), CheckedSendError<T>>;
}

enum CheckedSendError<T> {
    Full(T),
    NoReceivers(T),
}

I'm personally leaning towards the first suggestion.

Bikesheddable names:

  • checked_send
  • NoReceiversError
  • NoReceivers
  • CheckedSendError

@matthieu-m
Copy link

It seems to me that checking for dropped receivers is intrinsically complicated, and there are multiple issues to resolve beyond "check on send".

First of all, there's the problem of leaked receivers. Whether they are leaked intentionally (mem::forget), accidentally (Rc/Arc cycle) or functionally (still reachable, but never polled). In either case, merely counting the number of receivers is inadequate to detect that no one is polling the queue. Potential improvements here could be (a) capping the queue or (b) introducing an activity timestamp (whether logical or actual time).

Putting aside leaked receivers, there is still no acknowledgement of processing (by default). A sender can check that a receiver exists when it enqueues an item, but cannot guarantee that a receiver will eventually process such item: there is a data-race between checking whether the receiver is still alive and the receiver's death, and it could very well die after sending was successful.

Which brings us to a third problem; even if a sender detects, upon sending, that no receiver exists any longer, all it can do is avoid enqueuing more items. What of all the items that were already queued? Should there be an API to get them back? Isn't that akin to creating a new receiver?

And of course, there is the issue mentioned that send in a select! is not amenable to returning a Result.


As usual in this case, I'd say we should listen to our MOMs (1).

The concern here is reminiscent of guaranteed delivery. In order to implement guaranteed delivery, MOMs will usually provide a feature to address this issue of queued items no longer being deliverable: a dead letter queue.

In this case, I am wondering whether a dead letter callback would be suitable. The creator of the queue would be able to register a dead letter callback of the form Fn(Item) -> () + Send + 'static, which will be used to handle items when there is no receiver, or when receivers have been inactive for too long.

The callback would be triggered on the sender thread whenever a send is called and its activation criteria is met, first dequeuing the items enqueued, and then processing the new item that send wished to send.

(1) Message-Oriented Middleware, such as MQ & co.


If we look at the 4 problems I gave above, a dead letter callback solves them all:

  1. Leaked receivers are detected by the inactivity detection mechanism, which can be used to invalidate/poison receivers before proceeding to empty the queue and divert further enqueuing.
  2. Unprocessed items have a last chance to be processed; simple default callbacks could panic or log.
  3. Unprocessed items can be returned to a sender and stored in persistent storage (best effort processing).
  4. It works with select!, at the cost of being slightly indirect.

Note: the ability of last-chance processing of queued items does not lead to guaranteed delivery, should the process crash in the middle, any in-memory item is lost. Guaranteed delivery can only be implemented with point-to-point acknowledgement between persistent stores up until the final destination (which can either store or process the item before acknowledging).


Alternatives:

  • Bounded Queues: bounded queues are great to avoid memory exhaustion, and useful to create back-pressure. They are not suited to detecting inactive/dead receivers. The problem is that by design a bounded queues must be sized for peak time, so that during down time it could take seconds to minutes to actually fill up a queue and realize something is wrong on the other end.
  • Checked Send: a checked_send method is better at detecting dead receivers, though the user must realize that there just because an item is enqueued does not mean it will be processed. The mechanism can be adapted to detect inactive receivers.
  • Others?

@vorner
Copy link

vorner commented Aug 11, 2018

My motivation for bringing the issue up was mostly for accidental subtle bugs that happen on rare cases. In other words, protection against not handling situation you don't know you should handle.

In that light, I don't think adding another sending method that is more complex and people will probably not be interested in doesn't solve my worry ‒ people who don't know about the potential problem won't look for the method and won't use it, still getting the subtle bugs.

That being said, the method does look useful for some use cases where you already do know about the possibility of lost receivers.

@BurntSushi
Copy link

Yeah, I think my position is mostly unchanged here. I do like @vorner's comment in that opportunistic deadlock detection might be possible, and that seems like a fine idea to me.

@ghost
Copy link

ghost commented Sep 17, 2018

Ok, here's another idea. Maybe we could get back to the old interface for send and recv methods, where dropping receivers closes the channel:

impl<T> Sender<T> {
    fn send(&self, msg: T) -> Result<(), SendError<T>>;
    fn try_send(&self, msg: T) -> Result<(), TrySendError<T>>;
}

impl<T> Receiver<T> {
    fn recv(&self, msg: T) -> Result<T, RecvError>;
    fn try_recv(&self, msg: T) -> Result<T, TryRecvError>;
}

Now the question is what happens to select!. I think we could have the following syntax:

let r: Receiver<T> = ...;
let rs = Vec<Receiver<T>> = ...;

let s: Sender<T> = ...;
let ss = Vec<Sender<T>> = ...;

select! {
    res = recv(r) => match res {
        Ok(msg) => println!("received {}", msg),
        Err(_) => println!("channel closed"),
    }

    (res, r) = recv(rs) => match res {
        Ok(msg) => println!("received {} from {:?}", msg, r),
        Err(_) => println!("channel {:?} is closed", r),
    }

    res = send(s, foo) => match res {
        Ok(()) => println!("sent message"),
        Err(msg) => println!("couldn't send {} because the channel is closed", msg),
    }

    (res, s) = send(ss, foo) => match res {
        Ok(()) => println!("sent message into {:?}", s),
        Err(msg) => println!("couldn't send {} into {:?} because it is closed", msg, s),
    }
}

But you don't have to use match in every case. The following would be supported, too:

select! {
    msg = recv(r) => println!("recv result: {:?}", msg),
    res = send(s, "message") => res.unwrap(),
}

This is not too verbose and seems pretty intuitive. What do you think?

@vorner
Copy link

vorner commented Sep 19, 2018

Yes, I think this looks mostly good ‒ at least to me :-).

There are probably some questions to answer, though. Let's say there are two channels I want to send into, one of them lost all of its receivers, the other one is ready to accept. Are they both considered ready?

@ghost
Copy link

ghost commented Sep 20, 2018

Let's say there are two channels I want to send into, one of them lost all of its receivers, the other one is ready to accept. Are they both considered ready?

Yes, they are both ready and a random one of those two operations will be completed.

Think about it this way: send(s, msg) is equivalent to s.send(msg), and select! will simply wait until any one of the declared function calls returns. (Of course, only one function call is actually executed.)

If you'd like to skip operations that send into closed channels, you can achieve that as follows:

let mut s1 = Some(s1);
let mut s2 = Some(s2);
// Note that `Option<Sender<T>>` implements `IntoIterator<Item = Sender<T>>`
loop {
    select! {
        (res, _) = send(s1, msg) => match res {
            Ok(()) => break,
            Err(_) => s1 = None,
        }
        (res, _) = send(s2, msg) => match res {
            Ok(()) => break,
            Err(_) => s2 = None,
        }
    }
}

This is a common idiom in Go.

@ghost ghost closed this as completed Sep 20, 2018
@ghost ghost reopened this Sep 20, 2018
@ghost
Copy link

ghost commented Sep 22, 2018

@BurntSushi What do you think, is this a solution that might make everyone happy?

@BurntSushi
Copy link

@stjepang I think the change is a bit hard for me to evaluate. Do existing uses of select! continue to work?

Popping up a level, I remain skeptical here w.r.t. to bidirectional flow. I feel like our conclusion from when we last dove into this is that unidirectional flow has a lot of value, even if it means that folks need to restructure their programs.

@ghost
Copy link

ghost commented Sep 24, 2018

@BurntSushi

Do existing uses of select! continue to work?

The breaking change is how send behaves inside select!: sending into a closed channel would be a non-blocking operation, similar to receiving from a closed channel.

select! {
    // Equivalent to: let res = Receiver::recv(r)
    res = recv(r) => {}

    // Equivalent to: let res = Sender::send(s, msg)
    res = send(s, msg) => {}
}

But select! is still the same in the sense that you write a bunch of send/recv function calls and then wait until any one of them returns.

Also, as a slight digression, I dislike the current operation(input, output) => block syntax and would prefer to go with output = operation(input) => block instead. :)

Popping up a level, I remain skeptical here w.r.t. to bidirectional flow. I feel like our conclusion from when we last dove into this is that unidirectional flow has a lot of value, even if it means that folks need to restructure their programs.

So here's what happened while we were porting Servo to crossbeam-channel. Note that Servo exclusively uses unbounded channels.

There's a test where a receiving thread panics (thus dropping the receiver) and we want to propagate the error to the sending thread and fail with a stacktrace. Unfortunately, this is not possible with crosbeam-channel anymore. The sending thread was happily chugging along and kept sending stuff into the channel, thus leaking memory indefinitely.

@jdm was rightfully unhappy with the current situation so they decided to wrap crossbeam-channel into a custom channel API that detects dropped receivers.

This made me very confused for a while. Something was wrong about unidirectionality in crossbeam-channel, and I couldn't exactly pinpoint what and why. But then it dawned on me.

In Go, unbounded channels are a highly discouraged antipattern. If Servo was written in Go and used bounded channels, a panicked receiver thread would result in the sender thread filling up the channel and resulting in a deadlock. The Go runtime would detect the deadlock and print a nice stacktrace. Memory wouldn't be leaked indefinitely.

In Rust, we prefer unbounded channels for some reason. Our solution for panicked receiver threads is forbidding sending messages into channels without receivers. And we don't have a runtime that detects deadlocks either.

If you forbid unbounded channels and have a runtime that detects deadlocks, then unidirectionaly works wonderfully. For that reason, I think it's the right choice for Go. But if you allow unbounded channels or can't detect deadlocks, then you absolutely do need a different mechanism for detecting memory leaks. It's also important to note that Rust has destructors while Go doesn't (channels must be closed explicitly). Different languages, different idioms.

@vorner
Copy link

vorner commented Sep 24, 2018

Just as a bike-shedding point. That output = operation(input) has a bit unnatural feel to me. As alternatives:

  • Prefix with let or if let, something like if let res = recv(r) { block }
  • Put the result on right side, something like recv(r) -> res { block }

@BurntSushi
Copy link

In Go, unbounded channels are a highly discouraged antipattern. If Servo was written in Go and used bounded channels, a panicked receiver thread would result in the sender thread filling up the channel and resulting in a deadlock. The Go runtime would detect the deadlock and print a nice stacktrace. Memory wouldn't be leaked indefinitely.

In Rust, we prefer unbounded channels for some reason. Our solution for panicked receiver threads is forbidding sending messages into channels without receivers. And we don't have a runtime that detects deadlocks either.

If you forbid unbounded channels and have a runtime that detects deadlocks, then unidirectionaly works wonderfully. For that reason, I think it's the right choice for Go. But if you allow unbounded channels or can't detect deadlocks, then you absolutely do need a different mechanism for detecting memory leaks. It's also important to note that Rust has destructors while Go doesn't (channels must be closed explicitly). Different languages, different idioms.

This is an insightful observation! Thanks for pointing it out!

I agree that the failure modes of unbounded channels aren't great here. However, I'd like to push back just a bit. In particular, can we nudge folks to use bounded channels instead? That is, instead of improving unbounded channels at the cost of more API complexity for all channels, we could instead declare that poor failure modes are a property of unbounded channels and in order to get better failure modes, one should use bounded channels instead.

@ghost
Copy link

ghost commented Sep 25, 2018

In particular, can we nudge folks to use bounded channels instead? That is, instead of improving unbounded channels at the cost of more API complexity for all channels, we could instead declare that poor failure modes are a property of unbounded channels and in order to get better failure modes, one should use bounded channels instead.

I personally lean towards the "don't use unbounded channels" camp, but the issue is far from being clear-cut. There was a big debate on bounded vs unbounded channels in Servo's mailing list and they ultimately decided to go with unbounded channels.

However, there seems to be a consistent pattern:

  • Actor-based systems tend to use unbounded channels (Erlang, Pony, Kotlin, Akka, Actix).
  • CSP-based systems tend to use bounded channels (Go, Crystal, Clojure, Tokio).

Servo's constellation is more reminiscent of actors and, unsurprisingly, uses unbounded channels.

Even though crossbeam-channel is quite versatile, I can see it being used in actor-like systems more than in CSP-like systems. Another telling clue is that std::sync::mpsc has constructors channel() and sync_channel(), while futures-channel has channel() and unbounded().

It's interesting to note that even though actor-based systems could benefit from bounded channels (backpressure), they still choose not to (at least not by default). I'm not an expert in the field and can't explain why exactly, but there must be a deeper reason for it.

@Bathtor
Copy link

Bathtor commented Sep 25, 2018

Even though crossbeam-channel is quite versatile, I can see it being used in actor-like systems more than in CSP-like systems.

I'm doing exactly that in Kompact.

It's interesting to note that even though actor-based systems could benefit from bounded channels (backpressure), they still choose not to (at least not by default). I'm not an expert in the field and can't explain why exactly, but there must be a deeper reason for it.

Because a proper actor systems (unlike Actix) have ad-hoc connection semantics, where references can be passed around arbitrarily like any data and everyone can send to someone they hold a reference to. Typically, Actor systems are meant to be highly dynamic in structure, with actors constantly being created and dropped, forming new connection topologies based on their internal semantics. Since keeping track of references in a system like this is essentially impossible, preventing deadlocks in a backpressured system would be a major headache.

That is why Akka has their Akka Streams system now, where you can do properly typed and backpressured executions, but you are essentially building dataflow graphs explicitly, so that you can see what things are connected and thus (hopefully) prevent deadlocks.

@arthurprs
Copy link

arthurprs commented Sep 27, 2018

I am happy to see this being considered again. There's tangible benefits providing such API.

AFAIK there's no feasible way of providing a completely foolproof detection of absent receivers, at least not without runtime support. And that's ok.

I am super late to the discussion but I could parse some concerns, but mostly arguable/opinionated (IMHO).

So, are there significant drawbacks providing this?

@ghost
Copy link

ghost commented Sep 27, 2018

I am super late to the discussion but I could parse some concerns, but mostly arguable/opinionated (IMHO).

Please do!

So, are there significant drawbacks providing this?

Nothing significant. The biggest question is "how does this change interact with select!", but I think the answer is clear: send in select! fires when the message can be sent or when the channel is closed. If the channel is closed, the message we failed to send can be recovered from the result of type Result<(), SendError<T>>.

@arthurprs
Copy link

arthurprs commented Sep 27, 2018

send in select! fires when the message can be sent or when the channel is closed.

I agree. Even if users overlook that, it's not like its worse than sending messages to the void and/or running out or memory.

@BurntSushi
Copy link

So, are there significant drawbacks providing this?

Nothing significant.

I haven't been tracking the exact API changes in question here, but doesn't this mean all tx.send(value)s will in practice turn into tx.send(value).unwrap()?

@arthurprs
Copy link

arthurprs commented Sep 27, 2018

That will definitely happen in some cases, specially if the user considers consumers going away before the producers a bug.

We can bucket the use cases in 4 options

  1. send(..) (potential OOM 💀 ) [current and in the proposal in case of leaked receivers]
  2. _ = send(..)
  3. send(..).unwrap() (panic ⚠️ )
  4. match send(..) { do stuff }

I guess the proposal here is to remove 1 while enabling 2, 3 and 4.

@BurntSushi
Copy link

Again, to be clear, send(..) is only going to OOM in the case of unbounded channels. Please consider the bounded channel case in our analysis. From what I can tell, we are basically throwing out the ergonomics of the bounded channel case in favor of making failure modes with unbounded channels better. I actually don't really agree with that at all, but we should at least be clear about what we're doing. The reason why I'm not a fan of this approach is because it's not clear to me that we want to be encouraging the use of unbounded channels.

@arthurprs
Copy link

throwing out the ergonomics of the bounded channel case

I'm probably missing something. What's changing for the bounded case?

@BurntSushi
Copy link

@arthurprs tx.send(val) turns into tx.send(val).unwrap().

@arthurprs
Copy link

Ahh, I got confused for a minute.

Doesn't that only show the bound-queue side of the problem? Where you need to use select! with a timeout to protect against gone receivers. Which in turn is a lot more verbose than _ = or unwrap.

@ghost
Copy link

ghost commented Sep 27, 2018

@BurntSushi

Please consider the bounded channel case in our analysis. From what I can tell, we are basically throwing out the ergonomics of the bounded channel case in favor of making failure modes with unbounded channels better.

While bounded channels don't suffer from OOM pproblems, they are still prone to deadlocks. Currently, tx.send(msg) deadlocks if the channel is full and all receivers have been dropped. With the proposed change, tx.send(msg) wouldn't block.

So I would say this change benefits both unbounded and bounded channels. Yes, the .unwrap() means more typing, but it's not like bounded channels would be strictly worse off.

@ghost
Copy link

ghost commented Oct 1, 2018

So if y'all agree, I'd like to do a 180 turn and revert the behavior back to what we had in version 0.1: dropping all receivers closes the channel. The interface would look roughly like this:

impl<T> Sender<T> {
    fn send(&self, msg: T) -> Result<(), SendError<T>>;
    fn try_send(&self, msg: T) -> Result<(), TrySendError<T>>;
    // ...
}

impl<T> Receiver<T> {
    fn recv(&self) -> Result<T, RecvError>;
    fn try_recv(&self) -> Result<T, TryRecvError>;
    // ...
}

select! 
    recv(r) -> res => {}      // res: Result<T, RecvError>
    send(s, msg) -> res => {} // res: Result<(), SendError<T>>
    default => {}
}

@ghost
Copy link

ghost commented Oct 29, 2018

I have begun work that will change the behavior so that dropping all receivers disconnects the channel. Since this is a big breaking change, the next release will be 0.3.0.

PR: #106

@ghost ghost closed this as completed in #106 Nov 4, 2018
@ghost
Copy link

ghost commented Nov 4, 2018

After another big PR, I've published v0.3 that fixes this issue.
Docs: https://docs.rs/crossbeam-channel/0.3

Notable changes:

  • Dropping all receivers now closes the channel.
  • The overall interface is more similar to v0.1 (and std::sync::mpsc). For example, Receiver::recv() now returns a Result<T, RecvError> rather than Option<T>.
  • The Select struct for dynamic selection got a complete redesign. No more slow callbacks!
  • The syntax for select! is slightly different. See examples here.
  • The default case in select! can accept an optional timeout.
  • Most of the docs have been rewritten.

Thanks all for your insights! It might not seem much, but the progress of this crate really wouldn't be possible without your feedback! <3

Finally, I'd just like to add that this release to me feels like a huge improvement. Version 0.1 had that silly select_loop! thing and version 0.2 had an incredibly convoluted select! macro implementation. But now in version 0.3 a whole lot of ugly complexity went away and everything sort of fell into the right place. I'm very happy about the current state of the code!

@dvaerum
Copy link
Author

dvaerum commented Nov 5, 2018

Thx you for taking the feedback, it have been a pleasure to watch the back and forth 👍 Now to find time to test it in my program 😅

This issue was closed.
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Development

Successfully merging a pull request may close this issue.

8 participants