From 8598f013195780be7d846a750e92de17be203ecd Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Fri, 5 May 2017 12:53:39 -0700 Subject: [PATCH] Add back support for retries in storage uploads. Also clean up a lot of the documentation for the storage blob module. --- storage/google/cloud/storage/blob.py | 167 +++++++++++++++++---------- storage/setup.py | 2 +- storage/tests/unit/test_blob.py | 52 ++++++--- 3 files changed, 148 insertions(+), 73 deletions(-) diff --git a/storage/google/cloud/storage/blob.py b/storage/google/cloud/storage/blob.py index 0b26781b06a42..eb57c34c6ebb0 100644 --- a/storage/google/cloud/storage/blob.py +++ b/storage/google/cloud/storage/blob.py @@ -14,7 +14,16 @@ # pylint: disable=too-many-lines -"""Create / interact with Google Cloud Storage blobs.""" +"""Create / interact with Google Cloud Storage blobs. + +.. _API reference docs: https://cloud.google.com/storage/docs/\ + json_api/v1/objects +.. _customer-supplied: https://cloud.google.com/storage/docs/\ + encryption#customer-supplied +.. _google-resumable-media: https://googlecloudplatform.github.io/\ + google-resumable-media-python/latest/\ + google.resumable_media.requests.html +""" import base64 import copy @@ -71,12 +80,13 @@ 'storageClass', ) _NUM_RETRIES_MESSAGE = ( - 'num_retries is no longer supported. When a transient error occurs, ' - 'such as a 429 Too Many Requests or 500 Internal Server Error, upload ' - 'requests will be automatically retried. Subsequent retries will be ' - 'done after waiting 1, 2, 4, 8, etc. seconds (exponential backoff) until ' - '10 minutes of wait time have elapsed. At that point, there will be no ' - 'more attempts to retry.') + '`num_retries` has been deprecated and will be removed in a future ' + 'release. The default behavior (when `num_retries` is not specified) when ' + 'a transient error (e.g. 429 Too Many Requests or 500 Internal Server ' + 'Error) occurs will be as follows: upload requests will be automatically ' + 'retried. Subsequent retries will be sent after waiting 1, 2, 4, 8, etc. ' + 'seconds (exponential backoff) until 10 minutes of wait time have ' + 'elapsed. At that point, there will be no more attempts to retry.') _READ_LESS_THAN_SIZE = ( 'Size {:d} was specified but the file-like object only had ' '{:d} bytes remaining.') @@ -426,18 +436,20 @@ def download_to_file(self, file_obj, client=None): If the server-set property, :attr:`media_link`, is not yet initialized, makes an additional API request to load it. - Downloading a file that has been encrypted with a `customer-supplied`_ - encryption key: + Downloading a file that has been encrypted with a `customer-supplied`_ + encryption key: .. literalinclude:: storage_snippets.py :start-after: [START download_to_file] :end-before: [END download_to_file] + :dedent: 4 The ``encryption_key`` should be a str or bytes with a length of at least 32. - .. _customer-supplied: https://cloud.google.com/storage/docs/\ - encryption#customer-supplied + For more fine-grained over the download process, check out + `google-resumable-media`_. For example, this library allows + downloading **parts** of a blob rather than the whole thing. :type file_obj: file :param file_obj: A file handle to which to write the blob's data. @@ -530,8 +542,8 @@ def _get_writable_metadata(self): This is intended to be used when creating a new object / blob. - See the `API reference`_ for more information, the fields marked as - writable are: + See the `API reference docs`_ for more information, the fields + marked as writable are: * ``acl`` * ``cacheControl`` @@ -547,9 +559,6 @@ def _get_writable_metadata(self): For now, we don't support ``acl``, access control lists should be managed directly through :class:`ObjectACL` methods. - - .. _API reference: https://cloud.google.com/storage/\ - docs/json_api/v1/objects """ # NOTE: This assumes `self.name` is unicode. object_metadata = {'name': self.name} @@ -583,7 +592,8 @@ def _get_upload_arguments(self, content_type): content_type = self._get_content_type(content_type) return headers, object_metadata, content_type - def _do_multipart_upload(self, client, stream, content_type, size): + def _do_multipart_upload(self, client, stream, content_type, + size, num_retries): """Perform a multipart upload. Assumes ``chunk_size`` is :data:`None` on the current blob. @@ -610,6 +620,10 @@ def _do_multipart_upload(self, client, stream, content_type, size): from ``stream``). If not provided, the upload will be concluded once ``stream`` is exhausted (or :data:`None`). + :type num_retries: int + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) + :rtype: :class:`~requests.Response` :returns: The "200 OK" response object returned after the multipart upload request. @@ -631,13 +645,19 @@ def _do_multipart_upload(self, client, stream, content_type, size): upload_url = _MULTIPART_URL_TEMPLATE.format( bucket_path=self.bucket.path) upload = MultipartUpload(upload_url, headers=headers) + + if num_retries is not None: + upload._retry_strategy = resumable_media.RetryStrategy( + max_retries=num_retries) + response = upload.transmit( transport, data, object_metadata, content_type) return response def _initiate_resumable_upload(self, client, stream, content_type, - size, extra_headers=None, chunk_size=None): + size, num_retries, extra_headers=None, + chunk_size=None): """Initiate a resumable upload. The content type of the upload will be determined in order @@ -662,6 +682,10 @@ def _initiate_resumable_upload(self, client, stream, content_type, from ``stream``). If not provided, the upload will be concluded once ``stream`` is exhausted (or :data:`None`). + :type num_retries: int + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) + :type extra_headers: dict :param extra_headers: (Optional) Extra headers to add to standard headers. @@ -693,13 +717,19 @@ def _initiate_resumable_upload(self, client, stream, content_type, upload_url = _RESUMABLE_URL_TEMPLATE.format( bucket_path=self.bucket.path) upload = ResumableUpload(upload_url, chunk_size, headers=headers) + + if num_retries is not None: + upload._retry_strategy = resumable_media.RetryStrategy( + max_retries=num_retries) + upload.initiate( transport, stream, object_metadata, content_type, total_bytes=size, stream_final=False) return upload, transport - def _do_resumable_upload(self, client, stream, content_type, size): + def _do_resumable_upload(self, client, stream, content_type, + size, num_retries): """Perform a resumable upload. Assumes ``chunk_size`` is not :data:`None` on the current blob. @@ -726,19 +756,23 @@ def _do_resumable_upload(self, client, stream, content_type, size): from ``stream``). If not provided, the upload will be concluded once ``stream`` is exhausted (or :data:`None`). + :type num_retries: int + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) + :rtype: :class:`~requests.Response` :returns: The "200 OK" response object returned after the final chunk is uploaded. """ upload, transport = self._initiate_resumable_upload( - client, stream, content_type, size) + client, stream, content_type, size, num_retries) while not upload.finished: response = upload.transmit_next_chunk(transport) return response - def _do_upload(self, client, stream, content_type, size): + def _do_upload(self, client, stream, content_type, size, num_retries): """Determine an upload strategy and then perform the upload. If the current blob has a ``chunk_size`` set, then a resumable upload @@ -767,6 +801,10 @@ def _do_upload(self, client, stream, content_type, size): from ``stream``). If not provided, the upload will be concluded once ``stream`` is exhausted (or :data:`None`). + :type num_retries: int + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) + :rtype: dict :returns: The parsed JSON from the "200 OK" response. This will be the **only** response in the multipart case and it will be the @@ -774,10 +812,10 @@ def _do_upload(self, client, stream, content_type, size): """ if self.chunk_size is None: response = self._do_multipart_upload( - client, stream, content_type, size) + client, stream, content_type, size, num_retries) else: response = self._do_resumable_upload( - client, stream, content_type, size) + client, stream, content_type, size, num_retries) return response.json() @@ -798,22 +836,21 @@ def upload_from_file(self, file_obj, rewind=False, size=None, bucket. In the absence of those policies, upload will overwrite any existing contents. - See the `object versioning - `_ and - `lifecycle `_ - API documents for details. + See the `object versioning`_ and `lifecycle`_ API documents + for details. Uploading a file with a `customer-supplied`_ encryption key: .. literalinclude:: storage_snippets.py :start-after: [START upload_from_file] :end-before: [END upload_from_file] + :dedent: 4 The ``encryption_key`` should be a str or bytes with a length of at least 32. - .. _customer-supplied: https://cloud.google.com/storage/docs/\ - encryption#customer-supplied + For more fine-grained over the upload process, check out + `google-resumable-media`_. :type file_obj: file :param file_obj: A file handle open for reading. @@ -831,7 +868,8 @@ def upload_from_file(self, file_obj, rewind=False, size=None, :param content_type: Optional type of content being uploaded. :type num_retries: int - :param num_retries: Number of upload retries. (Deprecated.) + :param num_retries: Number of upload retries. (Deprecated: This + argument will be removed in a future release.) :type client: :class:`~google.cloud.storage.client.Client` :param client: (Optional) The client to use. If not passed, falls back @@ -839,6 +877,10 @@ def upload_from_file(self, file_obj, rewind=False, size=None, :raises: :class:`~google.cloud.exceptions.GoogleCloudError` if the upload response returns an error status. + + .. _object versioning: https://cloud.google.com/storage/\ + docs/object-versioning + .. _lifecycle: https://cloud.google.com/storage/docs/lifecycle """ if num_retries is not None: warnings.warn(_NUM_RETRIES_MESSAGE, DeprecationWarning) @@ -846,7 +888,7 @@ def upload_from_file(self, file_obj, rewind=False, size=None, _maybe_rewind(file_obj, rewind=rewind) try: created_json = self._do_upload( - client, file_obj, content_type, size) + client, file_obj, content_type, size, num_retries) self._set_properties(created_json) except resumable_media.InvalidResponse as exc: _raise_from_invalid_response(exc) @@ -941,8 +983,9 @@ def create_resumable_upload_session( access-controlled bucket. For more details, see the `documentation on signed URLs`_. - .. _documentation on signed URLs: https://cloud.google.com/storage\ - /docs/access-control/signed-urls#signing-resumable + .. _documentation on signed URLs: + https://cloud.google.com/storage/\ + docs/access-control/signed-urls#signing-resumable The content type of the upload will be determined in order of precedence: @@ -962,10 +1005,8 @@ def create_resumable_upload_session( `lifecycle `_ API documents for details. - If :attr:`encryption_key` is set, the blob will be `encrypted`_. - - .. _encrypted: https://cloud.google.com/storage/docs/\ - encryption#customer-supplied + If :attr:`encryption_key` is set, the blob will be encrypted with + a `customer-supplied`_ encryption key. :type size: int :param size: (Optional). The maximum number of bytes that can be @@ -1004,7 +1045,7 @@ def create_resumable_upload_session( # to the `ResumableUpload` constructor. The chunk size only # matters when **sending** bytes to an upload. upload, _ = self._initiate_resumable_upload( - client, dummy_stream, content_type, size, + client, dummy_stream, content_type, size, None, extra_headers=extra_headers, chunk_size=self._CHUNK_SIZE_MULTIPLE) @@ -1205,67 +1246,73 @@ def update_storage_class(self, new_class, client=None): cache_control = _scalar_property('cacheControl') """HTTP 'Cache-Control' header for this object. - See: https://tools.ietf.org/html/rfc7234#section-5.2 and - https://cloud.google.com/storage/docs/json_api/v1/objects + See: `RFC 7234`_ and `API reference docs`_. - If the property is not set locally, returns ``None``. + If the property is not set locally, returns :data:`None`. :rtype: str or ``NoneType`` + + .. _RFC 7234: https://tools.ietf.org/html/rfc7234#section-5.2 """ content_disposition = _scalar_property('contentDisposition') """HTTP 'Content-Disposition' header for this object. - See: https://tools.ietf.org/html/rfc6266 and - https://cloud.google.com/storage/docs/json_api/v1/objects + See: `RFC 6266`_ and `API reference docs`_. - If the property is not set locally, returns ``None``. + If the property is not set locally, returns :data:`None`. :rtype: str or ``NoneType`` + + .. _RFC 6266: https://tools.ietf.org/html/rfc7234#section-5.2 """ content_encoding = _scalar_property('contentEncoding') """HTTP 'Content-Encoding' header for this object. - See: https://tools.ietf.org/html/rfc7231#section-3.1.2.2 and - https://cloud.google.com/storage/docs/json_api/v1/objects + See: `RFC 7231`_ and `API reference docs`_. If the property is not set locally, returns ``None``. :rtype: str or ``NoneType`` + + .. _RFC 7231: https://tools.ietf.org/html/rfc7231#section-3.1.2.2 """ content_language = _scalar_property('contentLanguage') """HTTP 'Content-Language' header for this object. - See: https://tools.ietf.org/html/bcp47 and - https://cloud.google.com/storage/docs/json_api/v1/objects + See: `BCP47`_ and `API reference docs`_. - If the property is not set locally, returns ``None``. + If the property is not set locally, returns :data:`None`. :rtype: str or ``NoneType`` + + .. _BCP47: https://tools.ietf.org/html/bcp47 """ content_type = _scalar_property(_CONTENT_TYPE_FIELD) """HTTP 'Content-Type' header for this object. - See: https://tools.ietf.org/html/rfc2616#section-14.17 and - https://cloud.google.com/storage/docs/json_api/v1/objects + See: `RFC 2616`_ and `API reference docs`_. - If the property is not set locally, returns ``None``. + If the property is not set locally, returns :data:`None`. :rtype: str or ``NoneType`` + + .. _RFC 2616: https://tools.ietf.org/html/rfc2616#section-14.17 """ crc32c = _scalar_property('crc32c') """CRC32C checksum for this object. - See: https://tools.ietf.org/html/rfc4960#appendix-B and - https://cloud.google.com/storage/docs/json_api/v1/objects + See: `RFC 4960`_ and `API reference docs`_. - If the property is not set locally, returns ``None``. + If the property is not set locally, returns :data:`None`. :rtype: str or ``NoneType`` + + .. _RFC 4960: https://tools.ietf.org/html/rfc4960#appendix-B """ @property @@ -1287,11 +1334,12 @@ def component_count(self): def etag(self): """Retrieve the ETag for the object. - See: https://tools.ietf.org/html/rfc2616#section-3.11 and - https://cloud.google.com/storage/docs/json_api/v1/objects + See: `RFC 2616 (etags)`_ and `API reference docs`_. :rtype: str or ``NoneType`` :returns: The blob etag or ``None`` if the property is not set locally. + + .. _RFC 2616 (etags): https://tools.ietf.org/html/rfc2616#section-3.11 """ return self._properties.get('etag') @@ -1324,12 +1372,13 @@ def id(self): md5_hash = _scalar_property('md5Hash') """MD5 hash for this object. - See: https://tools.ietf.org/html/rfc4960#appendix-B and - https://cloud.google.com/storage/docs/json_api/v1/objects + See: `RFC 1321`_ and `API reference docs`_. If the property is not set locally, returns ``None``. :rtype: str or ``NoneType`` + + .. _RFC 1321: https://tools.ietf.org/html/rfc1321 """ @property diff --git a/storage/setup.py b/storage/setup.py index 88ebfcbe853e4..e261f6402c02d 100644 --- a/storage/setup.py +++ b/storage/setup.py @@ -53,7 +53,7 @@ REQUIREMENTS = [ 'google-cloud-core >= 0.24.1, < 0.25dev', 'google-auth >= 1.0.0', - 'google-resumable-media >= 0.1.0', + 'google-resumable-media >= 0.1.1', 'requests >= 2.0.0', ] diff --git a/storage/tests/unit/test_blob.py b/storage/tests/unit/test_blob.py index bbe67047fbff2..21443480b32f8 100644 --- a/storage/tests/unit/test_blob.py +++ b/storage/tests/unit/test_blob.py @@ -762,7 +762,8 @@ def _mock_transport(self, status_code, headers, content=b''): fake_transport.request.return_value = fake_response return fake_transport - def _do_multipart_success(self, mock_get_boundary, size=None): + def _do_multipart_success(self, mock_get_boundary, size=None, + num_retries=None): bucket = mock.Mock(path='/b/w00t', spec=[u'path']) blob = self._make_one(u'blob-name', bucket=bucket) self.assertIsNone(blob.chunk_size) @@ -777,7 +778,7 @@ def _do_multipart_success(self, mock_get_boundary, size=None): stream = io.BytesIO(data) content_type = u'application/xml' response = blob._do_multipart_upload( - client, stream, content_type, size) + client, stream, content_type, size, num_retries) # Check the mocks and the returned value. self.assertIs(response, fake_transport.request.return_value) @@ -817,6 +818,11 @@ def test__do_multipart_upload_no_size(self, mock_get_boundary): def test__do_multipart_upload_with_size(self, mock_get_boundary): self._do_multipart_success(mock_get_boundary, size=10) + @mock.patch(u'google.resumable_media._upload.get_boundary', + return_value=b'==0==') + def test__do_multipart_upload_with_retry(self, mock_get_boundary): + self._do_multipart_success(mock_get_boundary, num_retries=8) + def test__do_multipart_upload_bad_size(self): blob = self._make_one(u'blob-name', bucket=None) @@ -826,7 +832,7 @@ def test__do_multipart_upload_bad_size(self): self.assertGreater(size, len(data)) with self.assertRaises(ValueError) as exc_info: - blob._do_multipart_upload(None, stream, None, size) + blob._do_multipart_upload(None, stream, None, size, None) exc_contents = str(exc_info.exception) self.assertIn( @@ -834,7 +840,7 @@ def test__do_multipart_upload_bad_size(self): self.assertEqual(stream.tell(), len(data)) def _initiate_resumable_helper(self, size=None, extra_headers=None, - chunk_size=None): + chunk_size=None, num_retries=None): from google.resumable_media.requests import ResumableUpload bucket = mock.Mock(path='/b/whammy', spec=[u'path']) @@ -862,7 +868,7 @@ def _initiate_resumable_helper(self, size=None, extra_headers=None, stream = io.BytesIO(data) content_type = u'text/plain' upload, transport = blob._initiate_resumable_upload( - client, stream, content_type, size, + client, stream, content_type, size, num_retries, extra_headers=extra_headers, chunk_size=chunk_size) # Check the returned values. @@ -890,6 +896,14 @@ def _initiate_resumable_helper(self, size=None, extra_headers=None, self.assertEqual(upload._total_bytes, size) self.assertEqual(upload._content_type, content_type) self.assertEqual(upload.resumable_url, resumable_url) + retry_strategy = upload._retry_strategy + self.assertEqual(retry_strategy.max_sleep, 64.0) + if num_retries is None: + self.assertEqual(retry_strategy.max_cumulative_retry, 600.0) + self.assertIsNone(retry_strategy.max_retries) + else: + self.assertIsNone(retry_strategy.max_cumulative_retry) + self.assertEqual(retry_strategy.max_retries, num_retries) self.assertIs(transport, fake_transport) # Make sure we never read from the stream. self.assertEqual(stream.tell(), 0) @@ -923,6 +937,9 @@ def test__initiate_resumable_upload_with_extra_headers(self): extra_headers = {'origin': 'http://not-in-kansas-anymore.invalid'} self._initiate_resumable_helper(extra_headers=extra_headers) + def test__initiate_resumable_upload_with_retry(self): + self._initiate_resumable_helper(num_retries=11) + def _make_resumable_transport(self, headers1, headers2, headers3, total_bytes): from google import resumable_media @@ -990,7 +1007,7 @@ def _do_resumable_upload_call2(blob, content_type, data, return mock.call( 'PUT', resumable_url, data=payload, headers=expected_headers) - def _do_resumable_helper(self, use_size=False): + def _do_resumable_helper(self, use_size=False, num_retries=None): bucket = mock.Mock(path='/b/yesterday', spec=[u'path']) blob = self._make_one(u'blob-name', bucket=bucket) blob.chunk_size = blob._CHUNK_SIZE_MULTIPLE @@ -1017,7 +1034,7 @@ def _do_resumable_helper(self, use_size=False): stream = io.BytesIO(data) content_type = u'text/html' response = blob._do_resumable_upload( - client, stream, content_type, size) + client, stream, content_type, size, num_retries) # Check the returned values. self.assertIs(response, responses[2]) @@ -1039,7 +1056,10 @@ def test__do_resumable_upload_no_size(self): def test__do_resumable_upload_with_size(self): self._do_resumable_helper(use_size=True) - def _do_upload_helper(self, chunk_size=None): + def test__do_resumable_upload_with_retry(self): + self._do_resumable_helper(num_retries=6) + + def _do_upload_helper(self, chunk_size=None, num_retries=None): blob = self._make_one(u'blob-name', bucket=None) # Create a fake response. @@ -1061,17 +1081,18 @@ def _do_upload_helper(self, chunk_size=None): size = 12345654321 # Make the request and check the mocks. - created_json = blob._do_upload(client, stream, content_type, size) + created_json = blob._do_upload( + client, stream, content_type, size, num_retries) self.assertIs(created_json, mock.sentinel.json) response.json.assert_called_once_with() if chunk_size is None: blob._do_multipart_upload.assert_called_once_with( - client, stream, content_type, size) + client, stream, content_type, size, num_retries) blob._do_resumable_upload.assert_not_called() else: blob._do_multipart_upload.assert_not_called() blob._do_resumable_upload.assert_called_once_with( - client, stream, content_type, size) + client, stream, content_type, size, num_retries) def test__do_upload_without_chunk_size(self): self._do_upload_helper() @@ -1080,6 +1101,9 @@ def test__do_upload_with_chunk_size(self): chunk_size = 1024 * 1024 * 1024 # 1GB self._do_upload_helper(chunk_size=chunk_size) + def test__do_upload_with_retry(self): + self._do_upload_helper(num_retries=20) + def _upload_from_file_helper(self, side_effect=None, **kwargs): from google.cloud._helpers import UTC @@ -1109,8 +1133,9 @@ def _upload_from_file_helper(self, side_effect=None, **kwargs): self.assertEqual(blob.updated, new_updated) # Check the mock. + num_retries = kwargs.get('num_retries') blob._do_upload.assert_called_once_with( - client, stream, content_type, len(data)) + client, stream, content_type, len(data), num_retries) return stream @@ -1151,10 +1176,11 @@ def _do_upload_mock_call_helper(self, blob, client, content_type, size): mock_call = blob._do_upload.mock_calls[0] call_name, pos_args, kwargs = mock_call self.assertEqual(call_name, '') - self.assertEqual(len(pos_args), 4) + self.assertEqual(len(pos_args), 5) self.assertEqual(pos_args[0], client) self.assertEqual(pos_args[2], content_type) self.assertEqual(pos_args[3], size) + self.assertIsNone(pos_args[4]) # num_retries self.assertEqual(kwargs, {}) return pos_args[1]