From 76520bf0b63ae610af85f779ab55452a21e556c6 Mon Sep 17 00:00:00 2001 From: Luke Wagner Date: Tue, 5 Nov 2024 12:53:33 -0600 Subject: [PATCH] CABI: remove the DONE state from the async ABI --- design/mvp/Async.md | 37 +++++-- design/mvp/CanonicalABI.md | 141 +++++++++++++----------- design/mvp/FutureFeatures.md | 31 +++--- design/mvp/canonical-abi/definitions.py | 65 ++++++----- design/mvp/canonical-abi/run_tests.py | 70 ++++-------- 5 files changed, 178 insertions(+), 166 deletions(-) diff --git a/design/mvp/Async.md b/design/mvp/Async.md index 6aece5de..ac73ca7d 100644 --- a/design/mvp/Async.md +++ b/design/mvp/Async.md @@ -197,17 +197,34 @@ state that enforces the caller side of the Canonical ABI rules. To realize the above goals of always having a well-defined cross-component async callstack, the Component Model's Canonical ABI enforces [Structured Concurrency] by dynamically requiring that a task waits for all its subtasks to -finish before the task itself is allowed to finish. This means that a subtask +return before the task itself is allowed to finish. This means that a subtask cannot be orphaned and there will always be an async callstack rooted at an invocation of an export by the host. Moreover, at any one point in time, the set of tasks active in a linked component graph form a forest of async call trees which e.g., can be visualized using a traditional flamegraph. The Canonical ABI's Python code enforces Structured Concurrency by incrementing -a per-[`Task`] counter when a `Subtask` is created, decrementing when a -`Subtask` is destroyed, and trapping if the counter is not zero when the `Task` +a per-task "`num_subtasks`" counter when a subtask is created, decrementing +when the subtask returns, and trapping if `num_subtasks > 0` when a task attempts to exit. +There is a subtle nuance to these Structured Concurrency rules deriving from +the fact that subtasks may continue execution after [returning](#returning) +their value to their caller. The ability to execute after returning value is +necessary for being able to do work off the caller's critical path. A concrete +example is an HTTP service that does some logging or billing operations after +finishing an HTTP response, where the HTTP response is the return value of the +[`wasi:http/handler.handle`] function. Since the `num_subtasks` counter is +decremented when a subtask *returns* (as opposed to *exits*), this means that +subtasks may continue execution even once their supertask has exited. To +maintain Structured Concurrency (for purposes of checking [reentrance], +scheduler prioritization and debugging/observability), we can consider +the supertask to still be alive but in the process of "asynchronously +tail-calling" its still-executing subtasks. (For scenarios where one +component wants to non-cooperatively bound the execution of another +component, a separate "[blast zone]" feature is necessary in any +case.) + ### Streams and Futures Streams and Futures have two "ends": a *readable end* and *writable end*. When @@ -391,9 +408,9 @@ parameter and results (which are asynchronously read-from and written-to) and returns the index of a new subtask. `summarize` calls `task.wait` repeatedly until all `fetch` subtasks have finished, noting that `task.wait` can return intermediate progress (as subtasks transition from "starting" to "started" to -"returned" to "done") which tell the surrounding core wasm code that it can -reclaim the memory passed arguments or use the results that have now been -written to the outparam memory. +"returned") which tell the surrounding core wasm code that it can reclaim the +memory passed arguments or use the results that have now been written to the +outparam memory. Because the `summarize` function is `canon lift`ed with `async`, its core function type has no results, since results are passed out via `task.return`. @@ -528,8 +545,9 @@ comes after: that the current wasm instance can be torn down eagerly while preserving structured concurrency * some way to say "no more elements are coming for a while" -* `recursive` function type attribute: allow a function to be reentered - recursively (instead of trapping) and link inner and outer activations +* `recursive` function type attribute: allow a function to opt in to + recursive [reentrance], extending the ABI to link the inner and + outer activations * add `stringstream` specialization of `stream` (just like `string` is a specialization of `list`) * allow pipelining multiple `stream.read`/`write` calls @@ -571,9 +589,12 @@ comes after: [WIT]: WIT.md [Goals]: ../high-level/Goals.md [Use Cases]: ../high-level/UseCases.md +[Blast Zone]: FutureFeatures.md#blast-zones +[Reentrance]: Explainer.md#component-invariants [stack-switching]: https://github.com/WebAssembly/stack-switching/ [JSPI]: https://github.com/WebAssembly/js-promise-integration/ [shared-everything-threads]: https://github.com/webAssembly/shared-everything-threads [WASI Preview 3]: https://github.com/WebAssembly/WASI/tree/main/wasip2#looking-forward-to-preview-3 +[`wasi:http/handler.handle`]: https://github.com/WebAssembly/wasi-http/blob/main/wit-0.3.0-draft/handler.wit diff --git a/design/mvp/CanonicalABI.md b/design/mvp/CanonicalABI.md index f0de4f37..74ed8e33 100644 --- a/design/mvp/CanonicalABI.md +++ b/design/mvp/CanonicalABI.md @@ -324,26 +324,24 @@ class CallState(IntEnum): STARTING = 0 STARTED = 1 RETURNED = 2 - DONE = 3 class EventCode(IntEnum): CALL_STARTING = CallState.STARTING CALL_STARTED = CallState.STARTED CALL_RETURNED = CallState.RETURNED - CALL_DONE = CallState.DONE - YIELDED = 4 - STREAM_READ = 5 - STREAM_WRITE = 6 - FUTURE_READ = 7 - FUTURE_WRITE = 8 + YIELDED = 3 + STREAM_READ = 4 + STREAM_WRITE = 5 + FUTURE_READ = 6 + FUTURE_WRITE = 7 EventTuple = tuple[EventCode, int, int] EventCallback = Callable[[], Optional[EventTuple]] OnBlockCallback = Callable[[Awaitable], Awaitable] ``` The `CallState` enum describes the linear sequence of states that an async call -necessarily transitions through: [`STARTING`](Async.md#backpressure), `STARTED`, -[`RETURNING`](Async.md#returning) and `DONE`. The `EventCode` enum shares +necessarily transitions through: [`STARTING`](Async.md#backpressure), `STARTED` +and [`RETURNED`](Async.md#returning). The `EventCode` enum shares common code values with `CallState` to define the set of integer event codes that are delivered to [waiting](Async.md#waiting) or polling tasks. @@ -438,7 +436,8 @@ class Task: on_block: OnBlockCallback events: list[EventCallback] has_events: asyncio.Event - todo: int + num_subtasks: int + num_borrows: int def __init__(self, opts, inst, ft, caller, on_return, on_block): self.opts = opts @@ -449,7 +448,8 @@ class Task: self.on_block = on_block self.events = [] self.has_events = asyncio.Event() - self.todo = 0 + self.num_subtasks = 0 + self.num_borrows = 0 def task(self): return self @@ -633,10 +633,16 @@ The `return_` method is called by either `canon_task_return` or `canon_lift` callback instead of simply returning the values from `canon_lift` enables the callee to keep executing after returning its results. However, it does introduce a dynamic error condition if `canon task.return` is called less or -more than once which must be checked by `return_` and `exit`. +more than once which must be checked by `return_` and `exit`. Since ownership +of borrowed handles is returned to the caller when the caller is notified that +the `Subtask` is in the `RETURNED` state, `num_borrows` is guarded to be `0`. +Since tasks can keep executing arbitrary code after calling `task.return`, this +means that borrowed handles must be dropped potentially much earlier than the +observable end of the task. ```python def return_(self, flat_results): trap_if(not self.on_return) + trap_if(self.num_borrows > 0) if self.opts.sync and not self.opts.always_task_return: maxflat = MAX_FLAT_RESULTS else: @@ -653,15 +659,16 @@ async or sychronous-using-`always-task-return` call, in which return values are passed as parameters to `canon task.return`. Lastly, when a task exits, the runtime enforces the guard conditions mentioned -above and allows a pending task to start. The `todo` counter is used below to -record the number of unmet obligations to drop borrowed handles, subtasks, -streams and futures. +above and allows a pending task to start. The `num_subtasks` counter is used +below to record the number of unmet obligations to drop subtasks of the current +(super)task. ```python def exit(self): assert(current_task.locked()) - trap_if(self.todo) + trap_if(self.num_subtasks > 0) assert(not self.maybe_next_event()) trap_if(self.on_return) + assert(self.num_borrows == 0) trap_if(self.inst.num_tasks == 1 and self.inst.backpressure) self.inst.num_tasks -= 1 assert(self.inst.num_tasks >= 0) @@ -681,12 +688,14 @@ class Subtask: state: CallState lenders: list[ResourceHandle] enqueued: bool + finished: bool def __init__(self, task): self.supertask = task self.state = CallState.STARTING self.lenders = [] self.enqueued = False + self.finished = False def task(self): return self.supertask @@ -696,37 +705,36 @@ get the `Subtask`'s `supertask` or, in the case of a `Task`, itself. The `lenders` field of `Subtask` maintains a list of all the owned handles that have been lent to a subtask and must therefor not be dropped until the -subtask completes. The `add_lender` method is called (below) when lifting an +subtask returns. The `add_lender` method is called (below) when lifting an owned handle and increments the `lend_count` of the owned handle, which is guarded to be zero by `canon_resource_drop` (below). The `finish` method releases all the `lend_count`s of all such handles lifted for the subtask and -is called (below) when the subtask is finishes. +is called (below) when the subtask returns. ```python def add_lender(self, lending_handle): - assert(self.state != CallState.DONE) + assert(not self.finished and self.state != CallState.RETURNED) assert(lending_handle.own) lending_handle.lend_count += 1 self.lenders.append(lending_handle) def finish(self): - assert(self.state == CallState.RETURNED) + assert(not self.finished and self.state == CallState.RETURNED) for h in self.lenders: h.lend_count -= 1 - self.state = CallState.DONE + self.finished = True ``` Note, the `lenders` list usually has a fixed size (in all cases except when a function signature has `borrow`s in `list`s or `stream`s) and thus can be stored inline in the native stack frame. Lastly, after a `Subtask` has finished, it may be dropped from the `waitables` -table which effectively ends the call from the perspective of the caller. The -`todo` counter is used below to record the number of unmet obligations to drop -the streams and futures connected to this `Subtask`. +table which effectively ends the call from the perspective of the caller. ```python def drop(self): - trap_if(self.state != CallState.DONE) + trap_if(not self.finished) + assert(self.state == CallState.RETURNED) assert(not self.enqueued) - self.supertask.todo -= 1 + self.supertask.num_subtasks -= 1 ``` @@ -981,13 +989,13 @@ class StreamHandle: def start_copying(self, task, buffer): assert(not self.copying_task and not self.copying_buffer) - task.todo += 1 + task.num_subtasks += 1 self.copying_task = task self.copying_buffer = buffer def stop_copying(self): assert(self.copying_task and self.copying_buffer) - self.copying_task.todo -= 1 + self.copying_task.num_subtasks -= 1 self.copying_task = None self.copying_buffer = None @@ -995,17 +1003,17 @@ class StreamHandle: trap_if(self.copying_buffer) self.stream.close(errctx) if isinstance(self.borrow_scope, Task): - self.borrow_scope.todo -= 1 + self.borrow_scope.num_borrows -= 1 ``` The `trap_if(copying_buffer)` in `drop` and the increment/decrement of -`copying_task.todo` keep the `StreamHandle` and `Task` alive while performing -a copy operation (a `stream.read` or `stream.write`) so that the results of a -copy are always reported back to the `Task` that issued the copy. +`num_subtasks` keeps the `Task` alive while performing a copy operation (a +`stream.read` or `stream.write`) so that the results of a copy are always +reported back to the `Task` that issued the copy. -The `borrow_scope.todo` decrement matches an increment when a stream -containing `borrow` handles is lowered as a parameter of an exported function -and ensures that streams-of-borrows are dropped before the end of the call, -just like normal `borrow` handles. +The `num_borrows` decrement matches an increment in `lower_async_value` (below) +when a stream containing `borrow` handles is lowered as a parameter of an +exported function and ensures that streams-of-borrows are dropped before the +end of the call, just like normal `borrow` handles. Given the above logic, the [readable and writable ends] of a stream can be concretely implemented by the following two classes. The readable end @@ -1560,7 +1568,7 @@ def lift_async_value(ReadableHandleT, WritableHandleT, cx, i, t): trap_if(h.copying_buffer) if contains_borrow(t): trap_if(cx.borrow_scope.task() is not h.borrow_scope) - h.borrow_scope.todo -= 1 + h.borrow_scope.num_borrows -= 1 cx.inst.waitables.remove(i) case WritableHandleT(): trap_if(h.paired) @@ -1979,7 +1987,7 @@ def pack_flags_into_int(v, labels): Finally, `own` and `borrow` handles are lowered by initializing new handle elements in the current component instance's handle table. The increment of -`borrow_scope.todo` is complemented by a decrement in `canon_resource_drop` +`num_borrows` is complemented by a decrement in `canon_resource_drop` and ensures that all borrowed handles are dropped before the end of the task. ```python def lower_own(cx, rep, t): @@ -1991,7 +1999,7 @@ def lower_borrow(cx, rep, t): if cx.inst is t.rt.impl: return rep h = ResourceHandle(rep, own = False, borrow_scope = cx.borrow_scope) - h.borrow_scope.todo += 1 + h.borrow_scope.num_borrows += 1 return cx.inst.resources.add(t.rt, h) ``` The special case in `lower_borrow` is an optimization, recognizing that, when @@ -2027,12 +2035,12 @@ def lower_async_value(ReadableHandleT, WritableHandleT, cx, v, t): h.paired = True if contains_borrow(t): h.borrow_scope = cx.borrow_scope - h.borrow_scope.todo += 1 + h.borrow_scope.num_borrows += 1 return cx.inst.waitables.add(h) ``` In the ordinary case, the abstract `ReadableStream` (which may come from the host or the guest) is stored in a `ReadableHandle` in the `waitables` table. -The `borrow_scope.todo` increment must be matched by a decrement in +The `num_borrows` increment must be matched by a decrement in `StreamHandle.drop` (as guarded by `Task.exit`) and ensures that streams of `borrow` handles follow the usual `borrow` scoping rules. Symmetric to `lift_async_value`, the `cx.borrow_scope` is saved in the readable handle for @@ -2707,21 +2715,21 @@ flow to the `async`-lowered caller. else: max_flat_params = 1 max_flat_results = 0 - async def do_call(on_block): - await callee(task, on_start, on_return, on_block) - subtask.finish() - on_progress() - match await call_and_handle_blocking(do_call): - case Returned(): + _ = await call_and_handle_blocking(callee, task, on_start, on_return) + match subtask.state: + case CallState.RETURNED: + subtask.finish() flat_results = [0] - case Blocked(): - task.todo += 1 + case _: + task.num_subtasks += 1 subtaski = task.inst.waitables.add(subtask) def on_progress(): if not subtask.enqueued: subtask.enqueued = True def subtask_event(): subtask.enqueued = False + if subtask.state == CallState.RETURNED: + subtask.finish() return (EventCode(subtask.state), subtaski, 0) task.notify(subtask_event) assert(0 < subtaski <= Table.MAX_LENGTH < 2**30) @@ -2730,25 +2738,32 @@ flow to the `async`-lowered caller. return flat_results ``` -If `callee` finishes execution without blocking, the `Returned()` case -returns `0`, signalling to the caller that the parameters have been -read and the results have been written to the outparam buffer. - -Otherwise, the `Subtask` is added to the current component instance's -`waitables` table, eagerly returning the `i32` index of the `Subtask` -packed with the `CallState` of the `Subtask`. If the returned `CallState` is -`STARTING`, the caller must keep the memory pointed by `flat_args` valid until -`task.wait` indicates that subtask `i` has advanced to `STARTED`, `RETURNED` or -`DONE`. Similarly, if the returned state is `STARTED`, the caller must keep the -memory pointed to by the final `i32` parameter of `flat_args` valid until -`task.wait` indicates that the subtask has advanced to `RETURNED` or `DONE`. +If the `callee` reached the `RETURNED` state before blocking, the call returns +`0`, signalling to the caller that the parameters have been read and the +results have been written to the outparam buffer. Note that the callee may have +blocked *after* calling `task.return` and thus the callee may still be +executing concurrently. However, all the caller needs to know is that it has +received its return value (and all borrowed handles have been returned) and +thus the subtask is finished and ready to be dropped (via `subtask.drop`). + +If `callee` did not reach the `RETURNED` state, it must have blocked and so +the `Subtask` is added to the current component instance's `waitables` table, +eagerly returning the `i32` index packed with the `CallState` of the `Subtask`. +If the returned `CallState` is `STARTING`, the caller must keep the memory +pointed by `flat_args` valid until `task.wait` indicates that subtask `i` has +advanced to `STARTED` or `RETURNED`. Similarly, if the returned state +is `STARTED`, the caller must keep the memory pointed to by the final `i32` +parameter of `flat_args` valid until `task.wait` indicates that the subtask has +advanced to `RETURNED`. The `on_progress` callback is called by `on_start` and `on_return` above and takes care of `task.notify()`ing the async caller of progress. The `enqueued` flag is used to avoid sending repeated progress events when the caller only cares about the latest value of `subtask.state`, taking advantage of the fact that "events" are first-class functions that are called right before passing -the event tuple to core wasm. +the event tuple to core wasm. If the `RETURNED` event is about to be delivered, +the `Subtask` is `finish()`ed, returning ownership of borrowed handles to the +async caller. The above definitions of sync/async `canon_lift`/`canon_lower` ensure that a sync-or-async `canon_lift` may call a sync-or-async `canon_lower`, with all @@ -2826,7 +2841,7 @@ async def canon_resource_drop(rt, sync, task, i): else: task.trap_if_on_the_stack(rt.impl) else: - h.borrow_scope.todo -= 1 + h.borrow_scope.num_borrows -= 1 return flat_results ``` In general, the call to a resource's destructor is treated like a diff --git a/design/mvp/FutureFeatures.md b/design/mvp/FutureFeatures.md index 4b94f3b7..9d9d287b 100644 --- a/design/mvp/FutureFeatures.md +++ b/design/mvp/FutureFeatures.md @@ -8,6 +8,19 @@ also the high-level [post-MVP use cases](../high-level/UseCases.md#post-mvp) and [non-goals](../high-level/Goals.md#non-goals). +## Blast zones + +While the Component Model MVP allows strong software isolation of capabilities +(in the form of link-time imports and runtime handles) there is currently no +way for a host component to execute a guest component robustly in the face of +traps or runaway resource memory/CPU usage. A post-MVP "blast zone" feature +would allow a parent component to dynamically instantiate a child component in +a separate "blast zone" such that a trap in the blast zone could be safely and +predictably handled by the parent outside the blast zone. Furthermore, the +parent could use a non-deterministic timeout or resource quota trigger to +preemptively inject a trap into the blast zone. + + ## Custom ABIs via "adapter functions" The original Interface Types proposal includes the goal of avoiding a fixed @@ -38,24 +51,6 @@ to and from the statically-compiled host implementation language). See [`list.lift_canon` and `list.lower_canon`] for more details. -## Shared-some-things linking via "adapter modules" - -The original [Interface Types proposal] and the re-layered [Module Linking -proposal] both included an "adapter module" definition that allowed import and -export of both Core WebAssembly and Component Model Definitions and thus did -not establish a shared-nothing boundary. Since [component invariants] and -[GC-free runtime instantiation] both require a shared-nothing boundary, two -distinct "component" and "adapter module" concepts would need to be defined, -with all their own distinct types, index spaces, etc. Having both features in -the MVP adds non-trivial implementation complexity over having just one. -Additionally, having two similar-but-different, partially-overlapping concepts -makes the whole proposal harder to explain. Thus, the MVP drops the concept of -"adapter modules", including only shared-nothing "components". However, if -concrete future use cases emerged for creating modules that partially used -shared-nothing component values and partially shared linear memory, "adapter -modules" could be added as a future feature. - - ## Shared-everything Module Linking in Core WebAssembly [Originally][Core Module Linking], Module Linking was proposed as an addition diff --git a/design/mvp/canonical-abi/definitions.py b/design/mvp/canonical-abi/definitions.py index f39d6020..86ec8ce8 100644 --- a/design/mvp/canonical-abi/definitions.py +++ b/design/mvp/canonical-abi/definitions.py @@ -310,18 +310,16 @@ class CallState(IntEnum): STARTING = 0 STARTED = 1 RETURNED = 2 - DONE = 3 class EventCode(IntEnum): CALL_STARTING = CallState.STARTING CALL_STARTED = CallState.STARTED CALL_RETURNED = CallState.RETURNED - CALL_DONE = CallState.DONE - YIELDED = 4 - STREAM_READ = 5 - STREAM_WRITE = 6 - FUTURE_READ = 7 - FUTURE_WRITE = 8 + YIELDED = 3 + STREAM_READ = 4 + STREAM_WRITE = 5 + FUTURE_READ = 6 + FUTURE_WRITE = 7 EventTuple = tuple[EventCode, int, int] EventCallback = Callable[[], Optional[EventTuple]] @@ -367,7 +365,8 @@ class Task: on_block: OnBlockCallback events: list[EventCallback] has_events: asyncio.Event - todo: int + num_subtasks: int + num_borrows: int def __init__(self, opts, inst, ft, caller, on_return, on_block): self.opts = opts @@ -378,7 +377,8 @@ def __init__(self, opts, inst, ft, caller, on_return, on_block): self.on_block = on_block self.events = [] self.has_events = asyncio.Event() - self.todo = 0 + self.num_subtasks = 0 + self.num_borrows = 0 def task(self): return self @@ -468,6 +468,7 @@ async def poll(self, sync) -> Optional[EventTuple]: def return_(self, flat_results): trap_if(not self.on_return) + trap_if(self.num_borrows > 0) if self.opts.sync and not self.opts.always_task_return: maxflat = MAX_FLAT_RESULTS else: @@ -480,9 +481,10 @@ def return_(self, flat_results): def exit(self): assert(current_task.locked()) - trap_if(self.todo) + trap_if(self.num_subtasks > 0) assert(not self.maybe_next_event()) trap_if(self.on_return) + assert(self.num_borrows == 0) trap_if(self.inst.num_tasks == 1 and self.inst.backpressure) self.inst.num_tasks -= 1 assert(self.inst.num_tasks >= 0) @@ -496,32 +498,35 @@ class Subtask: state: CallState lenders: list[ResourceHandle] enqueued: bool + finished: bool def __init__(self, task): self.supertask = task self.state = CallState.STARTING self.lenders = [] self.enqueued = False + self.finished = False def task(self): return self.supertask def add_lender(self, lending_handle): - assert(self.state != CallState.DONE) + assert(not self.finished and self.state != CallState.RETURNED) assert(lending_handle.own) lending_handle.lend_count += 1 self.lenders.append(lending_handle) def finish(self): - assert(self.state == CallState.RETURNED) + assert(not self.finished and self.state == CallState.RETURNED) for h in self.lenders: h.lend_count -= 1 - self.state = CallState.DONE + self.finished = True def drop(self): - trap_if(self.state != CallState.DONE) + trap_if(not self.finished) + assert(self.state == CallState.RETURNED) assert(not self.enqueued) - self.supertask.todo -= 1 + self.supertask.num_subtasks -= 1 #### Buffer, Stream and Future State @@ -661,13 +666,13 @@ def __init__(self, stream, t): def start_copying(self, task, buffer): assert(not self.copying_task and not self.copying_buffer) - task.todo += 1 + task.num_subtasks += 1 self.copying_task = task self.copying_buffer = buffer def stop_copying(self): assert(self.copying_task and self.copying_buffer) - self.copying_task.todo -= 1 + self.copying_task.num_subtasks -= 1 self.copying_task = None self.copying_buffer = None @@ -675,7 +680,7 @@ def drop(self, errctx): trap_if(self.copying_buffer) self.stream.close(errctx) if isinstance(self.borrow_scope, Task): - self.borrow_scope.todo -= 1 + self.borrow_scope.num_borrows -= 1 class ReadableStreamHandle(StreamHandle): async def copy(self, dst, on_block): @@ -1063,7 +1068,7 @@ def lift_async_value(ReadableHandleT, WritableHandleT, cx, i, t): trap_if(h.copying_buffer) if contains_borrow(t): trap_if(cx.borrow_scope.task() is not h.borrow_scope) - h.borrow_scope.todo -= 1 + h.borrow_scope.num_borrows -= 1 cx.inst.waitables.remove(i) case WritableHandleT(): trap_if(h.paired) @@ -1362,7 +1367,7 @@ def lower_borrow(cx, rep, t): if cx.inst is t.rt.impl: return rep h = ResourceHandle(rep, own = False, borrow_scope = cx.borrow_scope) - h.borrow_scope.todo += 1 + h.borrow_scope.num_borrows += 1 return cx.inst.resources.add(t.rt, h) def lower_stream(cx, v, t): @@ -1388,7 +1393,7 @@ def lower_async_value(ReadableHandleT, WritableHandleT, cx, v, t): h.paired = True if contains_borrow(t): h.borrow_scope = cx.borrow_scope - h.borrow_scope.todo += 1 + h.borrow_scope.num_borrows += 1 return cx.inst.waitables.add(h) ### Flattening @@ -1780,21 +1785,21 @@ def on_return(vs): else: max_flat_params = 1 max_flat_results = 0 - async def do_call(on_block): - await callee(task, on_start, on_return, on_block) - subtask.finish() - on_progress() - match await call_and_handle_blocking(do_call): - case Returned(): + _ = await call_and_handle_blocking(callee, task, on_start, on_return) + match subtask.state: + case CallState.RETURNED: + subtask.finish() flat_results = [0] - case Blocked(): - task.todo += 1 + case _: + task.num_subtasks += 1 subtaski = task.inst.waitables.add(subtask) def on_progress(): if not subtask.enqueued: subtask.enqueued = True def subtask_event(): subtask.enqueued = False + if subtask.state == CallState.RETURNED: + subtask.finish() return (EventCode(subtask.state), subtaski, 0) task.notify(subtask_event) assert(0 < subtaski <= Table.MAX_LENGTH < 2**30) @@ -1834,7 +1839,7 @@ async def canon_resource_drop(rt, sync, task, i): else: task.trap_if_on_the_stack(rt.impl) else: - h.borrow_scope.todo -= 1 + h.borrow_scope.num_borrows -= 1 return flat_results ### `canon resource.rep` diff --git a/design/mvp/canonical-abi/run_tests.py b/design/mvp/canonical-abi/run_tests.py index 69559340..9dad9258 100644 --- a/design/mvp/canonical-abi/run_tests.py +++ b/design/mvp/canonical-abi/run_tests.py @@ -541,7 +541,7 @@ async def core_toggle(task, args): return [] toggle_callee = partial(canon_lift, producer_opts, producer_inst, toggle_ft, core_toggle) - fut2, fut3 = asyncio.Future(), asyncio.Future() + fut2, fut3, fut4 = asyncio.Future(), asyncio.Future(), asyncio.Future() blocking_ft = FuncType([U8Type()], [U8Type()]) async def core_blocking_producer(task, args): [x] = args @@ -549,6 +549,7 @@ async def core_blocking_producer(task, args): await task.on_block(fut2) [] = await canon_task_return(task, CoreFuncType(['i32'],[]), [44]) await task.on_block(fut3) + fut4.set_result("done") return [] blocking_callee = partial(canon_lift, producer_opts, producer_inst, blocking_ft, core_blocking_producer) @@ -573,7 +574,7 @@ async def consumer(task, args): assert(consumer_heap.memory[retp] == 13) fut1.set_result(None) event, callidx, _ = await task.wait(sync = False) - assert(event == EventCode.CALL_DONE) + assert(event == EventCode.CALL_RETURNED) assert(callidx == 1) [] = await canon_subtask_drop(task, callidx) event, callidx, _ = await task.wait(sync = True) @@ -585,11 +586,9 @@ async def consumer(task, args): assert(event == EventCode.CALL_RETURNED) assert(callidx == 2) assert(consumer_heap.memory[retp] == 44) - fut3.set_result(None) - event, callidx, _ = await task.wait(sync = True) - assert(event == EventCode.CALL_DONE) - assert(callidx == 2) [] = await canon_subtask_drop(task, callidx) + fut3.set_result(None) + assert(await task.on_block(fut4) == "done") dtor_fut = asyncio.Future() dtor_value = None @@ -609,7 +608,7 @@ async def dtor(task, args): assert(dtor_value is None) dtor_fut.set_result(None) event, callidx, _ = await task.wait(sync = False) - assert(event == CallState.DONE) + assert(event == CallState.RETURNED) assert(callidx == 2) [] = await canon_subtask_drop(task, callidx) @@ -666,7 +665,7 @@ async def consumer(task, args): async def callback(task, args): assert(len(args) == 4) if args[0] == 42: - assert(args[1] == EventCode.CALL_DONE) + assert(args[1] == EventCode.CALL_RETURNED) assert(args[2] == 1) assert(args[3] == 0) await canon_subtask_drop(task, 1) @@ -679,7 +678,7 @@ async def callback(task, args): return [62] else: assert(args[0] == 62) - assert(args[1] == EventCode.CALL_DONE) + assert(args[1] == EventCode.CALL_RETURNED) assert(args[2] == 2) assert(args[3] == 0) await canon_subtask_drop(task, 2) @@ -745,7 +744,7 @@ async def consumer(task, args): fut.set_result(None) assert(producer1_done == False) event, callidx, _ = await task.wait(sync = False) - assert(event == EventCode.CALL_DONE) + assert(event == EventCode.CALL_RETURNED) assert(callidx == 1) await canon_subtask_drop(task, callidx) assert(producer1_done == True) @@ -754,7 +753,7 @@ async def consumer(task, args): await canon_task_yield(False, task) assert(producer2_done == True) event, callidx, _ = await task.poll(sync = False) - assert(event == EventCode.CALL_DONE) + assert(event == EventCode.CALL_RETURNED) assert(callidx == 2) await canon_subtask_drop(task, callidx) assert(producer2_done == True) @@ -786,10 +785,10 @@ async def test_async_backpressure(): producer1_done = False async def producer1_core(task, args): nonlocal producer1_done - await canon_task_return(task, CoreFuncType([],[]), []) await canon_task_backpressure(task, [1]) await task.on_block(fut) await canon_task_backpressure(task, [0]) + await canon_task_return(task, CoreFuncType([],[]), []) producer1_done = True return [] @@ -812,7 +811,7 @@ async def consumer(task, args): assert(len(args) == 0) [ret] = await canon_lower(consumer_opts, producer_ft, producer1, task, []) - assert(ret == (1 | (CallState.RETURNED << 30))) + assert(ret == (1 | (CallState.STARTED << 30))) [ret] = await canon_lower(consumer_opts, producer_ft, producer2, task, []) assert(ret == (2 | (CallState.STARTING << 30))) @@ -823,12 +822,11 @@ async def consumer(task, args): assert(producer1_done == False) assert(producer2_done == False) event, callidx, _ = await task.wait(sync = False) - assert(event == EventCode.CALL_DONE) + assert(event == EventCode.CALL_RETURNED) assert(callidx == 1) assert(producer1_done == True) - assert(producer2_done == True) event, callidx, _ = await task.poll(sync = False) - assert(event == EventCode.CALL_DONE) + assert(event == EventCode.CALL_RETURNED) assert(callidx == 2) assert(producer2_done == True) @@ -880,11 +878,11 @@ async def core_func(task, args): fut1.set_result(None) event, callidx, _ = await task.wait(sync = False) - assert(event == EventCode.CALL_DONE) + assert(event == EventCode.CALL_RETURNED) assert(callidx == 1) fut2.set_result(None) event, callidx, _ = await task.wait(sync = False) - assert(event == EventCode.CALL_DONE) + assert(event == EventCode.CALL_RETURNED) assert(callidx == 2) await canon_subtask_drop(task, 1) @@ -1134,8 +1132,7 @@ async def core_func(task, args): [wsi2] = await canon_stream_new(U8Type(), task) retp = 16 [ret] = await canon_lower(opts, ft, host_import, task, [wsi2, retp]) - subi,state = unpack_lower_result(ret) - assert(state == CallState.RETURNED) + assert(ret == 0) rsi2 = mem[16] assert(rsi2 == 4) [ret] = await canon_stream_write(U8Type(), opts, task, wsi2, 0, 4) @@ -1168,17 +1165,12 @@ async def core_func(task, args): [ret] = await canon_stream_read(U8Type(), opts, task, rsi2, 0, 4) assert(ret == definitions.BLOCKED) event, p1, p2 = await task.wait(sync = False) - assert(event == EventCode.CALL_DONE) - assert(p1 == subi) - assert(p2 == 0) - event, p1, p2 = await task.wait(sync = False) assert(event == EventCode.STREAM_READ) assert(p1 == rsi2) assert(p2 == 4) [ret] = await canon_stream_read(U8Type(), opts, task, rsi2, 0, 4) assert(ret == definitions.CLOSED) [] = await canon_stream_close_readable(U8Type(), task, rsi2) - [] = await canon_subtask_drop(task, subi) [ret] = await canon_stream_write(U8Type(), sync_opts, task, wsi1, 0, 4) assert(ret == 4) [] = await canon_stream_close_writable(U8Type(), task, wsi1, 0) @@ -1371,8 +1363,7 @@ async def core_func2(task, args): retp = 0 [ret] = await canon_lower(opts2, ft1, func1, task, [retp]) - subi,state = unpack_lower_result(ret) - assert(state== CallState.RETURNED) + assert(ret == 0) rsi = mem2[0] assert(rsi == 1) @@ -1406,11 +1397,6 @@ async def core_func2(task, args): [] = await canon_stream_close_readable(U8Type(), task, rsi) [] = await canon_error_context_debug_message(opts2, task, errctxi, 0) [] = await canon_error_context_drop(task, errctxi) - - event, callidx, _ = await task.wait(sync = False) - assert(event == EventCode.CALL_DONE) - assert(callidx == subi) - [] = await canon_subtask_drop(task, subi) return [] await canon_lift(opts2, inst2, ft2, core_func2, None, lambda:[], lambda _:()) @@ -1474,7 +1460,7 @@ async def core_func2(task, args): [] = await canon_stream_close_writable(BorrowType(rt), task, wsi, 0) event, p1, _ = await task.wait(sync = False) - assert(event == EventCode.CALL_DONE) + assert(event == EventCode.CALL_RETURNED) assert(p1 == subi) [] = await canon_subtask_drop(task, subi) @@ -1628,8 +1614,7 @@ async def core_func(task, args): [wfi] = await canon_future_new(U8Type(), task) retp = 0 [ret] = await canon_lower(lower_opts, host_ft1, host_func, task, [wfi, retp]) - subi,state = unpack_lower_result(ret) - assert(state == CallState.RETURNED) + assert(ret == 0) rfi = mem[retp] readp = 0 @@ -1641,10 +1626,6 @@ async def core_func(task, args): [ret] = await canon_future_write(U8Type(), lower_opts, task, wfi, writep) assert(ret == 1) - event,p1,p2 = await task.wait(sync = False) - assert(event == EventCode.CALL_DONE) - assert(p1 == subi) - event,p1,p2 = await task.wait(sync = False) assert(event == EventCode.FUTURE_READ) assert(p1 == rfi) @@ -1653,13 +1634,11 @@ async def core_func(task, args): [] = await canon_future_close_writable(U8Type(), task, wfi, 0) [] = await canon_future_close_readable(U8Type(), task, rfi) - [] = await canon_subtask_drop(task, subi) [wfi] = await canon_future_new(U8Type(), task) retp = 0 [ret] = await canon_lower(lower_opts, host_ft1, host_func, task, [wfi, retp]) - subi,state = unpack_lower_result(ret) - assert(state == CallState.RETURNED) + assert(ret == 0) rfi = mem[retp] readp = 0 @@ -1671,18 +1650,15 @@ async def core_func(task, args): [ret] = await canon_future_write(U8Type(), lower_opts, task, wfi, writep) assert(ret == 1) - event,p1,p2 = await task.wait(sync = False) - assert(event == EventCode.CALL_DONE) - assert(p1 == subi) + while not task.inst.waitables.get(rfi).stream.closed(): + await task.yield_(sync = False) - await task.yield_(sync = False) [ret] = await canon_future_cancel_read(U8Type(), True, task, rfi) assert(ret == 1) assert(mem[readp] == 43) [] = await canon_future_close_writable(U8Type(), task, wfi, 0) [] = await canon_future_close_readable(U8Type(), task, rfi) - [] = await canon_subtask_drop(task, subi) return []