Skip to content

Commit

Permalink
vlib: add an arrays.parallel module, containing parallel.run/3 an…
Browse files Browse the repository at this point in the history
…d `parallel.amap/3` implementations (#22090)
  • Loading branch information
prashanth-hegde authored Aug 24, 2024
1 parent 481b842 commit a31cd37
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/tools/vtest-self.v
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,11 @@ const skip_on_musl = [
'vlib/gg/draw_fns_api_test.v',
'vlib/v/tests/skip_unused/gg_code.vv',
'vlib/v/tests/c_struct_with_reserved_field_name_test.v',
'vlib/arrays/parallel/parallel_test.v',
]
const skip_on_ubuntu_musl = [
'do_not_remove',
'vlib/arrays/parallel/parallel_test.v',
//'vlib/v/gen/js/jsgen_test.v',
'vlib/net/http/cookie_test.v',
'vlib/net/http/status_test.v',
Expand Down
111 changes: 111 additions & 0 deletions vlib/arrays/parallel/parallel.v
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
module parallel

import sync
import runtime

// Params contains the optional parameters that can be passed to `run` and `amap`.
@[params]
pub struct Params {
pub mut:
workers int // 0 by default, so that VJOBS will be used, through runtime.nr_jobs()
}

fn limited_workers(max_workers int, ilen int) int {
// create a limited amount of workers to handle the load
workers := if max_workers != 0 { max_workers } else { runtime.nr_jobs() }
if ilen < workers {
return ilen
}
return workers
}

// run lets the user run an array of input with a
// user provided function in parallel. It limits the number of
// worker threads to min(num_workers, num_cpu)
// The function aborts if an error is encountered.
// Example: parallel.run([1, 2, 3, 4, 5], 2, fn (i) { println(i) })
pub fn run[T](input []T, worker fn (T), opt Params) {
if input.len == 0 {
return
}
workers := limited_workers(opt.workers, input.len)
ch := chan T{cap: workers * 2}
mut wg := sync.new_waitgroup()
wg.add(input.len)
for _ in 0 .. workers {
spawn fn [ch, worker, mut wg] [T]() {
for {
task := <-ch or { break }
worker(task)
wg.done()
}
}()
}

// put the input into the channel
for i in input {
ch <- i
}

// wait for all tasks to complete
wg.wait()
ch.close() // this will signal all the workers to exit, and we can return, without having to wait for them to finish
}

struct Task[T, R] {
idx int
input T
result R
}

// amap lets the user run an array of input with a
// user provided function in parallel. It limits the number of
// worker threads to max number of cpus.
// The worker function can return a value. The returning array maintains the input order.
// Any error handling should have happened within the worker function.
// Example: squares := parallel.amap([1, 2, 3, 4, 5], 2, fn (i) { return i * i })
pub fn amap[T, R](input []T, worker fn (T) R, opt Params) []R {
if input.len == 0 {
return []
}
mut tasks := []Task[T, R]{len: input.len}
// the tasks array will be passed to the closure of each worker by reference, so that it could
// then modify the same tasks:
mut tasks_ref := &tasks

workers := limited_workers(opt.workers, input.len)
// use a buffered channel for transfering the tasks, that has enough space to keep all the workers busy,
// without blocking the main thread needlessly
ch := chan Task[T, R]{cap: workers * 2}
mut wg := sync.new_waitgroup()
wg.add(input.len)
for _ in 0 .. workers {
spawn fn [ch, worker, mut wg, mut tasks_ref] [T, R]() {
for {
mut task := <-ch or { break }
unsafe {
tasks_ref[task.idx] = Task[T, R]{
idx: task.idx
input: task.input
result: worker(task.input)
}
}
wg.done()
}
}()
}

// put the input into the channel
for idx, inp in input {
ch <- Task[T, R]{
idx: idx
input: inp
}
}

// wait for all tasks to complete
wg.wait()
ch.close()
tasks.sort(a.idx < b.idx)
return tasks.map(it.result)
}
66 changes: 66 additions & 0 deletions vlib/arrays/parallel/parallel_test.v
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import arrays.parallel
import rand
import time

fn test_parallel_run_with_empty_arrays() {
parallel.run([]int{}, fn (x int) {})
parallel.run([]u8{}, fn (x u8) {})
parallel.run([]u32{}, fn (x u32) {}, workers: 1000)
assert true
}

fn test_parallel_amap_with_empty_arrays() {
assert parallel.amap([]int{}, fn (x int) u8 {
return 0
}) == []
assert parallel.amap([]u8{}, fn (x u8) int {
return 0
}) == []
assert parallel.amap([]u8{}, fn (x u8) int {
return 0
}, workers: 1000) == []
assert true
}

fn test_parallel_run() {
counters := []int{len: 10, init: index}
dump(counters)
mut res := []string{len: 10}
mut pres := &res
parallel.run(counters, fn [mut pres] (i int) {
delay := rand.intn(250) or { 250 }
time.sleep(delay * time.millisecond)
unsafe {
pres[i] = 'task ${i}, delay=${delay}ms'
}
assert true
})
dump(res)
assert res.len == counters.len
}

fn test_parallel_amap() {
input := [1, 2, 3, 4, 5, 6, 7, 8, 9]
dump(input)
dump(input.len)
output := parallel.amap(input, fn (i int) int {
delay := rand.intn(250) or { 250 }
time.sleep(delay * time.millisecond)
return i * i
})
dump(output)
dump(output.len)
assert input.len == output.len

for i, _ in output {
assert output[i] == input[i] * input[i]
}

// unordered output validation
assert output.len == input.len
op_sorted := output.sorted()
dump(op_sorted)
for i, op in op_sorted {
assert op == input[i] * input[i]
}
}

0 comments on commit a31cd37

Please sign in to comment.