Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk write speed using zarr and GCS #619

Closed
skgbanga opened this issue Sep 27, 2020 · 13 comments
Closed

Bulk write speed using zarr and GCS #619

skgbanga opened this issue Sep 27, 2020 · 13 comments
Milestone

Comments

@skgbanga
Copy link

skgbanga commented Sep 27, 2020

This question is similar to #595, but instead of streaming, it concerns with the zarr array write using GCS as a store backend.

Consider the following code:

import pathlib
import time
from contextlib import contextmanager

import gcsfs
import zarr
import numpy as np

from my_helpers import credentials, project

_bucketname = pathlib.Path("my_bucket")

@contextmanager
def time_this(msg):
    start = time.time()
    try:
        yield
    finally:
        taken = time.time() - start
        print(f"Time taken for {msg}: {taken:0.6f}")

def generate(name):
    gcs = gcsfs.GCSFileSystem(project=project(), token=credentials())
    store = gcsfs.GCSMap(_bucketname / "zarr-test/data.zarr", gcs=gcs, check=False)
    root = zarr.group(store=store)

    gig1 = 1024 * 1024 * 1024
    dtype = np.dtype("float")
    items = gig1 // dtype.itemsize
    z = root.create(name, shape=items, chunks=items, dtype=dtype, overwrite=True)

    data = np.random.rand(items)

    with time_this("upload"):
        z[:] = data

I am creating a 1G numpy array and assigning that to a zarr array (which has the same shape and only 1 chunk of the same size).

When I run the above code, I get the following numbers:

$ python storage.py
Time taken for upload: 21.159390

Compare this time with manually uploading the zarr file (created with store as DirectoryStore) directly:

$ time gsutil cp -r data.zarr gs://my_bucket/zarr-test
Copying file://data.zarr/.zgroup [Content-Type=application/octet-stream]...
Copying file://data.zarr/large/.zarray [Content-Type=application/octet-stream]...
Copying file://data.zarr/large/0 [Content-Type=application/octet-stream]...     

/ [3 files][  1.0 GiB/  1.0 GiB]                                                
Operation completed over 3 objects/1.0 GiB.                                      

real    0m9.061s
user    0m6.047s
sys     0m2.699s

This is surprising.

To make the comparison more 'fair', I used google's python api to upload this chunk of data using following code:

from google.cloud import storage

def pyapi_upload():
    storageclient = storage.Client(project=project(), credentials=credentials())
    bucket = storageclient.get_bucket(str(_bucketname))

    with time_this("upload"):
        for path in pathlib.Path('data.zarr').rglob('*'):
            if path.is_file():
                with open(path, "rb") as f:
                    data = f.read()

                blob = bucket.blob(f"zarr-test/{path}")
                blob.upload_from_string(data, content_type="application/octet-stream")

Running the above gives the following result:

$ python storage.py
Time taken for upload: 11.151875

To summarize:

  • uploading directly via gsutil: ~9secs
  • uploading via gcloud python api: ~11secs
  • assigning via zarr/gcsfs: ~21 secs

@rabernat Did you ever observe something similar?

@skgbanga skgbanga reopened this Sep 28, 2020
@skgbanga
Copy link
Author

skgbanga commented Sep 28, 2020

@rabernat I noticed that you have a PR (#252) which implements a version of GCSStore using google storage api!

After noticing the performance difference, I took the important bits from that PR to write this GCSStore.

class GCSStore(MutableMapping):
    def __init__(self, bucket, prefix):
        client = storage.Client(project=project(), credentials=credentials())
        self.bucket = storage.Bucket(client, bucket)
        if not prefix.endswith("/"):
            prefix = prefix + "/"
        self.prefix = prefix

    def _full_name(self, key):
        return f"{self.prefix}{key}"

    def __contains__(self, key):
        name = self._full_name(key)
        return self.bucket.get_blob(name) is not None

    def __iter__(self):
        blobs = self.bucket.list_blobs(prefix=self.prefix)
        for blob in blobs:
            yield blob.name[len(self.prefix) :]

    def __setitem__(self, key, value):
        name = self._full_name(key)
        blob = self.bucket.blob(name, chunk_size=hsize("1gb"))
        blob.upload_from_string(value, content_type="application/octet-stream")

    def __len__(self):
        iterator = self.bucket.list_blobs(prefix=self.prefix)
        return sum(1 for _ in iterator)

    def __getitem__(self, key):
        name = self._full_name(key)
        blob = self.bucket.get_blob(name)
        if not blob:
            raise KeyError("Blob {name} not found")

        return blob.download_as_bytes()

    def __delitem__(self, key):
        name = self._full_name(key)
        self.bucket.delete_blob(name)

Running the code with this change:

    store = GCSStore(str(_bucketname), "zarr-test/data.zarr")
    root = zarr.group(store=cache)

I get the following timings:

python storage.py
Time taken for upload: 11.510206

Since this is a significant speed up over gcsfs (which is taking ~21secs), I am going to continue using this in my project. (Note that I am passing 1G as the chunk size in my blob to get the best throughput for my data)

@martindurant I spent a significant part of the afternoon to go over the code in gcfs (and fsspec), and I think some of the abstractions there are fantastic. I think the majority of the time spent is in this call (note sure if there are knobs to make this faster)

                  async with self.session.request(
                      method=method,
                      url=path,
                      params=kwargs,
                      json=jsonin,
                      headers=headers,
                      data=datain,
                      timeout=self.requests_timeout,
                  ) as r:

I don't think this call is affected by chunk_size since this is a direct POST call. There is a block_size but that is an argument to the underlying AsyncFileSystem on when to flush the buffer. (This also brings me to another interesting point. Abstractions are not 0 cost. gcsfs first takes the user supplied data and copies it into a local buffer, and then after a certain point flushes that buffer to the underlying storage. This is the right choice for a user who is doing a lot of small writes (similar to how OS routinely caches the writes), but might not be great choice for a user who wants 'bare metal' (very liberal usage of this term) performance). However, cost of copying to this local buffer is minuscule/irrelevant as compared to the network write. (so I won't hesitate to switch back to gcsfs if the performance can be improved).

@martindurant
Copy link
Member

Whilst it can be good to have a specific and simple implementation that does the job well, I would prefer if we can backport the performance improvement to gcsfs, and I hope you can help with this, @skgbanga .
If you can do a complete profile of what is taking the time in gcsfs, that would be appreciated. I doubt its memory copies, unless you are near to capacity. Unfortunately, snakeviz at al. won't be able to see what's happening within async code.

Two points to note: zarr writes to the backend via setitem, which currently calls open/write, but ought to call pipe as follows:

--- a/fsspec/mapping.py
+++ b/fsspec/mapping.py
@@ -148,8 +148,7 @@ class FSMap(MutableMapping):
         """Store value in key"""
         key = self._key_to_str(key)
         self.fs.mkdirs(self.fs._parent(key), exist_ok=True)
-        with self.fs.open(key, "wb") as f:
-            f.write(value)
+        self.fs.pipe_file(key, value)

     def __iter__(self):
         return (self._str_to_key(x) for x in self.fs.find(self.root))

Secondly, see #606 for application of concurrent/async methods for zarr - but this is only read-only. The same thing should/will be done for writing too, and would help you.

Finally, it is a shame that GCS does not support chunkwise uploads - each new chunk can only start when the previous one is finished.

@martindurant
Copy link
Member

btw: the first thing Blob.upload_from_string does is string_buffer = BytesIO(data), so that it can be used with upload_from_file, i.e., a copy :|
The default chunksize for gcloud.storage is 100MB.

@skgbanga
Copy link
Author

@martindurant Thanks for the reply.

I will take a more detailed look, but when I was stepping through all the code, async with self.session.request was the function which was taking most time. (sorry, I don't have detailed stats right now, also this is my first experience with async).

on upload_from_string, omg! :(

One of the first thing I wanted to do was make fsspec default size as 100MB. With the default size, I was getting numbers ~14secs with GCSStore.

@martindurant
Copy link
Member

async with self.session.request was the function which was taking most time.

This suggests that it is not memory copies that are at fault, at least not within the gcsfs code.
I have not figured out yet which library gcloud actually uses for its transfer - it is all pretty convoluted.

@skgbanga
Copy link
Author

skgbanga commented Sep 28, 2020

Yeah, I was pretty sure it was not memory copies, that's why I mentioned this:

However, cost of copying to this local buffer is minuscule/irrelevant as compared to the network write. (so I won't hesitate to switch back to gcsfs if the performance can be improved).

I have a naive question though. Is there any reason you are not using google cloud storage python api in gcsfs core.py? (and are making direct request calls)

@martindurant
Copy link
Member

Originally, it was because the google code was so convoluted, a situation which has improved somewhat since. We find very valuable some features that can only be achieved with a separate implementation:

  • random access files (with a choice of caching mechanism); also enables transparent text mode and compression
  • pickleable objects and transaction and other fsspec goodies (these perhaps could be bolted onto a core that calls gcloud).

In addition, gcloud's dependencies are hard to manage, and we want as little direct hooks into their auth as possible.

I cannot immediately find any references to asyncio having performance issues for streaming an upload.

@skgbanga
Copy link
Author

I will do some more testing over the weekend.

@skgbanga
Copy link
Author

So I added time calls around this function:

                import time
                now = time.time()
                async with self.session.request(
                    method=method,
                    url=path,
                    params=kwargs,
                    json=jsonin,
                    headers=headers,
                    data=datain,
                    timeout=self.requests_timeout,
                ) as r:
                    spent = time.time() - now
                    print("Time spent in request call is", spent)

and can say that out of ~21 seconds, more than 18s are spent in this function. Unfortunately this is the first time I am looking at async stuff, so I don't think I will be able to help further :(

I think it might just be down to how the two methods are doing their network calls. For google storage API, I noticed this via strace:

write(4, ""..., 16413) = 16413                                                      
write(4, ""..., 16413) = 16413                                                             
...

For gcsfs, I noticed this: (doing strace is slighly insidious on this method because it uses threading)

[pid  1465] sendto(6, ""..., 1693739, 0, NULL, 0) = 583740
[pid  1465] epoll_wait(3, [{EPOLLOUT, {u32=6, u64=8589934598}}], 2, -1000) = 1
[pid  1465] sendto(6, ""..., 1109999, 0, NULL, 0) = 583740
[pid  1465] epoll_wait(3, [{EPOLLOUT, {u32=6, u64=8589934598}}], 2, -1000) = 1
[pid  1465] sendto(6, ""..., 526259, 0, NULL, 0) = 526259

No clue why asyncio http request method is calling epoll_wait every time it sends some data.

@martindurant
Copy link
Member

I don't think there's a way fo me to delve into the innards of aiohttp to change the network layer :| I may have a closer look at the gcloud code - I'm pretty surprised if it's using a nonstandard transport. I do notice that the gcsfs writes are apparently larger, so maybe wait happens in the gcloud stack too, but after many more writes - and the relative sizes of the network buffers might be important here.

Did you try after replacement of the write for pipe_file in __setitem__?
Also, I would appreciate if you tested the speed of pipe with multiple inputs, which will run concurrently. This is what zarr will eventually call, just as reading already does.

@skgbanga
Copy link
Author

hey @martindurant, sorry for the late reply.

I do notice that the gcsfs writes are apparently larger

I did notice that. Those writes are even more than 65K TCP size window, so there is definitely a heavy flow control going on. (I think I will do a tcp capture to verify that).

I will try to do that over the weekend.

@rabernat
Copy link
Contributor

Can we consider this issue resolved? It has become quite broad...

@skgbanga
Copy link
Author

Please consider this closed from my end. @martindurant Unfortunately, I haven't been able to work on your stuff.

@Carreau Carreau added this to the No-Milestone milestone Dec 1, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants