-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lazy mean, sum etc? #34
Comments
I think this would be possible, yes from looking at the imp,ementation of reductions, https://github.com/meggart/DiskArrays.jl/blob/234420fe173b60788b2dca0b51b91b4a85ac2044/src/ops.jl#L79-L116 at least However, in the last months I have been repeatedly looking at Dagger.jl and thinking about how to best integrate it into DiskArrays computations. I personally instead of adding more DiskArray subtypes would prefer to work towards a DaggerDiskArray where we simply have delayed representations of each chunk of the data and then we can lazily stack computations subsets etc, similar to how dask+xarray work in the python world. The big advantage would be to get distributed chunk processing for free and we would (reductions and broadcast), so there might be a big potential in this. What do you think? |
That sounds better, my idea is basically poor mans Dagger.jl. I've also been looking at Dagger.jl. A DaggerDiskArray would get GeoData.jl to/past feature parity with xarray. |
I have a first version of a to_dagger function, which simply converts any chunked DiskArray into a using Dagger, Zarr, DiskArrays
using DiskArrays: eachchunk
using Dagger: ArrayDomain, DArray, DomainBlocks
function to_dagger(a::AbstractDiskArray)
dr = delayed(r->a[r.indices...])
t = dr.(eachchunk(a))
cs = eachchunk(a)
ard = DomainBlocks(cs.offset .+ 1,map((s,l)->cumsum(fill(l,s)),cs.chunkgridsize,cs.chunksize))
DArray(
eltype(a),
Dagger.domain(a),
ard,
dr.(eachchunk(a))
)
end Now you cab stack lazy operations on this: a = zzeros(Float32,1000,1000,chunks=(100,100))
a[:,:] = rand(Float32,1000,1000);
a2 = to_dagger(a)
someop = sum(sin.(a2 ./ 100), dims=2) and then you can use So I am currently trying to figure out a way to find out the chunk sizes of these lazy operations so that this can be done automatically. |
Pinging @visr here, who has shown interest in coupling Zarr to Dagger before, also @felixcremer. |
Wow that seems so simple. Probably just writing multiple files is a reasonable option for formats other than zarr. |
Great to see that mapping to Dagger is possible in such a simple function.
I guess this is where https://github.com/shashi/FileTrees.jl could maybe help? I see there is also some discussion on JuliaParallel/Dagger.jl#214 and experimentation at https://github.com/JuliaParallel/DaggerArrays.jl. |
For netcdf parallel writes there is also |
Yeah there's this issue about parallel NetCDF I/O: Alexander-Barth/NCDatasets.jl#122. Am not familiar with it myself. |
I have a first version of a function to_zarr(a::Dagger.ArrayOp,
outpath;
compressor = Zarr.BloscCompressor,
attrs = Dict())
#First we stage the computation to determine the output chunk sizes
staged_tosave = Dagger.stage(Dagger.Context(), a)
doms = staged_tosave.subdomains
#Now we compute the single output chunks from the BlockDomain
chunksizes = unique.(diff.(vcat.(doms.start .- 1, doms.cumlength)))
@assert all(length.(chunksizes).==1)
chunksizes = first.(chunksizes)
#Create the output
outarray = zcreate(eltype(staged_tosave),size(staged_tosave)...;
path = outpath,
chunks = chunksizes
)
#Make a lazy representation of the writing process
r = map(staged_tosave.chunks, staged_tosave.subdomains) do data, dom
f = delayed() do d
println("Process ", myid(), "writing at ", dom.indexes)
outarray[dom.indexes...] = d
return true
end
f(data)
end
#And merge the results
@assert collect(reduce(+, r)) == length(staged_tosave.chunks)
return outarray
end Then you can do a = zzeros(Float32,1000,1000,chunks=(100,500))
a[:,:] = rand(Float32,1000,1000);
a2 = to_dagger(a)
someop = sum(sin.(a2 ./ 1000), dims=2)
to_zarr(someop, tempname()) And the results get written to disk. I will do more tests, but if this works all as expected we could stop maintaining all the reduction and broadcast code here in DiskArrays and make Dagger the default backend. Then we would use this package only to provide the interface between the different |
Although I am not completely convinced. The following starts reading the data: cmip6 = zopen("gs://cmip6/CMIP6/HighResMIP/CMCC/CMCC-CM2-HR4/highresSST-present/r1i1p1f1/6hrPlev/psl/gn/v20170706/")
psl = to_dagger(cmip6["psl"])
view(psl,:,:,1:1000) still some work to do as it seems |
Awesome. Will be good to see if we can get wrappers to work with that as well, to e.g. save xarray/zarr encoding at the same time. I wonder if depending on Dagger here is a good idea: julia> @time using Dagger
0.773514 seconds (1.59 M allocations: 106.885 MiB, 8.10% gc time) (separately) julia> @time using DiskArrays
0.256858 seconds (514.44 k allocations: 30.299 MiB, 9.24% compilation time) Might be better to have DiskArraysDagger.jl as a subpackage here? |
I've been thinking about this how parallel dagger backend and writing to GDAL will work if Dagger.jl was the default, as I'm making a lot of use of generic DiskArrays broadcasts to write to files in GeoData.jl. I guess we can use defaults that are single threaded, or we can set the threading based on user input or the file types in the broadcast? Maybe a trait specifying if an |
We're having a discussion at JuliaParallel/Dagger.jl#230 (comment) for planning swap-to-disk support in Dagger. My latest reply (JuliaParallel/Dagger.jl#230 (comment)) details a system by which the scheduler (and also interested users/libraries) can instruct the scheduler to move data to-and-from disk or other persistent storage. One could construct tasks in the DAG that have "storage chunks" as inputs, and Dagger would automatically read them into memory and move them to the processor that will be operating on them. That task could then return a new storage chunk, which would result in the result being moved to the worker associated with the storage and data being written to disk there. Does that sound like the sort of API that would be useful to have for DiskArrays? Is there anything this API lacks that you would need? |
One complication to the above would be the use of |
Thanks a lot for joining the discussion @jpsamaroo
A short comment on this one. I think for the DiskArrays data model the workflow would be a bit simpler because we do not support distributed storage at all. In most climate/earth system science applications (and all I have been in involved in), we have our input data on shared file systems or stored in cloud buckets, so one can asume significant latency for single IO operations, but all workers have equal access to the same storage. So our main interface would not be mmap's and intermediate storage, but rather the very beginning of the processing chain (as proposed in my first example, defining one chunks/thunk for each sub-domain of the input data) or at the very end (dumping a structured set of thunks into a single coherent output dataset). So what I proposed was really just similar to dask's zarr interface as described here: https://docs.dask.org/en/latest/array-creation.html?highlight=zarr#zarr I will read the discussion you linked and see if I can comment, |
It looks like the most recent developments in Dagger are to do all operations in an eager way instead of lazy, so probably the coupling to Dagger, although interesting, will not really help with lazy sums. I think it will not be too hard to add a |
@meggart for my understanding, the purpose of this package is to amortize the high cost of disk access by fusing operations (so that you ideally only do one read/write syscall for each chunk of the data), correct? If so, I agree that Dagger is not yet suitable for integration here, and will continue moving in the direction of an eager execution model. However, I also have plans to integrate Dagger with the Symbolics ecosystem, to allow rewriting and fusing parts of the DAG so that such optimizations could be applied automatically. Together with a new Dagger operator to delay DAG execution while a user-specified sub-graph is being built (which I want for a variety of reasons), we should be able to fuse operations to achieve single-access behavior. I'll definitely need some time before I can get to implementing this functionality, but once I do, we should hopefully be able to create a set of rewrite rules that use DiskArrays.jl to do this fusion of "naive" array operations. I'll ping you once I've gotten those features working, and maybe then we can collaborate on making DiskArrays and Dagger work together 🙂 |
It seems possible to add a reducing function application wrapper that will take e.g. the mean/sum etc along an axis lazily, similar to how lazy broadcasting works.
The text was updated successfully, but these errors were encountered: