diff --git a/storage/google/cloud/storage/blob.py b/storage/google/cloud/storage/blob.py index bb5bc2e32a1d..8d30a94b600b 100644 --- a/storage/google/cloud/storage/blob.py +++ b/storage/google/cloud/storage/blob.py @@ -380,6 +380,110 @@ def download_as_string(self, client=None): self.download_to_file(string_buffer, client=client) return string_buffer.getvalue() + def _create_upload( + self, client, file_obj=None, size=None, content_type=None, + chunk_size=None, strategy=None, extra_headers=None): + """Helper for upload methods. + + Creates a :class:`google.cloud.core.streaming.Upload` object to handle + the details of uploading a file to Cloud Storage. + + :type client: :class:`~google.cloud.storage.client.Client` or + ``NoneType`` + :param client: Optional. The client to use. If not passed, falls back + to the ``client`` stored on the blob's bucket. + + :type file_obj: file + :param file_obj: A file handle open for reading. + + :type size: int + :param size: The size of the upload, in bytes. + + :type content_type: str + :param content_type: Optional type of content being uploaded. + + :type chunk_size: int + :param chunk_size: The size of each chunk when doing resumable and + media uploads. + + :type strategy: str + :param strategy: Either + :attr:`google.cloud.core.streaming.transfer.SIMPLE_UPLOAD` or + :attr:`google.cloud.core.streaming.transfer.RESUMABLE_UPLOAD`. + + :type extra_headers: dict + :param extra_headers: Additional headers to be sent with the upload + initiation request. + + :rtype: Tuple[google.cloud.core.streaming.Upload, + google.cloud.core.streaming.Request, + google.cloud.core.streaming.Response] + :returns: The Upload object, the upload HTTP request, and the upload + initiation response. + """ + + client = self._require_client(client) + + # Use ``_base_connection`` rather ``_connection`` since the current + # connection may be a batch. A batch wraps a client's connection, + # but does not store the ``http`` object. The rest (API_BASE_URL and + # build_api_url) are also defined on the Batch class, but we just + # use the wrapped connection since it has all three (http, + # API_BASE_URL and build_api_url). + connection = client._base_connection + + content_type = (content_type or self._properties.get('contentType') or + 'application/octet-stream') + + headers = { + 'Accept': 'application/json', + 'Accept-Encoding': 'gzip, deflate', + 'User-Agent': connection.USER_AGENT, + } + + if extra_headers: + headers.update(extra_headers) + + headers.update(_get_encryption_headers(self._encryption_key)) + + # Use apitools' Upload functionality + upload = Upload( + file_obj, content_type, total_size=size, auto_transfer=False) + + if chunk_size is not None: + upload.chunksize = chunk_size + + if strategy is not None: + upload.strategy = RESUMABLE_UPLOAD + + url_builder = _UrlBuilder( + bucket_name=self.bucket.name, + object_name=self.name) + upload_config = _UploadConfig() + + # Temporary URL until strategy is determined. + base_url = connection.API_BASE_URL + '/upload' + upload_url = connection.build_api_url( + api_base_url=base_url, + path=self.bucket.path + '/o') + + # Configure the upload request parameters. + request = Request(upload_url, 'POST', headers) + upload.configure_request(upload_config, request, url_builder) + + # Configure final URL + query_params = url_builder.query_params + base_url = connection.API_BASE_URL + '/upload' + request.url = connection.build_api_url( + api_base_url=base_url, + path=self.bucket.path + '/o', + query_params=query_params) + + # Start the upload session + response = upload.initialize_upload(request, connection.http) + + return upload, request, response + @staticmethod def _check_response_error(request, http_response): """Helper for :meth:`upload_from_file`.""" @@ -390,7 +494,6 @@ def _check_response_error(request, http_response): raise make_exception(faux_response, http_response.content, error_info=request.url) - # pylint: disable=too-many-locals def upload_from_file(self, file_obj, rewind=False, size=None, content_type=None, num_retries=6, client=None): """Upload the contents of this blob from a file-like object. @@ -459,8 +562,6 @@ def upload_from_file(self, file_obj, rewind=False, size=None, # use the wrapped connection since it has all three (http, # API_BASE_URL and build_api_url). connection = client._base_connection - content_type = (content_type or self._properties.get('contentType') or - 'application/octet-stream') # Rewind the file if desired. if rewind: @@ -475,52 +576,28 @@ def upload_from_file(self, file_obj, rewind=False, size=None, except (OSError, UnsupportedOperation): pass # Assuming fd is not an actual file (maybe socket). - headers = { - 'Accept': 'application/json', - 'Accept-Encoding': 'gzip, deflate', - 'User-Agent': connection.USER_AGENT, - } - - headers.update(_get_encryption_headers(self._encryption_key)) - - upload = Upload(file_obj, content_type, total_bytes, - auto_transfer=False) - + chunk_size = None + strategy = None if self.chunk_size is not None: - upload.chunksize = self.chunk_size + chunk_size = self.chunk_size if total_bytes is None: - upload.strategy = RESUMABLE_UPLOAD + strategy = RESUMABLE_UPLOAD elif total_bytes is None: raise ValueError('total bytes could not be determined. Please ' 'pass an explicit size, or supply a chunk size ' 'for a streaming transfer.') - url_builder = _UrlBuilder(bucket_name=self.bucket.name, - object_name=self.name) - upload_config = _UploadConfig() - - # Temporary URL, until we know simple vs. resumable. - base_url = connection.API_BASE_URL + '/upload' - upload_url = connection.build_api_url(api_base_url=base_url, - path=self.bucket.path + '/o') - - # Use apitools 'Upload' facility. - request = Request(upload_url, 'POST', headers) - - upload.configure_request(upload_config, request, url_builder) - query_params = url_builder.query_params - base_url = connection.API_BASE_URL + '/upload' - request.url = connection.build_api_url(api_base_url=base_url, - path=self.bucket.path + '/o', - query_params=query_params) - upload.initialize_upload(request, connection.http) + upload, request, _ = self._create_upload( + client, file_obj=file_obj, size=total_bytes, + content_type=content_type, chunk_size=chunk_size, + strategy=strategy) if upload.strategy == RESUMABLE_UPLOAD: http_response = upload.stream_file(use_chunks=True) else: - http_response = make_api_request(connection.http, request, - retries=num_retries) + http_response = make_api_request( + connection.http, request, retries=num_retries) self._check_response_error(request, http_response) response_content = http_response.content @@ -529,7 +606,6 @@ def upload_from_file(self, file_obj, rewind=False, size=None, six.string_types): # pragma: NO COVER Python3 response_content = response_content.decode('utf-8') self._set_properties(json.loads(response_content)) - # pylint: enable=too-many-locals def upload_from_filename(self, filename, content_type=None, client=None): """Upload this blob's contents from the content of a named file. @@ -604,6 +680,95 @@ def upload_from_string(self, data, content_type='text/plain', client=None): file_obj=string_buffer, rewind=True, size=len(data), content_type=content_type, client=client) + def create_resumable_upload_session( + self, + content_type=None, + size=None, + origin=None, + client=None): + """Create a resumable upload session. + + Resumable upload sessions allow you to start an upload session from + one client and complete the session in another. This method is called + by the initiator to set the metadata and limits. The initiator then + passes the session URL to the client that will upload the binary data. + The client performs a PUT request on the session URL to complete the + upload. This process allows untrusted clients to upload to an + access-controlled bucket. For more details, see the + `documentation on signed URLs`_. + + .. _documentation on signed URLs: https://cloud.google.com/storage\ + /docs/access-control/signed-urls#signing-resumable + + The content type of the upload will either be + - The value passed in to the function (if any) + - The value stored on the current blob + - The default value of 'application/octet-stream' + + .. note:: + The effect of uploading to an existing blob depends on the + "versioning" and "lifecycle" policies defined on the blob's + bucket. In the absence of those policies, upload will + overwrite any existing contents. + + See the `object versioning + `_ and + `lifecycle `_ + API documents for details. + + If :attr:`encryption_key` is set, the blob will be `encrypted`_. + + .. _encrypted: https://cloud.google.com/storage/docs/\ + encryption#customer-supplied + + :type size: int + :param size: Optional, the maximum number of bytes that can be + uploaded using this session. If the size is not known when creating + the session, this should be left blank. + + :type content_type: str + :param content_type: Optional type of content being uploaded. This can + be used to restrict the allowed file type that can be uploaded + to the size. + + :type origin: str + :param origin: Optional origin. If set, the upload can only be + completed by a user-agent that uploads from the given origin. This + can be useful when passing the session to a web client. + + :type client: :class:`~google.cloud.storage.client.Client` or + ``NoneType`` + :param client: Optional. The client to use. If not passed, falls back + to the ``client`` stored on the blob's bucket. + + :rtype: str + :returns: The resumable upload session URL. The upload can be + completed by making an HTTP PUT request with the file's contents. + + :raises: :class:`google.cloud.exceptions.GoogleCloudError` + if the session creation response returns an error status. + """ + + extra_headers = {} + + if origin is not None: + # This header is specifically for client-side uploads, it + # determines the origins allowed for CORS. + extra_headers['Origin'] = origin + + _, _, start_response = self._create_upload( + client, + size=size, + content_type=content_type, + strategy=RESUMABLE_UPLOAD, + extra_headers=extra_headers) + + # The location header contains the session URL. This can be used + # to continue the upload. + resumable_upload_session_url = start_response.info['location'] + + return resumable_upload_session_url + def make_public(self, client=None): """Make this blob public giving all users read access. diff --git a/storage/unit_tests/test_blob.py b/storage/unit_tests/test_blob.py index 92a5436b6fee..9724fd7a4210 100644 --- a/storage/unit_tests/test_blob.py +++ b/storage/unit_tests/test_blob.py @@ -1100,6 +1100,79 @@ def test_upload_from_string_text_w_key(self): self.assertEqual(headers['Content-Type'], 'text/plain') self.assertEqual(rq[0]['body'], ENCODED) + def test_create_resumable_upload_session(self): + from six.moves.http_client import OK + from six.moves.urllib.parse import parse_qsl + from six.moves.urllib.parse import urlsplit + + BLOB_NAME = 'blob-name' + UPLOAD_URL = 'http://example.com/upload/name/key' + loc_response = {'status': OK, 'location': UPLOAD_URL} + connection = _Connection( + (loc_response, '{}'), + ) + client = _Client(connection) + bucket = _Bucket(client=client) + blob = self._make_one(BLOB_NAME, bucket=bucket) + + resumable_url = blob.create_resumable_upload_session() + + self.assertEqual(resumable_url, UPLOAD_URL) + + rq = connection.http._requested + self.assertEqual(len(rq), 1) + self.assertEqual(rq[0]['method'], 'POST') + + uri = rq[0]['uri'] + scheme, netloc, path, qs, _ = urlsplit(uri) + self.assertEqual(scheme, 'http') + self.assertEqual(netloc, 'example.com') + self.assertEqual(path, '/b/name/o') + self.assertEqual(dict(parse_qsl(qs)), + {'uploadType': 'resumable', 'name': BLOB_NAME}) + headers = { + key.title(): str(value) for key, value in rq[0]['headers'].items()} + self.assertEqual(headers['Content-Length'], '0') + self.assertEqual( + headers['X-Upload-Content-Type'], 'application/octet-stream') + + def test_create_resumable_upload_session_args(self): + from six.moves.http_client import OK + + BLOB_NAME = 'blob-name' + UPLOAD_URL = 'http://example.com/upload/name/key' + CONTENT_TYPE = 'text/plain' + SIZE = 1024 + ORIGIN = 'http://google.com' + + loc_response = {'status': OK, 'location': UPLOAD_URL} + connection = _Connection( + (loc_response, '{}'), + ) + client = _Client(connection) + bucket = _Bucket(client=client) + blob = self._make_one(BLOB_NAME, bucket=bucket) + + resumable_url = blob.create_resumable_upload_session( + content_type=CONTENT_TYPE, + size=SIZE, + origin=ORIGIN) + + self.assertEqual(resumable_url, UPLOAD_URL) + + rq = connection.http._requested + self.assertEqual(len(rq), 1) + self.assertEqual(rq[0]['method'], 'POST') + + headers = { + key.title(): str(value) for key, value in rq[0]['headers'].items()} + self.assertEqual(headers['Content-Length'], '0') + self.assertEqual(headers['X-Upload-Content-Length'], str(SIZE)) + self.assertEqual( + headers['X-Upload-Content-Type'], 'text/plain') + self.assertEqual( + headers['Origin'], ORIGIN) + def test_make_public(self): from six.moves.http_client import OK from google.cloud.storage.acl import _ACLEntity