Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: new internal semaphore based on intrusive lists #2325

Merged
merged 97 commits into from
Mar 23, 2020
Merged

Conversation

hawkw
Copy link
Member

@hawkw hawkw commented Mar 16, 2020

Motivation

Many of Tokio's synchronization primitives (RwLock, Mutex,
Semaphore, and the bounded MPSC channel) are based on the internal
semaphore implementation, called semaphore_ll. This semaphore type
provides a lower-level internal API for the semaphore implementation
than the public Semaphore type, and supports "batch" operations, where
waiters may acquire more than one permit at a time, and batches of
permits may be released back to the semaphore.

Currently, semaphore_ll uses an atomic singly-linked list for the
waiter queue. The linked list implementation is specific to the
semaphore. This implementation therefore requires a heap allocation for
every waiter in the queue. These allocations are owned by the semaphore,
rather than by the task awaiting permits from the semaphore. Critically,
they are only deallocated when permits are released back to the
semaphore, at which point it dequeues as many waiters from the front of
the queue as can be satisfied with the released permits. If a task
attempts to acquire permits from the semaphore and is cancelled (such as
by timing out), their waiter nodes remain in the list until they are
dequeued while releasing permits. In cases where large numbers of tasks
are cancelled while waiting for permits, this results in extremely high
memory use for the semaphore (see #2237).

Solution

@Matthias247 has proposed that Tokio adopt the approach used in his
futures-intrusive crate: using an intrusive linked list to store the
wakers of tasks waiting on a synchronization primitive. In an intrusive
list, each list node is stored as part of the entry that node
represents, rather than in a heap allocation that owns the entry.
Because futures must be pinned in order to be polled, the necessary
invariant of such a list --- that entries may not move while in the list
--- may be upheld by making the waiter node !Unpin. In this approach,
the waiter node can be stored inline in the future, rather than
requiring separate heap allocation, and cancelled futures may remove
their nodes from the list.

This branch adds a new semaphore implementation that uses the intrusive
list added to Tokio in #2210. The implementation is essentially a hybrid
of the old semaphore_ll and the semaphore used in futures-intrusive:
while a Mutex around the wait list is necessary, since the intrusive
list is not thread-safe, the permit state is stored outside of the mutex
and updated atomically.

The mutex is acquired only when accessing the wait list — if a task
can acquire sufficient permits without waiting, it does not need to
acquire the lock. When releasing permits, we iterate over the wait
list from the end of the queue until we run out of permits to release,
and split off all the nodes that received enough permits to wake up
into a separate list. Then, we can drain the new list and notify those
wakers after releasing the lock. Because the split operation only
modifies the pointers on the head node of the split-off list and the
new tail node of the old list, it is O(1) and does not require an
allocation to return a variable length number of waiters to notify.

Because of the intrusive list invariants, the API provided by the new
batch_semaphore is somewhat different than that of semaphore_ll. In
particular, the Permit type has been removed. This type was primarily
intended allow the reuse of a wait list node allocated on the heap.
Since the intrusive list means we can avoid heap-allocating waiters,
this is no longer necessary. Instead, acquiring permits is done by
polling an Acquire future returned by the Semaphore type. The use of
a future here ensures that the waiter node is always pinned while
waiting to acquire permits, and that a reference to the semaphore is
available to remove the waiter if the future is cancelled.
Unfortunately, the current implementation of the bounded MPSC requires a
poll_acquire operation, and has methods that call it while outside of
a pinned context. Therefore, I've left the old semaphore_ll
implementation in place to be used by the bounded MPSC, and updated the
Mutex, RwLock, and Semaphore APIs to use the new implementation.
Hopefully, a subsequent change can update the bounded MPSC to use the
new semaphore as well.

Signed-off-by: Eliza Weisman eliza@buoyant.io

carllerche and others added 30 commits February 2, 2020 22:45
`Notify` provides a synchronization primitive similar to thread park /
unpark, except for tasks.
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
(it's really broken)

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
@hawkw hawkw requested a review from carllerche March 23, 2020 16:43
@hawkw
Copy link
Member Author

hawkw commented Mar 23, 2020

Looks great. The only remaining issue that I see is the assert! max size set to u16::MAX.

Whoops, I missed that — should be all good now!

tokio/src/sync/batch_semaphore.rs Show resolved Hide resolved
tokio/src/sync/batch_semaphore.rs Outdated Show resolved Hide resolved
tokio/src/sync/batch_semaphore.rs Outdated Show resolved Hide resolved
tokio/src/sync/batch_semaphore.rs Outdated Show resolved Hide resolved
tokio/src/sync/batch_semaphore.rs Show resolved Hide resolved
tokio/src/sync/batch_semaphore.rs Show resolved Hide resolved
tokio/src/sync/batch_semaphore.rs Show resolved Hide resolved
hawkw added 4 commits March 23, 2020 11:19
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
hawkw added 2 commits March 23, 2020 12:21
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
@hawkw hawkw merged commit acf8a7d into master Mar 23, 2020
@cramertj
Copy link
Contributor

I'm so happy to see people taking advantage of this property of pinning, which I thought was super cool. Thanks @Matthias247 for pushing on this both in your PRs to futures-rs and your work on futures-intrusive, and @hawkw for the implementation here!

@bothra90
Copy link

bothra90 commented Apr 2, 2020

@Matthias247 : Is there a reason there's no poll-based API for tokio::sync::Semaphore? I am trying to use this in the Libra project inside a custom Stream implementation, and it would be really ergonomic to have an API like poll_acquire.

@carllerche
Copy link
Member

@bothra90 The poll based APIs forced internal allocation. By using async / await, we can avoid that allocation. It should be possible to write a shim around the new Semaphore that is similar.

hawkw added a commit that referenced this pull request Apr 3, 2020
Previously, the `Mutex::lock`, `RwLock::{read, write}`, and
`Semaphore::acquire` futures in `tokio::sync` implemented `Send + Sync`
automatically. This was by virtue of being implemented using a `poll_fn`
that only closed over `Send + Sync` types. However, this broke in
PR #2325, which rewrote those types using the new `batch_semaphore`.
Now, they await an `Acquire` future, which contains a `Waiter`, which
internally contains an `UnsafeCell`, and thus does not implement `Sync`.

Since removing previously implemented traits breaks existing code, this
inadvertantly caused a breaking change. There were tests ensuring that
the `Mutex`, `RwLock`, and `Semaphore` types themselves were `Send +
Sync`, but no tests that the _futures they return_ implemented those
traits.

I've fixed this by adding an explicit impl of `Sync` for the
`batch_semaphore::Acquire` future. Since the `Waiter` type held by this
struct is only accessed when borrowed mutably, it is safe for it to
implement `Sync`.

Additionally, I've added to the bounds checks for the effected
`tokio::sync` types to ensure that returned futures continue to
implement `Send + Sync` in the future.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
carllerche pushed a commit that referenced this pull request Apr 3, 2020
)

Previously, the `Mutex::lock`, `RwLock::{read, write}`, and
`Semaphore::acquire` futures in `tokio::sync` implemented `Send + Sync`
automatically. This was by virtue of being implemented using a `poll_fn`
that only closed over `Send + Sync` types. However, this broke in
PR #2325, which rewrote those types using the new `batch_semaphore`.
Now, they await an `Acquire` future, which contains a `Waiter`, which
internally contains an `UnsafeCell`, and thus does not implement `Sync`.

Since removing previously implemented traits breaks existing code, this
inadvertantly caused a breaking change. There were tests ensuring that
the `Mutex`, `RwLock`, and `Semaphore` types themselves were `Send +
Sync`, but no tests that the _futures they return_ implemented those
traits.

I've fixed this by adding an explicit impl of `Sync` for the
`batch_semaphore::Acquire` future. Since the `Waiter` type held by this
struct is only accessed when borrowed mutably, it is safe for it to
implement `Sync`.

Additionally, I've added to the bounds checks for the effected
`tokio::sync` types to ensure that returned futures continue to
implement `Send + Sync` in the future.
@carllerche carllerche deleted the eliza/semaphore-2 branch April 15, 2020 20:38
hawkw added a commit to linkerd/linkerd2-proxy that referenced this pull request May 4, 2020
This branch updates the `linkerd2-cache` crate (and `linkerd2-lock`,
which it relies on), to use `std::future`.

Unlike previous PRs, the `linkerd2-lock` update is a more substantial
rewrite, because upstream API changes made the previous implementation
no longer possible. In particular, `tokio::sync::Mutex` does not provide
a `poll_acquire` method the way that `tokio_sync::Lock` did in Tokio
0.1. The removal of `poll_acquire` is largely due to the rewrite of
Tokio's synchronization primitives to use an intrusive linked list-based
semaphore (tokio-rs/tokio#2325), which requires waiters to be pinned to
ensure safety of the intrusive list. Allowing permits to be acquired
only from a future ensures correct pinning of waiters.

Therefore, I've implemented a new `Lock` for Linkerd which uses a Tokio
`Mutex` internally. The `Lock` owns a boxed instance of the future returned
by `Mutex::lock_owned`, and drives it in `poll_acquire`. This allows adapting
the poll-based Tower interface with the futures-only, pinned interface.
hawkw added a commit to linkerd/linkerd2-proxy that referenced this pull request Dec 4, 2020
This branch updates `linkerd2-buffer`, and `linkerd2-proxy-discover`'s
`buffer` module to use Tokio 0.3's MPSC channel rather than Tokio 0.2's.
The rest of the proxy still uses Tokio 0.2, including the 0.2 runtime.

Most of the Tokio synchronization primitives lost their `poll`-based
interfaces in 0.3 as part of the move to intrusive lists of wakers for
synchronization primitives (see tokio-rs/tokio#2325,
tokio-rs/tokio#2509, and tokio-rs/tokio#2861). This change takes
advantage of the inherently pinned nature of `async fn` and `async`
blocks to avoid needing a separate heap allocation to store the waiter
state for a task waiting on a synchronization primitive. However, it
means that a synchronization primitive can _only_ be waited on when the
future that waits on it is pinned --- otherwise, there is a potential
dangling pointer. The `poll`-based APIs allowed waiting on
synchronization primitives from unpinned contexts, so they were removed.

To wait on the synchronization primitives from contexts that may not be
pinned, such as `poll_ready`, it's necessary to add a `Pin<Box<...>>`
around the future that's waiting on the synchronization primitive. This
ensures that the future will not move while it's part of the wait list.
It's important to note that this isn't an _additional_ allocation per
waiter versus Tokio 0.2; instead, it's the same allocation that would
have _always_ happened internally to the synchronization primitive in
the 0.2 API. Now, it's moved outside of the `tokio::sync` type so that
it can be avoided when used with `async`/`await` syntax, and added by
the user when polling the sync primitives.

Because we need to poll channel senders in `tower::Service`
implementations' `poll_ready` functions, it was necessary to introduce
our own bounded MPSC channel type that exposes a polling-based API. When
the buffer's channel is full, we want to exert backpressure in
`poll_ready`, so that callers such as load balancers could choose to
call another service rather than waiting for buffer capacity. This
branch adds a new `linkerd2-channel` crate that implements a pollable
bounded channel, wrapping `tokio::sync`'s unbounded MPSC and using a
`tokio::sync::Semaphore` to implement bounding. It's worth noting that
this is, essentially, how `tokio::sync::mpsc`'s bounded channel is
implemented --- it also uses the semaphore. However, our implementation
exposes a `poll_ready` method by boxing the future that waits to acquire
a semaphore permit, which the Tokio channel does not expose.

This was factored out of PR #732.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
hawkw added a commit to linkerd/linkerd2-proxy that referenced this pull request Dec 4, 2020
This branch updates `linkerd2-buffer`, and `linkerd2-proxy-discover`'s
`buffer` module to use Tokio 0.3's MPSC channel rather than Tokio 0.2's.
The rest of the proxy still uses Tokio 0.2, including the 0.2 runtime.

Most of the Tokio synchronization primitives lost their `poll`-based
interfaces in 0.3 as part of the move to intrusive lists of wakers for
synchronization primitives (see tokio-rs/tokio#2325,
tokio-rs/tokio#2509, and tokio-rs/tokio#2861). This change takes
advantage of the inherently pinned nature of `async fn` and `async`
blocks to avoid needing a separate heap allocation to store the waiter
state for a task waiting on a synchronization primitive. However, it
means that a synchronization primitive can _only_ be waited on when the
future that waits on it is pinned --- otherwise, there is a potential
dangling pointer. The `poll`-based APIs allowed waiting on
synchronization primitives from unpinned contexts, so they were removed.

To wait on the synchronization primitives from contexts that may not be
pinned, such as `poll_ready`, it's necessary to add a `Pin<Box<...>>`
around the future that's waiting on the synchronization primitive. This
ensures that the future will not move while it's part of the wait list.
It's important to note that this isn't an _additional_ allocation per
waiter versus Tokio 0.2; instead, it's the same allocation that would
have _always_ happened internally to the synchronization primitive in
the 0.2 API. Now, it's moved outside of the `tokio::sync` type so that
it can be avoided when used with `async`/`await` syntax, and added by
the user when polling the sync primitives.

Because we need to poll channel senders in `tower::Service`
implementations' `poll_ready` functions, it was necessary to introduce
our own bounded MPSC channel type that exposes a polling-based API. When
the buffer's channel is full, we want to exert backpressure in
`poll_ready`, so that callers such as load balancers could choose to
call another service rather than waiting for buffer capacity. This
branch adds a new `linkerd2-channel` crate that implements a pollable
bounded channel, wrapping `tokio::sync`'s unbounded MPSC and using a
`tokio::sync::Semaphore` to implement bounding. It's worth noting that
this is, essentially, how `tokio::sync::mpsc`'s bounded channel is
implemented --- it also uses the semaphore. However, our implementation
exposes a `poll_ready` method by boxing the future that waits to acquire
a semaphore permit, which the Tokio channel does not expose.

Finally, I've added some tests for the `linkerd2-channel` crate, based
on Tokio's tests for the MPSC channel, modified where the APIs differ.
This should help ensure we get similar behavior to what we expect from
Tokio's MPSCs.

This was factored out of PR #732.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
hawkw added a commit to hawkw/mycelium that referenced this pull request Aug 30, 2022
A semaphore is a useful synchronization type for things like
rate-limiting async tasks. It could also be used as a lower-level
primitive to implement things like read-write locks[^1] and
channels[^2].

This branch adds an asynchronous semaphore implementation in
`maitake::wait`. The implementation is based on the implementation I
wrote for Tokio in tokio-rs/tokio#2325, with some code simplified a bit
as it was not necessary to maintain Tokio's required API surface.

Closes #299 

[^1]: a rwlock can be modeled by a semaphore with _n_ permits (where _n_
    is the maximum number of concurrent readers); each reader must
    acquire a single permit, while a writer must acquire _n_ permits).

[^2]: a bounded MPSC channel of capacity _n_ can be implemented using a
    semaphore with _n_ permits, where each producer must acquire a
    single permit to write, and every time a message is consumed, the
    reader releases a permit to the writers.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-enhancement Category: A PR with an enhancement or bugfix.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants