Skip to content

Commit

Permalink
Add EventStream class based on 'pulse' prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
lordmauve committed Apr 30, 2021
1 parent d793272 commit cae934a
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 1 deletion.
11 changes: 11 additions & 0 deletions docs/source/reference-core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,17 @@ deadlock. Using an unbounded channel avoids this, because it means
that :meth:`~trio.abc.SendChannel.send` never blocks.


Higher-level synchronization primitives
---------------------------------------

While events and channels are useful in a very wide range of
applications, some less common problems are best tackled with some
higher-level concurrency primitives that focus on a specific problem.

.. autoclass:: EventStream
:members:


Lower-level synchronization primitives
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
1 change: 1 addition & 0 deletions trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
Lock,
StrictFIFOLock,
Condition,
EventStream,
)

from ._highlevel_generic import aclose_forcefully, StapledStream
Expand Down
94 changes: 94 additions & 0 deletions trio/_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,3 +784,97 @@ def statistics(self):
return _ConditionStatistics(
tasks_waiting=len(self._lot), lock_statistics=self._lock.statistics()
)


@attr.s
class EventStream:
"""A concurrency primitive for a sequence of events.
Multiple tasks can subscribe for events on the stream using an ``async
for`` loop::
events = EventStream()
...
async for _ in events.subscribe():
...
On each loop iteration, a subcriber will be blocked if there are no new
events on the stream. An event can be "fired" on a stream, which causes
subscribers to awake::
events.fire()
By default, events are coalesced, but will never be lost. That is, if any
events are fired while a subscriber is processing its last wakeup, that
subscriber will not block on the next loop iteration.
Note that EventStream does not hold any data items associated with events.
However subscribe() does yield integer indices that indicate a position
in the event stream, which could be used. fire() returns the index of the
event added to the stream.
"""
_write_cursor = attr.ib(default=-1)
_wakeup = attr.ib(default=None)
_closed = attr.ib(default=False)

def close(self):
"""Close the stream.
This causes all subscribers to terminate once they have consumed
all events.
"""
self._closed = True
self._wake()

def _wake(self):
"""Wake blocked tasks."""
if self._wakeup is not None:
self._wakeup.set()
self._wakeup = None

def fire(self):
"""Fire an event on the stream."""
if self._closed:
raise RuntimeError(
"Cannot fire an event on a closed event stream."
)
self._write_cursor += 1
self._wake()
return self._write_cursor

async def _wait(self):
"""Wait for the next wakeup.
We lazily create the Event object to block on if one does not yet
exist; this avoids creating event objects that are never awaited.
"""
if self._wakeup is None:
self._wakeup = trio.Event()
await self._wakeup.wait()

async def subscribe(self, from_start=False, coalesce=True):
"""Subscribe for events on the stream.
If from_start is True, then subscribe for events from the start of
the stream.
If coalesce is True, then each iteration 'consumes' all previous
events; otherwise, each iteration consumes just one event.
"""
read_cursor = -1 if from_start else self._write_cursor
while True:
if self._write_cursor > read_cursor:
if coalesce:
read_cursor = self._write_cursor
else:
read_cursor += 1
yield read_cursor
else:
if self._closed:
return
else:
await self._wait()
125 changes: 124 additions & 1 deletion trio/tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from .. import _core
from .. import _timeouts
from .._timeouts import sleep_forever, move_on_after
from .._timeouts import sleep_forever, move_on_after, sleep
from .._sync import *


Expand Down Expand Up @@ -568,3 +568,126 @@ async def lock_taker():
await wait_all_tasks_blocked()
assert record == ["started"]
lock_like.release()


async def test_EventStream_basics():
p = EventStream()

wakeups = 0

async def background():
nonlocal wakeups
async for i in p.subscribe():
wakeups += 1

async with _core.open_nursery() as nursery:
nursery.start_soon(background)

# The event stream starts in a blocked state (no event fired)
await wait_all_tasks_blocked()
assert wakeups == 0

# Calling fire() lets it run:
p.fire()
await wait_all_tasks_blocked()
assert wakeups == 1

# Multiple events are coalesced into one:
p.fire()
p.fire()
p.fire()
await wait_all_tasks_blocked()
assert wakeups == 2

p.close()


async def test_EventStream_while_task_is_elsewhere(autojump_clock):
p = EventStream()

wakeups = 0

async def background():
nonlocal wakeups
async for _ in p.subscribe():
wakeups += 1
await sleep(10)

async with _core.open_nursery() as nursery:
nursery.start_soon(background)

# Double-check that it's all idle and settled waiting for a event
await sleep(5)
assert wakeups == 0
await sleep(10)
assert wakeups == 0

# Wake it up
p.fire()

# Now it's sitting in sleep()...
await sleep(5)
assert wakeups == 1

# ...when another event arrives.
p.fire()

# It still wakes up though
await sleep(10)
assert wakeups == 2

p.close()


async def test_EventStream_subscribe_independence(autojump_clock):
p = EventStream()

wakeups = [0, 0]

async def background(i, sleep_time):
nonlocal wakeups
async for _ in p.subscribe():
wakeups[i] += 1
await sleep(sleep_time)

try:
async with _core.open_nursery() as nursery:
nursery.start_soon(background, 0, 10)
nursery.start_soon(background, 1, 100)

# Initially blocked, no event fired
await sleep(200)
assert wakeups == [0, 0]

# Firing an event wakes both tasks
p.fire()
await sleep(5)
assert wakeups == [1, 1]

# Now
# task 0 is sleeping for 5 more seconds
# task 1 is sleeping for 95 more seconds

# Fire events at a 10s interval; task 0 will wake up for each
# task 1 will only wake up after its sleep
p.fire()
await sleep(10)
p.fire()
assert wakeups == [2, 1]
await sleep(100)
assert wakeups == [3, 2]

# Now task 0 is blocked on the next event
# Task 1 is sleeping for 100s

p.fire()
await sleep(1)
assert wakeups == [4, 2]
await sleep(100)
assert wakeups == [4, 3]

p.close()
except:
import traceback
traceback.print_exc()
raise

0 comments on commit cae934a

Please sign in to comment.