Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream contexts #206

Merged
merged 14 commits into from
May 4, 2021
5 changes: 3 additions & 2 deletions examples/asynchronous_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ async def main():

# this async for loop streams values from the above
# async generator running in a separate process
async for letter in await portal.run(stream_forever):
print(letter)
async with portal.open_stream_from(stream_forever) as stream:
async for letter in stream:
print(letter)

# we support trio's cancellation system
assert cancel_scope.cancelled_caught
Expand Down
4 changes: 2 additions & 2 deletions examples/debugging/multi_daemon_subactors.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ async def main():
p1 = await n.start_actor('name_error', enable_modules=[__name__])

# retreive results
stream = await p0.run(breakpoint_forever)
await p1.run(name_error)
async with p0.open_stream_from(breakpoint_forever) as stream:
await p1.run(name_error)


if __name__ == '__main__':
Expand Down
2 changes: 1 addition & 1 deletion examples/debugging/subactor_breakpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ async def main():


if __name__ == '__main__':
tractor.run(main, debug_mode=True)
tractor.run(main, debug_mode=True, loglevel='debug')
35 changes: 22 additions & 13 deletions examples/full_fledged_streaming_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@ async def aggregate(seed):
# fork point
portal = await nursery.start_actor(
name=f'streamer_{i}',
rpc_module_paths=[__name__],
enable_modules=[__name__],
)

portals.append(portal)

send_chan, recv_chan = trio.open_memory_channel(500)

async def push_to_chan(portal, send_chan):

# TODO: https://github.com/goodboy/tractor/issues/207
async with send_chan:
async for value in await portal.run(stream_data, seed=seed):
# leverage trio's built-in backpressure
await send_chan.send(value)
async with portal.open_stream_from(stream_data, seed=seed) as stream:
async for value in stream:
# leverage trio's built-in backpressure
await send_chan.send(value)

print(f"FINISHED ITERATING {portal.channel.uid}")

Expand Down Expand Up @@ -71,18 +74,24 @@ async def main():
import time
pre_start = time.time()

portal = await nursery.run_in_actor(
aggregate,
portal = await nursery.start_actor(
name='aggregator',
seed=seed,
enable_modules=[__name__],
)

start = time.time()
# the portal call returns exactly what you'd expect
# as if the remote "aggregate" function was called locally
result_stream = []
async for value in await portal.result():
result_stream.append(value)
async with portal.open_stream_from(
aggregate,
seed=seed,
) as stream:

start = time.time()
# the portal call returns exactly what you'd expect
# as if the remote "aggregate" function was called locally
result_stream = []
async for value in stream:
result_stream.append(value)

await portal.cancel_actor()

print(f"STREAM TIME = {time.time() - start}")
print(f"STREAM + SPAWN TIME = {time.time() - pre_start}")
Expand Down
11 changes: 6 additions & 5 deletions examples/multiple_streams_one_portal.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ async def stream_data(seed=10):

async def stream_from_portal(p, consumed):

async for item in await p.run(stream_data):
if item in consumed:
consumed.remove(item)
else:
consumed.append(item)
async with p.open_stream_from(stream_data) as stream:
async for item in stream:
if item in consumed:
consumed.remove(item)
else:
consumed.append(item)


async def main():
Expand Down
16 changes: 9 additions & 7 deletions tests/test_cancellation.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ def test_remote_error(arb_addr, args_err):
async def main():
async with tractor.open_nursery() as nursery:

portal = await nursery.run_in_actor(assert_err, name='errorer', **args)
portal = await nursery.run_in_actor(
assert_err, name='errorer', **args
)

# get result(s) from main task
try:
Expand Down Expand Up @@ -168,13 +170,14 @@ async def test_cancel_infinite_streamer(start_method):
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'donny',
rpc_module_paths=[__name__],
enable_modules=[__name__],
)

# this async for loop streams values from the above
# async generator running in a separate process
async for letter in await portal.run(stream_forever):
print(letter)
async with portal.open_stream_from(stream_forever) as stream:
async for letter in stream:
print(letter)

# we support trio's cancellation system
assert cancel_scope.cancelled_caught
Expand Down Expand Up @@ -430,15 +433,14 @@ async def main():
tractor.run(main)



async def spin_for(period=3):
"Sync sleep."
time.sleep(period)


async def spawn():
async with tractor.open_nursery() as tn:
portal = await tn.run_in_actor(
await tn.run_in_actor(
spin_for,
name='sleeper',
)
Expand All @@ -460,7 +462,7 @@ def test_cancel_while_childs_child_in_sync_sleep(
async def main():
with trio.fail_after(2):
async with tractor.open_nursery() as tn:
portal = await tn.run_in_actor(
await tn.run_in_actor(
spawn,
name='spawn',
)
Expand Down
99 changes: 52 additions & 47 deletions tests/test_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ async def cancel(use_signal, delay=0):


async def stream_from(portal):
async for value in await portal.result():
print(value)
async with portal.open_stream_from(stream_forever) as stream:
async for value in stream:
print(value)


async def spawn_and_check_registry(
Expand Down Expand Up @@ -139,18 +140,20 @@ async def get_reg():
registry = await get_reg()
assert actor.uid in registry

if with_streaming:
to_run = stream_forever
else:
to_run = trio.sleep_forever
try:
async with tractor.open_nursery() as n:
async with trio.open_nursery() as trion:

async with trio.open_nursery() as trion:
try:
async with tractor.open_nursery() as n:
portals = {}
for i in range(3):
name = f'a{i}'
portals[name] = await n.run_in_actor(to_run, name=name)
if with_streaming:
portals[name] = await n.start_actor(
name=name, enable_modules=[__name__])

else: # no streaming
portals[name] = await n.run_in_actor(
trio.sleep_forever, name=name)

# wait on last actor to come up
async with tractor.wait_for_actor(name):
Expand All @@ -171,19 +174,19 @@ async def get_reg():
trion.start_soon(cancel, use_signal, 1)

last_p = pts[-1]
async for value in await last_p.result():
print(value)
await stream_from(last_p)

else:
await cancel(use_signal)

finally:
with trio.CancelScope(shield=True):
await trio.sleep(0.5)
finally:
with trio.CancelScope(shield=True):
await trio.sleep(0.5)

# all subactors should have de-registered
registry = await get_reg()
assert len(registry) == extra
assert actor.uid in registry
# all subactors should have de-registered
registry = await get_reg()
assert len(registry) == extra
assert actor.uid in registry


@pytest.mark.parametrize('use_signal', [False, True])
Expand Down Expand Up @@ -260,36 +263,38 @@ async def close_chans_before_nursery(
get_reg = partial(aportal.run_from_ns, 'self', 'get_registry')

async with tractor.open_nursery() as tn:
portal1 = await tn.run_in_actor(
stream_forever,
name='consumer1',
)
agen1 = await portal1.result()

portal2 = await tn.start_actor('consumer2', rpc_module_paths=[__name__])
agen2 = await portal2.run(stream_forever)

async with trio.open_nursery() as n:
n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5)
try:
await streamer(agen2)
finally:
# Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()

# XXX: THIS IS THE KEY THING that happens
# **before** exiting the actor nursery block

# also kill off channels cuz why not
await agen1.aclose()
await agen2.aclose()
portal1 = await tn.start_actor(
name='consumer1', enable_modules=[__name__])
portal2 = await tn.start_actor(
'consumer2', enable_modules=[__name__])

# TODO: compact this back as was in last commit once
# 3.9+, see https://github.com/goodboy/tractor/issues/207
async with portal1.open_stream_from(stream_forever) as agen1:
async with portal2.open_stream_from(
stream_forever
) as agen2:
async with trio.open_nursery() as n:
n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5)
try:
await streamer(agen2)
finally:
# Kill the root nursery thus resulting in
# normal arbiter channel ops to fail during
# teardown. It doesn't seem like this is
# reliably triggered by an external SIGINT.
# tractor.current_actor()._root_nursery.cancel_scope.cancel()

# XXX: THIS IS THE KEY THING that happens
# **before** exiting the actor nursery block

# also kill off channels cuz why not
await agen1.aclose()
await agen2.aclose()
finally:
with trio.CancelScope(shield=True):
await trio.sleep(.5)
await trio.sleep(1)

# all subactors should have de-registered
registry = await get_reg()
Expand Down
Loading