Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask requires consistent Python hashing #4141

Open
itamarst opened this issue Oct 1, 2020 · 8 comments
Open

Dask requires consistent Python hashing #4141

itamarst opened this issue Oct 1, 2020 · 8 comments

Comments

@itamarst
Copy link

itamarst commented Oct 1, 2020

As per dask/dask#6640, Dask breaks if Python hashing is inconsistent across workers. This appears to be a bug in Distributed Dask backend as well:

hashing.py:

from dask.distributed import Client

from dask.bag import Bag
dsk = {("x", 0): (range, 5), ("x", 1): (range, 5), ("x", 2): (range, 5)}
b = Bag(dsk, "x", 3)


def iseven(x):
    return x % 2 == 0


def test_bag_groupby_pure_hash():
    # https://github.com/dask/dask/issues/6640
    result = b.groupby(iseven).compute()
    assert result == [(False, [1, 3] * 3), (True, [0, 2, 4] * 3)]


def test_bag_groupby_normal_hash():
    # https://github.com/dask/dask/issues/6640
    client = Client(n_workers=3)
    result = b.groupby(lambda x: "even" if iseven(x) else "odd").compute()
    assert len(result) == 2
    assert ("odd", [1, 3] * 3) in result
    assert ("even", [0, 2, 4] * 3) in result


def main():
    client = Client(n_workers=3)
    test_bag_groupby_normal_hash()
    test_bag_groupby_pure_hash()
    print("HORRAH")


if __name__ == '__main__':
    import hashing
    hashing.main()

When run:

$ python hashing.py
...
  File "../dask/hashing.py", line 36, in <module>
    hashing.main()
  File "/home/itamarst/Devel/dask/hashing.py", line 29, in main
    test_bag_groupby_normal_hash()
  File "/home/itamarst/Devel/dask/hashing.py", line 22, in test_bag_groupby_normal_hash
    assert len(result) == 2

Solving this

The solution for Dask (dask/dask#6660) was to set PYTHONHASHSEED for worker processes. And you can do that similar solution for Distributed in some cases, e.g. Client().

However, I'm pretty sure the distributed-worker CLI just runs the worker inline, it's not a subprocess, so by the time you're running Python code the hash seed has already been set, and you can't change it. It could e.g. set it and the fork()+exec() Python again, I suppose.

@mrocklin
Copy link
Member

mrocklin commented Oct 2, 2020

Hrm, so dask-worker does run the worker in a sub process by default (we create a Nanny, which runs Worker in a spawned process). However, this isn't universal (some people run without the Nanny process).

Doing the same fix as we did in dask.multiprocessing would be a good first step here, and would probably solve 90% of the problem.

Going beyond that though, I'm not sure. We could consider alternative hashing functions. Are there ways to specify the hash see programmatically somehow?

@itamarst
Copy link
Author

itamarst commented Oct 2, 2020

There's no public API for setting it after startup, which makes sense given what it does. Only thing I can think of is something like

import os, sys

if not os.environ.get("PYTHONHASHSEED") == "6640":
    os.environ["PYTHONHASHSEED"] = "6640"
   os.execv(sys.argv)

in the startup code (which won't work on Windows...).

@itamarst
Copy link
Author

itamarst commented Oct 6, 2020

For the Distributed-specific issue, is non-nanny mode actually useful, or could it be dropped?

Stepping back to fundamental solution—

with set_hash_seed(6640):
    h = hash(obj)

is maybe possible... but there are no public APIs, so you have to be OK with munging private CPython internals. And it's global state, so it will impact other threads' hashing which will e.g. break objects. Might be better to get a PEP through so that it's possible with future Python.

Another approach is a reimplemented hash algorithm. For example https://deepdiff.readthedocs.io/en/latest/deephash.html. The question here is how good it is at hashing arbitrary objects, would need to do some digging/reading/testing. Basic sense of what it supports: https://github.com/seperman/deepdiff/blob/master/deepdiff/deephash.py#L429

@itamarst
Copy link
Author

itamarst commented Oct 6, 2020

deephash seems promising:

from deepdiff import DeepHash
from pprint import pprint

class CustomHash:

    def __init__(self, x):
        self.x = x

    def __repr__(self):
        return f"CustomHash({self.x})"


objects = [
    125,
    "lalala",
    (12, 17),
    CustomHash(17),
    CustomHash(17),
    CustomHash(25),
]

for o in objects:
    print(repr(o), "has hash", DeepHash(o)[o])

Results in:

$ python example.py 
125 has hash 08823c686054787db6befb006d6846453fd5a3cbdaa8d1c4d2f0052a227ef204
'lalala' has hash 2307d4a08f50b743ec806101fe5fc4664e0b83c6c948647436f932ab38daadd3
(12, 17) has hash c7129efa5c7d19b0efabda722ae4cd8d022540b31e8160f9c181d163ce4f1bf6
CustomHash(17) has hash 477631cfc0315644152933579c23ed0c9f27743d632613e86eb7e271f6fc86e4
CustomHash(17) has hash 477631cfc0315644152933579c23ed0c9f27743d632613e86eb7e271f6fc86e4
CustomHash(25) has hash 8fa8427d493399d6aac3166331d78e9e2a2ba608c2b3c92a131ef0cf8882be14

@itamarst
Copy link
Author

itamarst commented Oct 9, 2020

Moved discussion of Dask-side fix to dask/dask#6723

@towr
Copy link

towr commented Feb 14, 2023

There really ought to be a warning about this in the documentation. It would be one thing if this raised an exception and just broke your application, but instead it unexpectedly gives completely wrong results.

If it's impossible to come up with a good hash function in two and a half years, maybe the solution is to just give groupby a hash_function argument. This could default to hash() for data where it's safe to use (like integers), and otherwise raise an exception if it's not supplied.

@cisaacstern
Copy link
Contributor

For those following this, noting that I've just revived the discussion over in dask/dask#6723 (comment).

@cisaacstern
Copy link
Contributor

Doing the same fix as we did in dask.multiprocessing would be a good first step here, and would probably solve 90% of the problem.

Gradually understanding the problem/solution space here, and realized that this basic fix has not yet been implemented, which I agree that would solve a large percentage of use cases. I will work up a PR for this for others to consider.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants