For quite a now while cligen has a ~procpool~ submodule. It is basically a
module similar to Python’s multiproccesing module. Under the hood it
uses fork()
to produce child processes.
In many use cases the memory overhead and startup overhead compared to
threads is completely negligible. In these cases using multiprocessing
is incredibly nice, because one side-steps all memory safety and race
condition problems. Until now, the API of procpool
may have been a
bit too low level, but with forked
this changes.
forked
is a for loop macro, which takes the iterator / collection of
the loop and runs the loop body in parallel on a pool of processes. By
default the number of logical CPU cores is used, but this can be
overwritten either by an environment variable PP_JOBS
or explicitly
via the macro call (the latter takes precedent over the former).
The basic syntax looks as follows:
var res = 0.0
for x in forked(iter):
let y = doWork() # arbitrary loop body
send y # mandatory `send`
join: res += y # mandatory `join`
Here iter
is any iterator or collection. Anything that would
normally work as a for loop without forked
. The loop body (in the
example just let y = doWork()
) can be any number of lines and can
contain any Nim code. There are no restrictions and feel free to
access any global variables and even mutate them. Just be aware that
the advantage of not having to worry about race conditions and thread
safety, also means mutated globals won’t persist the child processes,
nor will any child see any other child’s mutation (because of course
each process has its own memory space).
As you can see, there are 2 mandatory statements as part of the
loop. A send
statement, which receives a variable or
expression. This is the data you wish to send back to the parent
process.
Finally, the join
statement is the code which describes how the
main process consumes the data sent by each child. Here you will
likely want to process the data you sent in some way.
See the following two examples for some further ideas:
# 1. Port of https://github.com/c-blake/cligen/blob/master/examples/piPar.nim
import forked
import flatBuffers # for `save/loadBuffer`, see below
proc term(k: int): float = (0.5 - float(k mod 2))*8'f/float(2*k + 1)
iterator batches(n, batch: int): Slice[int] =
for start in countup(0, n - 1, batch):
yield start ..< start+batch
proc piProcPool(n: int): float =
for s in forked(batches(n, 5)):
var res = 0.0
for k in s:
res += term(k)
send res
join: result += res
echo piProcPool(100) # run in parallel!
# 2. Example sending non trivial data (i.e. not mem-copyable) data from
# child to parent. It just works 'magically' for you.
type
Foo = object
s: string
x: float
proc calcFoo(x: int): Foo = Foo(s: $x, x: x.float)
proc testIt(): seq[Foo] =
for x in forked(0 ..< 10):
let f = calcFoo(x)
send f
join: result.add f
echo testIt() # run in parallel!
You can also explicitly name the iteration index variable by writing one of the following:
for i, x in forked(...)
for (i, x) in forked(...)
This allows you to explicitly access the iteration index (and thus job index) in the loop body.
IMPORTANT: This also does allow you to access e.g. the i-th
element of e.g. a sequence inside of the join
portion!
For example the second example from above could also be:
type
Foo = object
s: string
x: float
proc calcFoo(x: int): Foo = Foo(s: $x, x: x.float)
proc testIt(): seq[Foo] =
result = newSeq[Foo](10)
for i, x in forked(0 ..< 10):
let f = calcFoo(x)
send f
join: result[i] = f
echo testIt()
This can be useful if you need the result in a sorted order.
The biggest convenience feature of forked
over using the raw
procpool
is ‘automagic’ transferring data from child to parent. For
any data that can be mem-copied, we simply send the binary data
through the file descriptor. However, if the data we want to send
contains non mem-copyable data like strings, (nested) sequences or ref
objects, this needs to be handled with manual care in
procpool
. While usually not an issue, it makes it less of a
drop-in replacement than one sometimes might want.
The way we get around it is to save the data we send back to an
in-memory file (using /dev/shm
) and only sending the filename back
to the parent, which loads the data back.
For this to work, the forked
macro code utilizes a saveBuffer
and
loadBuffer
procedure. In principle any ‘serialization’ library can
be used. This works, because the forked
macro only produces code,
which emits the function calls without binding them.
The required API of these two functions is:
proc saveBuffer*[T](x: T, fname: string)
proc loadBuffer*[T](fname: string, deleteFile = false): T
This is already implemented in flatBuffers, but feel free to e.g. use
flatty and simply write the two procedures as wrappers around the
equivalent flatty
functionality (fromFlatty
/ toFlatty
and
likely calls to read / write to disk).
The deleteFile
argument is used to remove the in-memory files after
they have been read. If you write your own loadBuffer
you should
make sure to memory map the file instead of using readFile
to avoid
unnecessary overhead.
NOTE: If you always wish to use flatBuffers
, you can compile with
-d:UseFlatBuffers
to import and reexport the required logic from
it. If you put this into your ~/.config/nim/nim.cfg
you won’t have
to write it all the time. :)
There are 3 compile time variables, which can be changed to alter data transfer behavior.
These are:
## Decides if we send data from child processes back to the parent by producing (in memory)
## files using `/dev/shm` (or real files if `BasePath` is changed)
const WriteFiles* {.booldefine.} = true
## Decides if the binary data files are deleted upon being memory mapped by the parent
const DeleteFiles* {.booldefine.} = true
## Can be used to change the default path where binary files are stored
const BasePath* {.strdefine.} = "/dev/shm/pp_forked/"
(Bool and string define variables are adjusted by compiling with
e.g. -d:WriteFiles=false
or -d:BasePath=/tmp/
)
In addition as mentioned above, the number of jobs can be adjusted at
runtime by defining a PP_JOBS
environment variable. For example to
adjust the number of jobs to 8:
PP_JOBS=8 ./program_using_forked
(or define it beforehand using export PP_JOBS=8
of course)
Alternatively you can also pass the number of jobs to forked
directly:
let jobs = 8
for x in forked(iter, jobs):
...
to run the loop with only 8 jobs for example. This takes precedence
over any PP_JOBS
definition, so make sure you only define it if you
want to fix it to some other constant or runtime value.
This module might very well become a submodule of cligen
itself in
the near future. For the time being it is separate until we have
decided how to handle dependencies for data communication.