Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: One-pass column optimization #491

Draft
wants to merge 33 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
b18b3af
start
martindurant Mar 28, 2024
3cdc778
Remove unused
martindurant Mar 28, 2024
c57693c
Pass reports around
martindurant Apr 4, 2024
0c367df
remember to commit
martindurant Apr 5, 2024
9c654b1
first working (for parquet)
martindurant Apr 9, 2024
e6c5ce3
Merge branch 'main' into one-pass
martindurant Apr 11, 2024
f56f77a
stop
martindurant Apr 12, 2024
75b4416
most pass
martindurant Apr 15, 2024
a5c319d
fix most
martindurant Apr 18, 2024
243fbc1
probably better
martindurant Apr 19, 2024
6eebf87
Reinstate necessary_columns (needs doc)
martindurant Apr 22, 2024
8bbe409
Merge branch 'main' into one-pass
martindurant Apr 22, 2024
43b2e43
pass buffer names around, not columns
martindurant Apr 25, 2024
e5828fb
Clear cache between tests
martindurant May 13, 2024
df0cf2f
Merge branch 'main' into one-pass
martindurant May 13, 2024
b593f87
Another one squashed
martindurant May 21, 2024
e410b61
Squash errors that only show when uproot.dask and hist.dask are insta…
martindurant May 22, 2024
cd537e2
fix uproot
martindurant May 23, 2024
baf6f46
fix report
martindurant May 27, 2024
e29e929
if meta fails
martindurant Jun 6, 2024
f61fdd7
rev
martindurant Jul 23, 2024
2c3abd0
concat enforce condition
martindurant Jul 24, 2024
04abbc8
temp
martindurant Jul 29, 2024
d876f00
squached some
martindurant Jul 30, 2024
8e1d507
add note
martindurant Jul 30, 2024
d1922ab
Merge branch 'main' into one-pass
martindurant Jul 30, 2024
fc9589b
Fix concat form comparison
martindurant Jul 31, 2024
961dd0c
one more squashed
martindurant Jul 31, 2024
c8b254b
fix IO report
martindurant Aug 2, 2024
d71e789
Merge branch 'main' into one-pass
pfackeldey Dec 13, 2024
d97292c
simplify loop to populate touched columns from all_layers
pfackeldey Dec 13, 2024
4e70827
Merge pull request #2 from pfackeldey/one-pass
martindurant Dec 13, 2024
25ff417
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Dec 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix IO report
@LGrey - moved from IO reporting layers returning a tuple to returning
a record (or dict) for my own sanity.
  • Loading branch information
martindurant committed Aug 2, 2024
commit c8b254bda5ee4aaaa537a91441148e0a94f22edf
66 changes: 9 additions & 57 deletions src/dask_awkward/lib/io/io.py
Original file line number Diff line number Diff line change
@@ -4,28 +4,22 @@
import math
from collections.abc import Callable, Iterable, Mapping
from dataclasses import dataclass
from functools import partial
from typing import TYPE_CHECKING, Any, cast

import awkward as ak
import dask.config
import numpy as np
from awkward.types.numpytype import primitive_to_dtype
from awkward.typetracer import length_zero_if_typetracer, typetracer_with_report
from dask.base import flatten, tokenize
from dask.highlevelgraph import HighLevelGraph
from dask.local import identity
from dask.utils import funcname, is_integer, parse_bytes
from fsspec.utils import infer_compression

from dask_awkward.layers.layers import (
AwkwardBlockwiseLayer,
AwkwardInputLayer,
AwkwardMaterializedLayer,
AwkwardTreeReductionLayer,
ImplementsReport,
io_func_implements_projection,
io_func_implements_report,
)
from dask_awkward.lib.core import (
Array,
@@ -36,7 +30,6 @@
)
from dask_awkward.lib.io.columnar import ColumnProjectionMixin
from dask_awkward.lib.utils import form_with_unique_keys, render_buffer_key
from dask_awkward.utils import first, second

if TYPE_CHECKING:
from dask.array.core import Array as DaskArray
@@ -661,6 +654,13 @@ def from_map(
dsk = AwkwardInputLayer(name=name, inputs=inputs, io_func=io_func)

hlg = HighLevelGraph.from_collections(name, dsk)
making_report = getattr(io_func, "return_report", False)
if making_report:
array_meta = ak.Array(
{"ioreport": ak.Array([0]).layout.to_typetracer(True), "data": array_meta}
)
array_meta._report = {report}

if divisions is not None:
result = new_array_object(hlg, name, meta=array_meta, divisions=divisions, **kw)
else:
@@ -669,56 +669,8 @@ def from_map(
)
dsk.meta = result._meta

if io_func_implements_report(io_func):
if cast(ImplementsReport, io_func).return_report:
# first element of each output tuple is the actual data
res = result.map_partitions(
first, meta=empty_typetracer(), label=label, output_divisions=1
)
res._meta = array_meta

concat_fn = partial(
ak.concatenate,
axis=0,
)

split_every = dask.config.get("awkward.aggregation.split-every", 8)

rep_trl_label = f"{label}-report"
rep_trl_token = tokenize(result, second, concat_fn, split_every)
rep_trl_name = f"{rep_trl_label}-{rep_trl_token}"
rep_trl_tree_node_name = f"{rep_trl_label}-tree-node-{rep_trl_token}"

# second element of each output tuple is the result, which does not
# depend on any of the actual data
rep_part = result.map_partitions(
second, meta=empty_typetracer(), label=f"{label}-partitioned-report"
)

rep_trl = AwkwardTreeReductionLayer(
name=rep_trl_name,
name_input=rep_part.name,
npartitions_input=rep_part.npartitions,
concat_func=concat_fn,
tree_node_func=identity,
finalize_func=identity,
split_every=split_every,
tree_node_name=rep_trl_tree_node_name,
)

rep_graph = HighLevelGraph.from_collections(
rep_trl_name, rep_trl, dependencies=[rep_part]
)
rep_trl.meta = empty_typetracer()

rep = new_array_object(
rep_graph,
rep_trl_name,
meta=rep_trl.meta,
npartitions=len(rep_trl.output_partitions),
)

return res, rep
if making_report:
return result.data, result.ioreport

return result

7 changes: 5 additions & 2 deletions src/dask_awkward/lib/io/parquet.py
Original file line number Diff line number Diff line change
@@ -158,9 +158,12 @@ def __call__(self, *args, **kwargs):
if self.return_report:
try:
result = self.read_fn(source)
return result, report_success(self.columns, source)
return {
"data": result,
"ioreport": report_success(self.columns, source),
}
except self.allowed_exceptions as err:
return self.mock_empty(), report_failure(err, source)
return {"data": ak.Array([]), "ioreport": report_failure(err, source)}

return self.read_fn(source)

2 changes: 1 addition & 1 deletion src/dask_awkward/lib/optimize.py
Original file line number Diff line number Diff line change
@@ -143,7 +143,7 @@ def _optimize_columns(dsk, all_layers):
rep = getattr(lay.meta, "_report", None)
if not rep:
continue
rep = first(rep) # each meta of an IL layer should have just one report
rep = first(rep) # each meta of an IO layer should have just one report
cols = set()
# this loop not required after next ak release
for ln in all_layers:
10 changes: 8 additions & 2 deletions src/dask_awkward/lib/testutils.py
Original file line number Diff line number Diff line change
@@ -287,9 +287,15 @@ def __call__(self, *args, **kwargs):
if self.return_report:
try:
result, time = time_it(self.read_fn)(*args, **kwargs)
return result, self.make_success_report(time, *args, **kwargs)
return {
"data": result,
"ioreport": self.make_success_report(time, *args, **kwargs),
}
except self.allowed_exceptions as err:
return self.mock_empty(), self.make_fail_report(err, *args, **kwargs)
return {
"data": self.mock_empty(),
"ioreport": self.make_fail_report(err, *args, **kwargs),
}

return self.read_fn(*args, **kwargs)