Skip to content

Commit

Permalink
add nicer reduce for grouped dtable
Browse files Browse the repository at this point in the history
  • Loading branch information
krynju committed Sep 16, 2021
1 parent 1e73c54 commit d019e45
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
2 changes: 2 additions & 0 deletions src/table/gdtable.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ mutable struct GDTable
index::Dict
end

grouped_cols(gd::GDTable) = gd.cols === nothing ? [:keys] : gd.cols

keys(gd::GDTable) = keys(gd.index)

fetch(gd::GDTable) = fetch(gd.dtable)
Expand Down
12 changes: 9 additions & 3 deletions src/table/operations.jl
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,22 @@ function reduce(f, d::DTable; cols=nothing::Union{Nothing, Vector{Symbol}}, init
construct_single_column = (_col, _chunk_results...) -> getindex.(_chunk_results, _col)
result_columns = [Dagger.@spawn construct_single_column(c, chunk_reduce_results...) for c in columns]

reduce_result_column = (_f, _c, _init) -> reduce(_f, _c; init=_init)
reduce_chunks = [Dagger.@spawn reduce_result_column(f, c, deepcopy(init)) for c in result_columns]
reduce_result_column = (_f, _c) -> reduce(_f, _c) # removed init from here as it's not needed (couldn't do (x,y)->x+1 for example)
reduce_chunks = [Dagger.@spawn reduce_result_column(f, c) for c in result_columns]

construct_result = (_cols, _vals...) -> (; zip(_cols, _vals)...)
Dagger.@spawn construct_result(columns, reduce_chunks...)
end


function reduce(f, gd::GDTable; cols=nothing::Union{Nothing, Vector{Symbol}}, init=Base._InitialValue())
Dict([d[1] => reduce(f, d[2]; cols=cols, init=init) for d in gd])
construct_result = (_keys, _results...) -> begin
result_cols = keys(first(_results))
k = [col => getindex.(_keys, i) for (i, col) in enumerate(grouped_cols(gd))]
r = [Symbol("result_" * string(r)) => collect(getindex.(_results, r)) for r in result_cols]
(;k...,r...)
end
Dagger.@spawn construct_result(keys(gd), [reduce(f, d[2]; cols=cols, init=deepcopy(init)) for d in gd]...)
end

"""
Expand Down

0 comments on commit d019e45

Please sign in to comment.