Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

permissions to write data to gcs from cluster workers #209

Closed
rabernat opened this issue Apr 11, 2018 · 27 comments
Closed

permissions to write data to gcs from cluster workers #209

rabernat opened this issue Apr 11, 2018 · 27 comments

Comments

@rabernat
Copy link
Member

I would like to create an xarray dataset from pangeo.pydata.org and write to to gcs from the distributed cluster.

For example:

from dask.distributed import Client, progress
from dask_kubernetes import KubeCluster
cluster = KubeCluster(n_workers=10)
client = Client(cluster)

# create some dataset
ds = xr.DataArray(np.random.rand(10,1000,1000),
                  dims=['time', 'y', 'x']
                  ).chunk({'time': 1}).to_dataset(name='foo')

# write it
gcs = gcsfs.GCSFileSystem() # which token should I use
gcsmap = gcsfs.mapping.GCSMap('pangeo-data/test/from-pangeo-cluster')
gcsmap.clear()
ds.to_zarr(gcsmap)

If I don't specify a token at all, or use token='cloud', I get this error immediately.

---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-9-760b0c395988> in <module>()
----> 1 ds.to_zarr(gcsmap)

/opt/conda/lib/python3.6/site-packages/xarray/core/dataset.py in to_zarr(self, store, mode, synchronizer, group, encoding)
   1163         from ..backends.api import to_zarr
   1164         return to_zarr(self, store=store, mode=mode, synchronizer=synchronizer,
-> 1165                        group=group, encoding=encoding)
   1166 
   1167     def __unicode__(self):

/opt/conda/lib/python3.6/site-packages/xarray/backends/api.py in to_zarr(dataset, store, mode, synchronizer, group, encoding)
    773     store = backends.ZarrStore.open_group(store=store, mode=mode,
    774                                           synchronizer=synchronizer,
--> 775                                           group=group, writer=None)
    776 
    777     # I think zarr stores should always be sync'd immediately

/opt/conda/lib/python3.6/site-packages/xarray/backends/zarr.py in open_group(cls, store, mode, synchronizer, group, writer)
    258                                       "#installation" % min_zarr)
    259         zarr_group = zarr.open_group(store=store, mode=mode,
--> 260                                      synchronizer=synchronizer, path=group)
    261         return cls(zarr_group, writer=writer)
    262 

/opt/conda/lib/python3.6/site-packages/zarr/hierarchy.py in open_group(store, mode, cache_attrs, synchronizer, path)
   1128             err_contains_group(path)
   1129         else:
-> 1130             init_group(store, path=path)
   1131 
   1132     # determine read only status

/opt/conda/lib/python3.6/site-packages/zarr/storage.py in init_group(store, overwrite, path, chunk_store)
    419     # initialise metadata
    420     _init_group_metadata(store=store, overwrite=overwrite, path=path,
--> 421                          chunk_store=chunk_store)
    422 
    423 

/opt/conda/lib/python3.6/site-packages/zarr/storage.py in _init_group_metadata(store, overwrite, path, chunk_store)
    440     meta = dict()
    441     key = _path_to_prefix(path) + group_meta_key
--> 442     store[key] = encode_group_metadata(meta)
    443 
    444 

~/.local/lib/python3.6/site-packages/gcsfs/mapping.py in __setitem__(self, key, value)
     73         key = self._key_to_str(key)
     74         with self.gcs.open(key, 'wb') as f:
---> 75             f.write(value)
     76 
     77     def keys(self):

<decorator-gen-165> in __exit__(self, *args)

~/.local/lib/python3.6/site-packages/gcsfs/core.py in _tracemethod(f, self, *args, **kwargs)
     49         logger.log(logging.DEBUG - 1, tb_io.getvalue())
     50 
---> 51     return f(self, *args, **kwargs)
     52 
     53 

~/.local/lib/python3.6/site-packages/gcsfs/core.py in __exit__(self, *args)
   1403     @_tracemethod
   1404     def __exit__(self, *args):
-> 1405         self.close()
   1406 
   1407 

<decorator-gen-162> in close(self)

~/.local/lib/python3.6/site-packages/gcsfs/core.py in _tracemethod(f, self, *args, **kwargs)
     49         logger.log(logging.DEBUG - 1, tb_io.getvalue())
     50 
---> 51     return f(self, *args, **kwargs)
     52 
     53 

~/.local/lib/python3.6/site-packages/gcsfs/core.py in close(self)
   1367         else:
   1368             if not self.forced:
-> 1369                 self.flush(force=True)
   1370             else:
   1371                 logger.debug("close with forced=True, bypassing final flush.")

<decorator-gen-157> in flush(self, force)

~/.local/lib/python3.6/site-packages/gcsfs/core.py in _tracemethod(f, self, *args, **kwargs)
     49         logger.log(logging.DEBUG - 1, tb_io.getvalue())
     50 
---> 51     return f(self, *args, **kwargs)
     52 
     53 

~/.local/lib/python3.6/site-packages/gcsfs/core.py in flush(self, force)
   1208             if force and self.buffer.tell() <= self.blocksize:
   1209                 # Force-write a buffer below blocksize with a single write
-> 1210                 self._simple_upload()
   1211             elif not force and self.buffer.tell() <= self.blocksize:
   1212                 # Defer initialization of multipart upload, *may* still

<decorator-gen-160> in _simple_upload(self)

~/.local/lib/python3.6/site-packages/gcsfs/core.py in _tracemethod(f, self, *args, **kwargs)
     49         logger.log(logging.DEBUG - 1, tb_io.getvalue())
     50 
---> 51     return f(self, *args, **kwargs)
     52 
     53 

~/.local/lib/python3.6/site-packages/gcsfs/core.py in _simple_upload(self)
   1291         r = self.gcsfs.session.post(
   1292             path, params={'uploadType': 'media', 'name': self.key}, data=data)
-> 1293         validate_response(r, path)
   1294         size, md5 = int(r.json()['size']), r.json()['md5Hash']
   1295         if self.consistency == 'size':

~/.local/lib/python3.6/site-packages/gcsfs/core.py in validate_response(r, path)
    156             raise FileNotFoundError(path)
    157         elif r.status_code == 403:
--> 158             raise IOError("Forbidden: %s\n%s" % (path, msg))
    159         elif "invalid" in m:
    160             raise ValueError("Bad Request: %s\n%s" % (path, msg))

OSError: Forbidden: https://www.googleapis.com/upload/storage/v1/b/pangeo-data/o
Insufficient Permission

If I use token='browser', I authenticate and things work from the notebook (the objects are created). But the writes from the workers don't work. These are the errors that come up.

distributed.scheduler - ERROR - error from worker tcp://10.21.237.34:39293: EOF when reading a line
distributed.scheduler - ERROR - error from worker tcp://10.21.237.34:39293: EOF when reading a line
distributed.scheduler - ERROR - error from worker tcp://10.21.237.34:39293: EOF when reading a line
distributed.scheduler - ERROR - error from worker tcp://10.21.237.34:39293: EOF when reading a line
distributed.scheduler - ERROR - error from worker tcp://10.21.237.34:39293: EOF when reading a line
distributed.scheduler - ERROR - error from worker tcp://10.21.237.34:39293: EOF when reading a line
distributed.utils - ERROR - EOF when reading a line
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/utils.py", line 238, in f
    result[0] = yield make_coro()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/distributed/client.py", line 1378, in _gather
    traceback)
  File "/opt/conda/lib/python3.6/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
  File "/opt/conda/lib/python3.6/site-packages/gcsfs/core.py", line 1011, in __setstate__
    self.connect(self.token)
  File "/opt/conda/lib/python3.6/site-packages/gcsfs/core.py", line 407, in connect
    self.__getattribute__('_connect_' + method)()
  File "/opt/conda/lib/python3.6/site-packages/gcsfs/core.py", line 379, in _connect_browser
    credentials = flow.run_console()
  File "/opt/conda/lib/python3.6/site-packages/google_auth_oauthlib/flow.py", line 362, in run_console
    code = input(authorization_code_message)
EOFError: EOF when reading a line
---------------------------------------------------------------------------
EOFError                                  Traceback (most recent call last)
<ipython-input-13-760b0c395988> in <module>()
----> 1 ds.to_zarr(gcsmap)

/opt/conda/lib/python3.6/site-packages/xarray/core/dataset.py in to_zarr(self, store, mode, synchronizer, group, encoding)
   1163         from ..backends.api import to_zarr
   1164         return to_zarr(self, store=store, mode=mode, synchronizer=synchronizer,
-> 1165                        group=group, encoding=encoding)
   1166 
   1167     def __unicode__(self):

/opt/conda/lib/python3.6/site-packages/xarray/backends/api.py in to_zarr(dataset, store, mode, synchronizer, group, encoding)
    777     # I think zarr stores should always be sync'd immediately
    778     # TODO: figure out how to properly handle unlimited_dims
--> 779     dataset.dump_to_store(store, sync=True, encoding=encoding)
    780     return store

/opt/conda/lib/python3.6/site-packages/xarray/core/dataset.py in dump_to_store(self, store, encoder, sync, encoding, unlimited_dims)
   1068                     unlimited_dims=unlimited_dims)
   1069         if sync:
-> 1070             store.sync()
   1071 
   1072     def to_netcdf(self, path=None, mode='w', format=None, group=None,

/opt/conda/lib/python3.6/site-packages/xarray/backends/zarr.py in sync(self)
    365 
    366     def sync(self):
--> 367         self.writer.sync()
    368 
    369 

/opt/conda/lib/python3.6/site-packages/xarray/backends/common.py in sync(self)
    268         if self.sources:
    269             import dask.array as da
--> 270             da.store(self.sources, self.targets, lock=self.lock)
    271             self.sources = []
    272             self.targets = []

/opt/conda/lib/python3.6/site-packages/dask/array/core.py in store(sources, targets, lock, regions, compute, return_stored, **kwargs)
    953 
    954         if compute:
--> 955             result.compute(**kwargs)
    956             return None
    957         else:

/opt/conda/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    153         dask.base.compute
    154         """
--> 155         (result,) = compute(self, traverse=False, **kwargs)
    156         return result
    157 

/opt/conda/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    402     postcomputes = [a.__dask_postcompute__() if is_dask_collection(a)
    403                     else (None, a) for a in args]
--> 404     results = get(dsk, keys, **kwargs)
    405     results_iter = iter(results)
    406     return tuple(a if f is None else f(next(results_iter), *a)

/opt/conda/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, **kwargs)
   2073             try:
   2074                 results = self.gather(packed, asynchronous=asynchronous,
-> 2075                                       direct=direct)
   2076             finally:
   2077                 for f in futures.values():

/opt/conda/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1498             return self.sync(self._gather, futures, errors=errors,
   1499                              direct=direct, local_worker=local_worker,
-> 1500                              asynchronous=asynchronous)
   1501 
   1502     @gen.coroutine

/opt/conda/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    613             return future
    614         else:
--> 615             return sync(self.loop, func, *args, **kwargs)
    616 
    617     def __repr__(self):

/opt/conda/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    252             e.wait(10)
    253     if error[0]:
--> 254         six.reraise(*error[0])
    255     else:
    256         return result[0]

/opt/conda/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/opt/conda/lib/python3.6/site-packages/distributed/utils.py in f()
    236             yield gen.moment
    237             thread_state.asynchronous = True
--> 238             result[0] = yield make_coro()
    239         except Exception as exc:
    240             logger.exception(exc)

/opt/conda/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1097 
   1098                     try:
-> 1099                         value = future.result()
   1100                     except Exception:
   1101                         self.had_exception = True

/opt/conda/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1105                     if exc_info is not None:
   1106                         try:
-> 1107                             yielded = self.gen.throw(*exc_info)
   1108                         finally:
   1109                             # Break up a reference to itself

/opt/conda/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1376                             six.reraise(type(exception),
   1377                                         exception,
-> 1378                                         traceback)
   1379                     if errors == 'skip':
   1380                         bad_keys.add(key)

/opt/conda/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

/opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py in loads()
     57 def loads(x):
     58     try:
---> 59         return pickle.loads(x)
     60     except Exception:
     61         logger.info("Failed to deserialize %s", x[:10000], exc_info=True)

/opt/conda/lib/python3.6/site-packages/gcsfs/core.py in __setstate__()
   1009     def __setstate__(self, state):
   1010         self.__dict__.update(state)
-> 1011         self.connect(self.token)
   1012 
   1013 

/opt/conda/lib/python3.6/site-packages/gcsfs/core.py in connect()
    405                     break
    406         else:
--> 407             self.__getattribute__('_connect_' + method)()
    408             self.method = method
    409 

/opt/conda/lib/python3.6/site-packages/gcsfs/core.py in _connect_browser()
    377     def _connect_browser(self):
    378         flow = InstalledAppFlow.from_client_config(client_config, [self.scope])
--> 379         credentials = flow.run_console()
    380         self.tokens[(self.project, self.access)] = credentials
    381         self._save_tokens()

/opt/conda/lib/python3.6/site-packages/google_auth_oauthlib/flow.py in run_console()
    360         print(authorization_prompt_message.format(url=auth_url))
    361 
--> 362         code = input(authorization_code_message)
    363 
    364         self.fetch_token(code=code)

EOFError: EOF when reading a line
Exception ignored in: <bound method GCSFile.__del__ of <GCSFile pangeo-data/test/from-pangeo-cluster/.zgroup>>
Traceback (most recent call last):
  File "<decorator-gen-163>", line 2, in __del__
  File "/home/jovyan/.local/lib/python3.6/site-packages/gcsfs/core.py", line 51, in _tracemethod
    return f(self, *args, **kwargs)
  File "/home/jovyan/.local/lib/python3.6/site-packages/gcsfs/core.py", line 1392, in __del__
    self.close()
  File "<decorator-gen-162>", line 2, in close
  File "/home/jovyan/.local/lib/python3.6/site-packages/gcsfs/core.py", line 51, in _tracemethod
    return f(self, *args, **kwargs)
  File "/home/jovyan/.local/lib/python3.6/site-packages/gcsfs/core.py", line 1369, in close
    self.flush(force=True)
  File "<decorator-gen-157>", line 2, in flush
  File "/home/jovyan/.local/lib/python3.6/site-packages/gcsfs/core.py", line 51, in _tracemethod
    return f(self, *args, **kwargs)
  File "/home/jovyan/.local/lib/python3.6/site-packages/gcsfs/core.py", line 1210, in flush
    self._simple_upload()
  File "<decorator-gen-160>", line 2, in _simple_upload
  File "/home/jovyan/.local/lib/python3.6/site-packages/gcsfs/core.py", line 51, in _tracemethod
    return f(self, *args, **kwargs)
  File "/home/jovyan/.local/lib/python3.6/site-packages/gcsfs/core.py", line 1293, in _simple_upload
    validate_response(r, path)
  File "/home/jovyan/.local/lib/python3.6/site-packages/gcsfs/core.py", line 158, in validate_response
    raise IOError("Forbidden: %s\n%s" % (path, msg))
OSError: Forbidden: https://www.googleapis.com/upload/storage/v1/b/pangeo-data/o
Insufficient Permission

If I use token='cache', (after having authenticated) again it works from from notebook but fails from the cluster.

distributed.scheduler - ERROR - error from worker tcp://10.21.237.34:39293: 'AuthorizedSession' object has no attribute 'credentials'
distributed.scheduler - ERROR - error from worker tcp://10.21.237.34:39293: 'AuthorizedSession' object has no attribute 'credentials'
distributed.scheduler - ERROR - error from worker tcp://10.21.237.34:39293: 'AuthorizedSession' object has no attribute 'credentials'
distributed.scheduler - ERROR - error from worker tcp://10.21.237.34:39293: 'AuthorizedSession' object has no attribute 'credentials'
distributed.scheduler - ERROR - error from worker tcp://10.21.237.34:39293: 'AuthorizedSession' object has no attribute 'credentials'
distributed.scheduler - ERROR - error from worker tcp://10.21.237.34:39293: 'AuthorizedSession' object has no attribute 'credentials'
distributed.scheduler - ERROR - error from worker tcp://10.21.237.34:39293: 'AuthorizedSession' object has no attribute 'credentials'
distributed.scheduler - ERROR - error from worker tcp://10.21.237.34:39293: 'AuthorizedSession' object has no attribute 'credentials'
distributed.utils - ERROR - 'AuthorizedSession' object has no attribute 'credentials'
Traceback (most recent call last):
  File "/opt/conda/lib/python3.6/site-packages/distributed/utils.py", line 238, in f
    result[0] = yield make_coro()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/opt/conda/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/conda/lib/python3.6/site-packages/distributed/client.py", line 1378, in _gather
    traceback)
  File "/opt/conda/lib/python3.6/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "/opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
  File "/opt/conda/lib/python3.6/site-packages/zarr/core.py", line 1930, in __setstate__
    self.__init__(*state)
  File "/opt/conda/lib/python3.6/site-packages/zarr/core.py", line 123, in __init__
    self._load_metadata()
  File "/opt/conda/lib/python3.6/site-packages/zarr/core.py", line 140, in _load_metadata
    self._load_metadata_nosync()
  File "/opt/conda/lib/python3.6/site-packages/zarr/core.py", line 149, in _load_metadata_nosync
    meta_bytes = self._store[mkey]
  File "/opt/conda/lib/python3.6/site-packages/gcsfs/mapping.py", line 66, in __getitem__
    with self.gcs.open(key, 'rb') as f:
  File "<decorator-gen-28>", line 2, in open
  File "/opt/conda/lib/python3.6/site-packages/gcsfs/core.py", line 51, in _tracemethod
    return f(self, *args, **kwargs)
  File "/opt/conda/lib/python3.6/site-packages/gcsfs/core.py", line 944, in open
    metadata=metadata)
  File "<decorator-gen-31>", line 2, in __init__
  File "/opt/conda/lib/python3.6/site-packages/gcsfs/core.py", line 51, in _tracemethod
    return f(self, *args, **kwargs)
  File "/opt/conda/lib/python3.6/site-packages/gcsfs/core.py", line 1068, in __init__
    self.details = gcsfs.info(path)
  File "<decorator-gen-17>", line 2, in info
  File "/opt/conda/lib/python3.6/site-packages/gcsfs/core.py", line 51, in _tracemethod
    return f(self, *args, **kwargs)
  File "/opt/conda/lib/python3.6/site-packages/gcsfs/core.py", line 802, in info
    return self._get_object(path)
  File "<decorator-gen-3>", line 2, in _get_object
  File "/opt/conda/lib/python3.6/site-packages/gcsfs/core.py", line 51, in _tracemethod
    return f(self, *args, **kwargs)
  File "/opt/conda/lib/python3.6/site-packages/gcsfs/core.py", line 492, in _get_object
    bucket, key))
  File "<decorator-gen-2>", line 2, in _call
  File "/opt/conda/lib/python3.6/site-packages/gcsfs/core.py", line 51, in _tracemethod
    return f(self, *args, **kwargs)
  File "/opt/conda/lib/python3.6/site-packages/gcsfs/core.py", line 431, in _call
    r = meth(self.base + path, params=kwargs, json=json)
  File "/opt/conda/lib/python3.6/site-packages/requests/sessions.py", line 521, in get
    return self.request('GET', url, **kwargs)
  File "/opt/conda/lib/python3.6/site-packages/google/auth/transport/requests.py", line 197, in request
    self.credentials.before_request(
AttributeError: 'AuthorizedSession' object has no attribute 'credentials'
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-15-760b0c395988> in <module>()
----> 1 ds.to_zarr(gcsmap)

/opt/conda/lib/python3.6/site-packages/xarray/core/dataset.py in to_zarr(self, store, mode, synchronizer, group, encoding)
   1163         from ..backends.api import to_zarr
   1164         return to_zarr(self, store=store, mode=mode, synchronizer=synchronizer,
-> 1165                        group=group, encoding=encoding)
   1166 
   1167     def __unicode__(self):

/opt/conda/lib/python3.6/site-packages/xarray/backends/api.py in to_zarr(dataset, store, mode, synchronizer, group, encoding)
    777     # I think zarr stores should always be sync'd immediately
    778     # TODO: figure out how to properly handle unlimited_dims
--> 779     dataset.dump_to_store(store, sync=True, encoding=encoding)
    780     return store

/opt/conda/lib/python3.6/site-packages/xarray/core/dataset.py in dump_to_store(self, store, encoder, sync, encoding, unlimited_dims)
   1068                     unlimited_dims=unlimited_dims)
   1069         if sync:
-> 1070             store.sync()
   1071 
   1072     def to_netcdf(self, path=None, mode='w', format=None, group=None,

/opt/conda/lib/python3.6/site-packages/xarray/backends/zarr.py in sync(self)
    365 
    366     def sync(self):
--> 367         self.writer.sync()
    368 
    369 

/opt/conda/lib/python3.6/site-packages/xarray/backends/common.py in sync(self)
    268         if self.sources:
    269             import dask.array as da
--> 270             da.store(self.sources, self.targets, lock=self.lock)
    271             self.sources = []
    272             self.targets = []

/opt/conda/lib/python3.6/site-packages/dask/array/core.py in store(sources, targets, lock, regions, compute, return_stored, **kwargs)
    953 
    954         if compute:
--> 955             result.compute(**kwargs)
    956             return None
    957         else:

/opt/conda/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    153         dask.base.compute
    154         """
--> 155         (result,) = compute(self, traverse=False, **kwargs)
    156         return result
    157 

/opt/conda/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    402     postcomputes = [a.__dask_postcompute__() if is_dask_collection(a)
    403                     else (None, a) for a in args]
--> 404     results = get(dsk, keys, **kwargs)
    405     results_iter = iter(results)
    406     return tuple(a if f is None else f(next(results_iter), *a)

/opt/conda/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, **kwargs)
   2073             try:
   2074                 results = self.gather(packed, asynchronous=asynchronous,
-> 2075                                       direct=direct)
   2076             finally:
   2077                 for f in futures.values():

/opt/conda/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1498             return self.sync(self._gather, futures, errors=errors,
   1499                              direct=direct, local_worker=local_worker,
-> 1500                              asynchronous=asynchronous)
   1501 
   1502     @gen.coroutine

/opt/conda/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    613             return future
    614         else:
--> 615             return sync(self.loop, func, *args, **kwargs)
    616 
    617     def __repr__(self):

/opt/conda/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    252             e.wait(10)
    253     if error[0]:
--> 254         six.reraise(*error[0])
    255     else:
    256         return result[0]

/opt/conda/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/opt/conda/lib/python3.6/site-packages/distributed/utils.py in f()
    236             yield gen.moment
    237             thread_state.asynchronous = True
--> 238             result[0] = yield make_coro()
    239         except Exception as exc:
    240             logger.exception(exc)

/opt/conda/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1097 
   1098                     try:
-> 1099                         value = future.result()
   1100                     except Exception:
   1101                         self.had_exception = True

/opt/conda/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1105                     if exc_info is not None:
   1106                         try:
-> 1107                             yielded = self.gen.throw(*exc_info)
   1108                         finally:
   1109                             # Break up a reference to itself

/opt/conda/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1376                             six.reraise(type(exception),
   1377                                         exception,
-> 1378                                         traceback)
   1379                     if errors == 'skip':
   1380                         bad_keys.add(key)

/opt/conda/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

/opt/conda/lib/python3.6/site-packages/distributed/protocol/pickle.py in loads()
     57 def loads(x):
     58     try:
---> 59         return pickle.loads(x)
     60     except Exception:
     61         logger.info("Failed to deserialize %s", x[:10000], exc_info=True)

/opt/conda/lib/python3.6/site-packages/zarr/core.py in __setstate__()
   1928 
   1929     def __setstate__(self, state):
-> 1930         self.__init__(*state)
   1931 
   1932     def _synchronized_op(self, f, *args, **kwargs):

/opt/conda/lib/python3.6/site-packages/zarr/core.py in __init__()
    121 
    122         # initialize metadata
--> 123         self._load_metadata()
    124 
    125         # initialize attributes

/opt/conda/lib/python3.6/site-packages/zarr/core.py in _load_metadata()
    138         """(Re)load metadata from store."""
    139         if self._synchronizer is None:
--> 140             self._load_metadata_nosync()
    141         else:
    142             mkey = self._key_prefix + array_meta_key

/opt/conda/lib/python3.6/site-packages/zarr/core.py in _load_metadata_nosync()
    147         try:
    148             mkey = self._key_prefix + array_meta_key
--> 149             meta_bytes = self._store[mkey]
    150         except KeyError:
    151             err_array_not_found(self._path)

/opt/conda/lib/python3.6/site-packages/gcsfs/mapping.py in __getitem__()
     64         key = self._key_to_str(key)
     65         try:
---> 66             with self.gcs.open(key, 'rb') as f:
     67                 result = f.read()
     68         except (IOError, OSError):

<decorator-gen-28> in open()

/opt/conda/lib/python3.6/site-packages/gcsfs/core.py in _tracemethod()
     49         logger.log(logging.DEBUG - 1, tb_io.getvalue())
     50 
---> 51     return f(self, *args, **kwargs)
     52 
     53 

/opt/conda/lib/python3.6/site-packages/gcsfs/core.py in open()
    942         if 'b' in mode:
    943             return GCSFile(self, path, mode, block_size, consistency=const,
--> 944                            metadata=metadata)
    945         else:
    946             mode = mode.replace('t', '') + 'b'

<decorator-gen-31> in __init__()

/opt/conda/lib/python3.6/site-packages/gcsfs/core.py in _tracemethod()
     49         logger.log(logging.DEBUG - 1, tb_io.getvalue())
     50 
---> 51     return f(self, *args, **kwargs)
     52 
     53 

/opt/conda/lib/python3.6/site-packages/gcsfs/core.py in __init__()
   1066             raise NotImplementedError('File mode not supported')
   1067         if mode == 'rb':
-> 1068             self.details = gcsfs.info(path)
   1069             self.size = self.details['size']
   1070         else:

<decorator-gen-17> in info()

/opt/conda/lib/python3.6/site-packages/gcsfs/core.py in _tracemethod()
     49         logger.log(logging.DEBUG - 1, tb_io.getvalue())
     50 
---> 51     return f(self, *args, **kwargs)
     52 
     53 

/opt/conda/lib/python3.6/site-packages/gcsfs/core.py in info()
    800 
    801         try:
--> 802             return self._get_object(path)
    803         except FileNotFoundError:
    804             logger.debug("info FileNotFound at path: %s", path)

<decorator-gen-3> in _get_object()

/opt/conda/lib/python3.6/site-packages/gcsfs/core.py in _tracemethod()
     49         logger.log(logging.DEBUG - 1, tb_io.getvalue())
     50 
---> 51     return f(self, *args, **kwargs)
     52 
     53 

/opt/conda/lib/python3.6/site-packages/gcsfs/core.py in _get_object()
    490 
    491         result = self._process_object(bucket, self._call('get', 'b/{}/o/{}',
--> 492                                                          bucket, key))
    493 
    494         return result

<decorator-gen-2> in _call()

/opt/conda/lib/python3.6/site-packages/gcsfs/core.py in _tracemethod()
     49         logger.log(logging.DEBUG - 1, tb_io.getvalue())
     50 
---> 51     return f(self, *args, **kwargs)
     52 
     53 

/opt/conda/lib/python3.6/site-packages/gcsfs/core.py in _call()
    429             try:
    430                 time.sleep(2**retry - 1)
--> 431                 r = meth(self.base + path, params=kwargs, json=json)
    432                 validate_response(r, path)
    433                 break

/opt/conda/lib/python3.6/site-packages/requests/sessions.py in get()
    519 
    520         kwargs.setdefault('allow_redirects', True)
--> 521         return self.request('GET', url, **kwargs)
    522 
    523     def options(self, url, **kwargs):

/opt/conda/lib/python3.6/site-packages/google/auth/transport/requests.py in request()
    195         request_headers = headers.copy() if headers is not None else {}
    196 
--> 197         self.credentials.before_request(
    198             self._auth_request, method, url, request_headers)
    199 

AttributeError: 'AuthorizedSession' object has no attribute 'credentials'

Very similar to fsspec/gcsfs#90.
cc @martindurant and @jgerardsimcock

@martindurant
Copy link
Contributor

In the latter case, you should first authenticate with the browser (without distributed) and send the file ~/.gcs_tokens to each worker, before using token='cache'. Note to also specify the project in the GCSFileSystem constructor.

@tjcrone
Copy link
Contributor

tjcrone commented Apr 12, 2018

cc @friedrichknuth

@tjcrone
Copy link
Contributor

tjcrone commented Apr 12, 2018

So is the best way to pass these tokens to the workers through the worker-template.yaml file? Or is there a better way? If this is the best way, would it go in an extraConfig section? And what would that look like?

@tjcrone
Copy link
Contributor

tjcrone commented Apr 12, 2018

Or maybe there is a way to set it up so that workers share the notebook home directory? Maybe that would cause all kinds of problems, but maybe not.

@rabernat
Copy link
Member Author

At this point, I feel like giving workers access to the same home directory as the notebook would solve multiple problems, including this one.

What are the technical and security barriers to doing this? What are the downsides?

@martindurant
Copy link
Contributor

Has anyone tried the fix in fsspec/gcsfs#91 (comment) ? Given the secure internal network, this might be the easy fix to gcsfs auth problems.

@kaipak
Copy link

kaipak commented Apr 12, 2018

I'm running into this issue as well. Have we come up with a good method of passing authentication tokens to workers?

@kaipak
Copy link

kaipak commented Apr 12, 2018

@martindurant I saw fsspec/gcsfs#91

Do you think you could clarify how this would work? I'm not sure where gcs.session.credentials comes from or how this shares tokens with workers. Would be helpful for the likes of me who is still getting up to speed with Dask and Kubernetes. Thanks!

@tjcrone
Copy link
Contributor

tjcrone commented Apr 12, 2018

Echoing @rabernat, my view is that getting read, and maybe even write access to the notebook /home/jovyan directory for the worker would be worth a try. I believe, but I'm not certain, that workers launch as root, so that is something that would likely need to be addressed. I have not had a chance to try this yet, but there is presumably a way to have the worker attach a pvc to the notebook volume.

@mrocklin
Copy link
Member

mrocklin commented Apr 12, 2018 via email

@rabernat
Copy link
Member Author

I'm not sure where gcs.session.credentials comes from or how this shares tokens with workers.

I think @martindurant is referring to my example, in which

gcs = gcsfs.GCSFileSystem(token=token)

gcs should have an attribute .session.credentials. What he says in fsspec/gcsfs#91 is

I think the following should do it: you should set up a gcsfs instance, and perform any operation on it (the first operation will cause the token refresh) and then
token=gcs.session.credentials
in storage parameters (be sure to also explicitly give the project when you do this).

More explicitly, I think what he means is something like this

# create a GCSFileSystem just for the purpose of authentication
gcs_orig = gcsfs.GCSFileSystem(project='pangeo-181919', token='browser)
# create another one with those credentials
gcs =  gcsfs.GCSFileSystem(project='pangeo-181919', token=gcs_orig.session.credentials)
# now use this to open the mapping to the zarr store
gcsmap = gcsfs.mapping.GCSMap('bucket/path/to/store', gcs=gcs)

@tjcrone
Copy link
Contributor

tjcrone commented Apr 12, 2018

Uptdate: I just tried @martindurant's suggestion to create a GCSFileSystem object using token='cache' or token='browser', and then rewrote the cell to instead pass the .session.credentials of this object back as the token to create another version of this object, and it worked when passed around to workers.

@tjcrone
Copy link
Contributor

tjcrone commented Apr 12, 2018

Right, what @rabernat said. And I can confirm that it works!

@martindurant
Copy link
Contributor

Hurray!

I could add a secure_serializing= keyword to GCSFileSystem, which when True will have the current behaviour (i.e., credentials are fetched from the workers and reauthenticated upon instantiation), but when False will enable this hack-like solution automatically.

@tjcrone
Copy link
Contributor

tjcrone commented Apr 12, 2018

Maybe it would be less hacky to have a way to create a .session.credentials object separately from creating a GCSFileSystem object, to have and to hold and to pass around as needed?

@rabernat
Copy link
Member Author

I could add a secure_serializing= keyword to GCSFileSystem, which when True will have the current behaviour (i.e., credentials are fetched from the workers and reauthenticated upon instantiation), but when False will enable this hack-like solution automatically.

Question about how the reauthentication currently works: Does it happen once per worker or once per task?

@martindurant
Copy link
Contributor

The intent was to provide GCSFileSystem.current() to return the most recent instance, if it exists. Looking now, I am not certain that this is ever called, so it may be that instances are not being reused. I'll get back to you - is this is fixing, then the fix could help a lot. I'll roll it in with the idea of insecure token passing, above.

@jacobtomlinson
Copy link
Member

This has progressed a bit since @mrocklin name checked me but here are a couple of comments.

Access to home directories from the workers would be ideal. The current volume type that is used for home directories only allows mounting in a single location, but changing to a different type could fix this. I am currently perusing this on our AWS cluster.

I was also under the impression that the workers run as jovyan, again on our cluster I'm certain of it as the workers use the same docker image as the notebook.

All that being said it looks like you've got a valid workaround!

@kaipak
Copy link

kaipak commented Apr 13, 2018

Uptdate: I just tried @martindurant's suggestion to create a GCSFileSystem object using token='cache' or token='browser', and then rewrote the cell to instead pass the .session.credentials of this object back as the token to create another version of this object, and it worked when passed around to workers.

Yep, this worked for me as well. It's a little convoluted, so I hope we can find a more long-term solution.

@tjcrone
Copy link
Contributor

tjcrone commented Apr 13, 2018

@jacobtomlinson, I believe the docker image fires up as the last user called in the Dockerfile. At least that's how it works when a docker image is "run" from the command line. So because the Pangeo worker Dockerfile finishes up as root, that's how the image starts. Is that not how things work when the image is launched by KubeCluster?

@jacobtomlinson
Copy link
Member

Ah interesting! I just had a look and I see the worker is using a different image to the notebook.

On our cluster we are using the notebook image for the workers too, so the user is jovyan. I don't see the benefit of having a separate worker image as the notebook image will be available on all nodes and it's a good way to ensure consistency.

@jgerardsimcock
Copy link

We ended installing google fuse in our docker images, creating service account creds for bucket access and then passing those creds to the worker-template.yml and then have the workers write to gcs buckets via google fuse.

@mrocklin
Copy link
Member

mrocklin commented Apr 14, 2018 via email

@jgerardsimcock
Copy link

screen shot 2018-04-13 at 3 10 26 pm

@mrocklin
Copy link
Member

mrocklin commented Apr 14, 2018 via email

@stale
Copy link

stale bot commented Jun 25, 2018

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale label Jun 25, 2018
@stale
Copy link

stale bot commented Jul 2, 2018

This issue has been automatically closed because it had not seen recent activity. The issue can always be reopened at a later date.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants