Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: DaskRunner GBK is broken, pending upstream fix in dask #29365

Open
1 of 16 tasks
cisaacstern opened this issue Nov 9, 2023 · 7 comments
Open
1 of 16 tasks

[Bug]: DaskRunner GBK is broken, pending upstream fix in dask #29365

cisaacstern opened this issue Nov 9, 2023 · 7 comments
Assignees

Comments

@cisaacstern
Copy link
Contributor

What happened?

As released in 2.51.0, the DaskRunner's GBK implementation only works under certain conditions (e.g., if the key type is an integer). It does not work for many commonly-used patterns, including:

While both of these are problematic, the latter is especially so because, given the ubiquity of asset_that in the tests, it more-or-less means we can't test (or, therefore, develop) the DaskRunner at all until this issue is fixed.

Over in cisaacstern/beam-daskrunner-gbk-bugs#1, I have developed a relatively comprehensive demonstration of both of these failure modes. Based on experiments there, it looks like the root cause of this issue relates to the partitioning of the underlying Dask Bag collection. As evidence of this, when the following implementation of Create is mocked into 2.51.0 during the test session, both of these GBK-related bugs are resolved (coped from here):

from apache_beam.runners.dask.transform_evaluator import Create as DaskCreate, OpInput

class UnpartitionedCreate(DaskCreate):
     """The DaskRunner GBK bug(s) demonstrated by this test module are (somehow)
     related to the partitioning of the dask bag collection. The following
     object is mocked into the DaskRunner in certain test cases below to
     demonstrate that if the dask bag collection is unpartitioned (i.e., consists
     of only a single partition), then the GBK bug(s) are resolved.
     """
     def apply(self, input_bag: OpInput) -> db.Bag:
         partitioned = super().apply(input_bag)
         return partitioned.repartition(npartitions=1)

I am nearly certain that this issue is not with dask.bag.Bag.groupby, which AFAICT works fine, e.g., with partitioned collections containing string keys. Rather, this appears to have something to do with how the DaskRunner is wrapping partioned collections.

(Note: I have tried to develop an MRE for the assert_that failure mode that does not involve assert_that, but thus fair come up empty-handed. Initially, I thought that it might have to do with using None as a key, which happens there, but the linked test cases show that, at least in a minimal example, the DaskRunner's GBK can handle None keys.)

I am happy to take this issue, as it is currently blocking me from finishing #27618. I may need to enlist the support of some more Dask-savvy folks in getting to the bottom of this, perhaps @jacobtomlinson will be keen!

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@cisaacstern
Copy link
Contributor Author

.take-issue

@TheNeuralBit
Copy link
Member

I spent a little time looking at this. I found it's possible to minimally reproduce this with dask only, configured to use a LocalCluster.

With this script:

from distributed import Client, LocalCluster
from dask import bag as db
from pprint import pprint


def run():
  cluster = LocalCluster()
  client = Client(cluster)

  b = db.from_sequence([('a', 1),
                        ('a', 2),
                        ('b', 3),
                        ('b', 4)])

  grouped = b.groupby(lambda s: s[0])

  for i in range(10):
    print(i)
    pprint(list(grouped))


if __name__ == '__main__':
    run()

I get this output:

0
[('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1), ('a', 2)])]
1
[('a', [('a', 2)]), ('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1)])]
2
[('a', [('a', 2)]), ('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1)])]
3
[('b', [('b', 4)]), ('a', [('a', 2)]), ('b', [('b', 3)]), ('a', [('a', 1)])]
4
[('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1), ('a', 2)])]
5
[('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1), ('a', 2)])]
6
[('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1), ('a', 2)])]
7
[('a', [('a', 2)]), ('b', [('b', 4)]), ('b', [('b', 3)]), ('a', [('a', 1)])]
8
[('b', [('b', 4)]), ('a', [('a', 2)]), ('b', [('b', 3)]), ('a', [('a', 1)])]
9
[('b', [('b', 4)]), ('a', [('a', 2)]), ('b', [('b', 3)]), ('a', [('a', 1)])]

This is the same behavior I see in a simple beam groupby test. I'm not sure how to make sense of this. Some possible explanations:

  • We're misunderstanding the semantics of groupby and it's not actually doing a shuffle.
  • Encoding python strings for dask shuffle is non-determinstic somehow.

Actually it looks like it must be the latter. Switching to integer keys works as expected (as you observed in the beam test):

❯ python test.py
0
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
1
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
2
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
3
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
4
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
5
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
6
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
7
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
8
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]
9
[(0, [(0, 1), (0, 2)]), (1, [(1, 3), (1, 4)])]

We should probably raise an issue with dask.

@cisaacstern
Copy link
Contributor Author

cisaacstern commented Dec 8, 2023

Wow @TheNeuralBit this is right on the money, perfect reproducer!

I can translate this into a Dask issue, if you like?

Edit: Working with your script a bit locally myself first, to see if I can narrow-down the source of the bug a bit further... looking like it's something the do with the interaction between dask bag and distributed, as I am not seeing the error without distributed. Will post a follow-up here shortly.

@cisaacstern
Copy link
Contributor Author

As Brian discovered during our sync on this today, the Dask issues already exist:

Encoding python strings for dask shuffle is non-determinstic somehow.

This is indeed the root issue, resulting from the fact that Python's built-in hash is non-deterministic across processes for strings, matching our observations here. I just confirmed that using cluster = LocalCluster(processes=False) (which uses single-process worker threads) resolves this groupby issue. Of course, that does not reflect how the DaskRunner will be used in the wild, so we will need a fix for the above Dask issue(s) to move forward here.

I will ping the Dask issues now.

@TheNeuralBit
Copy link
Member

TheNeuralBit commented Dec 9, 2023

It's really surprising that not many people have run into this! I noticed the dask bag docs do nudge users away from groupby:

These shuffle operations are expensive and better handled by projects like dask.dataframe. It is best to use dask.bag to clean and process data, then transform it into an array or DataFrame before embarking on the more complex operations that require shuffle steps.

So I guess it is conceivable that no one is relying on this.

It occurred to me that we are also partitioning with a hash in Beam, for the DataFrame API. I refreshed my memory on this and found it's using pd.util.hash_array instead of hash:

pd.util.hash_array(np.asarray(df.index.get_level_values(level)))

Presumably Dask DataFrames are doing something similar and that's why we don't see this issue there.

@cisaacstern
Copy link
Contributor Author

Pleased to report that dask/distributed#8400 provides a path forward for this.

I'll open a draft PR bumping the dask version we use in Beam (to be completed/merged following a release of distributed).

cisaacstern added a commit to cisaacstern/beam that referenced this issue Dec 21, 2023
@cisaacstern cisaacstern changed the title [Bug]: DaskRunner GBK failures related to partitioning prevent use of string keys and break assert_that [Bug]: DaskRunner GBK is broken, pending upstream fix in dask Mar 21, 2024
@cisaacstern
Copy link
Contributor Author

I've updated the issue title to reflect my current understanding of this issue. The previously mentioned dask distributed PR does not solve the case of non-deterministic hashing of Python's NoneType, which we will need fixed to move forward with the DaskRunner. I am exploring one possible path forward in dask/dask#10734.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants