Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Numba serialization is slow sometimes #7289

Closed
mrocklin opened this issue Nov 10, 2022 · 2 comments · Fixed by #7322
Closed

Numba serialization is slow sometimes #7289

mrocklin opened this issue Nov 10, 2022 · 2 comments · Fixed by #7322

Comments

@mrocklin
Copy link
Member

I'm using Dask + Datashader over here: https://github.com/mrocklin/dask-tutorial/blob/main/2-dataframes-at-scale.ipynb

I'm finding that I'm spending around 20s serializing things, this is mostly in some strange cloudpickle/numba interaction, particularly in numba/cloudpickle.py/cloudpickle_fast. In a dataframe with 800 partitions this gets called 67529 times, and apparently each time is fairly slow.

cc @ianthomas23 @fjetter @seibert

Screen Shot 2022-11-10 at 10 06 40 AM

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
12474464/6246010    5.075    0.000    9.144    0.000 {method 'format' of 'str' objects}
18837333/18832071    5.005    0.000    5.242    0.000 {built-in method builtins.getattr}
        4    3.162    0.790    3.162    0.790 {method 'acquire' of '_thread.lock' objects}
  6283755    2.821    0.000   16.884    0.000 pickle.py:322(_getattribute)
    22802    2.277    0.000   19.348    0.001 cloudpickle.py:132(_whichmodule)
  6212668    1.697    0.000    4.589    0.000 <frozen importlib._bootstrap>:581(_module_repr_from_spec)
  6228454    1.452    0.000    6.675    0.000 <frozen importlib._bootstrap>:294(_module_repr)
28031/2627    0.420    0.000    0.426    0.000 utils_comm.py:165(unpack_remotedata)
  6272548    0.372    0.000    0.372    0.000 {method 'startswith' of 'str' objects}
  6422829    0.326    0.000    0.326    0.000 {method 'split' of 'str' objects}
     3518    0.308    0.000    0.506    0.000 cloudpickle.py:345(_find_imported_submodules)
  6133738    0.290    0.000    0.290    0.000 <frozen importlib._bootstrap>:412(has_location)
    12278    0.273    0.000    0.464    0.000 cloudpickle.py:260(_find_imported_submodules)
6454132/6454126    0.212    0.000    0.213    0.000 {built-in method builtins.isinstance}
 2635/881    0.147    0.000   20.891    0.024 {function CloudPickler.dump at 0x10409ff40}
        4    0.036    0.009    0.036    0.009 {built-in method numpy.core._multiarray_umath.interp}
     1792    0.034    0.000    0.034    0.000 {method 'copy' of 'dict' objects}
     1756    0.034    0.000    0.034    0.000 {built-in method posix.stat}
    67529    0.030    0.000   19.988    0.000 cloudpickle_fast.py:612(reducer_override)
    67529    0.024    0.000   20.012    0.000 serialize.py:182(reducer_override)
     1754    0.022    0.000    0.105    0.000 <frozen importlib._bootstrap>:921(_find_spec)
    51868    0.021    0.000    0.629    0.000 cloudpickle_fast.py:691(reducer_override)
    76372    0.021    0.000    0.024    0.000 __init__.py:976(__getitem__)
    12278    0.021    0.000    0.507    0.000 cloudpickle_fast.py:140(_function_getstate)
    22802    0.017    0.000   19.399    0.001 cloudpickle.py:195(_lookup_module_and_qualname)
    65200    0.017    0.000    0.017    0.000 {method 'get' of 'dict' objects}
36791/14890    0.015    0.000    0.019    0.000 utils.py:1855(stringify)
    12278    0.015    0.000    0.018    0.000 cloudpickle_fast.py:533(_function_getnewargs)
    16713    0.012    0.000    0.038    0.000 cloudpickle.py:278(_lookup_module_and_qualname)
    12278    0.012    0.000    0.013    0.000 cloudpickle_fast.py:246(_code_reduce)
     2626    0.011    0.000   20.875    0.008 pickle.py:33(dumps)
     1754    0.011    0.000   20.178    0.012 serialize.py:101(_pickle__CustomPickled)
     1754    0.010    0.000    0.056    0.000 <frozen importlib._bootstrap_external>:1527(find_spec)
    17594    0.009    0.000    0.059    0.000 cloudpickle.py:241(_should_pickle_by_reference)
    23679    0.009    0.000   19.410    0.001 cloudpickle.py:175(_is_importable)
     9647    0.009    0.000    0.011    0.000 dispatcher.py:872(_reduce_states)
     3518    0.009    0.000    0.522    0.000 cloudpickle_fast.py:139(_function_getstate)
     3518    0.009    0.000    0.014    0.000 cloudpickle_fast.py:597(_function_getnewargs)
     2629    0.008    0.000    0.009    0.000 {built-in method _pickle.dumps}
    14076    0.008    0.000    0.011    0.000 cloudpickle.py:181(_is_registered_pickle_by_value)
   143145    0.007    0.000    0.007    0.000 {built-in method builtins.issubclass}
@mrocklin
Copy link
Member Author

OK, we have a numba function inside of a SubgraphCallable. We're serializing the same SubgraphCallable many times. This is a bit odd because the SubgraphCallable is exactly the same, except for the outkey, which changes per partition.

@mrocklin
Copy link
Member Author

OK, it looks like we don't generate SubgraphCallables with lots of new keys, they're getting injected later if there are futures in the same unpack because the unique names of these futures get injected.

I think that maybe we don't even care about the names of these futures, and instead just want some generic token. The code below probably breaks things, but it seems to work in my example.

index c06d3697..c74dd340 100644
--- a/distributed/utils_comm.py
+++ b/distributed/utils_comm.py
@@ -213,7 +213,16 @@ def unpack_remotedata(o, byte_keys=False, myset=None):
                     if byte_keys
                     else tuple(f.key for f in futures)
                 )
-                inkeys = sc.inkeys + futures
+                s = str(dsk)
+                inkeys = list(sc.inkeys)
+                for future in futures:
+                    if future in s:  # does this ever happen?
+                        inkeys.append(future)
+                        breakpoint()
+                    else:
+                        inkeys.append("_")
                 return (
                     (SubgraphCallable(dsk, sc.outkey, inkeys, sc.name),)
                     + args

@seibert @ianthomas23 sorry for pinging you. This is definitely an internal Dask issue.

cc'ing also @rjzamora due to HLGs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
1 participant