-
Notifications
You must be signed in to change notification settings - Fork 342
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Modernize a-chat #419
base: main
Are you sure you want to change the base?
Modernize a-chat #419
Conversation
Ok(()) | ||
}); | ||
|
||
select!(incoming, outgoing).await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think ideally we should either join both tasks, or cancel the one that is still running. And look like hard-cancelling would a wrong thing to do, as it could cancel a task mid-line.
The solution with one select
loop has an interesting propery that it guarantees "atomicity" of send/receive operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But, given that this is a quick&dirty CLI client, I don't think it's super important to really care here.
@@ -26,7 +24,7 @@ pub(crate) fn main() -> Result<()> { | |||
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { | |||
let listener = TcpListener::bind(addr).await?; | |||
|
|||
let (broker_sender, broker_receiver) = mpsc::unbounded(); | |||
let (broker_sender, broker_receiver) = channel(10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have never understood how one is supposed to pick buffer size properly. Maybe the correct choice here is actually 0
capacity? Queuing clients seems bad for latency, and we can't deadlock here, so not using a buffer should be ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe @stjepang can answer that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should not think about buffer capacities too much. Capacities of 1, 10, and 100 are all valid here, it doesn't really matter.
Ideally, we'd use capacity of 0 as a safe default choice (like Go does, i.e. make(chan int)
constructs a 0-capacity channel). However, the problem is async channels based on futures can't really have capacity 0.
A channel can truly have capacity of 0 only in the context of preemptible threads. With Go channels and crossbeam-channel
, send and receive operation need to pair up, at which point we flip an atomic value and both sending and receiving side agree that a message has been sent.
With futures-based channel, we can't do that. Imagine a receive operation is pending and registered in the channel. Then comes a send operation and sends a message. What happens if the receiving side then wakes up and cancels its receive operation? Did the message get through or not? Doesn't matter if your answer is "yes" or "no", we'll run into some inconsistencies either way. I guess the point is that we can only create an illusion of 0-capacity channels, but the channel will in some ways behave as if the capacity was 1.
Some(void) => match void {}, | ||
None => break, | ||
let messages = messages.map(ConnectionWriterEvent::Message); | ||
let shutdown = shutdown.map(|_| ConnectionWriterEvent::Shutdown).chain(stream::once(ConnectionWriterEvent::Shutdown)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's smart! Although I wonder if there's some simpler cooperative shutdown idiom...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I think I've conjured something up:
fn with_shutdown<S, T>(stream: S, shutdown: Receiver<Void>) -> impl Stream<Item=T>
where
S: Stream<Item=T>,
T: Unpin,
{
let items = stream.map(Some);
let shutdown = shutdown.map(|void| match void {}).chain(stream::once(None));
items.merge(shutdown).scan((), |&mut (), item| item)
}
The loop can then be written as
let mut messages = with_shutdown(messages, shutdown);
while let Some(msg) = messages.next().await {
}
Note that this also allows pushing cancellation completely out of this function and onto the call site (at the cost of making this generic over messages
stream)!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think with_shutdown
should probably take shutdown: impl Future<Item = !>
rather than a specific receiver
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@matklad I feel like the with_shutdown
method you've written here might be a bit daunting for people trying to understand everything that's going on. I don't quite understand what shutdown.map(|void| match void {})
does, though I do trust it's correct.
I feel like in the case of this example we should probably try to err on keeping code as simple as possible. Never types, and empty matches could be tricky for people to pick up on. I think keeping what is in the PR already would be easier to follow for most people.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same! I see how the "void" pattern is useful but is definitely something that is pretty unusual and gives me pause :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I've managed to make cancellation in this case rather easy, at the cost of building a 75-lines long stop_source/stop_token library: matklad@ab02901. I feel this is roughly the place where we want to end-up eventually, but I am not sure we want to do this for tutorial.
I am genuinely don't know what to do with tutorial, I have how there's no simple solution :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Met up with @skade in person today, and we had some discussions about this. I think we ended up with two conclusions:
- We probably need a counterpart to
Stream::merge
that interleaves 2 streams, but drops them once one of them is exhausted. - We should look into writing a cancellation library that works for both futures and streams. We bounced around some ideas that were based on the pattern used in this PR. There's probably some design space we could explore here.
Regarding this PR. I think focusing on making it as understandable as possible might be the right direction for now. And once we figure out a more formal strategy for cancellation we can apply that in a follow-up PR.
How does that sound?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM, I’ve sort-of reaches similar conclusions, specifically:
-
merge shortest might be an interesting primitive, thought, in this particular case, it might be enough if merge drops a substrram that is exhausted (see Chain iterator adaptor shold drop exhausted subiterator rust-lang/rust#66031 for minimizes example)
-
cancellation should be in a library, and, as design space is large, it should be outside of async std at least for start. I believe last 75 lines of matklad@ab02901 might be a good start actually, should I just publish that to crates.io?
-
for this PR, I wonder if we should rip out cancellation completely?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for this PR, I wonder if we should rip out cancellation completely?
That's definitely an option I think. It's a very cool system to show off though, so I feel like it's a bit of a loss. But given the circumstance maybe it's for the best.
Also yeah having it on crates.io would be neat; big fan of publishing things!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yoshuawuyts published https://github.com/async-rs/stop-token. You are also an owner on crates-io (I was surprised that there's no async-rs-publishes group to which I can grant permissions). There's a high change that I won't be able to properly maintain this crate, but docs are very upfront that it is an experiment, and it's also in a finished state already, so there's hopefully little to maintain (well, until folks ask for linked tokens and cancellation callbacks...)
This has to be considered a draft.
The 2 major changes are:
The stream construction is still a bit ugly, though, especially in the client handling code of the server.