Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GS progress for push & pull #2809

Merged
merged 11 commits into from
Nov 21, 2019
54 changes: 39 additions & 15 deletions dvc/remote/gs.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,69 @@
from __future__ import unicode_literals
from __future__ import unicode_literals, division

import logging
from datetime import timedelta
from functools import wraps
import io
import os.path

from funcy import cached_property

from dvc.config import Config
from dvc.exceptions import DvcException
from dvc.path_info import CloudURLInfo
from dvc.progress import Tqdm
from dvc.remote.base import RemoteBASE
from dvc.scheme import Schemes
from dvc.utils.compat import FileNotFoundError # skipcq: PYL-W0622

logger = logging.getLogger(__name__)
MIN_CHUNKSIZE = 256 * 1024
casperdcl marked this conversation as resolved.
Show resolved Hide resolved


def dynamic_chunk_size(func):
@wraps(func)
def wrapper(*args, **kwargs):
import requests
from google.cloud.storage.blob import Blob, _DEFAULT_CHUNKSIZE

# Default chunk size for gs is 100M, which might be too much for
# particular network (see [1]). So if we are getting ConnectionError,
# we should try lowering the chunk size until we reach the minimum
# allowed chunk size of 256K. Also note that `chunk_size` must be a
# multiple of 256K per the API specification.
# `ConnectionError` may be due to too large `chunk_size`
# (see [#2572]) so try halving on error.
# Note: start with 40 * [default: 256K] = 10M.
# Note: must be multiple of 256K.
#
# [1] https://github.com/iterative/dvc/issues/2572
# [#2572]: https://github.com/iterative/dvc/issues/2572

# skipcq: PYL-W0212
multiplier = int(_DEFAULT_CHUNKSIZE / Blob._CHUNK_SIZE_MULTIPLE)
multiplier = 40
casperdcl marked this conversation as resolved.
Show resolved Hide resolved
while True:
try:
# skipcq: PYL-W0212
chunk_size = Blob._CHUNK_SIZE_MULTIPLE * multiplier
chunk_size = MIN_CHUNKSIZE * multiplier
return func(*args, chunk_size=chunk_size, **kwargs)
except requests.exceptions.ConnectionError:
multiplier = int(multiplier / 2)
multiplier //= 2
if not multiplier:
raise

return wrapper


@dynamic_chunk_size
def _upload_to_bucket(bucket, from_file, to_info, **kwargs):
blob = bucket.blob(to_info.path, **kwargs)
blob.upload_from_filename(from_file)
def _upload_to_bucket(bucket, from_file, to_info, chunk_size=None, **kwargs):
blob = bucket.blob(to_info.path, chunk_size=chunk_size, **kwargs)
with Tqdm(
desc=to_info.path, total=os.path.getsize(from_file), bytes=True
casperdcl marked this conversation as resolved.
Show resolved Hide resolved
) as pbar:
with io.open(from_file, mode="rb") as fd:
raw_read = fd.read

def read(size=chunk_size):
res = raw_read(size)
if res:
pbar.update(len(res))
return res

fd.read = read
casperdcl marked this conversation as resolved.
Show resolved Hide resolved
blob.upload_from_file(fd)


class RemoteGS(RemoteBASE):
Expand Down Expand Up @@ -130,7 +145,16 @@ def _upload(self, from_file, to_info, **_kwargs):
def _download(self, from_info, to_file, **_kwargs):
bucket = self.gs.bucket(from_info.bucket)
blob = bucket.get_blob(from_info.path)
blob.download_to_filename(to_file)
with Tqdm(desc=from_info.path, total=blob.size, bytes=True) as pbar:
casperdcl marked this conversation as resolved.
Show resolved Hide resolved
with io.open(to_file, mode="wb") as fd:
raw_write = fd.write

def write(bytes):
raw_write(bytes)
pbar.update(len(bytes))

fd.write = write
casperdcl marked this conversation as resolved.
Show resolved Hide resolved
blob.download_to_file(fd)

def _generate_download_url(self, path_info, expires=3600):
expiration = timedelta(seconds=int(expires))
Expand Down