Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

usage of dask.distributed.Queue raises ValueError: unknown address scheme 'gateway' #404

Open
nikoladze opened this issue Jul 6, 2021 · 9 comments
Labels
bug Something isn't working

Comments

@nikoladze
Copy link

What happened:

When i try to use dask.distributed.Queue on workers of a dask-gateway cluster i get ValueError: unknown address scheme 'gateway'

What you expected to happen:

I expect dask.distributed.Queue to work, as it does e.g. in a dask.distributed.LocalCluster

Minimal Complete Verifiable Example:

python -m venv ~/venv/dask-gateway
source ~/venv/dask-gateway/bin/activate
pip install dask-gateway 'dask-gateway-server[local]'

dask-gateway-server
from dask_gateway import Gateway
from dask.distributed import Queue
gateway = Gateway("http://127.0.0.1:8000")
cluster = gateway.new_cluster()
cluster.scale(1)
client = cluster.get_client()
q = Queue("log")

def log_something():
    q.put(42)

client.submit(log_something).result()

results in

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/nikolai/venv/dask-gateway/lib/python3.9/site-packages/distributed/client.py", line 227, in result
    raise exc.with_traceback(tb)
  File "/home/nikolai/venv/dask-gateway/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
  File "/home/nikolai/venv/dask-gateway/lib/python3.9/site-packages/distributed/queues.py", line 289, in __setstate__
    client = get_client(address)
  File "/home/nikolai/venv/dask-gateway/lib/python3.9/site-packages/distributed/worker.py", line 3615, in get_client
    address = comm.resolve_address(address)
  File "/home/nikolai/venv/dask-gateway/lib/python3.9/site-packages/distributed/comm/addressing.py", line 171, in resolve_address
    backend = registry.get_backend(scheme)
  File "/home/nikolai/venv/dask-gateway/lib/python3.9/site-packages/distributed/comm/registry.py", line 81, in get_backend
    raise ValueError(
ValueError: unknown address scheme 'gateway' (known schemes: ['inproc', 'tcp', 'tls', 'ucx', 'ws', 'wss'])

Anything else we need to know?:

Noticed the issue in a kubernetes-deployed setup on google cloud, but it is reproducible with the local installation as well.

The code works with a distributed cluster, e.g.

from dask.distributed import Client, Queue
client = Client()
q = Queue("log")

def log_something():
    q.put(42)

client.submit(log_something).result()
q.get()

Environment:

  • Dask version: 2021.6.2,
  • dask-gateway version: 0.9.0
  • Python version: 3.9.5
  • Operating System: arch linux
  • Install method (conda, pip, source): pip
@TomAugspurger
Copy link
Member

I notice that the issue is in the (de)serailization of the Queue object. It looks like distributed.Queue defines how it pickles in a way that doesn't work with Dask Gateway clusters: https://github.com/dask/distributed/blob/88b99ae2ab2fca96395fe1975b0e0ebb8f7b25a0/distributed/queues.py#L283-L293.

I think that logic would need to be updated to handle things required by dask-gateway (and other cluster managers). At a minimum things like security would need to be provided.

@consideRatio consideRatio added the bug Something isn't working label Aug 26, 2021
@wdhowe
Copy link

wdhowe commented Jan 27, 2022

I am seeing a very similar issue when attempting to execute functions against persisted data structures.

from dask_gateway import Gateway

gateway = Gateway()
cluster = gateway.new_cluster()
workers = 2
cluster.scale(workers)
client = cluster.get_client()

import dask.array as da
ds = da.random.random((100, 100), chunks=(10, 10))
ds_persisted = client.persist(ds)

client.submit(lambda x: x, ds_persisted).result()

Traceback from a jupyter notebook.

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
/tmp/ipykernel_3026/3812178710.py in <module>
----> 1 client.submit(lambda x: x, ds_persisted).result()

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in result(self, timeout)
    234         if self.status == "error":
    235             typ, exc, tb = result
--> 236             raise exc.with_traceback(tb)
    237         elif self.status == "cancelled":
    238             raise result

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/protocol/pickle.py in loads()
     73             return pickle.loads(x, buffers=buffers)
     74         else:
---> 75             return pickle.loads(x)
     76     except Exception:
     77         logger.info("Failed to deserialize %s", x[:10000], exc_info=True)

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in __setstate__()
    381             c = Client.current(allow_global=False)
    382         except ValueError:
--> 383             c = get_client(address)
    384         self.__init__(key, c)
    385         c._send_to_scheduler(

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/worker.py in get_client()
   4066 
   4067     if address and resolve_address:
-> 4068         address = comm.resolve_address(address)
   4069     try:
   4070         worker = get_worker()

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/comm/addressing.py in resolve_address()
    178     """
    179     scheme, loc = parse_address(addr)
--> 180     backend = registry.get_backend(scheme)
    181     return unparse_address(scheme, backend.resolve_address(loc))
    182 

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/comm/registry.py in get_backend()
     90 
     91         if backend is None:
---> 92             raise ValueError(
     93                 "unknown address scheme %r (known schemes: %s)"
     94                 % (scheme, sorted(backends))

ValueError: unknown address scheme 'gateway' (known schemes: ['inproc', 'tcp', 'tls', 'ucx', 'ws', 'wss'])

Environment:

  • Dask version: 2021.11.2
  • dask-gateway version: 0.9.0
  • Python version: 3.9.7
  • Operating System: pangeo/pangeo-notebook:2021.12.02 Docker image (ubuntu:20.04)
  • Install method (conda, pip, source): conda

@jcrist
Copy link
Member

jcrist commented Jan 27, 2022

Hmmm, I'm not sure why queues hold the cluster address in them, most of our other distributed objects don't do that. I may make a PR upstream to fix this. That said, you may be able to get this to work by pre-importing the dask_gateway library on all workers/processes that may deserialize a Queue object. The dask_gateway client library registers the gateway scheme on import, after which point things may just work. Maybe.

@martindurant
Copy link
Member

@jcrist , would that mean that a distributed worker client would also fail in a similar way? It seems reasonable to pre-import dask_gateway (client side) in workers, or at least document what a failure to interpret "gateway:" connections means.

@jcrist
Copy link
Member

jcrist commented Jan 27, 2022

Hmmm, possibly? Worker connections should be able to connect directly through a tls address, but that may not be working properly, I've honestly never tried it.

@wdhowe
Copy link

wdhowe commented Jan 27, 2022

After the dask_gateway import on workers, I now get a different traceback.

"TypeError: Gateway expects a ssl_context argument of type ssl.SSLContext, instead got None"

def import_dask():
    import dask_gateway

client.run(import_dask)

import dask.array as da
ds = da.random.random((100, 100), chunks=(10, 10))
ds_persisted = client.persist(ds)

client.submit(lambda x: x, ds_persisted).result()

Traceback

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_3823/3812178710.py in <module>
----> 1 client.submit(lambda x: x, ds_persisted).result()

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in result(self, timeout)
    234         if self.status == "error":
    235             typ, exc, tb = result
--> 236             raise exc.with_traceback(tb)
    237         elif self.status == "cancelled":
    238             raise result

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/protocol/pickle.py in loads()
     73             return pickle.loads(x, buffers=buffers)
     74         else:
---> 75             return pickle.loads(x)
     76     except Exception:
     77         logger.info("Failed to deserialize %s", x[:10000], exc_info=True)

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in __setstate__()
    381             c = Client.current(allow_global=False)
    382         except ValueError:
--> 383             c = get_client(address)
    384         self.__init__(key, c)
    385         c._send_to_scheduler(

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/worker.py in get_client()
   4084         return client
   4085     elif address:
-> 4086         return Client(address, timeout=timeout)
   4087     else:
   4088         raise ValueError("No global client found and no address provided")

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in __init__()
    764             ext(self)
    765 
--> 766         self.start(timeout=timeout)
    767         Client._instances.add(self)
    768 

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in start()
    946             self._started = asyncio.ensure_future(self._start(**kwargs))
    947         else:
--> 948             sync(self.loop, self._start, **kwargs)
    949 
    950     def __await__(self):

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/utils.py in sync()
    330     if error[0]:
    331         typ, exc, tb = error[0]
--> 332         raise exc.with_traceback(tb)
    333     else:
    334         return result[0]

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/utils.py in f()
    313             if callback_timeout is not None:
    314                 future = asyncio.wait_for(future, callback_timeout)
--> 315             result[0] = yield future
    316         except Exception:
    317             error[0] = sys.exc_info()

/srv/conda/envs/notebook/lib/python3.9/site-packages/tornado/gen.py in run()
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in _start()
   1036 
   1037         try:
-> 1038             await self._ensure_connected(timeout=timeout)
   1039         except (OSError, ImportError):
   1040             await self._close()

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/client.py in _ensure_connected()
   1096 
   1097         try:
-> 1098             comm = await connect(
   1099                 self.scheduler.address, timeout=timeout, **self.connection_args
   1100             )

/srv/conda/envs/notebook/lib/python3.9/site-packages/distributed/comm/core.py in connect()
    282     while time_left() > 0:
    283         try:
--> 284             comm = await asyncio.wait_for(
    285                 connector.connect(loc, deserialize=deserialize, **connection_args),
    286                 timeout=min(intermediate_cap, time_left()),

/srv/conda/envs/notebook/lib/python3.9/asyncio/tasks.py in wait_for()
    479 
    480         if fut.done():
--> 481             return fut.result()
    482         else:
    483             fut.remove_done_callback(cb)

/srv/conda/envs/notebook/lib/python3.9/site-packages/dask_gateway/comm.py in connect()
     38         ctx = connection_args.get("ssl_context")
     39         if not isinstance(ctx, ssl.SSLContext):
---> 40             raise TypeError(
     41                 "Gateway expects a `ssl_context` argument of type "
     42                 "ssl.SSLContext, instead got %s" % ctx

TypeError: Gateway expects a `ssl_context` argument of type ssl.SSLContext, instead got None

@martindurant
Copy link
Member

@jcrist , I'm not sure the right way to contact you, but I could really use a primer on how to get up and running with local development so that I can, for example, add tests and docs against cases like this one (and see if worker-client is really an issue). I would be interested primarily in the local and kubernetes deployments, not yarn or HPC. If I can "download" from you, I should be able to handle much more of the workload in this repo.

@wdhowe
Copy link

wdhowe commented Mar 26, 2022

Has anyone had a chance to look further into this? It would be awesome to have this bug fixed. 👍

@wdhowe
Copy link

wdhowe commented Apr 6, 2022

For anyone else that finds themself here, compute on persisted data works on dask cloud provider: https://cloudprovider.dask.org/en/latest/index.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

6 participants