Skip to content

Commit

Permalink
Implement capacity limitation for threads
Browse files Browse the repository at this point in the history
- New synchronization primitive: CapacityLimiter. Like a Semaphore but
  more specialized. See python-triogh-182 for rationale.

- Add limiter= argument to run_in_worker_thread, that allows one to
  correctly (modulo
  python-trio#6 (comment))
  control the number of active threads.

- Added new function current_default_worker_thread_limiter(), which
  creates or returns a run-local CapacityLimiter, and made
  run_in_worker_thread use it by default when no other limiter= is
  given.

Closes: python-triogh-10, python-triogh-57, python-triogh-156
  • Loading branch information
njsmith committed Jun 18, 2017
1 parent 6311069 commit 7d2d571
Show file tree
Hide file tree
Showing 6 changed files with 638 additions and 51 deletions.
7 changes: 7 additions & 0 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
# import sys
# sys.path.insert(0, os.path.abspath('.'))

# Warn about all references to unknown targets
nitpicky = True
# Except for these ones, which we expect to point to unknown targets:
nitpick_ignore = [
("py:obj", "CapacityLimiter-like object"),
]

# XX hack the RTD theme until
# https://github.com/rtfd/sphinx_rtd_theme/pull/382
# is shipped (should be in the release after 0.2.4)
Expand Down
5 changes: 5 additions & 0 deletions docs/source/reference-core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,9 @@ synchronization logic. All of classes discussed in this section are
implemented on top of the public APIs in :mod:`trio.hazmat`; they
don't have any special access to trio's internals.)

.. autoclass:: CapacityLimiter
:members:

.. autoclass:: Semaphore
:members:

Expand Down Expand Up @@ -1395,6 +1398,8 @@ communicate back with trio, there's the closely related

.. autofunction:: run_in_worker_thread

.. autofunction:: current_default_worker_thread_limiter

.. function:: current_run_in_trio_thread
current_await_in_trio_thread

Expand Down
262 changes: 246 additions & 16 deletions trio/_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@
from ._util import aiter_compat

__all__ = [
"Event", "Semaphore", "Lock", "StrictFIFOLock", "Condition", "Queue"]
"Event", "CapacityLimiter", "Semaphore", "Lock", "StrictFIFOLock",
"Condition", "Queue",
]

@attr.s(slots=True, repr=False, cmp=False, hash=False)
class Event:
"""A waitable boolean value useful for inter-task synchronization,
inspired by :class:`threading.Event`.
An event object manages an internal boolean flag, which is initially
False.
False, and tasks can wait for it to become True.
"""

Expand Down Expand Up @@ -81,6 +83,245 @@ async def __aexit__(self, *args):
return cls


@attr.s(frozen=True)
class _CapacityLimiterStatistics:
borrowed_tokens = attr.ib()
total_tokens = attr.ib()
borrowers = attr.ib()
tasks_waiting = attr.ib()


@async_cm
class CapacityLimiter:
"""An object for controlling access to a resource with limited capacity.
Sometimes you need to put a limit on how many tasks can do something at
the same time. For example, you might want to use some threads to run
multiple blocking I/O operations in parallel... but if you use too many
threads at once, then your system can become overloaded and it'll actually
make things slower. One popular solution is to impose a policy like "run
up to 40 threads at the same time, but no more". But how do you implement
a policy like this?
That's what :class:`CapacityLimiter` is for. You can think of a
:class:`CapacityLimiter` object as a sack that starts out holding some fixed
number of tokens::
limit = trio.CapacityLimiter(40)
Then tasks can come along and borrow a token out of the sack::
# Borrow a token:
async with limit:
# We are holding a token!
await perform_expensive_operation()
# Exiting the 'async with' block puts the token back into the sack
And crucially, if you try to borrow a token but the sack is empty, then
you have to wait for another task to finish what it's doing and put its
token back first before you can take it and continue.
Another way to think of it: a :class:`CapacityLimiter` is like a sofa with a
fixed number of seats, and if they're all taken then you have to wait for
someone to get up before you can sit down.
By default, :func:`run_in_worker_thread` uses a :class:`CapacityLimiter` to
limit the number of threads running at once; see
:func:`current_default_worker_thread_limiter` for details.
If you're familiar with semaphores, then you can think of this as a
restricted semaphore that's specialized for one common use case, with
additional error checking. For a more traditional semaphore, see
:class:`Semaphore`.
.. note::
Don't confuse this with the `"leaky bucket"
<https://en.wikipedia.org/wiki/Leaky_bucket>`__ or `"token bucket"
<https://en.wikipedia.org/wiki/Token_bucket>`__ algorithms used to
limit bandwidth usage on networks. The basic idea of using tokens to
track a resource limit is similar, but this is a very simple sack where
tokens aren't automatically created or destroyed over time; they're
just borrowed and then put back.
"""
def __init__(self, total_tokens):
self._lot = _core.ParkingLot()
self._borrowers = set()
# Maps tasks attempting to acquire -> borrower, to handle on-behalf-of
self._pending_borrowers = {}
# invoke the property setter for validation
self.total_tokens = total_tokens
assert self._total_tokens == total_tokens

def __repr__(self):
return ("<trio.CapacityLimiter at {:#x}, {}/{} with {} waiting>"
.format(id(self), len(self._borrowers), self._total_tokens, len(self._lot)))

@property
def total_tokens(self):
"""The total capacity available.
You can change :attr:`total_tokens` by assigning to this attribute. If
you make it larger, then the appropriate number of waiting tasks will
be woken immediately to take the new tokens. If you decrease
total_tokens below the number of tasks that are currently using the
resource, then all current tasks will be allowed to finish as normal,
but no new tasks will be allowed in until the total number of tasks
drops below the new total_tokens.
"""
return self._total_tokens

def _wake_waiters(self):
available = self._total_tokens - len(self._borrowers)
for woken in self._lot.unpark(count=available):
self._borrowers.add(self._pending_borrowers.pop(woken))

@total_tokens.setter
def total_tokens(self, new_total_tokens):
if not isinstance(new_total_tokens, int):
raise TypeError("total_tokens must be an int")
if new_total_tokens < 1:
raise ValueError("total_tokens must be >= 1")
self._total_tokens = new_total_tokens
self._wake_waiters()

@property
def borrowed_tokens(self):
"""The amount of capacity that's currently in use.
"""
return len(self._borrowers)

@property
def available_tokens(self):
"""The amount of capacity that's available to use.
"""
return self.total_tokens - self.borrowed_tokens

@_core.enable_ki_protection
def acquire_nowait(self):
"""Borrow a token from the sack, without blocking.
Raises:
WouldBlock: if no tokens are available.
"""
self.acquire_on_behalf_of_nowait(_core.current_task())

@_core.enable_ki_protection
def acquire_on_behalf_of_nowait(self, borrower):
"""Borrow a token from the sack on behalf of ``borrower``, without
blocking.
Args:
borrower: A :class:`Task` or arbitrary opaque object used to record
who is borrowing this token. This is used by
:func:`run_in_worker_thread` to allow threads to "hold tokens",
with the intention in the future of using it to `allow deadlock
detection and other useful things
<https://github.com/python-trio/trio/issues/182>`__
Raises:
WouldBlock: if no tokens are available.
"""
if borrower in self._borrowers:
raise RuntimeError(
"this borrower is already holding one of this "
"CapacityLimiter's tokens")
if len(self._borrowers) < self._total_tokens and not self._lot:
self._borrowers.add(borrower)
else:
raise _core.WouldBlock

@_core.enable_ki_protection
async def acquire(self):
"""Borrow a token from the sack, blocking if necessary.
"""
await self.acquire_on_behalf_of(_core.current_task())

@_core.enable_ki_protection
async def acquire_on_behalf_of(self, borrower):
"""Borrow a token from the sack on behalf of ``borrower``, blocking if
necessary.
Args:
borrower: A :class:`Task` or arbitrary opaque object used to record
who is borrowing this token; see
:meth:`acquire_on_behalf_of_nowait` for details.
"""
await _core.yield_if_cancelled()
try:
self.acquire_on_behalf_of_nowait(borrower)
except _core.WouldBlock:
task = _core.current_task()
self._pending_borrowers[task] = borrower
await self._lot.park()
except:
await _core.yield_briefly_no_cancel()
raise
else:
await _core.yield_briefly_no_cancel()

@_core.enable_ki_protection
def release(self):
"""Put a token back into the sack.
Raises:
RuntimeError: if the current task has not acquired one of this
sack's tokens.
"""
self.release_on_behalf_of(_core.current_task())

@_core.enable_ki_protection
def release_on_behalf_of(self, borrower):
"""Put a token back into the sack on behalf of ``borrower``.
Raises:
RuntimeError: if the given borrower has not acquired one of this
sack's tokens.
"""
if borrower not in self._borrowers:
raise RuntimeError(
"this borrower isn't holding any of this CapacityLimiter's "
"tokens")
self._borrowers.remove(borrower)
self._wake_waiters()

def statistics(self):
"""Return an object containing debugging information.
Currently the following fields are defined:
* ``borrowed_tokens``: The number of tokens currently borrowed from
the sack.
* ``total_tokens``: The total number of tokens in the sack. Usually
this will be larger than ``borrowed_tokens``, but it's possibly for
it to be smaller if :attr:`total_tokens` was recently decreased.
* ``borrowers``: A list of all tasks or other entities that currently
hold a token.
* ``tasks_waiting``: The number of tasks blocked on this
:class:`CapacityLimiter`\'s :meth:`acquire` or
:meth:`acquire_on_behalf_of` methods.
"""
return _CapacityLimiterStatistics(
borrowed_tokens=len(self._borrowers),
total_tokens=self._total_tokens,
# Use a list instead of a frozenset just in case we start to allow
# one borrower to hold multiple tokens in the future
borrowers=list(self._borrowers),
tasks_waiting=len(self._lot),
)


@async_cm
class Semaphore:
"""A `semaphore <https://en.wikipedia.org/wiki/Semaphore_(programming)>`__.
Expand All @@ -90,20 +331,9 @@ class Semaphore:
the value is never allowed to drop below zero. If the value is zero, then
:meth:`acquire` will block until someone calls :meth:`release`.
This is a very flexible synchronization object, but perhaps the most
common use is to represent a resource with some bounded supply. For
example, if you want to make sure that there are never more than four
tasks simultaneously performing some operation, you could do something
like::
# Allocate a shared Semaphore object, and somehow distribute it to all
# your tasks. NB: max_value=4 isn't technically necessary, but can
# help catch errors.
sem = trio.Semaphore(4, max_value=4)
# Then when you perform the operation:
async with sem:
await perform_operation()
If you're looking for a :class:`Semaphore` to limit the number of tasks
that can access some resource simultaneously, then consider using a
:class:`CapacityLimiter` instead.
This object's interface is similar to, but different from, that of
:class:`threading.Semaphore`.
Expand Down
Loading

0 comments on commit 7d2d571

Please sign in to comment.