Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DaskRunner GBK by bumping dask lower bound #29802

Closed
wants to merge 4 commits into from

Conversation

cisaacstern
Copy link
Contributor

@cisaacstern cisaacstern commented Dec 18, 2023

Fixes #29365 by bumping distributed dask to include upstream fix in dask/distributed#8400 dask/dask#10734.

Opening as draft until we get a release of distributed dask.

cc @jacobtomlinson @TheNeuralBit for visibility 🚀


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link
Member

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add a unit test to the dask runner test suite that reproduces the original issue.

We should also add a precommit/postcommit that runs the dask runner tests (but that could be a separate task).

# a lower bound release of `distributed`, once a release that includes the linked PR is available.
# 'dask >= 2023.XX',
# 'distributed >= 2023.XX',
'distributed @ git+https://github.com/dask/distributed.git@8c3eb6f0bf47d124c887c543599d80ff09c3f5ed',
Copy link
Member

@TheNeuralBit TheNeuralBit Dec 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you do update this we should consider adding an upper bound to protect against breaking changes (e.g. I was seeing failures because wait_for_workers(n_workers) became a required parameter).

Unfortunately dask's use of CalVer isn't particularly conducive to this, as the third number is just a unique id for a release in that month (e.g. 2023.12.1 might be a bugfix release on 2023.12.0 or it could be the second major release in December). I think we should just restrict on the month, even though this might technically let a breaking change in. So something like >=2023.12,<2024.1.

@TheNeuralBit
Copy link
Member

Thank you for working to resolve this :)

@cisaacstern
Copy link
Contributor Author

The good news

The main objective of fixing GBK appears to be solved, as shown by the fact that this script

# gbk.py

import apache_beam as beam
from apache_beam.runners.dask.dask_runner import DaskRunner

if __name__ == "__main__":
  with beam.Pipeline(runner=DaskRunner()) as p:
    p | beam.Create([('a', 1), ('a', 2), ('b', 3), ('b', 4)]) | beam.GroupByKey() | beam.Map(print)

gives a correctly grouped result when when with distributed installed from upstream (at the pinned commit)

$ python gbk.py
('b', [3, 4])
('a', [1, 2])

and (as expected) an incorrect result with the latest release of distributed from pypi

$ python gbk.py
('a', [1])
('b', [3])
('b', [4])
('a', [2])

The bad news

We can't actually test this, because while the linked distributed PR did fix string keys for dask.bag.Bag.groupby, it turns out None keys are also broken in dask.bag (And None keys are required to use assert_that.)

This is because hash(None) is non-deterministic in Python < 3.12. A few notes/thoughts:

  • hash(None) is deterministic in Python 3.12, thanks to gh-99540: Constant hash for _PyNone_Type to aid reproducibility python/cpython#99541 (I found the linked issue discussion very interesting! FWIW)
  • Non-deterministic behavior of hash(None) also means otherwise deterministically-hashable containers (e.g. tuples, frozen dataclasses, etc) hash non-deterministically if they contain None. This seems like a very common category of key for real-world use cases.
  • Only supporting GBK for > Python 3.12 seems like an unreasonable workaround for what is ultimately an upstream brittleness issue, and should be fixed there.

So it seems like the only viable path forward is to revisit a fix for dask/dask#6723, which I was hoping we could avoid. I will re-ping that issue either later today or tomorrow with some thoughts on a path forward.

@cisaacstern
Copy link
Contributor Author

So it seems like the only viable path forward is to revisit a fix for dask/dask#6723, which I was hoping we could avoid. I will re-ping that issue either later today or tomorrow with some thoughts on a path forward.

See dask/dask#6723 (comment) and dask/dask#10734. AFAICT, the latter resolves the testing issues observed above.

@cisaacstern cisaacstern changed the title Fix DaskRunner GBK by bumping distributed lower bound Fix DaskRunner GBK by bumping dask lower bound Dec 21, 2023
@cisaacstern
Copy link
Contributor Author

I've renamed the issue to reflect my current proposed path forward, which it turns out is a PR to dask (not distributed).

This PR appears to fix non-deterministic hashing of both strings and None (and I believe all other types as well), whereas the previously-referenced distributed PR only fixed string hashes.

Please also add a unit test to the dask runner test suite that reproduces the original issue.

👍 test_groupby_string_keys added in 9067a54 replicates the current issue. This test passes locally for me with dask installed from the branch provided in d6c35eb. We will need to wait for the associated PR to progress before this will pass in CI (given tight coupling of distributed/dask releases, see comment in setup.py). We can also asses what a reasonable upper bound might be at that point.

We should also add a precommit/postcommit that runs the dask runner tests (but that could be a separate task).

Agreed! IIUC, this is what is being tracked in #25696?

@TheNeuralBit
Copy link
Member

Agreed! IIUC, this is what is being tracked in #25696?

Ah yes, thanks.

Copy link
Contributor

github-actions bot commented Mar 4, 2024

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Mar 4, 2024
Copy link
Contributor

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Mar 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug]: DaskRunner GBK is broken, pending upstream fix in dask
2 participants