A framework for out-of-core and parallel computation.
Dagger allows you to represent huge amounts of data as smaller pieces and compute on the pieces in parallel. The API mimicks Julia's standard library, with a few simple differences, so that it is easy to use. Computation with Dagger uses a scheduler similar to that in Dask. This scheduler tries to minimize the amount of memory allocated at any given time while maximizing CPU utilization. Dagger provides distributed arrays and sparse matrices and operations on them out-of-the-box. Distributed variations of various collection types such as arrays, sparse matrices, NDSparse datastructures or dictionaries can be expressed using the framework.
To begin, let us add a few worker processes and load Dagger.
addprocs() # julia will decide how many workers to add.
using Dagger
Note that you should run using Dagger
after adding
worker processes for julia to load the package on all of them.
A good place to start learning to work with Dagger is to play with some distributed random data.
In this example we will create a matrix of size 10000x10000
which is cut up into pieces of 4000x4000
elements.
a = rand(Blocks(4000, 4000), 10^4, 10^4)
# => Dagger.AllocateArray(...)
Notice the first argument Blocks(4000,4000)
. This tells Dagger to create the matrix with block partitioning where each partition is a 4000x4000
matrix. 4000x4000
floating point numbers takes up 128MB of RAM, this is a good chunk size to deal with - many such chunks can fit in a typical RAM, and files of size 128MB can be written to and read from disk with lesser overhead than smaller chunks. It is the onus of the user of Dagger to chose a suitable partitioning for the data.
In the above example the object a
represents the random matrix . The actual data has not been created yet. If you call compute(a)
, the data will be created.
b = compute(a)
# => Computed(10000x10000 Array{Float64,2} in 9 parts each of (max size) 4000x4000)
The result is an object containing metadata about the various pieces. They may be created on different workers and will stay there until a another worker needs it. However, you can request and get the whole data with the gather
function.
gather(b)
# => 10000x10000 Array{Float64,2}:
....
Okay, now that we have learned how to create a matrix of random numbers, let's do some computation on them!
In this example, we will compute the sum of square of a 10000x10000 normally distributed random matrix.
x = randn(Blocks(4000,4000), 10^4, 10^4)
sum(x.^2)
# => 1.0000084097623596e8
the answer is close to 10^8. Note the use of compute
on sum
. This is because sum
returns an object representing the computation of the sum. You need to call compute on it to actually compute it.
For the full array API supported by Dagger, see below.
Sometimes the result of a computation might be too big to fit in memory. In such cases you will need to save the data to disk. You can do this by calling save
on the computation along with the destination file name.
x = randn(Blocks(4000,4000), 10^5, 10^4)
compute(save(x.^2, "X_sq"))
Note that the metadata about the various pieces will be stored in the file X_sq
while the data itself is stored in a directory called X_sq_data
. Be warned that the directory will be created if it doesn't exist, and any existing files will be overwritten.
Once you have saved some data to disk, you can load it with the load
function.
X_sq = load(Context(), "X_sq")
Notice the first argument Context()
this required so that this load
method is kept special to Dagger. There will be discussion about what Context()
returns later on in the documentation.
Now you can do some computation on X_sq
and the data will be read from disk when different processes need it.
sum(X_sq)
# => 1.0000065091623393e9
Dagger also allows one to distribute an object from the master process using a certain partition type. In the following example, we are distributing an array of size 1000x1000 with a block partition of 100x100 elements per block.
# distribute a matrix and then do some computation
a = rand(1000, 1000)
b = Distribute(Blocks(100, 100), a) # Create chunks of 100x100 submatrices
c = map(x->x^2, b) # map applies a function element wise. this is equivalent to b.^2
d = sum(c)
Dagger starts by cutting up the 1000x1000 array into 100x100 pieces and each process takes up the task of squaring and summing one of these pieces.
You can load data from a big CSV file in parallel and write it in to disk in two simple calls.
x = dist_readdlm("bigdata.csv", ',', 100, cleanup)
processed_x = compute(save(x, "bigbinarydata"))
100 approximately equal chunks from bigdata.csv
will be read by the available processors in parallel, passed to Julia's readdlm
function, and then to the cleanup
function you provided. Note that cleanup
must return a matrix. The distribution of the data will be block partitioned, the width of the block will span all columns. For example, if you have 10^6 rows in the CSV files and 20 columns, the each block will contain approximately 10^4x20
elements. We say approximately because dist_readdlm
divides the input CSV by its size rather than number of lines, each chunk is delimited at the nearest newline.
Once you've loaded the data in this manner, it's a perfect time to slice and dice it and run some statistics on it. Try processed_x'processed_x
for example, since the number of columns is less, the result of this computation is also a small matrix. E.g. the aforementioned example dataset would yeild a 20x20 matrix as a result.
We have seen simple operations like broadcast (namely .^2
), reduce
and sum
on distributed arrays so far. Dagger also supports other essential array operations such as transpose, matrix-matrix multiplication and matrix-vector multiplication.
See the Array API below for details. Some special features are discussed below.
You can index into any Computation using the usual indexing syntax. Some examples of valid indexing syntax are:
X[20:100, 20:100]
X[[20,30,40], [30,40,60]]
X[[20,30,40], :]
X[20:40, [30,40,60]]
Note that indexing again results in a Computation
object of the type GetIndex
. You can use it as input to another computation or call gather
on it to get the indexed sub array.
Array support is made up of pretty generic code, hence SparseMatrixCSC work wherever dense arrays are applicable and wherever Julia defines the required generic functions on them.
To create a random sparse matrix or vector use the sprand
function.
s1 = sprand(Blocks(4000,4000), 10^4, 10^4, 0.01)
# => AllocateArray(...)
compute(s1)
# => Computed(10000x10000 SparseMatrixCSC{Float64,Int64} in 3x3 parts each of (max size) 4000x4000)
s2 = sprand(Blocks(4000,), 10^4, 0.01)
# => AllocateArray(...)
julia> compute(s2)
Computed(10000 SparseVector{Float64,Int64} in 3 parts each of (max size) 4000)
julia> x = compute(s1*s2) # sparse matrix-vector multiply
Computed(10000 SparseVector{Float64,Int64} in 3 parts each of (max size) 4000)
You can also save a partitioned sparse matrix to disk and load it back.
julia> compute(save(s1, "s1"))
julia> x = load(Context(), "s1")
Computed(10000x10000 Array{Float64,2} in 3x3 parts each of (max size) 4000x4000)
julia> gather(x)
10000x10000 sparse matrix with 999793 Float64 entries:
...
Dagger currently does not write results of computations to disk unless you specifically ask it to.
Some operations which are computationally intensive might require you to save the input data as well as output data to disk.
As an example let us take the operation A+A'
. Here, various chunks of A
need to be kept in memory so that they can be added to the corresponding chunk of A'
. But the total memory taken up by these chunks might be too large for your RAM. In such cases you will need to save the array A
to disk first and then save the result of A+A'
to disk as well.
A = rand(Blocks(4000, 4000), 30000, 30000) # 7.2GB of data
saved_A = compute(save(A, "A"))
result = compute(save(saved_A+saved_A', "ApAt"))
Creating arrays
Distribute(partition_scheme, data)
- distributedata
according topartition_scheme
.rand(partition_scheme, [type], [dimensions...])
- create a random array with the specified partition schemerandn(partition_scheme, [dimensions...])
- create a normally distributed random array with the specified partition schemeones(partition_scheme, [type], [dimensions...])
- create an array of all oneszeros(partition_scheme, [type], [dimensions...])
- create an array of all zeros
Sparse arrays
sprand(partition_scheme, m, [n], sparsity)
- create sparse matrix or vector with the given partition scheme
Map-reduce
map(f, c)
- apply f to each element of the distributed objectreduce(f, c)
- reducec
with a 2-arg associative functionf
.reduceblock(f, c)
- reduce each block of data by applyingf
to the block. In block distributed array, the result has the same dimensionality as the input.reducebykey(f, c)
- given a collection of tuples or pairs, use the first element of the tuples as the key, and reduce the values of each key. Computes a Dict of results.
Note: all these operations result in a Computation
object. You need to call compute
or gather
on them to actually do the computation.
Array API
- Unary element-wise operations:
exp, expm1, log, log10, log1p, sqrt, cbrt, exponent,
significand, (-), sin, sinpi, cos, cospi, tan, sec,
cot, csc, sinh, cosh, tanh, coth, sech, csch, asin,
acos, atan, acot, asec, acsc, asinh, acosh, atanh,
acoth, asech, acsch, sinc, cosc
- Binary element-wise operations:
+, -, %, (.*), (.+), (.-), (.%), (./), (.^),
$, &, (.!=), (.<), (.<=), (.==), (.>),
(.>=), (.\), (.//), (.>>), (.<<)
*
on Computations can either stand for matrix-matrix or matrix-vector multiplications.- transpose on a matrix can be done using the
x'
syntax
Compute and gather
compute(ctx, c)
- compute a computationc
gather(ctx, c)
- compute the result, and collate the result on the host process (usually pid 1).
Context
Context([pids=workers()])
- context which uses the processes specified in the pids
Upcoming features
- Sorting
To debug the performance of a computation you can use the debug_compute
substitute for the compute
function. debug_compute
returns a vector filled with performance metadata along with the result of the computation.
For example:
x = rand(Blocks(400, 4000), 1200, 12000)
debug_compute(x*x); # first run to compile stuff (maybe run on a smaller problem)
dbg, result = debug_compute(x*x')
summarize_events(dbg)
# => 3.132245 seconds (83.25 k allocations: 467.796 MB, 2.87% gc time)
# (note that this time might be larger than the running time of the compute
# this is because time readings from many processes are added up.)
The dbg
object is just an array of Timespan
s. A Timespan
contains the following fields:
category
: a symbol. One of:comm
(communication),:compute
, :scheduler,
:scheduler_init` - denotes the type of work being done in the timespanid
: Anid
for the timespan. This is also the id of theThunk
object being dealt with. (category, id) pair identify a timespan uniquely.timeline
: AnOSProc
(or anyProcessor
) object representing the processor where the timespan was recorded.start
:time_ns()
when the timespan startedfinish
:time_ns()
when the timespan finishedgc_diff
: a Base.GC_Diff object containing the GC state change in the timespan (see?Base.GC_Diff
)profiler_samples
: if run withprofile=true
, contains profiler samples collected in the timespan.
Since dbg
is just an array, you can filter events of particular interest to you and summarize them.
# how much time and space did the computation take on process 2?
summarize_events(filter(x-> x.timeline == OSProc(2), dbg))
# => 1.658207 seconds (16.11 k allocations: 269.361 MB, 2.89% gc time)
# how much time was spent communicating by process 3?
summarize_events(filter(x-> x.timeline == OSProc(3) && x.category==:comm, d))
0.606891 seconds (15.69 k allocations: 149.712 MB, 6.26% gc time)
# what was the scheduler overhead?
summarize_events(filter(x->x.category==:scheduler, d))
0.254783 seconds (47.67 k allocations: 3.314 MB)
If you run debug_compute
with profile keyword argument set to true, profile samples during each timespan are collected. summarize_events
will also pretty print the profiler data for you.
For example,
summarize_events(filter(x->x.category==:scheduler, dbg))
# => 0.146829 seconds (44.60 k allocations: 3.103 MB)
2 ./stream.jl; process_events; line: 731
154 REPL.jl; anonymous; line: 92
126 REPL.jl; eval_user_input; line: 62
59 util.jl; debug_compute; line: 155
59 ...e/shashi/.julia/v0.4/Dagger/src/compute.jl; compute; line: 203
16 ...e/shashi/.julia/v0.4/Dagger/src/compute.jl; finish_task!; line: 144
43 ...e/shashi/.julia/v0.4/Dagger/src/compute.jl; finish_task!; line: 152
43 .../shashi/.julia/v0.4/Dagger/src/compute.jl; release!; line: 218
43 ...shi/.julia/v0.4/Dagger/src/lib/dumbref.jl; release_token; line: 24
...
The goal of Dagger is to create sufficient scope for multiple-dispatch to be employed at various stages of a parallel computation. New capabilities, distributions, device types can be added by defining new methods on a very small set of generic functions. The DAG also allows for other optimizations (fusing maps and reduces), fault-tolerance and visualization.
We thank DARPA, Intel, and the NIH for supporting this work at MIT.