Skip to content

Commit

Permalink
CABI: remove the DONE state from the async ABI
Browse files Browse the repository at this point in the history
  • Loading branch information
lukewagner committed Dec 2, 2024
1 parent 633742c commit 76520bf
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 166 deletions.
37 changes: 29 additions & 8 deletions design/mvp/Async.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,17 +197,34 @@ state that enforces the caller side of the Canonical ABI rules.
To realize the above goals of always having a well-defined cross-component
async callstack, the Component Model's Canonical ABI enforces [Structured
Concurrency] by dynamically requiring that a task waits for all its subtasks to
finish before the task itself is allowed to finish. This means that a subtask
return before the task itself is allowed to finish. This means that a subtask
cannot be orphaned and there will always be an async callstack rooted at an
invocation of an export by the host. Moreover, at any one point in time, the
set of tasks active in a linked component graph form a forest of async call
trees which e.g., can be visualized using a traditional flamegraph.

The Canonical ABI's Python code enforces Structured Concurrency by incrementing
a per-[`Task`] counter when a `Subtask` is created, decrementing when a
`Subtask` is destroyed, and trapping if the counter is not zero when the `Task`
a per-task "`num_subtasks`" counter when a subtask is created, decrementing
when the subtask returns, and trapping if `num_subtasks > 0` when a task
attempts to exit.

There is a subtle nuance to these Structured Concurrency rules deriving from
the fact that subtasks may continue execution after [returning](#returning)
their value to their caller. The ability to execute after returning value is
necessary for being able to do work off the caller's critical path. A concrete
example is an HTTP service that does some logging or billing operations after
finishing an HTTP response, where the HTTP response is the return value of the
[`wasi:http/handler.handle`] function. Since the `num_subtasks` counter is
decremented when a subtask *returns* (as opposed to *exits*), this means that
subtasks may continue execution even once their supertask has exited. To
maintain Structured Concurrency (for purposes of checking [reentrance],
scheduler prioritization and debugging/observability), we can consider
the supertask to still be alive but in the process of "asynchronously
tail-calling" its still-executing subtasks. (For scenarios where one
component wants to non-cooperatively bound the execution of another
component, a separate "[blast zone]" feature is necessary in any
case.)

### Streams and Futures

Streams and Futures have two "ends": a *readable end* and *writable end*. When
Expand Down Expand Up @@ -391,9 +408,9 @@ parameter and results (which are asynchronously read-from and written-to) and
returns the index of a new subtask. `summarize` calls `task.wait` repeatedly
until all `fetch` subtasks have finished, noting that `task.wait` can return
intermediate progress (as subtasks transition from "starting" to "started" to
"returned" to "done") which tell the surrounding core wasm code that it can
reclaim the memory passed arguments or use the results that have now been
written to the outparam memory.
"returned") which tell the surrounding core wasm code that it can reclaim the
memory passed arguments or use the results that have now been written to the
outparam memory.

Because the `summarize` function is `canon lift`ed with `async`, its core
function type has no results, since results are passed out via `task.return`.
Expand Down Expand Up @@ -528,8 +545,9 @@ comes after:
that the current wasm instance can be torn down eagerly while preserving
structured concurrency
* some way to say "no more elements are coming for a while"
* `recursive` function type attribute: allow a function to be reentered
recursively (instead of trapping) and link inner and outer activations
* `recursive` function type attribute: allow a function to opt in to
recursive [reentrance], extending the ABI to link the inner and
outer activations
* add `stringstream` specialization of `stream<char>` (just like `string` is
a specialization of `list<char>`)
* allow pipelining multiple `stream.read`/`write` calls
Expand Down Expand Up @@ -571,9 +589,12 @@ comes after:
[WIT]: WIT.md
[Goals]: ../high-level/Goals.md
[Use Cases]: ../high-level/UseCases.md
[Blast Zone]: FutureFeatures.md#blast-zones
[Reentrance]: Explainer.md#component-invariants

[stack-switching]: https://github.com/WebAssembly/stack-switching/
[JSPI]: https://github.com/WebAssembly/js-promise-integration/
[shared-everything-threads]: https://github.com/webAssembly/shared-everything-threads

[WASI Preview 3]: https://github.com/WebAssembly/WASI/tree/main/wasip2#looking-forward-to-preview-3
[`wasi:http/handler.handle`]: https://github.com/WebAssembly/wasi-http/blob/main/wit-0.3.0-draft/handler.wit
141 changes: 78 additions & 63 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,26 +324,24 @@ class CallState(IntEnum):
STARTING = 0
STARTED = 1
RETURNED = 2
DONE = 3

class EventCode(IntEnum):
CALL_STARTING = CallState.STARTING
CALL_STARTED = CallState.STARTED
CALL_RETURNED = CallState.RETURNED
CALL_DONE = CallState.DONE
YIELDED = 4
STREAM_READ = 5
STREAM_WRITE = 6
FUTURE_READ = 7
FUTURE_WRITE = 8
YIELDED = 3
STREAM_READ = 4
STREAM_WRITE = 5
FUTURE_READ = 6
FUTURE_WRITE = 7

EventTuple = tuple[EventCode, int, int]
EventCallback = Callable[[], Optional[EventTuple]]
OnBlockCallback = Callable[[Awaitable], Awaitable]
```
The `CallState` enum describes the linear sequence of states that an async call
necessarily transitions through: [`STARTING`](Async.md#backpressure), `STARTED`,
[`RETURNING`](Async.md#returning) and `DONE`. The `EventCode` enum shares
necessarily transitions through: [`STARTING`](Async.md#backpressure), `STARTED`
and [`RETURNED`](Async.md#returning). The `EventCode` enum shares
common code values with `CallState` to define the set of integer event codes
that are delivered to [waiting](Async.md#waiting) or polling tasks.

Expand Down Expand Up @@ -438,7 +436,8 @@ class Task:
on_block: OnBlockCallback
events: list[EventCallback]
has_events: asyncio.Event
todo: int
num_subtasks: int
num_borrows: int

def __init__(self, opts, inst, ft, caller, on_return, on_block):
self.opts = opts
Expand All @@ -449,7 +448,8 @@ class Task:
self.on_block = on_block
self.events = []
self.has_events = asyncio.Event()
self.todo = 0
self.num_subtasks = 0
self.num_borrows = 0

def task(self):
return self
Expand Down Expand Up @@ -633,10 +633,16 @@ The `return_` method is called by either `canon_task_return` or `canon_lift`
callback instead of simply returning the values from `canon_lift` enables the
callee to keep executing after returning its results. However, it does
introduce a dynamic error condition if `canon task.return` is called less or
more than once which must be checked by `return_` and `exit`.
more than once which must be checked by `return_` and `exit`. Since ownership
of borrowed handles is returned to the caller when the caller is notified that
the `Subtask` is in the `RETURNED` state, `num_borrows` is guarded to be `0`.
Since tasks can keep executing arbitrary code after calling `task.return`, this
means that borrowed handles must be dropped potentially much earlier than the
observable end of the task.
```python
def return_(self, flat_results):
trap_if(not self.on_return)
trap_if(self.num_borrows > 0)
if self.opts.sync and not self.opts.always_task_return:
maxflat = MAX_FLAT_RESULTS
else:
Expand All @@ -653,15 +659,16 @@ async or sychronous-using-`always-task-return` call, in which return values
are passed as parameters to `canon task.return`.

Lastly, when a task exits, the runtime enforces the guard conditions mentioned
above and allows a pending task to start. The `todo` counter is used below to
record the number of unmet obligations to drop borrowed handles, subtasks,
streams and futures.
above and allows a pending task to start. The `num_subtasks` counter is used
below to record the number of unmet obligations to drop subtasks of the current
(super)task.
```python
def exit(self):
assert(current_task.locked())
trap_if(self.todo)
trap_if(self.num_subtasks > 0)
assert(not self.maybe_next_event())
trap_if(self.on_return)
assert(self.num_borrows == 0)
trap_if(self.inst.num_tasks == 1 and self.inst.backpressure)
self.inst.num_tasks -= 1
assert(self.inst.num_tasks >= 0)
Expand All @@ -681,12 +688,14 @@ class Subtask:
state: CallState
lenders: list[ResourceHandle]
enqueued: bool
finished: bool

def __init__(self, task):
self.supertask = task
self.state = CallState.STARTING
self.lenders = []
self.enqueued = False
self.finished = False

def task(self):
return self.supertask
Expand All @@ -696,37 +705,36 @@ get the `Subtask`'s `supertask` or, in the case of a `Task`, itself.

The `lenders` field of `Subtask` maintains a list of all the owned handles
that have been lent to a subtask and must therefor not be dropped until the
subtask completes. The `add_lender` method is called (below) when lifting an
subtask returns. The `add_lender` method is called (below) when lifting an
owned handle and increments the `lend_count` of the owned handle, which is
guarded to be zero by `canon_resource_drop` (below). The `finish` method
releases all the `lend_count`s of all such handles lifted for the subtask and
is called (below) when the subtask is finishes.
is called (below) when the subtask returns.
```python
def add_lender(self, lending_handle):
assert(self.state != CallState.DONE)
assert(not self.finished and self.state != CallState.RETURNED)
assert(lending_handle.own)
lending_handle.lend_count += 1
self.lenders.append(lending_handle)

def finish(self):
assert(self.state == CallState.RETURNED)
assert(not self.finished and self.state == CallState.RETURNED)
for h in self.lenders:
h.lend_count -= 1
self.state = CallState.DONE
self.finished = True
```
Note, the `lenders` list usually has a fixed size (in all cases except when a
function signature has `borrow`s in `list`s or `stream`s) and thus can be
stored inline in the native stack frame.

Lastly, after a `Subtask` has finished, it may be dropped from the `waitables`
table which effectively ends the call from the perspective of the caller. The
`todo` counter is used below to record the number of unmet obligations to drop
the streams and futures connected to this `Subtask`.
table which effectively ends the call from the perspective of the caller.
```python
def drop(self):
trap_if(self.state != CallState.DONE)
trap_if(not self.finished)
assert(self.state == CallState.RETURNED)
assert(not self.enqueued)
self.supertask.todo -= 1
self.supertask.num_subtasks -= 1
```


Expand Down Expand Up @@ -981,31 +989,31 @@ class StreamHandle:

def start_copying(self, task, buffer):
assert(not self.copying_task and not self.copying_buffer)
task.todo += 1
task.num_subtasks += 1
self.copying_task = task
self.copying_buffer = buffer

def stop_copying(self):
assert(self.copying_task and self.copying_buffer)
self.copying_task.todo -= 1
self.copying_task.num_subtasks -= 1
self.copying_task = None
self.copying_buffer = None

def drop(self, errctx):
trap_if(self.copying_buffer)
self.stream.close(errctx)
if isinstance(self.borrow_scope, Task):
self.borrow_scope.todo -= 1
self.borrow_scope.num_borrows -= 1
```
The `trap_if(copying_buffer)` in `drop` and the increment/decrement of
`copying_task.todo` keep the `StreamHandle` and `Task` alive while performing
a copy operation (a `stream.read` or `stream.write`) so that the results of a
copy are always reported back to the `Task` that issued the copy.
`num_subtasks` keeps the `Task` alive while performing a copy operation (a
`stream.read` or `stream.write`) so that the results of a copy are always
reported back to the `Task` that issued the copy.

The `borrow_scope.todo` decrement matches an increment when a stream
containing `borrow` handles is lowered as a parameter of an exported function
and ensures that streams-of-borrows are dropped before the end of the call,
just like normal `borrow` handles.
The `num_borrows` decrement matches an increment in `lower_async_value` (below)
when a stream containing `borrow` handles is lowered as a parameter of an
exported function and ensures that streams-of-borrows are dropped before the
end of the call, just like normal `borrow` handles.

Given the above logic, the [readable and writable ends] of a stream can be
concretely implemented by the following two classes. The readable end
Expand Down Expand Up @@ -1560,7 +1568,7 @@ def lift_async_value(ReadableHandleT, WritableHandleT, cx, i, t):
trap_if(h.copying_buffer)
if contains_borrow(t):
trap_if(cx.borrow_scope.task() is not h.borrow_scope)
h.borrow_scope.todo -= 1
h.borrow_scope.num_borrows -= 1
cx.inst.waitables.remove(i)
case WritableHandleT():
trap_if(h.paired)
Expand Down Expand Up @@ -1979,7 +1987,7 @@ def pack_flags_into_int(v, labels):

Finally, `own` and `borrow` handles are lowered by initializing new handle
elements in the current component instance's handle table. The increment of
`borrow_scope.todo` is complemented by a decrement in `canon_resource_drop`
`num_borrows` is complemented by a decrement in `canon_resource_drop`
and ensures that all borrowed handles are dropped before the end of the task.
```python
def lower_own(cx, rep, t):
Expand All @@ -1991,7 +1999,7 @@ def lower_borrow(cx, rep, t):
if cx.inst is t.rt.impl:
return rep
h = ResourceHandle(rep, own = False, borrow_scope = cx.borrow_scope)
h.borrow_scope.todo += 1
h.borrow_scope.num_borrows += 1
return cx.inst.resources.add(t.rt, h)
```
The special case in `lower_borrow` is an optimization, recognizing that, when
Expand Down Expand Up @@ -2027,12 +2035,12 @@ def lower_async_value(ReadableHandleT, WritableHandleT, cx, v, t):
h.paired = True
if contains_borrow(t):
h.borrow_scope = cx.borrow_scope
h.borrow_scope.todo += 1
h.borrow_scope.num_borrows += 1
return cx.inst.waitables.add(h)
```
In the ordinary case, the abstract `ReadableStream` (which may come from the
host or the guest) is stored in a `ReadableHandle` in the `waitables` table.
The `borrow_scope.todo` increment must be matched by a decrement in
The `num_borrows` increment must be matched by a decrement in
`StreamHandle.drop` (as guarded by `Task.exit`) and ensures that streams of
`borrow` handles follow the usual `borrow` scoping rules. Symmetric to
`lift_async_value`, the `cx.borrow_scope` is saved in the readable handle for
Expand Down Expand Up @@ -2707,21 +2715,21 @@ flow to the `async`-lowered caller.
else:
max_flat_params = 1
max_flat_results = 0
async def do_call(on_block):
await callee(task, on_start, on_return, on_block)
subtask.finish()
on_progress()
match await call_and_handle_blocking(do_call):
case Returned():
_ = await call_and_handle_blocking(callee, task, on_start, on_return)
match subtask.state:
case CallState.RETURNED:
subtask.finish()
flat_results = [0]
case Blocked():
task.todo += 1
case _:
task.num_subtasks += 1
subtaski = task.inst.waitables.add(subtask)
def on_progress():
if not subtask.enqueued:
subtask.enqueued = True
def subtask_event():
subtask.enqueued = False
if subtask.state == CallState.RETURNED:
subtask.finish()
return (EventCode(subtask.state), subtaski, 0)
task.notify(subtask_event)
assert(0 < subtaski <= Table.MAX_LENGTH < 2**30)
Expand All @@ -2730,25 +2738,32 @@ flow to the `async`-lowered caller.

return flat_results
```
If `callee` finishes execution without blocking, the `Returned()` case
returns `0`, signalling to the caller that the parameters have been
read and the results have been written to the outparam buffer.

Otherwise, the `Subtask` is added to the current component instance's
`waitables` table, eagerly returning the `i32` index of the `Subtask`
packed with the `CallState` of the `Subtask`. If the returned `CallState` is
`STARTING`, the caller must keep the memory pointed by `flat_args` valid until
`task.wait` indicates that subtask `i` has advanced to `STARTED`, `RETURNED` or
`DONE`. Similarly, if the returned state is `STARTED`, the caller must keep the
memory pointed to by the final `i32` parameter of `flat_args` valid until
`task.wait` indicates that the subtask has advanced to `RETURNED` or `DONE`.
If the `callee` reached the `RETURNED` state before blocking, the call returns
`0`, signalling to the caller that the parameters have been read and the
results have been written to the outparam buffer. Note that the callee may have
blocked *after* calling `task.return` and thus the callee may still be
executing concurrently. However, all the caller needs to know is that it has
received its return value (and all borrowed handles have been returned) and
thus the subtask is finished and ready to be dropped (via `subtask.drop`).

If `callee` did not reach the `RETURNED` state, it must have blocked and so
the `Subtask` is added to the current component instance's `waitables` table,
eagerly returning the `i32` index packed with the `CallState` of the `Subtask`.
If the returned `CallState` is `STARTING`, the caller must keep the memory
pointed by `flat_args` valid until `task.wait` indicates that subtask `i` has
advanced to `STARTED` or `RETURNED`. Similarly, if the returned state
is `STARTED`, the caller must keep the memory pointed to by the final `i32`
parameter of `flat_args` valid until `task.wait` indicates that the subtask has
advanced to `RETURNED`.

The `on_progress` callback is called by `on_start` and `on_return` above and
takes care of `task.notify()`ing the async caller of progress. The `enqueued`
flag is used to avoid sending repeated progress events when the caller only
cares about the latest value of `subtask.state`, taking advantage of the fact
that "events" are first-class functions that are called right before passing
the event tuple to core wasm.
the event tuple to core wasm. If the `RETURNED` event is about to be delivered,
the `Subtask` is `finish()`ed, returning ownership of borrowed handles to the
async caller.

The above definitions of sync/async `canon_lift`/`canon_lower` ensure that a
sync-or-async `canon_lift` may call a sync-or-async `canon_lower`, with all
Expand Down Expand Up @@ -2826,7 +2841,7 @@ async def canon_resource_drop(rt, sync, task, i):
else:
task.trap_if_on_the_stack(rt.impl)
else:
h.borrow_scope.todo -= 1
h.borrow_scope.num_borrows -= 1
return flat_results
```
In general, the call to a resource's destructor is treated like a
Expand Down
Loading

0 comments on commit 76520bf

Please sign in to comment.