From 83fbfc57e0b92591f0f0192d3fbed4a7bc8488f5 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Mon, 18 Dec 2023 09:24:25 -0800 Subject: [PATCH 1/4] install distributed from github, temporarily --- sdks/python/setup.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index d3fca2147239d..110f87289f138 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -418,8 +418,13 @@ def get_portability_package_data(): ], 'dataframe': dataframe_dependency, 'dask': [ - 'dask >= 2022.6', - 'distributed >= 2022.6', + # FIXME(cisaacstern): The git+ link below is where https://github.com/dask/distributed/pull/8400 + # was merged into `distributed`. This PR is a fix for https://github.com/apache/beam/issues/29365. + # Installing from here to move forward with development. Before merge, this should be replaced with + # a lower bound release of `distributed`, once a release that includes the linked PR is available. + # 'dask >= 2023.XX', + # 'distributed >= 2023.XX', + 'distributed @ git+https://github.com/dask/distributed.git@8c3eb6f0bf47d124c887c543599d80ff09c3f5ed', ], 'yaml': [ 'docstring-parser>=0.15,<1.0', From a106deceb1d10bada377dcd0264c2efc856e5821 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Mon, 18 Dec 2023 10:43:31 -0800 Subject: [PATCH 2/4] client.wait_for_workers requires n_workers arg; use as_completed instead --- .../apache_beam/runners/dask/dask_runner.py | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/runners/dask/dask_runner.py b/sdks/python/apache_beam/runners/dask/dask_runner.py index 109c4379b45df..4de68066125f6 100644 --- a/sdks/python/apache_beam/runners/dask/dask_runner.py +++ b/sdks/python/apache_beam/runners/dask/dask_runner.py @@ -86,21 +86,31 @@ def _add_argparse_args(cls, parser: argparse.ArgumentParser) -> None: @dataclasses.dataclass class DaskRunnerResult(PipelineResult): - from dask import distributed + import dask.distributed as ddist - client: distributed.Client - futures: t.Sequence[distributed.Future] + client: ddist.Client + futures: t.Sequence[ddist.Future] def __post_init__(self): super().__init__(PipelineState.RUNNING) def wait_until_finish(self, duration=None) -> str: + import dask.distributed as ddist + try: if duration is not None: # Convert milliseconds to seconds duration /= 1000 - self.client.wait_for_workers(timeout=duration) - self.client.gather(self.futures, errors='raise') + for _ in ddist.as_completed(self.futures, + timeout=duration, + with_results=True): + # without gathering results, worker errors are not raised on the client: + # https://distributed.dask.org/en/stable/resilience.html#user-code-failures + # so we want to gather results to raise errors client-side, but we do + # not actually need to use the results here, so we just pass. to gather, + # we use the iterative `as_completed(..., with_results=True)`, instead + # of aggregate `client.gather`, to minimize memory footprint of results. + pass self._state = PipelineState.DONE except: # pylint: disable=broad-except self._state = PipelineState.FAILED From 9067a54baf3741de3d144d77bfc2ed943b4e8236 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Thu, 21 Dec 2023 15:11:13 -0800 Subject: [PATCH 3/4] add test replicating #29365 bug --- .../apache_beam/runners/dask/dask_runner_test.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/sdks/python/apache_beam/runners/dask/dask_runner_test.py b/sdks/python/apache_beam/runners/dask/dask_runner_test.py index d8b3e17d8a56b..fd4e1cebc6f71 100644 --- a/sdks/python/apache_beam/runners/dask/dask_runner_test.py +++ b/sdks/python/apache_beam/runners/dask/dask_runner_test.py @@ -73,6 +73,11 @@ def test_create(self): pcoll = p | beam.Create([1]) assert_that(pcoll, equal_to([1])) + def test_create_multiple(self): + with self.pipeline as p: + pcoll = p | beam.Create([1, 2, 3, 4]) + assert_that(pcoll, equal_to([1, 2, 3, 4])) + def test_create_and_map(self): def double(x): return x * 2 @@ -89,6 +94,14 @@ def double(x): pcoll = p | beam.Create([1]) | beam.Map(double) | beam.GroupByKey() assert_that(pcoll, equal_to([(2, [1])])) + def test_groupby_string_keys(self): + with self.pipeline as p: + pcoll = ( + p + | beam.Create([('a', 1), ('a', 2), ('b', 3), ('b', 4)]) + | beam.GroupByKey()) + assert_that(pcoll, equal_to([('a', [1, 2]), ('b', [3, 4])])) + if __name__ == '__main__': unittest.main() From d6c35eb1e85136bbd06110acec9dd59770e76f35 Mon Sep 17 00:00:00 2001 From: Charles Stern <62192187+cisaacstern@users.noreply.github.com> Date: Thu, 21 Dec 2023 15:11:52 -0800 Subject: [PATCH 4/4] for development, re-pin to dask pr branch --- sdks/python/setup.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 110f87289f138..f1618818359ab 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -418,13 +418,16 @@ def get_portability_package_data(): ], 'dataframe': dataframe_dependency, 'dask': [ - # FIXME(cisaacstern): The git+ link below is where https://github.com/dask/distributed/pull/8400 - # was merged into `distributed`. This PR is a fix for https://github.com/apache/beam/issues/29365. + # FIXME(cisaacstern): The git+ link below is a fix for https://github.com/apache/beam/issues/29365. # Installing from here to move forward with development. Before merge, this should be replaced with - # a lower bound release of `distributed`, once a release that includes the linked PR is available. - # 'dask >= 2023.XX', - # 'distributed >= 2023.XX', - 'distributed @ git+https://github.com/dask/distributed.git@8c3eb6f0bf47d124c887c543599d80ff09c3f5ed', + # a lower bound release of `dask` that includes https://github.com/dask/dask/pull/10734. + # 'dask >= 2024.XX.X', + # 'distributed >= 2024.XX.X', + 'dask @ git+https://github.com/cisaacstern/dask.git@tokenize-bag-groupby-key', + # For development, 'distributed >= 2023.12.1' should work with the above dask PR, however it can't + # be installed as part of a single `pip` call, since distributed releases are pinned to specific + # dask releases. As a workaround, distributed can be installed first, and then `.[dask]` installed + # second, with the `--update` / `-U` flag to replace the dask release brought in by distributed. ], 'yaml': [ 'docstring-parser>=0.15,<1.0',