Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel #101

Merged
merged 1 commit into from
Oct 16, 2020
Merged

Channel #101

merged 1 commit into from
Oct 16, 2020

Conversation

glommer
Copy link
Collaborator

@glommer glommer commented Oct 9, 2020

Depends on #100.

Will merge that first.

@glommer
Copy link
Collaborator Author

glommer commented Oct 9, 2020 via email

@matklad
Copy link
Contributor

matklad commented Oct 11, 2020

To be clear, I do agree with the reasoning you made but a case like this is
a case of essentially like picking a politician for office.
Deep down you know they are all bad, you just have to choose which aligns
more with the set of issues that you care about.

+100500, I don't really think it's productive to debate this, I just wanted to give some extra context here. On thing I want to clarify though, is that the alternative proposal is panicking on send if the receiver is gone. I wouldn't call this silent; on the contrary, it elevates recoverable Result to non-recoverable, "this is a bug" panic.

This is part of the reason I don't offer a condvar in Scipio, btw. I
essentially haven't figured out a way to make it immune to that without
essentially making it into a gate

Not sure if this fully covers all the footguns, but I quite like how std does notification's API. Condvar's wait consumes a mutex guard, which prevents some logical races.

Thread parking API just makes unpark(); park() sequence work in any relative order, making a sort-of level-triggered API.

@glommer
Copy link
Collaborator Author

glommer commented Oct 13, 2020

Hey, not all discussions have to be productive! For some we can settle on fun =)

Panic vs requiring unwrap to me falls into the idea of making Scipio less opinionated whenever possible. I just gave an example of usage where you'd prefer to panic but it is equally easy to imagine others where missing the notification is okay.

@glommer glommer force-pushed the channel branch 2 times, most recently from 91d5e15 to 55c2455 Compare October 14, 2020 01:22
@glommer
Copy link
Collaborator Author

glommer commented Oct 14, 2020

I just added a second commit implementing most suggestions from @matklad to make review easier. I will stash it into a single commit once we're all in agreement

In particular:

  • the original item is now returned on Error. I still am using the io::Error so signal reason for error by pairing both into a helper Error struct.
  • method names are changed and now we have try_send and send

As we have discussed:

  • we'll keep the need to .unwrap() to generate a panic.
  • it is best indeed to release the RefCell before waking wakers.

send_waiters: Vec::new(),
recv_waiters: Vec::new(),
receiver_alive: true,
sender_alive: true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it makes sense to pair waiters: Vec and alive: bool into a single waiters: Option<Vec>?

That way, type system guarantees that you can‘t add waiter if the opposite side is closed.

@matklad
Copy link
Contributor

matklad commented Oct 14, 2020 via email

@Daniel-B-Smith
Copy link
Contributor

My only drive-by comment is that it would be nice to have a len() method on the receiver accessible for stats. When monitoring a data pipeline in a process, I've used the channel len() method to monitor where our bottlenecks are coming from.

Though, I would also be more than open to alternative monitoring strategies.

@glommer
Copy link
Collaborator Author

glommer commented Oct 15, 2020

I have pushed a new version that incorporates both of your suggestions (@matklad and @Daniel-B-Smith)

Thank you both.

However for tests I still can't use futures_lite::StreamExt. It simply won't compile no matter what I do.
Because the vast majority of futures use is in tests, I am considering as a temporary measure removing futures from our dependencies but leaving it in dev-dependencies so we are at least not forced into conversion hell right now.

But @matklad if you could advise what would be needed to get those tests working with futures-lite, that I would be quite enlightening

@glommer
Copy link
Collaborator Author

glommer commented Oct 15, 2020

I managed to convert most tests to futures_lite by using fold instead of for_each, which is better for this anyway.
The test using for_each_concurrent has to stay on futures but I guess that is fine, as we want to make sure that those keep working too as users may use it.

Now, the problem lies on producer_early_drop_receiver, which uses take. (currently commented out). I just can't get that to work and the error messages make no sense to me.

@glommer
Copy link
Collaborator Author

glommer commented Oct 15, 2020

... which must be some old bug! It works if I update futures_lite. Brilliant

@glommer
Copy link
Collaborator Author

glommer commented Oct 15, 2020

Everything is using futures-lite now with the exception of the one test in which we test that for_each_concurrent works.

For this to compile, though ,we need PR #115 merged first as it bumps the version of futures-lite (while removing futures from Cargo)

Channels are useful abstraction when data needs to be passed between
asynchronous entities.

I have just came across a use case where channels would provide a much
more ergonomic way to code than the Deque, so here's our first!

The LocalChannel is an executor-local channel (meaning !Send, !Sync)
that is useful to pass data between task queues.

The use case for this is an internal service that needs to pick up
work to do serially. Imagine for example a flush service that wants
to flush one file at a time.

The flushers will live in a separate task queue. The entities generating
the work now have to register it into the flusher's task queue.

Using a Deque is possible but you now have to wrap it under an Rc
and code a loop-like construct (and hey, if that's what float your
boat, have at it!)

Using the LocalChannel, however, we can write this:

  let (sender, receiver) = LocalChannel::new_unbounded();

  Local::local_into(async move {
    receiver.for_each(|x| { do_something(x) }).await;
  }, tq).unwrap().detach();

  sender.send(...);
@glommer
Copy link
Collaborator Author

glommer commented Oct 16, 2020

pushed a new version that should fix all comments here, plus the ones raised in #111 (which included a copy of this, and some people commented there)

  • new variants now at the top level
  • and as a result LocalChannel is no longer public
  • only wake up one waker upon push or pop to avoid quadratic wake storms

Copy link
Contributor

@matklad matklad left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Couple of "what if" alternatives:

@glommer
Copy link
Collaborator Author

glommer commented Oct 16, 2020

The Deque can go.

A bit of history, In our Scipio-based internal application I was using it to implement a write-behind / read-ahead mechanism similar to what we now have in the StreamReader and StreamWriter controlling in-flight buffers. When the code matured and I moved it to Scipio using the standard Future trait made more sense and we implemented poll. Therefore I didn't find much use for the pop_front().await pattern of the Deque.

It is still not the same as the channel, because technically you could implement urgency into the Deque by pushing to the front but we can always enhance the channel when it comes to it. I don't mind seeing it go.

Speaking of enhancing the channel, I am considering implementing a bidirectional channel too (should be easy with just two channels playing the role of both lanes). If you look at the controller code I am doing that manually which may mean there is room for such abstraction. Thoughts?

About the code organization: as I said before already while our community is small we can have those discussions but I do hope it grows! When it does, programmers tend to naturally bite each other too much about style and I always hated that. So I found it very refreshing that Rust has clippy and rustfmt which allowed me to essentially come with the the "if they don't complain, everything goes" policy. That means I will certainly not oppose it if you do it this way but I'd be wary of enforcing such a rule.

Now in my personal opinion, it does sound like a good rule. I am wondering if we couldn't get Clippy to shout about it ?

@glommer glommer merged commit 8102245 into DataDog:master Oct 16, 2020
@glommer glommer deleted the channel branch October 16, 2020 12:57
@matklad
Copy link
Contributor

matklad commented Oct 16, 2020

but I'd be wary of enforcing such a rule.

Agree, the only rule we should enforce is "CI is green". If some guideline can't be automatically checked, it's futile to enforce it. Though, having non-enforceable/non-enforced guidelines is still useful to steer new code and refactorings in the right direction.

I am wondering if we couldn't get Clippy to shout about it ?

I don't think so. Well, in theory we can contribute a lint for that, but I wouldn't want to use this -- this is a pretty nuanced guideline, and I fear would have a fair amount of false positives if enforced by a dumb robot at the current level of AI :)

@glommer
Copy link
Collaborator Author

glommer commented Oct 16, 2020

Hey, robots have feelings too.
Calling them dumb is how you get Skynet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants