Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: support mpsc send with &self #2861

Merged
merged 14 commits into from
Sep 25, 2020
Merged

sync: support mpsc send with &self #2861

merged 14 commits into from
Sep 25, 2020

Conversation

carllerche
Copy link
Member

@carllerche carllerche commented Sep 22, 2020

Waiting on #2835 to land in order to fix signal integration.

Updates the mpsc channel to use the intrusive waker based semaphore.
This enables using Sender with &self.

Instead of using Sender::poll_ready to ensure capacity and updating
the Sender state, async fn Sender::reserve() is added. This function
returns a Permit value representing the reserved capacity.

Fixes: #2637
Refs: #2718 (intrusive waiters)

Updates the mpsc channel to use the intrusive waker based sempahore.
This enables using `Sender` with `&self`.

Instead of using `Sender::poll_ready` to ensure capacity and updating
the `Sender` state, `async fn Sender::reserve()` is added. This function
returns a `Permit` value representing the reserved capacity.

Fixes: #2637
Refs: #2718 (intrusive waiters)
@carllerche carllerche requested a review from hawkw September 22, 2020 20:54
@Darksonn Darksonn added A-tokio Area: The main tokio crate C-enhancement Category: A PR with an enhancement or bugfix. M-sync Module: tokio/sync labels Sep 22, 2020
@carllerche carllerche marked this pull request as ready for review September 23, 2020 05:07
@carllerche
Copy link
Member Author

Let's hope CI passes now 👍

@jonhoo
Copy link
Contributor

jonhoo commented Sep 23, 2020

So, this removes disarm, which is sad. Does it still reserve a slot on poll_ready? If so, disarm is quite necessary to avoid deadlocks that are super hard to debug.

@carllerche
Copy link
Member Author

@jonhoo it has an equivalent via Permit. Disarm is now dropping the permit.

@hawkw
Copy link
Member

hawkw commented Sep 24, 2020

So, this removes disarm, which is sad. Does it still reserve a slot on poll_ready? If so, disarm is quite necessary to avoid deadlocks that are super hard to debug.

@jonhoo We could keep a disarm method on Permit, but it would just be

impl Permit {
    pub fn disarm(self) {
       drop(self)
    }
}

and I don't know if that's really worth having...

Copy link
Member

@hawkw hawkw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really nice, I couldn't find any real issues. the new implementation feels much simpler and more straightforward than the previous one.

it might be nice to have some loom tests for holding multiple permits and dropping them across threads, but that behavior is probably already covered by the existing loom tests for the semaphore impl...

it would also be nice to do some benchmarking to compare this with the old implementation. i imagine this has better performance characteristics but it would be cool to know for sure.

tokio/src/sync/batch_semaphore.rs Show resolved Hide resolved
@udoprog
Copy link
Contributor

udoprog commented Sep 24, 2020

Also supersedes #2041, which is great!

@carllerche carllerche merged commit cf025ba into master Sep 25, 2020
@carllerche carllerche deleted the mpsc-rework branch September 25, 2020 00:26
hawkw added a commit to linkerd/linkerd2-proxy that referenced this pull request Dec 4, 2020
This branch updates `linkerd2-buffer`, and `linkerd2-proxy-discover`'s
`buffer` module to use Tokio 0.3's MPSC channel rather than Tokio 0.2's.
The rest of the proxy still uses Tokio 0.2, including the 0.2 runtime.

Most of the Tokio synchronization primitives lost their `poll`-based
interfaces in 0.3 as part of the move to intrusive lists of wakers for
synchronization primitives (see tokio-rs/tokio#2325,
tokio-rs/tokio#2509, and tokio-rs/tokio#2861). This change takes
advantage of the inherently pinned nature of `async fn` and `async`
blocks to avoid needing a separate heap allocation to store the waiter
state for a task waiting on a synchronization primitive. However, it
means that a synchronization primitive can _only_ be waited on when the
future that waits on it is pinned --- otherwise, there is a potential
dangling pointer. The `poll`-based APIs allowed waiting on
synchronization primitives from unpinned contexts, so they were removed.

To wait on the synchronization primitives from contexts that may not be
pinned, such as `poll_ready`, it's necessary to add a `Pin<Box<...>>`
around the future that's waiting on the synchronization primitive. This
ensures that the future will not move while it's part of the wait list.
It's important to note that this isn't an _additional_ allocation per
waiter versus Tokio 0.2; instead, it's the same allocation that would
have _always_ happened internally to the synchronization primitive in
the 0.2 API. Now, it's moved outside of the `tokio::sync` type so that
it can be avoided when used with `async`/`await` syntax, and added by
the user when polling the sync primitives.

Because we need to poll channel senders in `tower::Service`
implementations' `poll_ready` functions, it was necessary to introduce
our own bounded MPSC channel type that exposes a polling-based API. When
the buffer's channel is full, we want to exert backpressure in
`poll_ready`, so that callers such as load balancers could choose to
call another service rather than waiting for buffer capacity. This
branch adds a new `linkerd2-channel` crate that implements a pollable
bounded channel, wrapping `tokio::sync`'s unbounded MPSC and using a
`tokio::sync::Semaphore` to implement bounding. It's worth noting that
this is, essentially, how `tokio::sync::mpsc`'s bounded channel is
implemented --- it also uses the semaphore. However, our implementation
exposes a `poll_ready` method by boxing the future that waits to acquire
a semaphore permit, which the Tokio channel does not expose.

This was factored out of PR #732.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
hawkw added a commit to linkerd/linkerd2-proxy that referenced this pull request Dec 4, 2020
This branch updates `linkerd2-buffer`, and `linkerd2-proxy-discover`'s
`buffer` module to use Tokio 0.3's MPSC channel rather than Tokio 0.2's.
The rest of the proxy still uses Tokio 0.2, including the 0.2 runtime.

Most of the Tokio synchronization primitives lost their `poll`-based
interfaces in 0.3 as part of the move to intrusive lists of wakers for
synchronization primitives (see tokio-rs/tokio#2325,
tokio-rs/tokio#2509, and tokio-rs/tokio#2861). This change takes
advantage of the inherently pinned nature of `async fn` and `async`
blocks to avoid needing a separate heap allocation to store the waiter
state for a task waiting on a synchronization primitive. However, it
means that a synchronization primitive can _only_ be waited on when the
future that waits on it is pinned --- otherwise, there is a potential
dangling pointer. The `poll`-based APIs allowed waiting on
synchronization primitives from unpinned contexts, so they were removed.

To wait on the synchronization primitives from contexts that may not be
pinned, such as `poll_ready`, it's necessary to add a `Pin<Box<...>>`
around the future that's waiting on the synchronization primitive. This
ensures that the future will not move while it's part of the wait list.
It's important to note that this isn't an _additional_ allocation per
waiter versus Tokio 0.2; instead, it's the same allocation that would
have _always_ happened internally to the synchronization primitive in
the 0.2 API. Now, it's moved outside of the `tokio::sync` type so that
it can be avoided when used with `async`/`await` syntax, and added by
the user when polling the sync primitives.

Because we need to poll channel senders in `tower::Service`
implementations' `poll_ready` functions, it was necessary to introduce
our own bounded MPSC channel type that exposes a polling-based API. When
the buffer's channel is full, we want to exert backpressure in
`poll_ready`, so that callers such as load balancers could choose to
call another service rather than waiting for buffer capacity. This
branch adds a new `linkerd2-channel` crate that implements a pollable
bounded channel, wrapping `tokio::sync`'s unbounded MPSC and using a
`tokio::sync::Semaphore` to implement bounding. It's worth noting that
this is, essentially, how `tokio::sync::mpsc`'s bounded channel is
implemented --- it also uses the semaphore. However, our implementation
exposes a `poll_ready` method by boxing the future that waits to acquire
a semaphore permit, which the Tokio channel does not expose.

Finally, I've added some tests for the `linkerd2-channel` crate, based
on Tokio's tests for the MPSC channel, modified where the APIs differ.
This should help ensure we get similar behavior to what we expect from
Tokio's MPSCs.

This was factored out of PR #732.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-enhancement Category: A PR with an enhancement or bugfix. M-sync Module: tokio/sync
Projects
None yet
Development

Successfully merging this pull request may close these issues.

sync: bounded mpsc memory use is proportional to max waiters
6 participants