Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow events (+ support destructible types like refcounting in tasks) #144

Merged
merged 7 commits into from
May 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 47 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ instead of being based on traditional work-stealing with shared-memory deques.
> to be deadlock-free. They were not submitted to an additional data race
> detection tool to ensure proper implementation.
>
> Furthermore worker threads are basically actors or state-machines and
> Furthermore worker threads are state-machines and
> were not formally verified either.
>
> Weave does limit synchronization to only simple SPSC and MPSC channels which greatly reduces
Expand Down Expand Up @@ -170,7 +170,7 @@ The root thread is also a worker thread.
Worker threads are tuned to maximize throughput of computational **tasks**.

- `spawn fnCall(args)` which spawns a function that may run on another thread and gives you an awaitable `Flowvar` handle.
- `newPledge`, `fulfill` and `spawnDelayed` (experimental) to delay a task until some dependencies are met. This allows expressing precise data dependencies and producer-consumer relationships.
- `newFlowEvent`, `trigger`, `spawnOnEvent` and `spawnOnEvents` (experimental) to delay a task until some dependencies are met. This allows expressing precise data dependencies and producer-consumer relationships.
- `sync(Flowvar)` will await a Flowvar and block until you receive a result.
- `isReady(Flowvar)` will check if `sync` will actually block or return the result immediately.

Expand Down Expand Up @@ -224,7 +224,7 @@ and for shutdown
Once setup, a foreign thread can submit jobs via:

- `submit fnCall(args)` which submits a function to the Weave runtime and gives you an awaitable `Pending` handle.
- `newPledge`, `fulfill` and `submitDelayed` (experimental) to delay a task until some dependencies are met. This allows expressing precise data dependencies and producer-consumer relationships.
- `newFlowEvent`, `trigger` and `submitDelayed` (experimental) to delay a task until some dependencies are met. This allows expressing precise data dependencies and producer-consumer relationships.
- `waitFor(Pending)` which await a Pending job result and blocks the current thread
- `isReady(Pending)` will check if `waitFor` will actually block or return the result immediately.
- `isSubmitted(job)` allows you to build speculative algorithm where a job is submitted only if certain conditions are valid.
Expand Down Expand Up @@ -473,31 +473,38 @@ In the literature, it is also called:
Tagged experimental as the API and its implementation are unique
compared to other libraries/language-extensions. Feedback welcome.

No specific ordering is required between calling the pledge producer and its consumer(s).
No specific ordering is required between calling the event producer and its consumer(s).

Dependencies are expressed by a handle called `Pledge`.
A pledge can express either a single dependency, initialized with `newPledge()`
or a dependencies on parallel for loop iterations, initialized with `newPledge(start, exclusiveStop, stride)`
Dependencies are expressed by a handle called `FlowEvent`.
An flow event can express either a single dependency, initialized with `newFlowEvent()`
or a dependencies on parallel for loop iterations, initialized with `newFlowEvent(start, exclusiveStop, stride)`

To await on a single pledge `singlePledge` pass it to `spawnDelayed` or the `parallelFor` invocation.
To await on an iteration `iterPledge`, pass a tuple:
- `(iterPledge, 0)` to await precisely and only for iteration 0. This works with both `spawnDelayed` or `parallelFor`
- `(iterPledge, myIndex)` to await on a whole iteration range. This only works with `parallelFor`. The `Pledge` iteration domain and the `parallelFor` domain must be the same. As soon as a subset of the pledge is ready, the corresponding `parallelFor` tasks will be scheduled.
To await on a single event pass it to `spawnOnEvent` or the `parallelFor` invocation.
To await on an iteration, pass a tuple:
- `(FlowEvent, 0)` to await precisely and only for iteration 0. This works with both `spawnOnEvent` or `parallelFor` (via a dependsOnEvent statement)
- `(FlowEvent, loop_index_variable)` to await on a whole iteration range.
For example
```Nim
parallelFor i in 0 ..< n:
dependsOnEvent: (e, i) # Each "i" will independently depends on their matching event
body
```
This only works with `parallelFor`. The `FlowEvent` iteration domain and the `parallelFor` domain must be the same. As soon as a subset of the pledge is ready, the corresponding `parallelFor` tasks will be scheduled.

#### Delayed computation with single dependencies

```Nim
import weave

proc echoA(pA: Pledge) =
proc echoA(eA: FlowEvent) =
echo "Display A, sleep 1s, create parallel streams 1 and 2"
sleep(1000)
pA.fulfill()
eA.trigger()

proc echoB1(pB1: Pledge) =
proc echoB1(eB1: FlowEvent) =
echo "Display B1, sleep 1s"
sleep(1000)
pB1.fulfill()
eB1.trigger()

proc echoB2() =
echo "Display B2, exit stream"
Expand All @@ -508,12 +515,12 @@ proc echoC1() =
proc main() =
echo "Dataflow parallelism with single dependency"
init(Weave)
let pA = newPledge()
let pB1 = newPledge()
spawnDelayed pB1, echoC1()
spawnDelayed pA, echoB2()
spawnDelayed pA, echoB1(pB1)
spawn echoA(pA)
let eA = newFlowEvent()
let eB1 = newFlowEvent()
spawnOnEvent eB1, echoC1()
spawnOnEvent eA, echoB2()
spawnOnEvent eA, echoB1(eB1)
spawn echoA(eA)
exit(Weave)

main()
Expand All @@ -524,33 +531,33 @@ main()
```Nim
import weave

proc echoA(pA: Pledge) =
proc echoA(eA: FlowEvent) =
echo "Display A, sleep 1s, create parallel streams 1 and 2"
sleep(1000)
pA.fulfill()
eA.trigger()

proc echoB1(pB1: Pledge) =
proc echoB1(eB1: FlowEvent) =
echo "Display B1, sleep 1s"
sleep(1000)
pB1.fulfill()
eB1.trigger()

proc echoB2(pB2: Pledge) =
proc echoB2(eB2: FlowEvent) =
echo "Display B2, no sleep"
pB2.fulfill()
eB2.trigger()

proc echoC12() =
echo "Display C12, exit stream"

proc main() =
echo "Dataflow parallelism with multiple dependencies"
init(Weave)
let pA = newPledge()
let pB1 = newPledge()
let pB2 = newPledge()
spawnDelayed pB1, pB2, echoC12()
spawnDelayed pA, echoB2(pB2)
spawnDelayed pA, echoB1(pB1)
spawn echoA(pA)
let eA = newFlowEvent()
let eB1 = newFlowEvent()
let eB2 = newFlowEvent()
spawnOnEvents eB1, eB2, echoC12()
spawnOnEvent eA, echoB2(eB2)
spawnOnEvent eA, echoB1(eB1)
spawn echoA(eA)
exit(Weave)

main()
Expand All @@ -570,20 +577,20 @@ import weave
proc main() =
init(Weave)

let pA = newPledge(0, 10, 1)
let pB = newPledge(0, 10, 1)
let eA = newFlowEvent(0, 10, 1)
let pB = newFlowEvent(0, 10, 1)

parallelFor i in 0 ..< 10:
captures: {pA}
captures: {eA}
sleep(i * 10)
pA.fulfill(i)
eA.trigger(i)
echo "Step A - stream ", i, " at ", i * 10, " ms"

parallelFor i in 0 ..< 10:
dependsOn: (pA, i)
dependsOn: (eA, i)
captures: {pB}
sleep(i * 10)
pB.fulfill(i)
pB.trigger(i)
echo "Step B - stream ", i, " at ", 2 * i * 10, " ms"

parallelFor i in 0 ..< 10:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ proc pack_A_mc_kc*[T; ukernel: static MicroKernel](
proc pack_B_kc_nc*[T; ukernel: static MicroKernel](
packedB: ptr UncheckedArray[T],
kc, nc: int,
B: MatrixView[T], kcTileReady: Pledge) =
B: MatrixView[T], kcTileReady: FlowEvent) =
## Packs panel [kc, nc] for ~B (half-L1 cache)
## Pads if needed
##
Expand Down Expand Up @@ -97,4 +97,4 @@ proc pack_B_kc_nc*[T; ukernel: static MicroKernel](
# so waiting there guarantees proper data dependencies
# provided the "k" loop is not nested (i.e. does real work instead of enqueueing tasks)
discard sync(kcLoop)
kcTileReady.fulfill()
kcTileReady.trigger()
4 changes: 2 additions & 2 deletions benchmarks/matmul_gemm_blas/gemm_pure_nim/gemm_weave.nim
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ proc gemm_impl[T; ukernel: static MicroKernel](
prefetch(tiles.b, Write, LowTemporalLocality)
let kc = min(K - pc, tiles.kc) # Deal with edges # A[0:M, pc:pc+kc]

let kcncTileReady = newPledge()
let kcncTileReady = newFlowEvent()
let kcncB = vB.stride(pc, 0) # B[pc:pc+kc, jc:jc+nc]
spawn pack_B_kc_nc[T, ukernel]( # PackB panel [kc, nc] (nc is large or unknown)
tiles.b, kc, nc, kcncB, kcncTileReady)
Expand All @@ -176,7 +176,7 @@ proc gemm_impl[T; ukernel: static MicroKernel](
let mckcA = vA.stride(ic, pc) # A[ic:ic+mc, pc:pc+kc]
pack_A_mc_kc[T, ukernel](packA, mc, kc, mckcA) # PackA block [mc, kc]

spawnDelayed(
spawnOnEvent(
kcncTileReady,
gebp_mkernel[T, ukernel]( # GEBP macrokernel:
mc, nc, kc, # C[ic:ic+mc, jc:jc+nc] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ proc gemm_impl[T; ukernel: static MicroKernel](
for pc in countup(0, K-1, tiles.kc):
# Explanation of the control flow:
# 1. syncScope -> only trigger the next `pc` iteration once the current one is finished
# 2. kcncTileReady -> packing B is asynchronously spawned, kcNcTileReady is a pledge, once fulfilled
# 2. kcncTileReady -> packing B is asynchronously spawned, kcNcTileReady is a FlowEvent, once triggered
# computation dependent on packing B can proceed
# 3. parallelFor icb -> is an async parallel for loop that may or may not be split in multiple thread
# 4. spawnDelayed -> the packing of A is done synchronously in the current thread
Expand All @@ -162,7 +162,7 @@ proc gemm_impl[T; ukernel: static MicroKernel](
prefetch(tiles.b, Write, LowTemporalLocality)
let kc = min(K - pc, tiles.kc) # Deal with edges # A[0:M, pc:pc+kc]

let kcncTileReady = newPledge()
let kcncTileReady = newFlowEvent()
let kcncB = vB.stride(pc, 0) # B[pc:pc+kc, jc:jc+nc]
spawn pack_B_kc_nc[T, ukernel]( # PackB panel [kc, nc] (nc is large or unknown)
tiles.b, kc, nc, kcncB, kcncTileReady)
Expand All @@ -183,7 +183,7 @@ proc gemm_impl[T; ukernel: static MicroKernel](
let mckcA = vA.stride(ic, pc) # A[ic:ic+mc, pc:pc+kc]
pack_A_mc_kc[T, ukernel](packA, mc, kc, mckcA) # PackA block [mc, kc]

spawnDelayed(
spawnOnEvent(
kcncTileReady,
gebp_mkernel[T, ukernel]( # GEBP macrokernel:
mc, nc, kc, # C[ic:ic+mc, jc:jc+nc] =
Expand Down
14 changes: 14 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,20 @@

### v0.x.x - Unreleased

#### Breaking

- The experimental `Pledge` for dataflow parallelism have been renamed
`FlowEvent` to be in line with:
- `AsyncEvent` in Nim async frameworks
- `cudaEvent_t` in CUDA
- `cl_event` in OpenCL

Renaming changes:
- `newPledge()` becomes `newFlowEvent()`
- `fulfill()` becomes `trigger()`
- `spawnDelayed()` becomes `spawnOnEvents()`
- The `dependsOn` clause in `parallelFor` becomes `dependsOnEvent`

#### Features

- Added `isReady(Flowvar)` which will return true is `sync` would block on that Flowvar or if the result is actually immediately available.
Expand Down
46 changes: 23 additions & 23 deletions tests/test_background_jobs.nim
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ proc main() =
block: # Delayed computation
serviceDone.store(false, moRelaxed)

proc echoA(pA: Pledge) =
proc echoA(eA: FlowEvent) =
echo "Display A, sleep 1s, create parallel streams 1 and 2"
sleep(1000)
pA.fulfill()
eA.trigger()

proc echoB1(pB1: Pledge) =
proc echoB1(eB1: FlowEvent) =
echo "Display B1, sleep 1s"
sleep(1000)
pB1.fulfill()
eB1.trigger()

proc echoB2() =
echo "Display B2, exit stream"
Expand All @@ -92,12 +92,12 @@ proc main() =
waitUntilReady(Weave)

echo "Sanity check 3: Dataflow parallelism"
let pA = newPledge()
let pB1 = newPledge()
let done = submitDelayed(pB1, echoC1())
submitDelayed pA, echoB2()
submitDelayed pA, echoB1(pB1)
submit echoA(pA)
let eA = newFlowEvent()
let eB1 = newFlowEvent()
let done = submitOnEvent(eB1, echoC1())
submitOnEvent eA, echoB2()
submitOnEvent eA, echoB1(eB1)
submit echoA(eA)

discard waitFor(done)
serviceDone[].store(true, moRelaxed)
Expand All @@ -109,19 +109,19 @@ proc main() =
block: # Delayed computation with multiple dependencies
serviceDone.store(false, moRelaxed)

proc echoA(pA: Pledge) =
proc echoA(eA: FlowEvent) =
echo "Display A, sleep 1s, create parallel streams 1 and 2"
sleep(1000)
pA.fulfill()
eA.trigger()

proc echoB1(pB1: Pledge) =
proc echoB1(eB1: FlowEvent) =
echo "Display B1, sleep 1s"
sleep(1000)
pB1.fulfill()
eB1.trigger()

proc echoB2(pB2: Pledge) =
proc echoB2(eB2: FlowEvent) =
echo "Display B2, no sleep"
pB2.fulfill()
eB2.trigger()

proc echoC12(): bool =
echo "Display C12, exit stream"
Expand All @@ -132,13 +132,13 @@ proc main() =
waitUntilReady(Weave)

echo "Sanity check 4: Dataflow parallelism with multiple dependencies"
let pA = newPledge()
let pB1 = newPledge()
let pB2 = newPledge()
let done = submitDelayed(pB1, pB2, echoC12())
submitDelayed pA, echoB2(pB2)
submitDelayed pA, echoB1(pB1)
submit echoA(pA)
let eA = newFlowEvent()
let eB1 = newFlowEvent()
let eB2 = newFlowEvent()
let done = submitOnEvents(eB1, eB2, echoC12())
submitOnEvent eA, echoB2(eB2)
submitOnEvent eA, echoB1(eB1)
submit echoA(eA)

discard waitFor(done)
serviceDone[].store(true, moRelaxed)
Expand Down
8 changes: 4 additions & 4 deletions weave.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import
runtime],
weave/state_machines/[sync_root, sync, sync_scope],
weave/datatypes/flowvars,
weave/cross_thread_com/pledges,
weave/cross_thread_com/flow_events,
weave/contexts,
weave/[executor, parallel_jobs]

Expand All @@ -28,11 +28,11 @@ export
# Experimental scope barrier
syncScope,
# Experimental dataflow parallelism
spawnDelayed, Pledge,
fulfill, newPledge,
spawnOnEvent, spawnOnEvents, FlowEvent,
trigger, newFlowEvent,
# Experimental background service
Pending,
submit, submitDelayed,
submit, submitOnEvent, submitOnEvents,
runInBackground, waitUntilReady,
setupSubmitterThread, teardownSubmitterThread,
waitFor, isSubmitted,
Expand Down
4 changes: 2 additions & 2 deletions weave.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ task test, "Run Weave tests":
test "", "weave/cross_thread_com/channels_spsc_single.nim"
test "", "weave/cross_thread_com/channels_spsc_single_ptr.nim"
test "", "weave/cross_thread_com/channels_mpsc_unbounded_batch.nim"
test "", "weave/cross_thread_com/pledges.nim"
test "", "weave/cross_thread_com/flow_events.nim"

test "", "weave/datatypes/binary_worker_trees.nim"
test "", "weave/datatypes/bounded_queues.nim"
Expand Down Expand Up @@ -96,7 +96,7 @@ task test_gc_arc, "Run Weave tests with --gc:arc":
test "--gc:arc", "weave/cross_thread_com/channels_spsc_single.nim"
test "--gc:arc", "weave/cross_thread_com/channels_spsc_single_ptr.nim"
test "--gc:arc", "weave/cross_thread_com/channels_mpsc_unbounded_batch.nim"
test "--gc:arc", "weave/cross_thread_com/pledges.nim"
test "--gc:arc", "weave/cross_thread_com/flow_events.nim"

test "--gc:arc", "weave/datatypes/binary_worker_trees.nim"
test "--gc:arc", "weave/datatypes/bounded_queues.nim"
Expand Down
Loading