From 6fdba1dddf661e4811de1ef0c4e5e1f18d0ea9f3 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 17 Nov 2019 22:58:47 +0000 Subject: [PATCH 01/11] initial upload idea - fixes #1566 --- dvc/remote/gs.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/dvc/remote/gs.py b/dvc/remote/gs.py index c2fb4efc1c..7d7eefe9fd 100644 --- a/dvc/remote/gs.py +++ b/dvc/remote/gs.py @@ -3,12 +3,15 @@ import logging from datetime import timedelta from functools import wraps +import io + from funcy import cached_property from dvc.config import Config from dvc.exceptions import DvcException from dvc.path_info import CloudURLInfo +from dvc.progress import Tqdm from dvc.remote.base import RemoteBASE from dvc.scheme import Schemes from dvc.utils.compat import FileNotFoundError # skipcq: PYL-W0622 @@ -46,9 +49,20 @@ def wrapper(*args, **kwargs): @dynamic_chunk_size -def _upload_to_bucket(bucket, from_file, to_info, **kwargs): - blob = bucket.blob(to_info.path, **kwargs) - blob.upload_from_filename(from_file) +def _upload_to_bucket(bucket, from_file, to_info, chunk_size=None, **kwargs): + blob = bucket.blob(to_info.path, chunk_size=chunk_size, **kwargs) + with Tqdm() as pbar: + with io.open(from_file, mode="rb", buffering=chunk_size or -1) as fd: + raw_read = fd.read + + def read(self, size=chunk_size): + res = raw_read(size) + if res: + pbar.update(len(res)) + return res + + fd.read = read + blob.upload_from_file(fd) class RemoteGS(RemoteBASE): From 6b388edbe56adccb18f460e76a19a81787cf14e8 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 17 Nov 2019 23:19:19 +0000 Subject: [PATCH 02/11] py2/3 fix --- dvc/remote/gs.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dvc/remote/gs.py b/dvc/remote/gs.py index 7d7eefe9fd..fb3402492b 100644 --- a/dvc/remote/gs.py +++ b/dvc/remote/gs.py @@ -5,7 +5,6 @@ from functools import wraps import io - from funcy import cached_property from dvc.config import Config @@ -52,10 +51,10 @@ def wrapper(*args, **kwargs): def _upload_to_bucket(bucket, from_file, to_info, chunk_size=None, **kwargs): blob = bucket.blob(to_info.path, chunk_size=chunk_size, **kwargs) with Tqdm() as pbar: - with io.open(from_file, mode="rb", buffering=chunk_size or -1) as fd: + with io.open(from_file, mode="rb") as fd: raw_read = fd.read - def read(self, size=chunk_size): + def read(size=chunk_size): res = raw_read(size) if res: pbar.update(len(res)) From 1f442a4732ee1b2245003f9e044b01c8e23a7407 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Sun, 17 Nov 2019 23:29:45 +0000 Subject: [PATCH 03/11] gs download idea --- dvc/remote/gs.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/dvc/remote/gs.py b/dvc/remote/gs.py index fb3402492b..8584309350 100644 --- a/dvc/remote/gs.py +++ b/dvc/remote/gs.py @@ -143,7 +143,16 @@ def _upload(self, from_file, to_info, **_kwargs): def _download(self, from_info, to_file, **_kwargs): bucket = self.gs.bucket(from_info.bucket) blob = bucket.get_blob(from_info.path) - blob.download_to_filename(to_file) + with Tqdm() as pbar: + with io.open(to_file, mode="wb") as fd: + raw_write = fd.write + + def write(bytes): + raw_write(bytes) + pbar.update(len(bytes)) + + fd.write = write + blob.download_to_file(fd) def _generate_download_url(self, path_info, expires=3600): expiration = timedelta(seconds=int(expires)) From 321a46bd9137704451d97ebc4ef5faa9813edfd6 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Mon, 18 Nov 2019 01:45:34 +0000 Subject: [PATCH 04/11] add gs stats --- dvc/remote/gs.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/dvc/remote/gs.py b/dvc/remote/gs.py index 8584309350..99d1ab5701 100644 --- a/dvc/remote/gs.py +++ b/dvc/remote/gs.py @@ -4,6 +4,7 @@ from datetime import timedelta from functools import wraps import io +import os.path from funcy import cached_property @@ -50,7 +51,9 @@ def wrapper(*args, **kwargs): @dynamic_chunk_size def _upload_to_bucket(bucket, from_file, to_info, chunk_size=None, **kwargs): blob = bucket.blob(to_info.path, chunk_size=chunk_size, **kwargs) - with Tqdm() as pbar: + with Tqdm( + desc=to_info.path, total=os.path.getsize(from_file), bytes=True + ) as pbar: with io.open(from_file, mode="rb") as fd: raw_read = fd.read @@ -143,7 +146,7 @@ def _upload(self, from_file, to_info, **_kwargs): def _download(self, from_info, to_file, **_kwargs): bucket = self.gs.bucket(from_info.bucket) blob = bucket.get_blob(from_info.path) - with Tqdm() as pbar: + with Tqdm(desc=from_info.path, total=blob.size, bytes=True) as pbar: with io.open(to_file, mode="wb") as fd: raw_write = fd.write From 32c7b605bf6e0ac366c2c73f91d9bc0a6687cf49 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Mon, 18 Nov 2019 01:53:18 +0000 Subject: [PATCH 05/11] reduce gs upload chunk size --- dvc/remote/gs.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/dvc/remote/gs.py b/dvc/remote/gs.py index 99d1ab5701..085054d54a 100644 --- a/dvc/remote/gs.py +++ b/dvc/remote/gs.py @@ -1,4 +1,4 @@ -from __future__ import unicode_literals +from __future__ import unicode_literals, division import logging from datetime import timedelta @@ -25,23 +25,26 @@ def wrapper(*args, **kwargs): import requests from google.cloud.storage.blob import Blob, _DEFAULT_CHUNKSIZE - # Default chunk size for gs is 100M, which might be too much for - # particular network (see [1]). So if we are getting ConnectionError, - # we should try lowering the chunk size until we reach the minimum - # allowed chunk size of 256K. Also note that `chunk_size` must be a - # multiple of 256K per the API specification. + # `ConnectionError` may be due to too large `chunk_size` + # (see [#2572]) so try halving on error. + # Note: default 100M is too large for fine-grained progress + # so 10M is the starting default. + # Note: minimum 256K. + # Note: must be multiple of 256K. # - # [1] https://github.com/iterative/dvc/issues/2572 + # [#2572]: https://github.com/iterative/dvc/issues/2572 # skipcq: PYL-W0212 - multiplier = int(_DEFAULT_CHUNKSIZE / Blob._CHUNK_SIZE_MULTIPLE) + multiplier = int( + _DEFAULT_CHUNKSIZE / (10.0 * Blob._CHUNK_SIZE_MULTIPLE) + ) while True: try: # skipcq: PYL-W0212 chunk_size = Blob._CHUNK_SIZE_MULTIPLE * multiplier return func(*args, chunk_size=chunk_size, **kwargs) except requests.exceptions.ConnectionError: - multiplier = int(multiplier / 2) + multiplier = multiplier // 2 if not multiplier: raise From 934850f23c707e33e2c7ff68f22df12af132fac8 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Mon, 18 Nov 2019 02:10:27 +0000 Subject: [PATCH 06/11] neaten default chunks Fixes https://github.com/iterative/dvc/pull/2809#discussion_r347178103 --- dvc/remote/gs.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/dvc/remote/gs.py b/dvc/remote/gs.py index 085054d54a..3091c628dc 100644 --- a/dvc/remote/gs.py +++ b/dvc/remote/gs.py @@ -23,28 +23,24 @@ def dynamic_chunk_size(func): @wraps(func) def wrapper(*args, **kwargs): import requests - from google.cloud.storage.blob import Blob, _DEFAULT_CHUNKSIZE + from google.cloud.storage.blob import _DEFAULT_CHUNKSIZE # `ConnectionError` may be due to too large `chunk_size` # (see [#2572]) so try halving on error. - # Note: default 100M is too large for fine-grained progress - # so 10M is the starting default. - # Note: minimum 256K. + # Note: start with 40 * [default: 256K] = 10M. # Note: must be multiple of 256K. # # [#2572]: https://github.com/iterative/dvc/issues/2572 # skipcq: PYL-W0212 - multiplier = int( - _DEFAULT_CHUNKSIZE / (10.0 * Blob._CHUNK_SIZE_MULTIPLE) - ) + multiplier = 40 while True: try: # skipcq: PYL-W0212 - chunk_size = Blob._CHUNK_SIZE_MULTIPLE * multiplier + chunk_size = _DEFAULT_CHUNKSIZE * multiplier return func(*args, chunk_size=chunk_size, **kwargs) except requests.exceptions.ConnectionError: - multiplier = multiplier // 2 + multiplier //= 2 if not multiplier: raise From eb5a0a92cea7a404e9b3a78d16a053ff86e26a87 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Tue, 19 Nov 2019 13:57:23 +0000 Subject: [PATCH 07/11] fix gs chunks Fixes https://github.com/iterative/dvc/pull/2809#discussion_r347835334 --- dvc/remote/gs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dvc/remote/gs.py b/dvc/remote/gs.py index 3091c628dc..b27a837409 100644 --- a/dvc/remote/gs.py +++ b/dvc/remote/gs.py @@ -17,13 +17,13 @@ from dvc.utils.compat import FileNotFoundError # skipcq: PYL-W0622 logger = logging.getLogger(__name__) +MIN_CHUNKSIZE = 256 * 1024 def dynamic_chunk_size(func): @wraps(func) def wrapper(*args, **kwargs): import requests - from google.cloud.storage.blob import _DEFAULT_CHUNKSIZE # `ConnectionError` may be due to too large `chunk_size` # (see [#2572]) so try halving on error. @@ -37,7 +37,7 @@ def wrapper(*args, **kwargs): while True: try: # skipcq: PYL-W0212 - chunk_size = _DEFAULT_CHUNKSIZE * multiplier + chunk_size = MIN_CHUNKSIZE * multiplier return func(*args, chunk_size=chunk_size, **kwargs) except requests.exceptions.ConnectionError: multiplier //= 2 From 5eefacdf2c2e08c765dd8556b5780c77f31e8d62 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Tue, 19 Nov 2019 14:39:31 +0000 Subject: [PATCH 08/11] back to gs multiple Fixes https://github.com/iterative/dvc/pull/2809#discussion_r347954033 --- dvc/remote/gs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dvc/remote/gs.py b/dvc/remote/gs.py index b27a837409..45dc6b7bdf 100644 --- a/dvc/remote/gs.py +++ b/dvc/remote/gs.py @@ -17,13 +17,13 @@ from dvc.utils.compat import FileNotFoundError # skipcq: PYL-W0622 logger = logging.getLogger(__name__) -MIN_CHUNKSIZE = 256 * 1024 def dynamic_chunk_size(func): @wraps(func) def wrapper(*args, **kwargs): import requests + from google.cloud.storage.blob import Blob # `ConnectionError` may be due to too large `chunk_size` # (see [#2572]) so try halving on error. @@ -37,7 +37,7 @@ def wrapper(*args, **kwargs): while True: try: # skipcq: PYL-W0212 - chunk_size = MIN_CHUNKSIZE * multiplier + chunk_size = Blob._CHUNK_SIZE_MULTIPLE * multiplier return func(*args, chunk_size=chunk_size, **kwargs) except requests.exceptions.ConnectionError: multiplier //= 2 From 28b99dc4c78a7bc4d80b09951615a4c73644545f Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Tue, 19 Nov 2019 16:19:54 +0000 Subject: [PATCH 09/11] expose and use name, no_progress_bar Fixes https://github.com/iterative/dvc/pull/2809#discussion_r347180388 --- dvc/remote/gs.py | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/dvc/remote/gs.py b/dvc/remote/gs.py index 45dc6b7bdf..d4a59cfb52 100644 --- a/dvc/remote/gs.py +++ b/dvc/remote/gs.py @@ -48,10 +48,20 @@ def wrapper(*args, **kwargs): @dynamic_chunk_size -def _upload_to_bucket(bucket, from_file, to_info, chunk_size=None, **kwargs): - blob = bucket.blob(to_info.path, chunk_size=chunk_size, **kwargs) +def _upload_to_bucket( + bucket, + from_file, + to_info, + chunk_size=None, + name=None, + no_progress_bar=True, +): + blob = bucket.blob(to_info.path, chunk_size=chunk_size) with Tqdm( - desc=to_info.path, total=os.path.getsize(from_file), bytes=True + desc=name or to_info.path, + total=os.path.getsize(from_file), + bytes=True, + disable=no_progress_bar, ) as pbar: with io.open(from_file, mode="rb") as fd: raw_read = fd.read @@ -138,14 +148,25 @@ def exists(self, path_info): paths = set(self._list_paths(path_info.bucket, path_info.path)) return any(path_info.path == path for path in paths) - def _upload(self, from_file, to_info, **_kwargs): + def _upload(self, from_file, to_info, name=None, no_progress_bar=True): bucket = self.gs.bucket(to_info.bucket) - _upload_to_bucket(bucket, from_file, to_info) + _upload_to_bucket( + bucket, + from_file, + to_info, + name=name, + no_progress_bar=no_progress_bar, + ) - def _download(self, from_info, to_file, **_kwargs): + def _download(self, from_info, to_file, name=None, no_progress_bar=True): bucket = self.gs.bucket(from_info.bucket) blob = bucket.get_blob(from_info.path) - with Tqdm(desc=from_info.path, total=blob.size, bytes=True) as pbar: + with Tqdm( + desc=name or from_info.path, + total=blob.size, + bytes=True, + disable=no_progress_bar, + ) as pbar: with io.open(to_file, mode="wb") as fd: raw_write = fd.write From b811a773c69f2e3710a9ccff15c5a546fedbfdb7 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Tue, 19 Nov 2019 17:44:01 +0000 Subject: [PATCH 10/11] update dynamic chunk tests --- tests/unit/remote/test_gs.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/tests/unit/remote/test_gs.py b/tests/unit/remote/test_gs.py index 817b64c595..62923e41e2 100644 --- a/tests/unit/remote/test_gs.py +++ b/tests/unit/remote/test_gs.py @@ -53,14 +53,4 @@ def upload(chunk_size=None): with pytest.raises(requests.exceptions.ConnectionError): upload() - assert chunk_sizes == [ - 104857600, - 52428800, - 26214400, - 13107200, - 6553600, - 3145728, - 1572864, - 786432, - 262144, - ] + assert chunk_sizes == [10485760, 5242880, 2621440, 1310720, 524288, 262144] From b2f7c841ef674f67c77dd2ed0ebb8797cfccd986 Mon Sep 17 00:00:00 2001 From: Casper da Costa-Luis Date: Tue, 19 Nov 2019 17:49:32 +0000 Subject: [PATCH 11/11] fix deepsource issues --- dvc/remote/gs.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/dvc/remote/gs.py b/dvc/remote/gs.py index d4a59cfb52..0ba106dbdb 100644 --- a/dvc/remote/gs.py +++ b/dvc/remote/gs.py @@ -63,8 +63,8 @@ def _upload_to_bucket( bytes=True, disable=no_progress_bar, ) as pbar: - with io.open(from_file, mode="rb") as fd: - raw_read = fd.read + with io.open(from_file, mode="rb") as fobj: + raw_read = fobj.read def read(size=chunk_size): res = raw_read(size) @@ -72,8 +72,8 @@ def read(size=chunk_size): pbar.update(len(res)) return res - fd.read = read - blob.upload_from_file(fd) + fobj.read = read + blob.upload_from_file(fobj) class RemoteGS(RemoteBASE): @@ -167,15 +167,15 @@ def _download(self, from_info, to_file, name=None, no_progress_bar=True): bytes=True, disable=no_progress_bar, ) as pbar: - with io.open(to_file, mode="wb") as fd: - raw_write = fd.write + with io.open(to_file, mode="wb") as fobj: + raw_write = fobj.write - def write(bytes): - raw_write(bytes) - pbar.update(len(bytes)) + def write(byte_string): + raw_write(byte_string) + pbar.update(len(byte_string)) - fd.write = write - blob.download_to_file(fd) + fobj.write = write + blob.download_to_file(fobj) def _generate_download_url(self, path_info, expires=3600): expiration = timedelta(seconds=int(expires))