Skip to content

Commit

Permalink
Change task.start/return to take a core function type and use flat pa…
Browse files Browse the repository at this point in the history
…rams/results
  • Loading branch information
lukewagner committed Jul 3, 2024
1 parent 5d5f4ee commit 979a36c
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 49 deletions.
4 changes: 2 additions & 2 deletions design/mvp/Binary.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ canon ::= 0x00 0x00 f:<core:funcidx> opts:<opts> ft:<typeidx> => (canon lift
| 0x04 rt:<typeidx> => (canon resource.rep rt (core func))
| 0x05 ft:<typeidx> => (canon thread.spawn ft (core func))
| 0x06 => (canon thread.hw_concurrency (core func))
| 0x08 => (canon task.start (core func))
| 0x09 => (canon task.return (core func))
| 0x08 ft:<core:typeidx> => (canon task.start ft (core func))
| 0x09 ft:<core:typeidx> => (canon task.return ft (core func))
| 0x0a => (canon task.wait (core func))
| 0x0b => (canon task.poll (core func))
| 0x0c => (canon task.yield (core func))
Expand Down
48 changes: 34 additions & 14 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -1868,12 +1868,14 @@ def lower_heap_values(cx, vs, ts, out_param):
tuple_value = {str(i): v for i,v in enumerate(vs)}
if out_param is None:
ptr = cx.opts.realloc(0, 0, alignment(tuple_type), elem_size(tuple_type))
flat_vals = [ptr]
else:
ptr = out_param.next('i32')
flat_vals = []
trap_if(ptr != align_to(ptr, alignment(tuple_type)))
trap_if(ptr + elem_size(tuple_type) > len(cx.opts.memory))
store(cx, tuple_value, tuple_type, ptr)
return [ptr]
return flat_vals
```
The `may_leave` flag is guarded by `canon_lower` below to prevent a component
from calling out of the component while in the middle of lowering, ensuring
Expand Down Expand Up @@ -2207,43 +2209,61 @@ component instance defining a resource can access its representation.

For a canonical definition:
```wasm
(canon task.start (core func $f))
(canon task.start $ft (core func $f))
```
validation specifies:
* `$f` is given type `(func (param i32))`
* `$f` is given type `$ft`, which validation requires to be a (core) function type

Calling `$f` invokes the following function which extracts the arguments from the
caller and lowers them into the current instance:
```python
async def canon_task_start(task, i):
async def canon_task_start(task, core_ft, flat_args):
assert(len(core_ft.params) == len(flat_args))
trap_if(task.opts.sync)
trap_if(core_ft != flatten_functype(CanonicalOptions(), FuncType([], task.ft.params), 'lower'))
task.start()
lower_async_values(task, task.start_thunk(), task.ft.param_types(), CoreValueIter([i]))
return []
args = task.start_thunk()
flat_results = lower_sync_values(task, MAX_FLAT_RESULTS, args, task.ft.param_types(), CoreValueIter(flat_args))
assert(len(core_ft.results) == len(flat_results))
return flat_results
```
The call to the `Task.start` (defined above) ensures that `canon task.start` is
called exactly once, before `canon task.return`, before an async call finishes.
An expected implementation of `task.start` would generate a core wasm function
for each lowering of an `async`-lifted export that performs the fused copy of
the arguments into the caller, storing the index of this function in the `Task`
structure and using `call_indirect` to perform the function-type-equality check
required here. The call to `Task.start` (defined above) ensures that `canon
task.start` is called exactly once, before `canon task.return`, before an async
call finishes.

### 🔀 `canon task.return`

For a canonical definition:
```wasm
(canon task.return (core func $f))
(canon task.return $ft (core func $f))
```
validation specifies:
* `$f` is given type `(func (param i32))`
* `$f` is given type `$ft`, which validation requires to be a (core) function type

Calling `$f` invokes the following function which lifts the results from the
current instance and passes them to the caller:
```python
async def canon_task_return(task, i):
async def canon_task_return(task, core_ft, flat_args):
assert(len(core_ft.params) == len(flat_args))
trap_if(task.opts.sync)
trap_if(core_ft != flatten_functype(CanonicalOptions(), FuncType(task.ft.results, []), 'lower'))
task.return_()
task.return_thunk(lift_async_values(task, CoreValueIter([i]), task.ft.result_types()))
results = lift_sync_values(task, MAX_FLAT_PARAMS, CoreValueIter(flat_args), task.ft.result_types())
task.return_thunk(results)
assert(len(core_ft.results) == 0)
return []
```
The call to `Task.return_` (defined above) ensures that `canon task.return` is
called exactly once, after `canon task.start`, before an async call finishes.
An expected implementation of `task.return` would generate a core wasm function
for each lowering of an `async`-lifted export that performs the fused copy of
the results into the caller, storing the index of this function in the `Task`
structure and using `call_indirect` to perform the function-type-equality check
required here. The call to `Task.return_` (defined above) ensures that `canon
task.return` is called exactly once, after `canon task.start`, before an async
call finishes.

### 🔀 `canon task.wait`

Expand Down
37 changes: 21 additions & 16 deletions design/mvp/Explainer.md
Original file line number Diff line number Diff line change
Expand Up @@ -1313,8 +1313,8 @@ canon ::= ...
| (canon resource.new <typeidx> (core func <id>?))
| (canon resource.drop <typeidx> async? (core func <id>?))
| (canon resource.rep <typeidx> (core func <id>?))
| (canon task.start (core func <id>?)) 🔀
| (canon task.return (core func <id>?)) 🔀
| (canon task.start <core:typeidx> (core func <id>?)) 🔀
| (canon task.return <core:typeidx> (core func <id>?)) 🔀
| (canon task.wait (core func <id>?)) 🔀
| (canon task.poll (core func <id>?)) 🔀
| (canon task.yield (core func <id>?)) 🔀
Expand Down Expand Up @@ -1374,21 +1374,26 @@ transferring ownership of the newly-created resource to the export's caller.
See the [async explainer](Async.md) for high-level context and terminology
and the [Canonical ABI explainer] for detailed runtime semantics.

The `task.start` built-in has type `[i32] -> []` where the `i32` is a pointer
into a linear memory buffer that will receive the arguments of the call to
the current task. This built-in must be called from an `async`-lifted export
exactly once per export activation. Delaying the call to `task.start` allows
the async callee to exert *backpressure* on the caller. (See also
[Starting](Async.md#starting) in the async explainer and [`canon_task_start`]
in the Canonical ABI explainer.)

The `task.return` built-in has type `[i32] -> []` where the `i32` is a pointer
to a linear memory buffer containing the value to be returned from the current
The `task.start` built-in returns the arguments to the currently-executing
task. This built-in must be called from an `async`-lifted export exactly once
per export activation after `task.start`. After calling `task.return`, the
callee can continue executing for an arbitrary amount of time before returning
to the caller. (See also [Returning](Async.md#returning) in the async explainer
and [`canon_task_return`] in the Canonical ABI explainer.)
per export activation. Delaying the call to `task.start` allows the async
callee to exert *backpressure* on the caller. The `canon task.start` definition
takes the type index of a core function type and produces a core function with
exactly that type. When called, the declared core function type is checked
to match the lowered function type of a component-level function returning the
parameter types of the current task. (See also [Starting](Async.md#starting) in
the async explainer and [`canon_task_start`] in the Canonical ABI explainer.)

The `task.return` built-in takes as parameters the result values of the
currently-executing task. This built-in must be called from an `async`-lifted
export exactly once per export activation after `task.start`. After calling
`task.return`, the callee can continue executing for an arbitrary amount of
time before returning to the caller. The `canon task.return` definition takes
the type index of a core function type and produces a core function with
exactly that type. When called, the declared core function type is checked
to match the lowered function type of a component-level function taking the
result types of the current task. (See also [Returning](Async.md#returning) in
the async explainer and [`canon_task_return`] in the Canonical ABI explainer.)

The `task.wait` built-in has type `[i32] -> [i32]`, returning an event and
storing the 4-byte payload of the event at the address passed as parameter.
Expand Down
24 changes: 18 additions & 6 deletions design/mvp/canonical-abi/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class ModuleType(ExternType):
class CoreFuncType(CoreExternType):
params: list[str]
results: list[str]
def __eq__(self, other):
return self.params == other.params and self.results == other.results

@dataclass
class CoreMemoryType(CoreExternType):
Expand Down Expand Up @@ -1334,12 +1336,14 @@ def lower_heap_values(cx, vs, ts, out_param):
tuple_value = {str(i): v for i,v in enumerate(vs)}
if out_param is None:
ptr = cx.opts.realloc(0, 0, alignment(tuple_type), elem_size(tuple_type))
flat_vals = [ptr]
else:
ptr = out_param.next('i32')
flat_vals = []
trap_if(ptr != align_to(ptr, alignment(tuple_type)))
trap_if(ptr + elem_size(tuple_type) > len(cx.opts.memory))
store(cx, tuple_value, tuple_type, ptr)
return [ptr]
return flat_vals

### `canon lift`

Expand Down Expand Up @@ -1466,18 +1470,26 @@ async def canon_resource_rep(rt, task, i):

### `canon task.start`

async def canon_task_start(task, i):
async def canon_task_start(task, core_ft, flat_args):
assert(len(core_ft.params) == len(flat_args))
trap_if(task.opts.sync)
trap_if(core_ft != flatten_functype(CanonicalOptions(), FuncType([], task.ft.params), 'lower'))
task.start()
lower_async_values(task, task.start_thunk(), task.ft.param_types(), CoreValueIter([i]))
return []
args = task.start_thunk()
flat_results = lower_sync_values(task, MAX_FLAT_RESULTS, args, task.ft.param_types(), CoreValueIter(flat_args))
assert(len(core_ft.results) == len(flat_results))
return flat_results

### `canon task.return`

async def canon_task_return(task, i):
async def canon_task_return(task, core_ft, flat_args):
assert(len(core_ft.params) == len(flat_args))
trap_if(task.opts.sync)
trap_if(core_ft != flatten_functype(CanonicalOptions(), FuncType(task.ft.results, []), 'lower'))
task.return_()
task.return_thunk(lift_async_values(task, CoreValueIter([i]), task.ft.result_types()))
results = lift_sync_values(task, MAX_FLAT_PARAMS, CoreValueIter(flat_args), task.ft.result_types())
task.return_thunk(results)
assert(len(core_ft.results) == 0)
return []

### `canon task.wait`
Expand Down
20 changes: 9 additions & 11 deletions design/mvp/canonical-abi/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,8 @@ async def test_async_to_async():
eager_ft = FuncType([], [U8()])
async def core_eager_producer(task, args):
assert(len(args) == 0)
[] = await canon_task_start(task, 1000)
[] = await canon_task_return(task, 43)
[] = await canon_task_start(task, CoreFuncType([],[]), [])
[] = await canon_task_return(task, CoreFuncType(['i32'],[]), [43])
return []
eager_callee = partial(canon_lift, producer_opts, producer_inst, core_eager_producer, eager_ft)

Expand All @@ -514,11 +514,9 @@ async def blocking_callee(entered, start_thunk, return_thunk):
async def consumer(task, args):
assert(len(args) == 0)

ptr = consumer_heap.realloc(0, 0, 1, 1)
[] = await canon_task_start(task, ptr)
b = consumer_heap.memory[ptr]
assert(b == True)
[b] = await canon_task_start(task, CoreFuncType([],['i32']), [])

ptr = consumer_heap.realloc(0, 0, 1, 1)
[ret] = await canon_lower(consumer_opts, eager_callee, eager_ft, task, [0, ptr])
assert(ret == 0)
u8 = consumer_heap.memory[ptr]
Expand Down Expand Up @@ -574,7 +572,7 @@ async def dtor(task, args):
assert(callidx == 1)
assert(task.num_async_subtasks == 0)

[] = await canon_task_return(task, 42)
[] = await canon_task_return(task, CoreFuncType(['i32'],[]), [42])
return []

ft = FuncType([Bool()],[U8()])
Expand Down Expand Up @@ -611,7 +609,7 @@ async def producer(fut, entered, start_thunk, return_thunk):
consumer_ft = FuncType([],[U32()])
async def consumer(task, args):
assert(len(args) == 0)
[] = await canon_task_start(task, 0)
[] = await canon_task_start(task, CoreFuncType([],[]), [])

[ret] = await canon_lower(opts, producer1, producer_ft, task, [0, 0])
assert(ret == (1 | (AsyncCallState.STARTED << 30)))
Expand All @@ -633,7 +631,7 @@ async def callback(task, args):
assert(args[0] == 43)
assert(args[1] == AsyncCallState.DONE)
assert(args[2] == 2)
[] = await canon_task_return(task, 83)
[] = await canon_task_return(task, CoreFuncType(['i32'],[]), [83])
return [0]

consumer_inst = ComponentInstance()
Expand Down Expand Up @@ -710,8 +708,8 @@ async def consumer(task, args):

assert(task.poll() is None)

await canon_task_start(task, 0)
await canon_task_return(task, 83)
await canon_task_start(task, CoreFuncType([],[]), [])
await canon_task_return(task, CoreFuncType(['i32'],[]), [83])
return []

consumer_inst = ComponentInstance()
Expand Down

0 comments on commit 979a36c

Please sign in to comment.