Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Windowing Support for the DaskRunner #27618

Closed
wants to merge 176 commits into from
Closed
Show file tree
Hide file tree
Changes from 144 commits
Commits
Show all changes
176 commits
Select commit Hold shift + click to select a range
bc6b525
WIP: Created a skeleton dask runner implementation.
alxmrs Jun 22, 2022
665ee61
WIP: Idea for a translation evaluator.
alxmrs Jun 23, 2022
d1095c7
Added overrides and a visitor that translates operations.
alxmrs Jul 2, 2022
fd13d40
Fixed a dataclass typo.
alxmrs Jul 2, 2022
e3d0c8a
Expanded translations.
alxmrs Jul 2, 2022
69a660f
Core idea seems to be kinda working...
alxmrs Jul 2, 2022
ab58334
First iteration on DaskRunnerResult (keep track of pipeline state).
alxmrs Jul 3, 2022
5391cd6
Added minimal set of DaskRunner options.
alxmrs Jul 4, 2022
5deb598
WIP: Alllmost got asserts to work! The current status is:
alxmrs Jul 8, 2022
1768e47
With a great 1-liner from @pabloem, groupby is fixed! Now, all three …
alxmrs Jul 8, 2022
e6c7106
Self-review: Cleaned up dask runner impl.
alxmrs Jul 8, 2022
1b58d4c
Self-review: Remove TODOs, delete commented out code, other cleanup.
alxmrs Jul 8, 2022
5fe9372
First pass at linting rules.
alxmrs Jul 9, 2022
b98330e
WIP, include dask dependencies + test setup.
alxmrs Jul 9, 2022
e48780a
WIP: maybe better dask deps?
alxmrs Jul 9, 2022
0dc9e23
Skip dask tests depending on successful import.
alxmrs Jul 10, 2022
326d3a3
Fixed setup.py (missing `,`).
alxmrs Jul 11, 2022
b4cc408
Added an additional comma.
alxmrs Jul 11, 2022
40a6ebe
Moved skipping logic to be above dask import.
alxmrs Jul 11, 2022
3c4204d
Fix lint issues with dask runner tests.
alxmrs Sep 5, 2022
41623ec
Adding destination for client address.
alxmrs Sep 20, 2022
676d752
Changing to async produces a timeout error instead of stuck in infini…
alxmrs Sep 21, 2022
09365f6
Close client during `wait_until_finish`; rm async.
alxmrs Sep 22, 2022
c6ba4ba
Revert "Close client during `wait_until_finish`; rm async."
pabloem Sep 28, 2022
a325356
Revert "Changing to async produces a timeout error instead of stuck i…
pabloem Sep 28, 2022
ea13125
Adding -dask tox targets onto the gradle build
pabloem Sep 28, 2022
f855ffc
Supporting side-inputs for ParDo.
alxmrs Oct 2, 2022
3fd966e
Merge remote-tracking branch 'origin/dask-runner-mvp' into dask-runne…
alxmrs Oct 2, 2022
173d79b
wip - added print stmt.
alxmrs Oct 2, 2022
dd2d15c
wip - prove side inputs is set.
alxmrs Oct 2, 2022
8756618
wip - prove side inputs is set in Pardo.
alxmrs Oct 2, 2022
8380d7b
wip - rm asserts, add print
alxmrs Oct 2, 2022
b908dc3
wip - adding named inputs...
alxmrs Oct 2, 2022
f444b1e
Experiments: non-named side inputs + del `None` in named inputs.
alxmrs Oct 2, 2022
174d6fd
None --> 'None'
alxmrs Oct 2, 2022
60b063a
No default side input.
alxmrs Oct 2, 2022
90ee474
Pass along args + kwargs.
alxmrs Oct 2, 2022
c62050e
WIP Windowing with dask runner.
alxmrs Oct 3, 2022
79d4603
WIP: Created a skeleton dask runner implementation.
alxmrs Jun 22, 2022
248ec70
WIP: Idea for a translation evaluator.
alxmrs Jun 23, 2022
42452ca
Added overrides and a visitor that translates operations.
alxmrs Jul 2, 2022
1da2ddd
Fixed a dataclass typo.
alxmrs Jul 2, 2022
14885a3
Expanded translations.
alxmrs Jul 2, 2022
fca2420
Core idea seems to be kinda working...
alxmrs Jul 2, 2022
6dd1ada
First iteration on DaskRunnerResult (keep track of pipeline state).
alxmrs Jul 3, 2022
6675687
Added minimal set of DaskRunner options.
alxmrs Jul 4, 2022
88ed36b
WIP: Alllmost got asserts to work! The current status is:
alxmrs Jul 8, 2022
2e3a126
With a great 1-liner from @pabloem, groupby is fixed! Now, all three …
alxmrs Jul 8, 2022
6467b0e
Self-review: Cleaned up dask runner impl.
alxmrs Jul 8, 2022
793ba86
Self-review: Remove TODOs, delete commented out code, other cleanup.
alxmrs Jul 8, 2022
e535792
First pass at linting rules.
alxmrs Jul 9, 2022
8e32668
WIP, include dask dependencies + test setup.
alxmrs Jul 9, 2022
318afc2
WIP: maybe better dask deps?
alxmrs Jul 9, 2022
b01855f
Skip dask tests depending on successful import.
alxmrs Jul 10, 2022
2c2eb8d
Fixed setup.py (missing `,`).
alxmrs Jul 11, 2022
e64e9eb
Added an additional comma.
alxmrs Jul 11, 2022
69b118b
Moved skipping logic to be above dask import.
alxmrs Jul 11, 2022
9ffc8d8
Fix lint issues with dask runner tests.
alxmrs Sep 5, 2022
8a2afb7
Adding destination for client address.
alxmrs Sep 20, 2022
93f02f1
Changing to async produces a timeout error instead of stuck in infini…
alxmrs Sep 21, 2022
afdcf1b
Close client during `wait_until_finish`; rm async.
alxmrs Sep 22, 2022
41b5267
Supporting side-inputs for ParDo.
alxmrs Oct 2, 2022
e3ac3f8
Revert "Close client during `wait_until_finish`; rm async."
pabloem Sep 28, 2022
3fddc81
Revert "Changing to async produces a timeout error instead of stuck i…
pabloem Sep 28, 2022
9eeb9ea
Adding -dask tox targets onto the gradle build
pabloem Sep 28, 2022
b4d0999
wip - added print stmt.
alxmrs Oct 2, 2022
0319ffd
wip - prove side inputs is set.
alxmrs Oct 2, 2022
0b13bb0
wip - prove side inputs is set in Pardo.
alxmrs Oct 2, 2022
1e7052b
wip - rm asserts, add print
alxmrs Oct 2, 2022
292e023
wip - adding named inputs...
alxmrs Oct 2, 2022
31c1e2b
Experiments: non-named side inputs + del `None` in named inputs.
alxmrs Oct 2, 2022
f4ecf2f
None --> 'None'
alxmrs Oct 2, 2022
4d24ed9
No default side input.
alxmrs Oct 2, 2022
ee62a4a
Pass along args + kwargs.
alxmrs Oct 2, 2022
506c719
Applied yapf to dask sources.
alxmrs Oct 9, 2022
cd0ba8b
Dask sources passing pylint.
alxmrs Oct 9, 2022
d0a7c63
Added dask extra to docs gen tox env.
alxmrs Oct 9, 2022
775bd07
Applied yapf from tox.
alxmrs Oct 9, 2022
efba1c9
Include dask in mypy checks.
alxmrs Oct 9, 2022
741d961
Upgrading mypy support to python 3.8 since py37 support is deprecated…
alxmrs Oct 9, 2022
f66458a
Manually installing an old version of dask before 3.7 support was dro…
alxmrs Oct 9, 2022
5dcf969
fix lint: line too long.
alxmrs Oct 9, 2022
ec5f613
Fixed type errors with DaskRunnerResult. Disabled mypy type checking …
alxmrs Oct 9, 2022
04b1f1a
Fix pytype errors (in transform_evaluator).
alxmrs Oct 9, 2022
712944b
Ran isort.
alxmrs Oct 9, 2022
567b72b
Ran yapf again.
alxmrs Oct 9, 2022
f53c0a4
Fix imports (one per line)
alxmrs Oct 10, 2022
fb280ad
isort -- alphabetical.
alxmrs Oct 10, 2022
80ddfec
Added feature to CHANGES.md.
alxmrs Oct 10, 2022
40c4e35
ran yapf via tox on linux machine
alxmrs Oct 10, 2022
a70c5f3
Merge branch 'master' into dask-runner-mvp
alxmrs Oct 13, 2022
9fb52e5
Change an import to pass CI.
alxmrs Oct 13, 2022
91115e0
WIP -- better structure in ParDo for windowing. Thanks @pabloem.
alxmrs Oct 13, 2022
26c6016
Skip isort error; needed to get CI to pass.
alxmrs Oct 13, 2022
aec19bf
Skip test logic may favor better with isort.
alxmrs Oct 13, 2022
0673235
(Maybe) the last isort fix.
alxmrs Oct 13, 2022
de03a32
Tested pipeline options (added one fix).
alxmrs Oct 14, 2022
7e0a2c7
Improve formatting of test.
alxmrs Oct 14, 2022
39b1e1c
Self-review: removing side inputs.
alxmrs Oct 14, 2022
6db49fa
add dask to coverage suite in tox.
alxmrs Oct 17, 2022
ed00139
Experiment: Windowed ParDo with @pabloem.
alxmrs Oct 17, 2022
2d8f5d6
Merge branch 'master' of github.com:apache/beam into dask-runner-wind…
alxmrs Oct 17, 2022
d35e9d6
add mandatory args for _OutputHandler
pabloem Oct 17, 2022
973d9f9
Merge pull request #79 from pabloem/dask-runner-windowing
alxmrs Oct 17, 2022
0e5d498
Merge branch 'dask-runner-windowing' of github.com:alxmrs/beam into d…
alxmrs Oct 17, 2022
3feeeac
Update: still need to pre-apply windowed values.
alxmrs Oct 17, 2022
036561c
Merge branch 'master' into dask-runner-mvp
alxmrs Oct 18, 2022
191580d
Capture value error in assert.
alxmrs Oct 18, 2022
365fc87
Merge branch 'master' of github.com:apache/beam into dask-runner-mvp
alxmrs Oct 18, 2022
085447e
Change timeout value to 600 seconds.
alxmrs Oct 18, 2022
1a60a5e
ignoring broken test
pabloem Oct 21, 2022
c1037f8
Update CHANGES.md
pabloem Oct 21, 2022
9e79ffd
Using reflection to test the Dask client constructor.
alxmrs Oct 24, 2022
3e2cc0f
Merge branch 'dask-runner-mvp' into dask-runner-windowing
alxmrs Oct 24, 2022
4edc970
Better method of inspecting the constructor parameters (thanks @TomAu…
alxmrs Oct 24, 2022
fd8e361
Merge branch 'master' of github.com:apache/beam into dask-runner-wind…
alxmrs Oct 25, 2022
d88e8a1
Merge branch 'master' of github.com:apache/beam into dask-runner-wind…
alxmrs Oct 28, 2022
36bea9a
Minor fixes, and now unit tests are passing!!
alxmrs Oct 31, 2022
df315c1
Ran yapf on Dask sources.
alxmrs Nov 1, 2022
ef0d2b6
Ran lint checks.
alxmrs Nov 1, 2022
6c2cc4e
(hopefully) final lint check.
alxmrs Nov 1, 2022
0fae761
Disabled additional ungrouped imports check.
alxmrs Nov 1, 2022
119666c
Ran yapf with correct version.
alxmrs Nov 1, 2022
2e46d88
mini self-review.
alxmrs Nov 2, 2022
2ed8b14
WIP: A more correct windowing implementation with failing tests.
alxmrs Nov 5, 2022
2f193d5
WIP: Further improvements, more correct windowing impl.
alxmrs Nov 5, 2022
577f30a
WIP: Passing initial tests, failing multiple -- drops random elements…
alxmrs Nov 5, 2022
b3a70f6
WIP: All tests are passing :)
alxmrs Nov 5, 2022
7acd8d5
Cleanup: removed variables for debugger.
alxmrs Nov 5, 2022
7e90e2b
Lint + YAPF
alxmrs Nov 5, 2022
6e33ce2
self-review.
alxmrs Nov 5, 2022
f54f14c
fix lint
pabloem Nov 18, 2022
8dd2cdb
fixup
pabloem Nov 18, 2022
518a8f0
ignore internal dask file for docs
pabloem Nov 19, 2022
801b131
fixing dask version
pabloem Nov 28, 2022
393c508
Merge remote-tracking branch 'origin/master' into dask-runner-windowing
pabloem Nov 28, 2022
f89f609
remove Python 3.7 which seems unsupported by newer Dask versions
pabloem Nov 28, 2022
d6486fe
reducing scope of Dask tests
pabloem Nov 30, 2022
6f05963
adding datafrems dep
pabloem Nov 30, 2022
4343e13
Merge branch 'master' of github.com:apache/beam into dask-runner-wind…
alxmrs Jul 22, 2023
8b40a34
Simpler wait to implement `wait_until_finish()` (Dask offers quite an…
alxmrs Jul 23, 2023
cae8561
Added a test for fixed windowing.
alxmrs Jul 23, 2023
c82a241
Attempt to fix lint and threading error.
alxmrs Jul 23, 2023
ad43ebe
More lint fixing.
alxmrs Jul 23, 2023
71c10d9
Added docstrings.
alxmrs Jul 31, 2023
ff5f47c
can @cisaacstern commit to this branch?
cisaacstern Oct 5, 2023
3dc2121
Merge branch 'master' into dask-runner-windowing
cisaacstern Oct 6, 2023
7e343e3
Merge branch 'master' into dask-runner-windowing
cisaacstern Oct 11, 2023
a83e85a
ran yapf manually, per beam dev wiki
cisaacstern Oct 11, 2023
dea3e11
for pylint: describe types with comments, not literals
cisaacstern Oct 12, 2023
32ee6e1
use isinstance in Flatten.apply
cisaacstern Oct 12, 2023
305699b
cast dask_visitor.bags.values() to list in dask.optimize call
cisaacstern Oct 12, 2023
b1226fa
Merge branch 'master' into dask-runner-windowing
cisaacstern Oct 12, 2023
ccbccc9
Merge remote-tracking branch 'upstream/master' into dask-runner-windo…
cisaacstern Oct 13, 2023
0a7f59a
Merge remote-tracking branch 'upstream/master' into dask-runner-windo…
cisaacstern Oct 17, 2023
9557e3a
revert casting bag vals to list
cisaacstern Oct 17, 2023
899fe08
cast and unpack list
cisaacstern Oct 24, 2023
c398a21
for serialization, move apply_do_fn_to_bundle to module level
cisaacstern Oct 26, 2023
bfbbf1c
gather futures to prevent silent errors
cisaacstern Oct 27, 2023
f287a46
gather and wait with as_completed to reduce memory footprint of results
cisaacstern Oct 27, 2023
83fbfc5
install distributed from github, temporarily
cisaacstern Dec 18, 2023
a106dec
client.wait_for_workers requires n_workers arg; use as_completed instead
cisaacstern Dec 18, 2023
9067a54
add test replicating #29365 bug
cisaacstern Dec 21, 2023
d6c35eb
for development, re-pin to dask pr branch
cisaacstern Dec 21, 2023
8c5d8d4
Merge branch 'fix-29365' into dask-runner-windowing
cisaacstern Dec 22, 2023
3c0fbc6
side inputs for side_input test
cisaacstern Dec 22, 2023
8dc8971
SideInputMap WIP
cisaacstern Dec 22, 2023
327ba0d
add DaskBagWindowedIterator, and side_inputs to apply signature
cisaacstern Jan 3, 2024
21573a3
in visitor, apply side_inputs
cisaacstern Jan 3, 2024
9bf08a7
add ray runner side inputs tests
cisaacstern Jan 3, 2024
b8c1aaf
fix unfusable test, and single-element flattens
cisaacstern Jan 4, 2024
7932802
xfail windowed side inputs test
cisaacstern Jan 5, 2024
07e4274
get pylint to pass
cisaacstern Jan 8, 2024
4067c9a
latest dask doesnt support py38, so mock it for autodoc
cisaacstern Jan 10, 2024
7d04b16
reenable tests, no py38, temporary commands_pre installs
cisaacstern Jan 11, 2024
0ed78d5
seems like pull_request_target is standard for other workflows
cisaacstern Jan 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions .github/workflows/dask_runner_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest, windows-latest]
os: [ubuntu-latest, macos-latest]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could use help getting this to run on windows.

params: [
{"py_ver": "3.8", "tox_env": "py38"},
{"py_ver": "3.9", "tox_env": "py39"},
Expand All @@ -85,15 +85,11 @@ jobs:
run: pip install tox
- name: Install SDK with dask
working-directory: ./sdks/python
run: pip install setuptools --upgrade && pip install -e .[gcp,dask,test]
run: pip install setuptools --upgrade && pip install -e .[dask,test,dataframes]
- name: Run tests basic unix
if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macos')
working-directory: ./sdks/python
run: tox -c tox.ini -e ${{ matrix.params.tox_env }}-dask
- name: Run tests basic windows
if: startsWith(matrix.os, 'windows')
working-directory: ./sdks/python
run: tox -c tox.ini -e ${{ matrix.params.tox_env }}-win-dask
- name: Upload test logs
uses: actions/upload-artifact@v3
if: always()
Expand Down
17 changes: 12 additions & 5 deletions sdks/python/apache_beam/runners/dask/dask_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@
from apache_beam.runners.runner import PipelineState
from apache_beam.utils.interactive_utils import is_in_notebook

try:
# Added to try to prevent threading related issues, see
# https://github.com/pytest-dev/pytest/issues/3216#issuecomment-1502451456
from dask import distributed
except ImportError:
distributed = {}


class DaskOptions(PipelineOptions):
@staticmethod
Expand Down Expand Up @@ -86,8 +93,6 @@ def _add_argparse_args(cls, parser: argparse.ArgumentParser) -> None:

@dataclasses.dataclass
class DaskRunnerResult(PipelineResult):
from dask import distributed

client: distributed.Client
futures: t.Sequence[distributed.Future]

Expand All @@ -99,8 +104,7 @@ def wait_until_finish(self, duration=None) -> str:
if duration is not None:
# Convert milliseconds to seconds
duration /= 1000
self.client.wait_for_workers(timeout=duration)
self.client.gather(self.futures, errors='raise')
distributed.wait(self.futures, timeout=duration)
self._state = PipelineState.DONE
except: # pylint: disable=broad-except
self._state = PipelineState.FAILED
Expand Down Expand Up @@ -159,6 +163,8 @@ def is_fnapi_compatible():
return False

def run_pipeline(self, pipeline, options):
import dask
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to nest the import here when you're already trying to import distributed from dask in the try-except block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The try..except is a recent addition (to deal with a test-time-only threading issue). Nested imports are what I've had for a long time. That's the main reason for why things are the way they are.

Nested imports are there so that users don't have to depend on dask to load the runner.


# TODO(alxr): Create interactive notebook support.
if is_in_notebook():
raise NotImplementedError('interactive support will come later!')
Expand All @@ -178,5 +184,6 @@ def run_pipeline(self, pipeline, options):
dask_visitor = self.to_dask_bag_visitor()
pipeline.visit(dask_visitor)

futures = client.compute(list(dask_visitor.bags.values()))
opt_graph = dask.optimize(dask_visitor.bags.values())
futures = client.compute(opt_graph)
return DaskRunnerResult(client, futures)
72 changes: 70 additions & 2 deletions sdks/python/apache_beam/runners/dask/dask_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import datetime
import inspect
import unittest

Expand All @@ -22,12 +23,14 @@
from apache_beam.testing import test_pipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms import window

try:
from apache_beam.runners.dask.dask_runner import DaskOptions
from apache_beam.runners.dask.dask_runner import DaskRunner
import dask
import dask.distributed as ddist

from apache_beam.runners.dask.dask_runner import DaskOptions # pylint: disable=ungrouped-imports
from apache_beam.runners.dask.dask_runner import DaskRunner # pylint: disable=ungrouped-imports
except (ImportError, ModuleNotFoundError):
raise unittest.SkipTest('Dask must be installed to run tests.')

Expand Down Expand Up @@ -73,6 +76,11 @@ def test_create(self):
pcoll = p | beam.Create([1])
assert_that(pcoll, equal_to([1]))

def test_create_multiple(self):
with self.pipeline as p:
pcoll = p | beam.Create([1, 2, 3])
assert_that(pcoll, equal_to([1, 2, 3]))

def test_create_and_map(self):
def double(x):
return x * 2
Expand All @@ -81,6 +89,22 @@ def double(x):
pcoll = p | beam.Create([1]) | beam.Map(double)
assert_that(pcoll, equal_to([2]))

def test_create_and_map_multiple(self):
def double(x):
return x * 2

with self.pipeline as p:
pcoll = p | beam.Create([1, 2]) | beam.Map(double)
assert_that(pcoll, equal_to([2, 4]))

def test_create_and_map_many(self):
def double(x):
return x * 2

with self.pipeline as p:
pcoll = p | beam.Create(list(range(1, 11))) | beam.Map(double)
assert_that(pcoll, equal_to(list(range(2, 21, 2))))

def test_create_map_and_groupby(self):
def double(x):
return x * 2, x
Expand All @@ -89,6 +113,50 @@ def double(x):
pcoll = p | beam.Create([1]) | beam.Map(double) | beam.GroupByKey()
assert_that(pcoll, equal_to([(2, [1])]))

def test_create_map_and_groupby_multiple(self):
def double(x):
return x * 2, x

with self.pipeline as p:
pcoll = p | beam.Create([1, 2, 1, 2, 3
]) | beam.Map(double) | beam.GroupByKey()
assert_that(pcoll, equal_to([(2, [1, 1]), (4, [2, 2]), (6, [3])]))

def test_map_with_side_inputs(self):
def mult_by(x, y):
return x * y

with self.pipeline as p:
pcoll = p | beam.Create([1]) | beam.Map(mult_by, 3)
assert_that(pcoll, equal_to([3]))

def test_map_with_named_side_inputs(self):
def mult_by(x, y):
return x * y

with self.pipeline as p:
pcoll = p | beam.Create([1]) | beam.Map(mult_by, y=3)
assert_that(pcoll, equal_to([3]))

def test_groupby_with_fixed_windows(self):
def double(x):
return x * 2, x

def add_timestamp(pair):
delta = datetime.timedelta(seconds=pair[1] * 60)
now = (datetime.datetime.now() + delta).timestamp()
return window.TimestampedValue(pair, now)

with self.pipeline as p:
pcoll = (
p
| beam.Create([1, 2, 1, 2, 3])
| beam.Map(double)
| beam.WindowInto(window.FixedWindows(60))
| beam.Map(add_timestamp)
| beam.GroupByKey())
assert_that(pcoll, equal_to([(2, [1, 1]), (4, [2, 2]), (6, [3])]))


if __name__ == '__main__':
unittest.main()
7 changes: 5 additions & 2 deletions sdks/python/apache_beam/runners/dask/overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def infer_output_type(self, input_type):
@typehints.with_input_types(t.Tuple[K, t.Iterable[V]])
@typehints.with_output_types(t.Tuple[K, t.Iterable[V]])
class _GroupAlsoByWindow(beam.ParDo):
"""Not used yet..."""
def __init__(self, windowing):
super().__init__(_GroupAlsoByWindowDoFn(windowing))
self.windowing = windowing
Expand All @@ -86,7 +85,11 @@ def expand(self, input_or_inputs):
@typehints.with_output_types(t.Tuple[K, t.Iterable[V]])
class _GroupByKey(beam.PTransform):
def expand(self, input_or_inputs):
return input_or_inputs | "GroupByKey" >> _GroupByKeyOnly()
return (
input_or_inputs
| "ReifyWindows" >> beam.ParDo(beam.GroupByKey.ReifyWindows())
| "GroupByKey" >> _GroupByKeyOnly()
| "GroupByWindow" >> _GroupAlsoByWindow(input_or_inputs.windowing))


class _Flatten(beam.PTransform):
Expand Down
116 changes: 106 additions & 10 deletions sdks/python/apache_beam/runners/dask/transform_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,73 @@
import abc
import dataclasses
import typing as t
from dataclasses import field

import apache_beam
import dask.bag as db
from apache_beam import DoFn
from apache_beam import TaggedOutput
from apache_beam.pipeline import AppliedPTransform
from apache_beam.runners.common import DoFnContext
from apache_beam.runners.common import DoFnInvoker
from apache_beam.runners.common import DoFnSignature
from apache_beam.runners.common import Receiver
from apache_beam.runners.common import _OutputHandler
from apache_beam.runners.dask.overrides import _Create
from apache_beam.runners.dask.overrides import _Flatten
from apache_beam.runners.dask.overrides import _GroupByKeyOnly
from apache_beam.transforms.window import GlobalWindow
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms.window import WindowFn
from apache_beam.utils.windowed_value import WindowedValue

OpInput = t.Union[db.Bag, t.Sequence[db.Bag], None]
PCollVal = t.Union[WindowedValue, t.Any]


def get_windowed_value(item: t.Any, window_fn: WindowFn) -> WindowedValue:
if isinstance(item, TaggedOutput):
item = item.value

if isinstance(item, WindowedValue):
windowed_value = item
elif isinstance(item, TimestampedValue):
assign_context = WindowFn.AssignContext(item.timestamp, item.value)
windowed_value = WindowedValue(
item.value, item.timestamp, tuple(window_fn.assign(assign_context)))
else:
windowed_value = WindowedValue(item, 0, (GlobalWindow(), ))

return windowed_value


def defenestrate(x):
alxmrs marked this conversation as resolved.
Show resolved Hide resolved
if isinstance(x, WindowedValue):
return x.value
return x


@dataclasses.dataclass
class TaggingReceiver(Receiver):
tag: str
values: t.List[PCollVal]

def receive(self, windowed_value: WindowedValue):
if self.tag:
output = TaggedOutput(self.tag, windowed_value)
else:
output = windowed_value
self.values.append(output)


@dataclasses.dataclass
class OneReceiver(dict):
values: t.List[PCollVal] = field(default_factory=list)

def __missing__(self, key):
if key not in self:
self[key] = TaggingReceiver(key, self.values)
return self[key]


@dataclasses.dataclass
Expand Down Expand Up @@ -65,15 +123,54 @@ def apply(self, input_bag: OpInput) -> db.Bag:
class ParDo(DaskBagOp):
alxmrs marked this conversation as resolved.
Show resolved Hide resolved
def apply(self, input_bag: db.Bag) -> db.Bag:
transform = t.cast(apache_beam.ParDo, self.transform)
return input_bag.map(
transform.fn.process, *transform.args, **transform.kwargs).flatten()

args, kwargs = transform.raw_side_inputs
args = list(args)
main_input = next(iter(self.applied.main_inputs.values()))
window_fn = main_input.windowing.windowfn if hasattr(
main_input, "windowing") else None

class Map(DaskBagOp):
def apply(self, input_bag: db.Bag) -> db.Bag:
transform = t.cast(apache_beam.Map, self.transform)
return input_bag.map(
transform.fn.process, *transform.args, **transform.kwargs)
context = DoFnContext(transform.label, state=None)
bundle_finalizer_param = DoFn.BundleFinalizerParam()
do_fn_signature = DoFnSignature(transform.fn)

tagged_receivers = OneReceiver()

output_processor = _OutputHandler(
window_fn=window_fn,
main_receivers=tagged_receivers[None],
tagged_receivers=tagged_receivers,
per_element_output_counter=None,
output_batch_converter=None,
process_yields_batches=False,
process_batch_yields_elements=False)

do_fn_invoker = DoFnInvoker.create_invoker(
do_fn_signature,
output_processor,
context,
None,
args,
kwargs,
user_state_context=None,
bundle_finalizer_param=bundle_finalizer_param)

def apply_dofn_to_bundle(items):
do_fn_invoker.invoke_setup()
do_fn_invoker.invoke_start_bundle()

for it in items:
do_fn_invoker.invoke_process(it)

results = [v.value for v in tagged_receivers.values]

do_fn_invoker.invoke_finish_bundle()
do_fn_invoker.invoke_teardown()

return results

return input_bag.map(get_windowed_value,
window_fn).map_partitions(apply_dofn_to_bundle)


class GroupByKey(DaskBagOp):
Expand All @@ -83,21 +180,20 @@ def key(item):

def value(item):
k, v = item
return k, [elm[1] for elm in v]
return k, [defenestrate(elm[1]) for elm in v]

return input_bag.groupby(key).map(value)


class Flatten(DaskBagOp):
def apply(self, input_bag: OpInput) -> db.Bag:
def apply(self, input_bag: t.List[db.Bag]) -> db.Bag:
assert type(input_bag) is list, 'Must take a sequence of bags!'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd expect our type hinting infra to handle this, but I'm comfortable leaving it while you develop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to know!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this should be isinstance()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this should be isinstance()

Good call @alxmrs, I can fix that in my next push.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 32ee6e1

return db.concat(input_bag)


TRANSLATIONS = {
_Create: Create,
apache_beam.ParDo: ParDo,
apache_beam.Map: Map,
alxmrs marked this conversation as resolved.
Show resolved Hide resolved
_GroupByKeyOnly: GroupByKey,
_Flatten: Flatten,
}
1 change: 1 addition & 0 deletions sdks/python/scripts/generate_pydoc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ excluded_patterns=(
'apache_beam/runners/portability/'
'apache_beam/runners/test/'
'apache_beam/runners/worker/'
'apache_beam/runners/dask/transform_evaluator.*'
'apache_beam/testing/benchmarks/chicago_taxi/'
'apache_beam/testing/benchmarks/cloudml/'
'apache_beam/testing/benchmarks/inference/'
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,8 @@ def get_portability_package_data():
],
'dataframe': dataframe_dependency,
'dask': [
'dask >= 2022.6',
'distributed >= 2022.6',
'dask >= 2022.6.0',
'distributed >= 2022.6.0',
],
},
zip_safe=False,
Expand Down
Loading
Loading