Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a Weak mpsc Sender variant #4023

Closed
jmagnuson opened this issue Aug 4, 2021 · 1 comment · Fixed by #4595
Closed

Implement a Weak mpsc Sender variant #4023

jmagnuson opened this issue Aug 4, 2021 · 1 comment · Fixed by #4595
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request.

Comments

@jmagnuson
Copy link

Is your feature request related to a problem? Please describe.

The tokio actor pattern described in @Darksonn's blog post is very useful and pervasive. But one use-case that I've struggled with is when the actor needs to send messages to itself. A mpsc::Sender<ActorMessage> (by way of MyActorHandle) can be cloned into the actor body, but then RAII of Sender copies can no longer be relied on as a signal to shut the actor down, since it will always hold a Handle to itself.

Describe the solution you'd like

Ideally, the actor could retain a "weak" Sender such that it does not count in RAII semantics (when the Receiver should terminate). The mpsc Sender variants could offer a ::downgrade API, similar to Arc::downgrade in stdlib.

Or, if a separate type isn't viable, perhaps [unsafe] mutation of the internal Sender count could be exposed, such as Arc::decrement_strong_count.

Describe alternatives you've considered

Currently, we roll our own actor-rx Stream which selects on both the RAII Sender, as well as a separate actor Sender:

let combined_rx = async_stream::stream! {
    loop {
        tokio::select!(
            Some(req) = &mut actor_rx.next() => {
                yield req;
            }
            maybe_req = &mut handle_rx.next() => {
                if let Some(req) = maybe_req {
                    yield req;
                } else {
                    break;
                }
            }
        )
    }
};

This works, but adds extra complexity, and we lose out on being able to use native tokio Receiver functionality within the actor.

Additional context

This feature was previously requested in #1637, but was never followed-up on by the author and was subsequently closed.

This idea was also floated for stdlib's mpsc in RFC rust-lang/rfcs#1549, but doesn't seem to have made any progress.

@jmagnuson jmagnuson added A-tokio Area: The main tokio crate C-feature-request Category: A feature request. labels Aug 4, 2021
@Darksonn
Copy link
Contributor

Darksonn commented Aug 4, 2021

It's an interesting suggestion, though it would add a fair amount of complexity to the mpsc channel.

One alternative to your custom stream is a struct like this:

use std::collections::VecDeque;
use tokio::sync::mpsc::Receiver;

pub struct InjectableReceiver<T> {
    receiver: Receiver<T>,
    inject: VecDeque<T>,
}

impl<T> InjectableReceiver<T> {
    pub fn new(receiver: Receiver<T>) -> Self {
        Self {
            receiver,
            inject: VecDeque::new(),
        }
    }
    
    pub fn add(&mut self, msg: T) {
        self.inject.push_back(msg);
    }
    
    pub async fn recv(&mut self) -> Option<T> {
        if let Some(msg) = self.inject.pop_front() {
            Some(msg)
        } else {
            self.receiver.recv().await
        }
    }
}

Note that the &mut in your select are unnecessary.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants