Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use distributed processing from DiDa #203

Merged
merged 5 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,31 +1,28 @@
name = "GigaSOM"
uuid = "a03a9c34-069e-5582-a11c-5c984cab887c"
version = "0.6.8"
version = "0.7.0"

[deps]
CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
Distances = "b4f34e82-e78d-54a5-968a-f98e89d6e8f7"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
DistributedArrays = "aaf54ef3-cdf8-58ed-94cc-d582ad619b94"
DistributedData = "f6a0035f-c5ac-4ad0-b410-ad102ced35df"
Distributions = "31c24e10-a181-5473-b8eb-7969acd0382f"
FCSFiles = "d76558cf-badf-52d4-a17e-381ab0b0d937"
FileIO = "5789e2e9-d7fb-5bc7-8068-2c6fae9b9549"
JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
NearestNeighbors = "b8a86587-4115-5ab1-83bc-aa920d37bbce"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
SHA = "ea8e919c-243c-51af-8825-aaa63cd721ce"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
StableRNGs = "860ef19b-820b-49d6-a774-d7a799459cd3"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
XLSX = "fdbf4ff8-1666-58a4-91e7-1b58723a45e0"

[compat]
CSV = "^0.5.12, 0.6, 0.7, 0.8, 0.10"
DataFrames = "0.20, 0.21, 0.22, 1.0"
Distances = "^0.8.2, 0.9, 0.10"
DistributedArrays = "^0.6.4"
DistributedData = "0.1.4, 0.2"
Distributions = "^0.21.1, 0.22, 0.23, 0.24, 0.25"
FCSFiles = "^0.1.1"
FileIO = "^1.0.7"
Expand All @@ -36,4 +33,11 @@ XLSX = "^0.5.5, 0.6, 0.7"
julia = "1"

[extras]
JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
SHA = "ea8e919c-243c-51af-8825-aaa63cd721ce"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
XLSX = "fdbf4ff8-1666-58a4-91e7-1b58723a45e0"

[targets]
test = ["JSON", "LinearAlgebra", "SHA", "Test", "XLSX"]
7 changes: 0 additions & 7 deletions docs/src/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,3 @@ Pages = ["core.jl", "trainutils.jl"]
Modules = [GigaSOM]
Pages = ["embedding.jl"]
```

## Distributed processing tools

```@autodocs
Modules = [GigaSOM]
Pages = ["distributed.jl", "dio.jl"]
```
15 changes: 7 additions & 8 deletions docs/src/tutorials/basicUsage.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,17 @@ While all functions work well on simple data matrices, the main aim of GigaSOM
is to let the users enjoy the cluster-computing resources. All functions also
work on data that are "scattered" among workers (i.e. each worker only holds a
portion of the data loaded in the memory). This dataset description is stored
in [`LoadedDataInfo`](@ref) structure. Most of the functions above in fact
accept the `LoadedDataInfo` as argument, and often return another
`LoadedDataInfo` that describes the scattered result.
in [`Dinfo`](@ref) structure. Most of the functions above in fact
accept the `Dinfo` as argument, and often return another
`Dinfo` that describes the scattered result.

Most importantly, using `LoadedDataInfo` **prevents memory exhaustion at the
Most importantly, using `Dinfo` **prevents memory exhaustion at the
master node**, which is a critical feature required to handle huge datasets.

You can always collect the scattered data back into a matrix (if it fits to
your RAM) with [`distributed_collect`](@ref), and utilize many other functions
to manipulate it, including e.g. [`distributed_mapreduce`](@ref) for easily
running parallel computations, or [`distributed_export`](@ref) for saving and
restoring the dataset paralelly.
your RAM) with `gather_array`, and utilize many other functions to manipulate
it, including e.g. `dmapreduce` for easily running parallel computations, or
`dstore` for saving and restoring the dataset paralelly.

## Minimal working example

Expand Down
22 changes: 12 additions & 10 deletions docs/src/tutorials/distributedProcessing.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,31 @@ aforementioned `dselect` and `dtransform_asinh`. The following code extracts
means and standard deviations from the first 3 columns of a dataset distributed
as `di`:
```julia
using DistributedData

dstat(di, [1,2,3])
```

## Manual work with the distributed data
## Manual work with the DistributedData.jl package

We will first show how to use the general framework to compute per-cluster
statistics. GigaSOM exports the [`distributed_mapreduce`](@ref) function that
statistics. DistributedData.jl exports the [`dmapreduce`](@ref) function that
can be used as a very effective basic building block for running such
computations. For example, you can efficiently compute a distributed mean of
all your data as such:
```julia
distributed_mapreduce(di, sum, +) / distributed_mapreduce(di, length, +)
dmapreduce(di, sum, +) / dmapreduce(di, length, +)
```

The parameters of `distributed_mapreduce` are, in order:
The parameters of `dmapreduce` are, in order:

- `di`, the dataset
- `sum` or `length`, an unary "map" function -- during the computation, each
piece of distributed data is first _paralelly_ processed by this function
- `+`, a binary "reduction" or "folding" function -- the pieces of information
processed by the map function are successively joined in pairs using this
function, until there is only a single result left. This final result is also
what `distributed_mapreduce` returns.
what `dmapreduce` returns.

Above example thus reads: Sum all data on all workers, add up the intermediate
results, and divide the final number to the sum of all lengths of data on the
Expand All @@ -45,7 +47,7 @@ Column-wise mean (as produced by `dstat`) is slightly more useful; we only need
to split the computation on columns:

```julia
distributed_mapreduce(di, d -> mapslices(sum, d, dims=1), +) ./ distributed_mapreduce(di, x->size(x,1), +)
dmapreduce(di, d -> mapslices(sum, d, dims=1), +) ./ dmapreduce(di, x->size(x,1), +)
```

Finally, for distributed computation of per-cluster mean, the clustering
Expand All @@ -55,7 +57,7 @@ the distributed `mapToGigaSOM` does exactly that).
First, compute the clustering:
```julia
mapping = mapToGigaSOM(som, di)
distributed_transform(mapping, m -> metaClusters[m])
dtransform(mapping, m -> metaClusters[m])
```

Now, the distributed computation is run on 2 scattered datasets. We employ a
Expand All @@ -66,7 +68,7 @@ following code produces a matrix of tuples `(sum, count)`, for separate
clusters (in rows) and data columns (in columns):

```julia
sumscounts = distributed_mapreduce([di, mapping],
sumscounts = dmapreduce([di, mapping],
(d, mapping) -> catmapbuckets(
(_,clData) -> (sum(clData), length(clData)),
d, 10, mapping),
Expand Down Expand Up @@ -113,13 +115,13 @@ capture all of the actual 16 existing clusters)

Finally, we can remove the temporary data from workers to create free memory for other analyses:
```julia
undistribute(mapping)
unscatter(mapping)
```

## Convenience statistical functions

Notably, several of the most used statistical functions are available in
GigaSOM.jl in a form that can cope with distributed data.
DistributedData.jl in a form that can cope with distributed data.

For example, you can run a distributed median computation as such:
```julia
Expand Down
6 changes: 3 additions & 3 deletions docs/src/tutorials/processingFCSData.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ If you are sure you have enough RAM, you can collect the data to the master
node. (In case of the relatively small Levine13 dataset, you very probably have
the required 2.5MB of RAM, but there are many larger datasets.)
```julia
e = distributed_collect(e)
e = gather_array(e)
```

```
Expand Down Expand Up @@ -260,13 +260,13 @@ The `metaClusters` represent membership of the SOM codes in cluster; these can
be expanded to membership of all cells using [`mapToGigaSOM`](@ref):

```julia
mapping = distributed_collect(mapToGigaSOM(som, di), free=true)
mapping = gather_array(mapToGigaSOM(som, di), free=true)
clusters = metaClusters[mapping]
```

`clusters` now contain an integer from `1` to `10` with a classification of
each cell in the dataset.

(The argument `free=true` of `distributed_collect` automatically removes the
(The argument `free=true` of `gather_array` automatically removes the
distributed data from workers after collecting, which saves their memory for
other datasets.)
43 changes: 3 additions & 40 deletions src/GigaSOM.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ using CSV
using DataFrames
using Distances
using Distributed
using DistributedData
using Distributions
using FCSFiles
using FileIO
Expand All @@ -21,20 +22,15 @@ using StableRNGs
include("base/structs.jl")

include("base/dataops.jl")
include("base/distributed.jl")
include("base/trainutils.jl")

include("analysis/core.jl")
include("analysis/embedding.jl")

include("io/dio.jl")
include("io/input.jl")
include("io/process.jl")
include("io/splitting.jl")

# include visualization files
# include("visualization/plotting.jl")

#core
export initGigaSOM, trainGigaSOM, mapToGigaSOM

Expand All @@ -45,7 +41,7 @@ export linearRadius, expRadius, gaussianKernel, bubbleKernel, thresholdKernel, d
export embedGigaSOM

# structs
export Som, LoadedDataInfo
export Som

#io/input
export readFlowset,
Expand All @@ -69,40 +65,7 @@ export slicesof, vcollectSlice, collectSlice
#io/process
export cleanNames!, getMetaData, getMarkerNames

# plotting
export plotCounts, plotPCA

#dataops (higher-level operations on data)
export dcopy,
dselect,
dapply_cols,
dapply_rows,
dstat,
dstat_buckets,
dcount,
dcount_buckets,
dscale,
dtransform_asinh,
dmedian,
dmedian_buckets,
mapbuckets,
catmapbuckets

#distributed data tools
export save_at,
get_from,
get_val_from,
remove_from,
distribute_array,
distribute_darray,
undistribute,
distributed_transform,
distributed_mapreduce,
distributed_foreach,
distributed_collect,
distributed_export,
distributed_import,
distributed_unlink

export dtransform_asinh

end # module
Loading