From 323291a3d843b05c8ecb3ce0dabf1fe69726f211 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 22 Feb 2021 10:18:58 -0800 Subject: [PATCH 1/9] feat: add blob.open() for file-like I/O --- google/cloud/storage/blob.py | 35 ++ google/cloud/storage/fileio.py | 327 +++++++++++++++++++ tests/system/test_system.py | 54 ++++ tests/unit/test_blob.py | 30 ++ tests/unit/test_fileio.py | 567 +++++++++++++++++++++++++++++++++ 5 files changed, 1013 insertions(+) create mode 100644 google/cloud/storage/fileio.py create mode 100644 tests/unit/test_fileio.py diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 33b809d3c..bfe0a8ea4 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -30,6 +30,7 @@ import copy import hashlib from io import BytesIO +from io import TextIOWrapper import logging import mimetypes import os @@ -78,6 +79,8 @@ from google.cloud.storage.retry import DEFAULT_RETRY from google.cloud.storage.retry import DEFAULT_RETRY_IF_ETAG_IN_JSON from google.cloud.storage.retry import DEFAULT_RETRY_IF_GENERATION_SPECIFIED +from google.cloud.storage.fileio import BlobReader +from google.cloud.storage.fileio import BlobWriter _API_ACCESS_ENDPOINT = "https://storage.googleapis.com" @@ -3407,6 +3410,38 @@ def update_storage_class( retry=retry, ) + def open( + self, + mode="r", + chunk_size=None, + encoding=None, + errors=None, + newline=None, + **kwargs + ): + if mode == "rb": + return BlobReader(self, chunk_size=chunk_size, **kwargs) + elif mode == "wb": + return BlobWriter(self, chunk_size=chunk_size, **kwargs) + elif mode in ("r", "rt"): + return TextIOWrapper( + BlobReader(self, chunk_size=chunk_size, **kwargs), + encoding=encoding, + errors=errors, + newline=newline, + ) + elif mode in ("w", "wt"): + return TextIOWrapper( + BlobWriter(self, chunk_size=chunk_size, text_mode=True, **kwargs), + encoding=encoding, + errors=errors, + newline=newline, + ) + else: + raise NotImplementedError( + "Supported modes strings are 'r', 'rb', 'rt', 'w', 'wb', and 'wt' only." + ) + cache_control = _scalar_property("cacheControl") """HTTP 'Cache-Control' header for this object. diff --git a/google/cloud/storage/fileio.py b/google/cloud/storage/fileio.py new file mode 100644 index 000000000..5678340c9 --- /dev/null +++ b/google/cloud/storage/fileio.py @@ -0,0 +1,327 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import io + +# Resumable uploads require a chunk size of precisely a multiple of 256KiB. +DEFAULT_CHUNK_SIZE = 10 * 1024 * 1024 # 10 MiB (40 times minimum chunk size). + +# Valid keyword arguments for download methods, and blob.reload() if needed. +# Note: Changes here need to be reflected in the blob.open() docstring. +VALID_DOWNLOAD_KWARGS = { + "if_generation_match", + "if_generation_not_match", + "if_metageneration_match", + "if_metageneration_not_match", + "timeout", +} + +# Valid keyword arguments for upload methods. +# Note: Changes here need to be reflected in the blob.open() docstring. +VALID_UPLOAD_KWARGS = { + "content_type", + "num_retries", + "predefined_acl", + "if_generation_match", + "if_generation_not_match", + "if_metageneration_match", + "if_metageneration_not_match", + "timeout", + "checksum", +} + + +class BlobReader(io.BufferedIOBase): + def __init__(self, blob, chunk_size=None, **download_kwargs): + """docstring note that download_kwargs also used for reload()""" + for kwarg in download_kwargs: + if kwarg not in VALID_DOWNLOAD_KWARGS: + raise ValueError( + "BlobReader does not support keyword argument {}.".format(kwarg) + ) + + self._blob = blob + self._pos = 0 + self._buffer = io.BytesIO() + self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE + self._download_kwargs = download_kwargs + + def read(self, size=-1): + self._checkClosed() # Raises ValueError if closed. + + result = self._buffer.read(size) + # If the read request demands more bytes than are buffered, fetch more. + remaining_size = size - len(result) + if remaining_size > 0 or size < 0: + self._buffer.seek(0) + self._buffer.truncate(0) # Clear the buffer to make way for new data. + fetch_start = self._pos + len(result) + if size > 0: + # Fetch the larger of self._chunk_size or the remaining_size. + fetch_end = fetch_start + max(remaining_size, self._chunk_size) + else: + fetch_end = None + + # Download the blob. + result += self._blob.download_as_bytes( + start=fetch_start, end=fetch_end, **self._download_kwargs + ) + + # If more bytes were read than is immediately needed, buffer the + # remainder and then trim the result. + if size > 0 and len(result) > size: + self._buffer.write(result[size:]) + self._buffer.seek(0) + result = result[:size] + + self._pos += len(result) + + return result + + def read1(self, size=-1): + return self.read(size) + + def seek(self, pos, whence=0): + """Seek within the blob. + + This implementation of seek() uses knowledge of the blob size to + validate that the reported position does not exceed the blob last byte. + If the blob size is not already known it will call blob.reload(). + """ + self._checkClosed() # Raises ValueError if closed. + + if self._blob.size is None: + self._blob.reload(**self._download_kwargs) + + initial_pos = self._pos + + if whence == 0: + self._pos = pos + elif whence == 1: + self._pos += pos + elif whence == 2: + self._pos = self._blob.size + pos + if whence not in {0, 1, 2}: + raise ValueError("invalid whence value") + + if self._pos > self._blob.size: + self._pos = self._blob.size + + # Seek or invalidate buffer as needed. + difference = self._pos - initial_pos + new_buffer_pos = self._buffer.seek(difference, 1) + if new_buffer_pos != difference: # Buffer does not contain new pos. + # Invalidate buffer. + self._buffer.seek(0) + self._buffer.truncate(0) + + return self._pos + + def close(self): + self._buffer.close() + + def _checkClosed(self): + if self._buffer.closed: + raise ValueError("I/O operation on closed file.") + + def readable(self): + return True + + def writable(self): + return False + + def seekable(self): + return True + + +class BlobWriter(io.BufferedIOBase): + def __init__(self, blob, chunk_size=None, text_mode=False, **upload_kwargs): + for kwarg in upload_kwargs: + if kwarg not in VALID_UPLOAD_KWARGS: + raise ValueError( + "BlobWriter does not support keyword argument {}.".format(kwarg) + ) + self._blob = blob + self._buffer = SlidingBuffer() + self._upload_and_transport = None + # Resumable uploads require a chunk size of a multiple of 256KiB. + # self._chunk_size must not be changed after the upload is initiated. + self._chunk_size = chunk_size or blob.chunk_size or DEFAULT_CHUNK_SIZE + # In text mode this class will be wrapped and TextIOWrapper requires a + # different behavior of flush(). + self._text_mode = text_mode + self._upload_kwargs = upload_kwargs + + def write(self, b): + self._checkClosed() # Raises ValueError if closed. + + pos = self._buffer.write(b) + + # If there is enough content, upload chunks. + num_chunks = len(self._buffer) // self._chunk_size + if num_chunks: + self._upload_chunks_from_buffer(num_chunks) + + return pos + + def _initiate_upload(self): + num_retries = self._upload_kwargs.pop("num_retries", None) + content_type = self._upload_kwargs.pop("content_type", None) + + if ( + self._upload_kwargs.get("if_metageneration_match") is None + and num_retries is None + ): + # Uploads are only idempotent (safe to retry) if + # if_metageneration_match is set. If it is not set, the default + # num_retries should be 0. Note: Because retry logic for uploads is + # provided by the google-resumable-media-python package, it doesn't + # use the ConditionalRetryStrategy class used in other API calls in + # this library to solve this problem. + num_retries = 0 + + self._upload_and_transport = self._blob._initiate_resumable_upload( + self._blob.bucket.client, + self._buffer, + content_type, + None, + num_retries, + chunk_size=self._chunk_size, + **self._upload_kwargs + ) + + def _upload_chunks_from_buffer(self, num_chunks): + """Upload a specified number of chunks.""" + + # Initialize the upload if necessary. + if not self._upload_and_transport: + self._initiate_upload() + + upload, transport = self._upload_and_transport + + # Upload chunks. The SlidingBuffer class will manage seek position. + for _ in range(num_chunks): + upload.transmit_next_chunk(transport) + + # Wipe the buffer of chunks uploaded, preserving any remaining data. + self._buffer.flush() + + def tell(self): + return self._buffer.tell() + len(self._buffer) + + def flush(self): + if self._text_mode: + # TextIOWrapper expects this method to succeed before calling close(). + return + + raise io.UnsupportedOperation( + "Cannot flush without finalizing upload. Use close() instead." + ) + + def close(self): + self._checkClosed() # Raises ValueError if closed. + + self._upload_chunks_from_buffer(1) + self._buffer.close() + + def _checkClosed(self): + if self._buffer.closed: + raise ValueError("I/O operation on closed file.") + + def readable(self): + return False + + def writable(self): + return True + + def seekable(self): + return False + + +class SlidingBuffer(object): + """A non-rewindable buffer that frees memory of chunks already consumed. + + This class is necessary because `google-resumable-media-python` expects + `tell()` to work relative to the start of the file, not relative to a place + in an intermediate buffer. Using this class, we present an external + interface with consistent seek and tell behavior without having to actually + store bytes already sent. + + Behavior of this class differs from an ordinary BytesIO buffer. `write()` + will always append to the end of the file only and not change the seek + position otherwise. `flush()` will delete all data already read (data to the + left of the seek position). `tell()` will report the seek position of the + buffer including all deleted data. Additionally the class implements + __len__() which will report the size of the actual underlying buffer. + + This class does not attempt to implement the entire Python I/O interface. + """ + + def __init__(self): + self._buffer = io.BytesIO() + self._cursor = 0 + + def write(self, b): + """Append to the end of the buffer without changing the position.""" + self._checkClosed() # Raises ValueError if closed. + + bookmark = self._buffer.tell() + self._buffer.seek(0, io.SEEK_END) + pos = self._buffer.write(b) + self._buffer.seek(bookmark) + return self._cursor + pos + + def read(self, size=-1): + """Read and move the cursor.""" + self._checkClosed() # Raises ValueError if closed. + + data = self._buffer.read(size) + self._cursor += len(data) + return data + + def flush(self): + """Delete already-read data (all data to the left of the position).""" + self._checkClosed() # Raises ValueError if closed. + + # BytesIO can't be deleted from the left, so save any leftover, unread + # data and truncate at 0, then readd leftover data. + leftover = self._buffer.read() + self._buffer.seek(0) + self._buffer.truncate(0) + self._buffer.write(leftover) + self._buffer.seek(0) + + def tell(self): + """Report how many bytes have been read from the buffer in total.""" + return self._cursor + + def seek(self, pos, whence=None): + raise io.UnsupportedOperation("seek() is not supported for this class.") + + def __len__(self): + """Determine the size of the buffer by seeking to the end.""" + bookmark = self._buffer.tell() + length = self._buffer.seek(0, io.SEEK_END) + self._buffer.seek(bookmark) + return length + + def close(self): + return self._buffer.close() + + def _checkClosed(self): + return self._buffer._checkClosed() + + @property + def closed(self): + return self._buffer.closed diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 9e7a86c2f..31ffd9c8b 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -1,3 +1,5 @@ +# coding=utf-8 + # Copyright 2014 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -1103,6 +1105,58 @@ def test_blob_crc32_md5_hash(self): self.assertEqual(download_blob.crc32c, blob.crc32c) self.assertEqual(download_blob.md5_hash, blob.md5_hash) + def test_blobwriter_and_blobreader(self): + blob = self.bucket.blob("LargeFile") + + # Test BlobWriter works. + file_data = self.FILES["big"] + with open(file_data["path"], "rb") as file_obj: + with blob.open("wb", chunk_size=256 * 1024) as writer: + writer.write(file_obj.read(100)) + writer.write(file_obj.read(256 * 1024)) + writer.write(file_obj.read()) + self.case_blobs_to_delete.append(blob) + + blob.reload() + md5_hash = blob.md5_hash + if not isinstance(md5_hash, six.binary_type): + md5_hash = md5_hash.encode("utf-8") + self.assertEqual(md5_hash, file_data["hash"]) + + # Test BlobReader read and seek behave identically to filesystem file. + with open(file_data["path"], "rb") as file_obj: + with blob.open("rb", chunk_size=256 * 1024) as reader: + self.assertEqual(file_obj.read(100), reader.read(100)) + self.assertEqual(file_obj.read(256 * 1024), reader.read(256 * 1024)) + reader.seek(20) + file_obj.seek(20) + self.assertEqual( + file_obj.read(256 * 1024 * 2), reader.read(256 * 1024 * 2) + ) + self.assertEqual(file_obj.read(), reader.read()) + + def test_blobwriter_and_blobreader_text_mode(self): + blob = self.bucket.blob("MultibyteTextFile") + + # Construct a multibyte text_data sample file. + base_multibyte_text_string = u"abcde あいうえお line: " + text_data = "\n".join([base_multibyte_text_string + str(x) for x in range(100)]) + + # Test text BlobWriter works. + with blob.open("wt") as writer: + writer.write(text_data[:100]) + writer.write(text_data[100:]) + self.case_blobs_to_delete.append(blob) + + # Test text BlobReader read and seek to 0. Seeking to an non-0 byte on a + # multibyte text stream is not safe in Python but the API expects + # seek() to work regadless. + with blob.open("rt") as reader: + # This should produce 100 characters, not 100 bytes. + self.assertEqual(text_data[:100], reader.read(100)) + self.assertEqual(0, reader.seek(0)) + self.assertEqual(text_data, reader.read()) + class TestUnicode(TestStorageFiles): def test_fetch_object_and_check_content(self): diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index 4aacc3a8c..e91f8958b 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -4673,6 +4673,36 @@ def test_from_string_w_domain_name_bucket(self): self.assertEqual(blob.name, "b") self.assertEqual(blob.bucket.name, "buckets.example.com") + def test_open(self): + from io import TextIOWrapper + from google.cloud.storage.fileio import BlobReader + from google.cloud.storage.fileio import BlobWriter + + blob_name = "blob-name" + client = self._make_client() + bucket = _Bucket(client) + blob = self._make_one(blob_name, bucket=bucket) + + f = blob.open("r") + self.assertEqual(type(f), TextIOWrapper) + self.assertEqual(type(f.buffer), BlobReader) + f = blob.open("rt") + self.assertEqual(type(f), TextIOWrapper) + self.assertEqual(type(f.buffer), BlobReader) + f = blob.open("rb") + self.assertEqual(type(f), BlobReader) + f = blob.open("w") + self.assertEqual(type(f), TextIOWrapper) + self.assertEqual(type(f.buffer), BlobWriter) + f = blob.open("wt") + self.assertEqual(type(f), TextIOWrapper) + self.assertEqual(type(f.buffer), BlobWriter) + f = blob.open("wb") + self.assertEqual(type(f), BlobWriter) + + with self.assertRaises(NotImplementedError): + blob.open("a") + class Test__quote(unittest.TestCase): @staticmethod diff --git a/tests/unit/test_fileio.py b/tests/unit/test_fileio.py new file mode 100644 index 000000000..341805edb --- /dev/null +++ b/tests/unit/test_fileio.py @@ -0,0 +1,567 @@ +# coding=utf-8 + +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import mock +import io +from google.cloud.storage.fileio import BlobReader, BlobWriter, SlidingBuffer +import string + +TEST_TEXT_DATA = string.ascii_lowercase + "\n" + string.ascii_uppercase + "\n" +TEST_BINARY_DATA = TEST_TEXT_DATA.encode("utf-8") +TEST_MULTIBYTE_TEXT_DATA = u"あいうえおかきくけこさしすせそたちつてと" +PLAIN_CONTENT_TYPE = "text/plain" +NUM_RETRIES = 2 + + +class TestBlobReaderBinary(unittest.TestCase): + def test_attributes(self): + blob = mock.Mock() + blob.chunk_size = 256 + reader = BlobReader(blob) + self.assertTrue(reader.seekable()) + self.assertTrue(reader.readable()) + self.assertFalse(reader.writable()) + self.assertEqual(256, reader._chunk_size) + + def test_read(self): + blob = mock.Mock() + + def read_from_fake_data(start=0, end=None, **_): + return TEST_BINARY_DATA[start:end] + + blob.download_as_bytes = mock.Mock(side_effect=read_from_fake_data) + download_kwargs = {"if_metageneration_match": 1} + reader = BlobReader(blob, chunk_size=8, **download_kwargs) + + # Read and trigger the first download of chunk_size. + self.assertEqual(reader.read(1), TEST_BINARY_DATA[0:1]) + blob.download_as_bytes.assert_called_once_with( + start=0, end=8, **download_kwargs + ) + + # Read from buffered data only. + self.assertEqual(reader.read(3), TEST_BINARY_DATA[1:4]) + blob.download_as_bytes.assert_called_once() + + # Read remaining buffer plus an additional chunk read. + self.assertEqual(reader.read(8), TEST_BINARY_DATA[4:12]) + self.assertEqual(reader._pos, 12) + self.assertEqual(blob.download_as_bytes.call_count, 2) + blob.download_as_bytes.assert_called_with(start=8, end=16, **download_kwargs) + + # Read a larger amount, requiring a download larger than chunk_size. + self.assertEqual(reader.read(16), TEST_BINARY_DATA[12:28]) + self.assertEqual(reader._pos, 28) + self.assertEqual(blob.download_as_bytes.call_count, 3) + blob.download_as_bytes.assert_called_with(start=16, end=28, **download_kwargs) + + # Read all remaining data. + self.assertEqual(reader.read(), TEST_BINARY_DATA[28:]) + self.assertEqual(blob.download_as_bytes.call_count, 4) + blob.download_as_bytes.assert_called_with(start=28, end=None, **download_kwargs) + + reader.close() + + def test_readline(self): + blob = mock.Mock() + + def read_from_fake_data(start=0, end=None, **_): + return TEST_BINARY_DATA[start:end] + + blob.download_as_bytes = mock.Mock(side_effect=read_from_fake_data) + reader = BlobReader(blob, chunk_size=10) + + # Read a line. With chunk_size=10, expect three chunks downloaded. + self.assertEqual(reader.readline(), TEST_BINARY_DATA[:27]) + blob.download_as_bytes.assert_called_with(start=20, end=30) + self.assertEqual(blob.download_as_bytes.call_count, 3) + + # Read another line. + self.assertEqual(reader.readline(), TEST_BINARY_DATA[27:]) + blob.download_as_bytes.assert_called_with(start=50, end=60) + self.assertEqual(blob.download_as_bytes.call_count, 6) + + blob.size = len(TEST_BINARY_DATA) + reader.seek(0) + + # Read all lines. The readlines algorithm will attempt to read past the end of the last line once to verify there is no more to read. + self.assertEqual(b"".join(reader.readlines()), TEST_BINARY_DATA) + blob.download_as_bytes.assert_called_with( + start=len(TEST_BINARY_DATA), end=len(TEST_BINARY_DATA) + 10 + ) + self.assertEqual(blob.download_as_bytes.call_count, 13) + + reader.close() + + def test_seek(self): + blob = mock.Mock() + + def read_from_fake_data(start=0, end=None, **_): + return TEST_BINARY_DATA[start:end] + + blob.download_as_bytes = mock.Mock(side_effect=read_from_fake_data) + blob.size = None + download_kwargs = {"if_metageneration_match": 1} + reader = BlobReader(blob, chunk_size=8, **download_kwargs) + + # Seek needs the blob size to work and should call reload() if the size + # is not known. Set a mock to initialize the size if reload() is called. + def initialize_size(**_): + blob.size = len(TEST_BINARY_DATA) + + blob.reload = mock.Mock(side_effect=initialize_size) + + # Seek, forcing a blob reload in order to validate the seek doesn't + # exceed the end of the blob. + self.assertEqual(reader.seek(4), 4) + blob.reload.assert_called_once_with(**download_kwargs) + self.assertEqual(reader.read(4), TEST_BINARY_DATA[4:8]) + self.assertEqual(blob.download_as_bytes.call_count, 1) + + # Seek forward 2 bytes with whence=1. Position is still in buffer. + self.assertEqual(reader.seek(2, 1), 10) + self.assertEqual(reader.read(2), TEST_BINARY_DATA[10:12]) + self.assertEqual(blob.download_as_bytes.call_count, 1) + + # Seek to beginning. The next read will need to download data again. + self.assertEqual(reader.seek(0), 0) + self.assertEqual(reader.read(4), TEST_BINARY_DATA[0:4]) + self.assertEqual(blob.download_as_bytes.call_count, 2) + + # Seek relative to end with whence=2. + self.assertEqual(reader.seek(-1, 2), len(TEST_BINARY_DATA) - 1) + self.assertEqual(reader.read(), TEST_BINARY_DATA[-1:]) + self.assertEqual(blob.download_as_bytes.call_count, 3) + + with self.assertRaises(ValueError): + reader.seek(1, 4) + + # tell() is an inherited method that uses seek(). + self.assertEqual(reader.tell(), reader._pos) + + reader.close() + + def test_close(self): + blob = mock.Mock() + reader = BlobReader(blob) + + reader.close() + + with self.assertRaises(ValueError): + reader.read() + + with self.assertRaises(ValueError): + reader.seek(0) + + def test_context_mgr(self): + # Just very that the context manager form doesn't crash. + blob = mock.Mock() + with BlobReader(blob) as reader: + reader.close() + + def test_rejects_invalid_kwargs(self): + blob = mock.Mock() + with self.assertRaises(ValueError): + BlobReader(blob, invalid_kwarg=1) + + +class TestBlobWriterBinary(unittest.TestCase): + def test_attributes(self): + blob = mock.Mock() + blob.chunk_size = 256 + writer = BlobWriter(blob) + self.assertFalse(writer.seekable()) + self.assertFalse(writer.readable()) + self.assertTrue(writer.writable()) + self.assertEqual(256, writer._chunk_size) + + def test_write(self): + blob = mock.Mock() + + upload = mock.Mock() + transport = mock.Mock() + + blob._initiate_resumable_upload.return_value = (upload, transport) + + # Create a writer with (arbitrary) arguments so we can validate the + # arguments are used. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + upload_kwargs = {"if_metageneration_match": 1} + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + writer = BlobWriter( + blob, + chunk_size=chunk_size, + num_retries=NUM_RETRIES, + content_type=PLAIN_CONTENT_TYPE, + **upload_kwargs + ) + + # The transmit_next_chunk method must actually consume bytes from the + # sliding buffer for the flush() feature to work properly. + upload.transmit_next_chunk.side_effect = lambda _: writer._buffer.read( + chunk_size + ) + + # Write under chunk_size. This should be buffered and the upload not + # initiated. + writer.write(TEST_BINARY_DATA[0:4]) + blob.initiate_resumable_upload.assert_not_called() + + # Write over chunk_size. This should result in upload initialization + # and multiple chunks uploaded. + writer.write(TEST_BINARY_DATA[4:32]) + blob._initiate_resumable_upload.assert_called_once_with( + blob.bucket.client, + writer._buffer, + PLAIN_CONTENT_TYPE, + None, + NUM_RETRIES, + chunk_size=chunk_size, + **upload_kwargs + ) + upload.transmit_next_chunk.assert_called_with(transport) + self.assertEqual(upload.transmit_next_chunk.call_count, 4) + + # Write another byte, finalize and close. + writer.write(TEST_BINARY_DATA[32:33]) + self.assertEqual(writer.tell(), 33) + writer.close() + self.assertEqual(upload.transmit_next_chunk.call_count, 5) + + def flush_fails(self): + blob = mock.Mock() + writer = BlobWriter(blob) + + with self.assertRaises(io.UnsupportedOperation): + writer.flush() + + def seek_fails(self): + blob = mock.Mock() + writer = BlobWriter(blob) + + with self.assertRaises(io.UnsupportedOperation): + writer.seek() + + def test_conditional_retries(self): + blob = mock.Mock() + + upload = mock.Mock() + transport = mock.Mock() + + blob._initiate_resumable_upload.return_value = (upload, transport) + + # Create a writer. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + writer = BlobWriter( + blob, + chunk_size=chunk_size, + num_retries=None, + content_type=PLAIN_CONTENT_TYPE, + ) + + # The transmit_next_chunk method must actually consume bytes from the + # sliding buffer for the flush() feature to work properly. + upload.transmit_next_chunk.side_effect = lambda _: writer._buffer.read( + chunk_size + ) + + # Write under chunk_size. This should be buffered and the upload not + # initiated. + writer.write(TEST_BINARY_DATA[0:4]) + blob.initiate_resumable_upload.assert_not_called() + + # Write over chunk_size. This should result in upload initialization + # and multiple chunks uploaded. + # Due to the condition not being fulfilled, num_retries should be 0. + writer.write(TEST_BINARY_DATA[4:32]) + blob._initiate_resumable_upload.assert_called_once_with( + blob.bucket.client, + writer._buffer, + PLAIN_CONTENT_TYPE, + None, + 0, + chunk_size=chunk_size, + ) + upload.transmit_next_chunk.assert_called_with(transport) + self.assertEqual(upload.transmit_next_chunk.call_count, 4) + + # Write another byte, finalize and close. + writer.write(TEST_BINARY_DATA[32:33]) + writer.close() + self.assertEqual(upload.transmit_next_chunk.call_count, 5) + + def test_rejects_invalid_kwargs(self): + blob = mock.Mock() + with self.assertRaises(ValueError): + BlobWriter(blob, invalid_kwarg=1) + + def test_flush_fails(self): + blob = mock.Mock() + writer = BlobWriter(blob) + with self.assertRaises(io.UnsupportedOperation): + writer.flush() + + +class Test_SlidingBuffer(unittest.TestCase): + def test_write_and_read(self): + buff = SlidingBuffer() + + # Write and verify tell() still reports 0 and len is correct. + buff.write(TEST_BINARY_DATA) + self.assertEqual(buff.tell(), 0) + self.assertEqual(len(buff), len(TEST_BINARY_DATA)) + + # Read and verify tell() reports end. + self.assertEqual(buff.read(), TEST_BINARY_DATA) + self.assertEqual(buff.tell(), len(TEST_BINARY_DATA)) + self.assertEqual(len(buff), len(TEST_BINARY_DATA)) + + def test_flush(self): + buff = SlidingBuffer() + + # Write and verify tell() still reports 0 and len is correct. + buff.write(TEST_BINARY_DATA) + self.assertEqual(buff.tell(), 0) + self.assertEqual(len(buff), len(TEST_BINARY_DATA)) + + # Read 8 bytes and verify tell reports correctly. + self.assertEqual(buff.read(8), TEST_BINARY_DATA[:8]) + self.assertEqual(buff.tell(), 8) + self.assertEqual(len(buff), len(TEST_BINARY_DATA)) + + # Flush buffer and verify tell doesn't change but len does. + buff.flush() + self.assertEqual(buff.tell(), 8) + self.assertEqual(len(buff), len(TEST_BINARY_DATA) - 8) + + # Read remainder. + self.assertEqual(buff.read(), TEST_BINARY_DATA[8:]) + self.assertEqual(buff.tell(), len(TEST_BINARY_DATA)) + self.assertEqual(len(buff), len(TEST_BINARY_DATA[8:])) + + def test_seek_fails(self): + buff = SlidingBuffer() + with self.assertRaises(io.UnsupportedOperation): + buff.seek(1) + + def test_close(self): + buff = SlidingBuffer() + buff.close() + with self.assertRaises(ValueError): + buff.read() + + +class TestBlobReaderText(unittest.TestCase): + def test_attributes(self): + blob = mock.Mock() + reader = io.TextIOWrapper(BlobReader(blob)) + self.assertTrue(reader.seekable()) + self.assertTrue(reader.readable()) + self.assertFalse(reader.writable()) + + def test_read(self): + blob = mock.Mock() + + def read_from_fake_data(start=0, end=None, **_): + return TEST_TEXT_DATA.encode("utf-8")[start:end] + + blob.download_as_bytes = mock.Mock(side_effect=read_from_fake_data) + blob.chunk_size = None + blob.size = len(TEST_TEXT_DATA.encode("utf-8")) + download_kwargs = {"if_metageneration_match": 1} + reader = io.TextIOWrapper(BlobReader(blob, **download_kwargs)) + + # The TextIOWrapper class has an internally defined chunk size which + # will override ours. The wrapper class is not under test. + # Read and trigger the first download of chunk_size. + self.assertEqual(reader.read(1), TEST_TEXT_DATA[0:1]) + blob.download_as_bytes.assert_called_once() + + # Read from buffered data only. + self.assertEqual(reader.read(3), TEST_TEXT_DATA[1:4]) + blob.download_as_bytes.assert_called_once() + + # Read all remaining data. + self.assertEqual(reader.read(), TEST_TEXT_DATA[4:]) + + # Seek to 0 and read all remaining data again. + reader.seek(0) + self.assertEqual(reader.read(), TEST_TEXT_DATA) + + reader.close() + + def test_multibyte_read(self): + blob = mock.Mock() + + def read_from_fake_data(start=0, end=None, **_): + return TEST_MULTIBYTE_TEXT_DATA.encode("utf-8")[start:end] + + blob.download_as_bytes = mock.Mock(side_effect=read_from_fake_data) + blob.chunk_size = None + blob.size = len(TEST_MULTIBYTE_TEXT_DATA.encode("utf-8")) + download_kwargs = {"if_metageneration_match": 1} + reader = io.TextIOWrapper(BlobReader(blob, **download_kwargs)) + + # The TextIOWrapper class has an internally defined chunk size which + # will override ours. The wrapper class is not under test. + # Read and trigger the first download of chunk_size. + self.assertEqual(reader.read(1), TEST_MULTIBYTE_TEXT_DATA[0:1]) + blob.download_as_bytes.assert_called_once() + + # Read from buffered data only. + self.assertEqual(reader.read(3), TEST_MULTIBYTE_TEXT_DATA[1:4]) + blob.download_as_bytes.assert_called_once() + + # Read all remaining data. + self.assertEqual(reader.read(), TEST_MULTIBYTE_TEXT_DATA[4:]) + + # Seek to 0 and read all remaining data again. + reader.seek(0) + self.assertEqual(reader.read(), TEST_MULTIBYTE_TEXT_DATA) + + reader.close() + + def test_seek(self): + blob = mock.Mock() + + def read_from_fake_data(start=0, end=None, **_): + return TEST_TEXT_DATA.encode("utf-8")[start:end] + + blob.download_as_bytes = mock.Mock(side_effect=read_from_fake_data) + blob.size = None + blob.chunk_size = None + download_kwargs = {"if_metageneration_match": 1} + reader = io.TextIOWrapper(BlobReader(blob, **download_kwargs)) + + # Seek needs the blob size to work and should call reload() if the size + # is not known. Set a mock to initialize the size if reload() is called. + def initialize_size(**_): + blob.size = len(TEST_TEXT_DATA.encode("utf-8")) + + blob.reload = mock.Mock(side_effect=initialize_size) + + # Seek, forcing a blob reload in order to validate the seek doesn't + # exceed the end of the blob. + self.assertEqual(reader.seek(4), 4) + blob.reload.assert_called_once_with(**download_kwargs) + self.assertEqual(reader.read(4), TEST_TEXT_DATA[4:8]) + self.assertEqual(blob.download_as_bytes.call_count, 1) + + # Seek to beginning. The next read will need to download data again. + self.assertEqual(reader.seek(0), 0) + self.assertEqual(reader.read(), TEST_TEXT_DATA) + self.assertEqual(blob.download_as_bytes.call_count, 2) + + reader.close() + + def test_multibyte_seek(self): + blob = mock.Mock() + + def read_from_fake_data(start=0, end=None, **_): + return TEST_MULTIBYTE_TEXT_DATA.encode("utf-8")[start:end] + + blob.download_as_bytes = mock.Mock(side_effect=read_from_fake_data) + blob.size = None + blob.chunk_size = None + download_kwargs = {"if_metageneration_match": 1} + reader = io.TextIOWrapper(BlobReader(blob, **download_kwargs)) + + # Seek needs the blob size to work and should call reload() if the size + # is not known. Set a mock to initialize the size if reload() is called. + def initialize_size(**_): + blob.size = len(TEST_MULTIBYTE_TEXT_DATA.encode("utf-8")) + + blob.reload = mock.Mock(side_effect=initialize_size) + + # Seek, forcing a blob reload in order to validate the seek doesn't + # exceed the end of the blob. + self.assertEqual(reader.seek(4), 4) + blob.reload.assert_called_once_with(**download_kwargs) + + # Seek to beginning. + self.assertEqual(reader.seek(0), 0) + self.assertEqual(reader.read(), TEST_MULTIBYTE_TEXT_DATA) + self.assertEqual(blob.download_as_bytes.call_count, 1) + + # tell() is an inherited method that uses seek(). + self.assertEqual(reader.tell(), len(TEST_MULTIBYTE_TEXT_DATA.encode("utf-8"))) + + reader.close() + + def test_close(self): + blob = mock.Mock() + reader = BlobReader(blob) + + reader.close() + + with self.assertRaises(ValueError): + reader.read() + + with self.assertRaises(ValueError): + reader.seek(0) + + +class TestBlobWriterText(unittest.TestCase): + def test_write(self): + blob = mock.Mock() + + upload = mock.Mock() + transport = mock.Mock() + + blob._initiate_resumable_upload.return_value = (upload, transport) + + # Create a writer in text mode. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + unwrapped_writer = BlobWriter( + blob, + chunk_size=chunk_size, + text_mode=True, + num_retries=NUM_RETRIES, + content_type=PLAIN_CONTENT_TYPE, + ) + + writer = io.TextIOWrapper(unwrapped_writer) + + # The transmit_next_chunk method must actually consume bytes from the + # sliding buffer for the flush() feature to work properly. + upload.transmit_next_chunk.side_effect = lambda _: unwrapped_writer._buffer.read( + chunk_size + ) + + # Write under chunk_size. This should be buffered and the upload not + # initiated. + writer.write(TEST_MULTIBYTE_TEXT_DATA[0:2]) + blob.initiate_resumable_upload.assert_not_called() + + # Write all data and close. + writer.write(TEST_MULTIBYTE_TEXT_DATA[2:]) + writer.close() + + blob._initiate_resumable_upload.assert_called_once_with( + blob.bucket.client, + unwrapped_writer._buffer, + PLAIN_CONTENT_TYPE, + None, + NUM_RETRIES, + chunk_size=chunk_size, + ) + upload.transmit_next_chunk.assert_called_with(transport) From e6c6cf15ba829c075c055d5619063e6d1e2fb83a Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 22 Feb 2021 13:20:21 -0800 Subject: [PATCH 2/9] coverage --- tests/unit/test_fileio.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_fileio.py b/tests/unit/test_fileio.py index 341805edb..aa191c1be 100644 --- a/tests/unit/test_fileio.py +++ b/tests/unit/test_fileio.py @@ -137,6 +137,11 @@ def initialize_size(**_): self.assertEqual(reader.read(2), TEST_BINARY_DATA[10:12]) self.assertEqual(blob.download_as_bytes.call_count, 1) + # Attempt seek past end of file. Position should be at end of file. + self.assertEqual( + reader.seek(len(TEST_BINARY_DATA) + 100), len(TEST_BINARY_DATA) + ) + # Seek to beginning. The next read will need to download data again. self.assertEqual(reader.seek(0), 0) self.assertEqual(reader.read(4), TEST_BINARY_DATA[0:4]) @@ -243,14 +248,14 @@ def test_write(self): writer.close() self.assertEqual(upload.transmit_next_chunk.call_count, 5) - def flush_fails(self): + def test_flush_fails(self): blob = mock.Mock() writer = BlobWriter(blob) with self.assertRaises(io.UnsupportedOperation): writer.flush() - def seek_fails(self): + def test_seek_fails(self): blob = mock.Mock() writer = BlobWriter(blob) From 12d1f3382ab70a7fb87c96f6f0a0fcdf100bfe31 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Mon, 22 Feb 2021 17:24:12 -0800 Subject: [PATCH 3/9] remove accidentally duplicated tests --- tests/unit/test_fileio.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/unit/test_fileio.py b/tests/unit/test_fileio.py index aa191c1be..dcb6ca06c 100644 --- a/tests/unit/test_fileio.py +++ b/tests/unit/test_fileio.py @@ -317,12 +317,6 @@ def test_rejects_invalid_kwargs(self): with self.assertRaises(ValueError): BlobWriter(blob, invalid_kwarg=1) - def test_flush_fails(self): - blob = mock.Mock() - writer = BlobWriter(blob) - with self.assertRaises(io.UnsupportedOperation): - writer.flush() - class Test_SlidingBuffer(unittest.TestCase): def test_write_and_read(self): From 40245aac1913dfaa131ebbbc794346edd20b016a Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Thu, 18 Mar 2021 06:51:40 -0700 Subject: [PATCH 4/9] docstrings --- google/cloud/storage/blob.py | 58 ++++++++++++++++++++++++++++++++++ google/cloud/storage/fileio.py | 44 +++++++++++++++++++++++++- 2 files changed, 101 insertions(+), 1 deletion(-) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index bfe0a8ea4..8f0db05a4 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -3419,9 +3419,67 @@ def open( newline=None, **kwargs ): + r"""Create a file handler for file-like I/O to or from this blob. + + :type mode: str + :param mode: + A mode string, as per standard Python `open()` semantics. The first + character must be 'r', to open the blob for reading, or 'w' to open + it for writing. The second character, if present, must be 't' for + (unicode) text mode, or 'b' for bytes mode. If the second character + is omitted, text mode is the default. + + :type chunk_size: long + :param chunk_size: + (Optional) For reads, the minimum number of bytes to read at a time. + If fewer bytes than the chunk_size are requested, the remainder is + buffered. For writes, the maximum number of bytes to buffer before + sending data to the server, and the size of each request when data + is sent. Writes are implemented as a "resumable upload", so + chunk_size for writes must be exactly a multiple of 256KiB as with + other resumable uploads. The default is 10 MiB. + + :type encoding: str + :param encoding: + For text mode only, the name of the encoding that the stream will + be decoded or encoded with. If omitted, it defaults to + locale.getpreferredencoding(False). + + :type errors: str + :param errors: + For text mode only, an optional string that specifies how encoding + and decoding errors are to be handled. Pass 'strict' to raise a + ValueError exception if there is an encoding error (the default of + None has the same effect), or pass 'ignore' to ignore errors. (Note + that ignoring encoding errors can lead to data loss.) Other more + rarely-used options are also available; see the Python 'io' module + documentation for 'io.TextIOWrapper' for a complete list. + + :type newline: str + :param newline: + For text mode only, controls how line endings are handled. It can + be None, '', '\n', '\r', and '\r\n'. If None, reads use "universal + newline mode" and writes use the system default. See the Python + 'io' module documentation for 'io.TextIOWrapper' for details. + + :param kwargs: Keyword arguments to pass to the underlying API calls. + For both uploads and downloads, the following arguments are + supported: "if_generation_match", "if_generation_not_match", + "if_metageneration_match", "if_metageneration_not_match", "timeout". + For uploads only, the following additional arguments are supported: + "content_type", "num_retries", "predefined_acl", "checksum". + + :returns: A 'BlobReader' or 'BlobWriter' from + 'google.cloud.storage.fileio', or an 'io.TextIOWrapper' around one + of those classes, depending on the 'mode' argument. + """ if mode == "rb": + if encoding or errors or newline: + raise ValueError("encoding, errors and newline arguments are for text mode only") return BlobReader(self, chunk_size=chunk_size, **kwargs) elif mode == "wb": + if encoding or errors or newline: + raise ValueError("encoding, errors and newline arguments are for text mode only") return BlobWriter(self, chunk_size=chunk_size, **kwargs) elif mode in ("r", "rt"): return TextIOWrapper( diff --git a/google/cloud/storage/fileio.py b/google/cloud/storage/fileio.py index 5678340c9..9044a302e 100644 --- a/google/cloud/storage/fileio.py +++ b/google/cloud/storage/fileio.py @@ -43,6 +43,23 @@ class BlobReader(io.BufferedIOBase): + """A file-like object that reads from a blob. + + :type blob: 'google.cloud.storage.blob.Blob' + :param blob: + The blob to download. + + :type chunk_size: long + :param chunk_size: + (Optional) The minimum number of bytes to read at a time. If fewer + bytes than the chunk_size are requested, the remainder is buffered. + The default is 10MiB. + + :param download_kwargs: Keyword arguments to pass to the underlying API + calls. The following arguments are supported: "if_generation_match", + "if_generation_not_match", "if_metageneration_match", + "if_metageneration_not_match", "timeout". + """ def __init__(self, blob, chunk_size=None, **download_kwargs): """docstring note that download_kwargs also used for reload()""" for kwarg in download_kwargs: @@ -146,6 +163,32 @@ def seekable(self): class BlobWriter(io.BufferedIOBase): + """A file-like object that writes to a blob. + + :type blob: 'google.cloud.storage.blob.Blob' + :param blob: + The blob to which to write. + + :type chunk_size: long + :param chunk_size: + (Optional) The maximum number of bytes to buffer before sending data + to the server, and the size of each request when data is sent. + Writes are implemented as a "resumable upload", so chunk_size for + writes must be exactly a multiple of 256KiB as with other resumable + uploads. The default is 10 MiB. + + :type text_mode: boolean + :param text_mode: + Whether this class is wrapped in 'io.TextIOWrapper'. Toggling this + changes the behavior of flush() to conform to TextIOWrapper's + expectations. + + :param upload_kwargs: Keyword arguments to pass to the underlying API + calls. The following arguments are supported: "if_generation_match", + "if_generation_not_match", "if_metageneration_match", + "if_metageneration_not_match", "timeout", "content_type", + "num_retries", "predefined_acl", "checksum". + """ def __init__(self, blob, chunk_size=None, text_mode=False, **upload_kwargs): for kwarg in upload_kwargs: if kwarg not in VALID_UPLOAD_KWARGS: @@ -267,7 +310,6 @@ class SlidingBuffer(object): This class does not attempt to implement the entire Python I/O interface. """ - def __init__(self): self._buffer = io.BytesIO() self._cursor = 0 From ccf0bb49055be2d61ba20ab92a42b51e7f95ed07 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Fri, 19 Mar 2021 18:39:27 -0700 Subject: [PATCH 5/9] Validate chunk size --- google/cloud/storage/blob.py | 29 ++++++-- google/cloud/storage/fileio.py | 119 ++++++++++++++++++++------------- tests/unit/test_fileio.py | 85 ++++++++++++----------- 3 files changed, 146 insertions(+), 87 deletions(-) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 8f0db05a4..15cb282c9 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -147,7 +147,9 @@ class Blob(_PropertyMixin): :type chunk_size: int :param chunk_size: (Optional) The size of a chunk of data whenever iterating (in bytes). - This must be a multiple of 256 KB per the API specification. + This must be a multiple of 256 KB per the API specification. If not + specified, the chunk_size of the blob itself is used. If that is not + specified, a default value of 40 MB is used. :type encryption_key: bytes :param encryption_key: @@ -3421,6 +3423,9 @@ def open( ): r"""Create a file handler for file-like I/O to or from this blob. + This method can be used as a context manager, just like Python's + built-in 'open()' function. + :type mode: str :param mode: A mode string, as per standard Python `open()` semantics. The first @@ -3437,7 +3442,7 @@ def open( sending data to the server, and the size of each request when data is sent. Writes are implemented as a "resumable upload", so chunk_size for writes must be exactly a multiple of 256KiB as with - other resumable uploads. The default is 10 MiB. + other resumable uploads. The default is 40 MiB. :type encoding: str :param encoding: @@ -3472,14 +3477,30 @@ def open( :returns: A 'BlobReader' or 'BlobWriter' from 'google.cloud.storage.fileio', or an 'io.TextIOWrapper' around one of those classes, depending on the 'mode' argument. + + Example: + Read from a text blob by using open() as context manager. + + >>> from google.cloud import storage + >>> client = storage.Client() + >>> bucket = client.bucket("bucket-name") + + >>> blob = bucket.blob("blob-name.txt") + >>> with blob.open("rt") as f: + >>> print(f.read()) + """ if mode == "rb": if encoding or errors or newline: - raise ValueError("encoding, errors and newline arguments are for text mode only") + raise ValueError( + "encoding, errors and newline arguments are for text mode only" + ) return BlobReader(self, chunk_size=chunk_size, **kwargs) elif mode == "wb": if encoding or errors or newline: - raise ValueError("encoding, errors and newline arguments are for text mode only") + raise ValueError( + "encoding, errors and newline arguments are for text mode only" + ) return BlobWriter(self, chunk_size=chunk_size, **kwargs) elif mode in ("r", "rt"): return TextIOWrapper( diff --git a/google/cloud/storage/fileio.py b/google/cloud/storage/fileio.py index 9044a302e..509ad7db6 100644 --- a/google/cloud/storage/fileio.py +++ b/google/cloud/storage/fileio.py @@ -14,8 +14,9 @@ import io -# Resumable uploads require a chunk size of precisely a multiple of 256KiB. -DEFAULT_CHUNK_SIZE = 10 * 1024 * 1024 # 10 MiB (40 times minimum chunk size). +# Resumable uploads require a chunk size of precisely a multiple of 256 KiB. +CHUNK_SIZE_MULTIPLE = 256 * 1024 # 256 KiB +DEFAULT_CHUNK_SIZE = 40 * 1024 * 1024 # 40 MiB # Valid keyword arguments for download methods, and blob.reload() if needed. # Note: Changes here need to be reflected in the blob.open() docstring. @@ -43,23 +44,24 @@ class BlobReader(io.BufferedIOBase): - """A file-like object that reads from a blob. - - :type blob: 'google.cloud.storage.blob.Blob' - :param blob: - The blob to download. - - :type chunk_size: long - :param chunk_size: - (Optional) The minimum number of bytes to read at a time. If fewer - bytes than the chunk_size are requested, the remainder is buffered. - The default is 10MiB. - - :param download_kwargs: Keyword arguments to pass to the underlying API - calls. The following arguments are supported: "if_generation_match", - "if_generation_not_match", "if_metageneration_match", - "if_metageneration_not_match", "timeout". - """ + """A file-like object that reads from a blob. + + :type blob: 'google.cloud.storage.blob.Blob' + :param blob: + The blob to download. + + :type chunk_size: long + :param chunk_size: + (Optional) The minimum number of bytes to read at a time. If fewer + bytes than the chunk_size are requested, the remainder is buffered. + The default is the chunk_size of the blob, or 40MiB. + + :param download_kwargs: Keyword arguments to pass to the underlying API + calls. The following arguments are supported: "if_generation_match", + "if_generation_not_match", "if_metageneration_match", + "if_metageneration_not_match", "timeout". + """ + def __init__(self, blob, chunk_size=None, **download_kwargs): """docstring note that download_kwargs also used for reload()""" for kwarg in download_kwargs: @@ -163,32 +165,33 @@ def seekable(self): class BlobWriter(io.BufferedIOBase): - """A file-like object that writes to a blob. - - :type blob: 'google.cloud.storage.blob.Blob' - :param blob: - The blob to which to write. - - :type chunk_size: long - :param chunk_size: - (Optional) The maximum number of bytes to buffer before sending data - to the server, and the size of each request when data is sent. - Writes are implemented as a "resumable upload", so chunk_size for - writes must be exactly a multiple of 256KiB as with other resumable - uploads. The default is 10 MiB. - - :type text_mode: boolean - :param text_mode: - Whether this class is wrapped in 'io.TextIOWrapper'. Toggling this - changes the behavior of flush() to conform to TextIOWrapper's - expectations. - - :param upload_kwargs: Keyword arguments to pass to the underlying API - calls. The following arguments are supported: "if_generation_match", - "if_generation_not_match", "if_metageneration_match", - "if_metageneration_not_match", "timeout", "content_type", - "num_retries", "predefined_acl", "checksum". - """ + """A file-like object that writes to a blob. + + :type blob: 'google.cloud.storage.blob.Blob' + :param blob: + The blob to which to write. + + :type chunk_size: long + :param chunk_size: + (Optional) The maximum number of bytes to buffer before sending data + to the server, and the size of each request when data is sent. + Writes are implemented as a "resumable upload", so chunk_size for + writes must be exactly a multiple of 256KiB as with other resumable + uploads. The default is the chunk_size of the blob, or 40 MiB. + + :type text_mode: boolean + :param text_mode: + Whether this class is wrapped in 'io.TextIOWrapper'. Toggling this + changes the behavior of flush() to conform to TextIOWrapper's + expectations. + + :param upload_kwargs: Keyword arguments to pass to the underlying API + calls. The following arguments are supported: "if_generation_match", + "if_generation_not_match", "if_metageneration_match", + "if_metageneration_not_match", "timeout", "content_type", + "num_retries", "predefined_acl", "checksum". + """ + def __init__(self, blob, chunk_size=None, text_mode=False, **upload_kwargs): for kwarg in upload_kwargs: if kwarg not in VALID_UPLOAD_KWARGS: @@ -206,6 +209,31 @@ def __init__(self, blob, chunk_size=None, text_mode=False, **upload_kwargs): self._text_mode = text_mode self._upload_kwargs = upload_kwargs + @property + def _chunk_size(self): + """Get the blob's default chunk size. + + :rtype: int or ``NoneType`` + :returns: The current blob's chunk size, if it is set. + """ + return self.__chunk_size + + @_chunk_size.setter + def _chunk_size(self, value): + """Set the blob's default chunk size. + + :type value: int + :param value: (Optional) The current blob's chunk size, if it is set. + + :raises: :class:`ValueError` if ``value`` is not ``None`` and is not a + multiple of 256 KiB. + """ + if value is not None and value > 0 and value % CHUNK_SIZE_MULTIPLE != 0: + raise ValueError( + "Chunk size must be a multiple of %d." % CHUNK_SIZE_MULTIPLE + ) + self.__chunk_size = value + def write(self, b): self._checkClosed() # Raises ValueError if closed. @@ -310,6 +338,7 @@ class SlidingBuffer(object): This class does not attempt to implement the entire Python I/O interface. """ + def __init__(self): self._buffer = io.BytesIO() self._cursor = 0 diff --git a/tests/unit/test_fileio.py b/tests/unit/test_fileio.py index dcb6ca06c..10db27d7c 100644 --- a/tests/unit/test_fileio.py +++ b/tests/unit/test_fileio.py @@ -187,12 +187,18 @@ def test_rejects_invalid_kwargs(self): class TestBlobWriterBinary(unittest.TestCase): def test_attributes(self): blob = mock.Mock() - blob.chunk_size = 256 + blob.chunk_size = 256 * 1024 writer = BlobWriter(blob) self.assertFalse(writer.seekable()) self.assertFalse(writer.readable()) self.assertTrue(writer.writable()) - self.assertEqual(256, writer._chunk_size) + self.assertEqual(256 * 1024, writer._chunk_size) + + def test_reject_wrong_chunk_size(self): + blob = mock.Mock() + blob.chunk_size = 123 + with self.assertRaises(ValueError): + _ = BlobWriter(blob) def test_write(self): blob = mock.Mock() @@ -202,19 +208,20 @@ def test_write(self): blob._initiate_resumable_upload.return_value = (upload, transport) - # Create a writer with (arbitrary) arguments so we can validate the - # arguments are used. - # It would be normal to use a context manager here, but not doing so - # gives us more control over close() for test purposes. - upload_kwargs = {"if_metageneration_match": 1} - chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. - writer = BlobWriter( - blob, - chunk_size=chunk_size, - num_retries=NUM_RETRIES, - content_type=PLAIN_CONTENT_TYPE, - **upload_kwargs - ) + with mock.patch("google.cloud.storage.fileio.CHUNK_SIZE_MULTIPLE", 1): + # Create a writer with (arbitrary) arguments so we can validate the + # arguments are used. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + upload_kwargs = {"if_metageneration_match": 1} + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + writer = BlobWriter( + blob, + chunk_size=chunk_size, + num_retries=NUM_RETRIES, + content_type=PLAIN_CONTENT_TYPE, + **upload_kwargs + ) # The transmit_next_chunk method must actually consume bytes from the # sliding buffer for the flush() feature to work properly. @@ -249,14 +256,14 @@ def test_write(self): self.assertEqual(upload.transmit_next_chunk.call_count, 5) def test_flush_fails(self): - blob = mock.Mock() + blob = mock.Mock(chunk_size=None) writer = BlobWriter(blob) with self.assertRaises(io.UnsupportedOperation): writer.flush() def test_seek_fails(self): - blob = mock.Mock() + blob = mock.Mock(chunk_size=None) writer = BlobWriter(blob) with self.assertRaises(io.UnsupportedOperation): @@ -270,16 +277,17 @@ def test_conditional_retries(self): blob._initiate_resumable_upload.return_value = (upload, transport) - # Create a writer. - # It would be normal to use a context manager here, but not doing so - # gives us more control over close() for test purposes. - chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. - writer = BlobWriter( - blob, - chunk_size=chunk_size, - num_retries=None, - content_type=PLAIN_CONTENT_TYPE, - ) + with mock.patch("google.cloud.storage.fileio.CHUNK_SIZE_MULTIPLE", 1): + # Create a writer. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + writer = BlobWriter( + blob, + chunk_size=chunk_size, + num_retries=None, + content_type=PLAIN_CONTENT_TYPE, + ) # The transmit_next_chunk method must actually consume bytes from the # sliding buffer for the flush() feature to work properly. @@ -526,17 +534,18 @@ def test_write(self): blob._initiate_resumable_upload.return_value = (upload, transport) - # Create a writer in text mode. - # It would be normal to use a context manager here, but not doing so - # gives us more control over close() for test purposes. - chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. - unwrapped_writer = BlobWriter( - blob, - chunk_size=chunk_size, - text_mode=True, - num_retries=NUM_RETRIES, - content_type=PLAIN_CONTENT_TYPE, - ) + with mock.patch("google.cloud.storage.fileio.CHUNK_SIZE_MULTIPLE", 1): + # Create a writer in text mode. + # It would be normal to use a context manager here, but not doing so + # gives us more control over close() for test purposes. + chunk_size = 8 # Note: Real upload requires a multiple of 256KiB. + unwrapped_writer = BlobWriter( + blob, + chunk_size=chunk_size, + text_mode=True, + num_retries=NUM_RETRIES, + content_type=PLAIN_CONTENT_TYPE, + ) writer = io.TextIOWrapper(unwrapped_writer) From 876d1f602094f819e8b44c19d3d667b791fdc9f3 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Fri, 19 Mar 2021 19:14:16 -0700 Subject: [PATCH 6/9] Enable seek on sliding buffer (backwards only) --- google/cloud/storage/fileio.py | 27 +++++++++++++++++++++++++-- tests/unit/test_blob.py | 4 ++++ tests/unit/test_fileio.py | 24 +++++++++++++++++++++--- 3 files changed, 50 insertions(+), 5 deletions(-) diff --git a/google/cloud/storage/fileio.py b/google/cloud/storage/fileio.py index 509ad7db6..07f2f11a3 100644 --- a/google/cloud/storage/fileio.py +++ b/google/cloud/storage/fileio.py @@ -377,8 +377,31 @@ def tell(self): """Report how many bytes have been read from the buffer in total.""" return self._cursor - def seek(self, pos, whence=None): - raise io.UnsupportedOperation("seek() is not supported for this class.") + def seek(self, pos): + """Seek to a position (backwards only) within the internal buffer. + + This implementation of seek() verifies that the seek destination is + contained in _buffer. It will raise ValueError if the destination byte + has already been purged from the buffer. + + The "whence" argument is not supported in this implementation. + """ + self._checkClosed() # Raises ValueError if closed. + + buffer_initial_pos = self._buffer.tell() + difference = pos - self._cursor + buffer_seek_result = self._buffer.seek(difference, io.SEEK_CUR) + if ( + not buffer_seek_result - buffer_initial_pos == difference + or pos > self._cursor + ): + # The seek did not arrive at the expected byte because the internal + # buffer does not (or no longer) contains the byte. Reset and raise. + self._buffer.seek(buffer_initial_pos) + raise ValueError("Cannot seek() to that value.") + + self._cursor = pos + return self._cursor def __len__(self): """Determine the size of the buffer by seeking to the end.""" diff --git a/tests/unit/test_blob.py b/tests/unit/test_blob.py index e91f8958b..e8573ce21 100644 --- a/tests/unit/test_blob.py +++ b/tests/unit/test_blob.py @@ -4702,6 +4702,10 @@ def test_open(self): with self.assertRaises(NotImplementedError): blob.open("a") + with self.assertRaises(ValueError): + blob.open("rb", encoding="utf-8") + with self.assertRaises(ValueError): + blob.open("wb", encoding="utf-8") class Test__quote(unittest.TestCase): diff --git a/tests/unit/test_fileio.py b/tests/unit/test_fileio.py index 10db27d7c..09fa58f8d 100644 --- a/tests/unit/test_fileio.py +++ b/tests/unit/test_fileio.py @@ -363,10 +363,28 @@ def test_flush(self): self.assertEqual(buff.tell(), len(TEST_BINARY_DATA)) self.assertEqual(len(buff), len(TEST_BINARY_DATA[8:])) - def test_seek_fails(self): + def test_seek(self): buff = SlidingBuffer() - with self.assertRaises(io.UnsupportedOperation): - buff.seek(1) + buff.write(TEST_BINARY_DATA) + + # Try to seek forward. Verify the tell() doesn't change. + with self.assertRaises(ValueError): + pos = buff.tell() + buff.seek(len(TEST_BINARY_DATA) + 1) + self.assertEqual(pos, buff.tell()) + + # Read 8 bytes, test seek backwards, read again, and flush. + self.assertEqual(buff.read(8), TEST_BINARY_DATA[:8]) + buff.seek(0) + self.assertEqual(buff.read(8), TEST_BINARY_DATA[:8]) + buff.flush() + self.assertEqual(buff.tell(), 8) + + # Try to seek to a byte that has already been flushed. + with self.assertRaises(ValueError): + pos = buff.tell() + buff.seek(0) + self.assertEqual(pos, buff.tell()) def test_close(self): buff = SlidingBuffer() From 55fc90a9606ec2b17924a02aff0835e3c7bb5bad Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Wed, 24 Mar 2021 10:28:15 -0700 Subject: [PATCH 7/9] docstrings --- google/cloud/storage/blob.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 15cb282c9..0a990998b 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -3428,7 +3428,7 @@ def open( :type mode: str :param mode: - A mode string, as per standard Python `open()` semantics. The first + (Optional) A mode string, as per standard Python `open()` semantics.The first character must be 'r', to open the blob for reading, or 'w' to open it for writing. The second character, if present, must be 't' for (unicode) text mode, or 'b' for bytes mode. If the second character @@ -3446,13 +3446,13 @@ def open( :type encoding: str :param encoding: - For text mode only, the name of the encoding that the stream will + (Optional) For text mode only, the name of the encoding that the stream will be decoded or encoded with. If omitted, it defaults to locale.getpreferredencoding(False). :type errors: str :param errors: - For text mode only, an optional string that specifies how encoding + (Optional) For text mode only, an optional string that specifies how encoding and decoding errors are to be handled. Pass 'strict' to raise a ValueError exception if there is an encoding error (the default of None has the same effect), or pass 'ignore' to ignore errors. (Note @@ -3462,7 +3462,7 @@ def open( :type newline: str :param newline: - For text mode only, controls how line endings are handled. It can + (Optional) For text mode only, controls how line endings are handled. It can be None, '', '\n', '\r', and '\r\n'. If None, reads use "universal newline mode" and writes use the system default. See the Python 'io' module documentation for 'io.TextIOWrapper' for details. From 1bd04e84d58cdb49d1fc174b84f431566800b23e Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Wed, 24 Mar 2021 11:16:44 -0700 Subject: [PATCH 8/9] docstrings final pass --- google/cloud/storage/blob.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 0a990998b..9e9b53062 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -3426,6 +3426,13 @@ def open( This method can be used as a context manager, just like Python's built-in 'open()' function. + While reading, as with other read methods, if blob.generation is not set + the most recent blob generation will be used. Because the file-like IO + reader downloads progressively in chunks, this could result in data from + multiple versions being mixed together. If this is a concern, use + either bucket.get_blob(), or blob.reload(), which will download the + latest generation number and set it. + :type mode: str :param mode: (Optional) A mode string, as per standard Python `open()` semantics.The first @@ -3485,7 +3492,7 @@ def open( >>> client = storage.Client() >>> bucket = client.bucket("bucket-name") - >>> blob = bucket.blob("blob-name.txt") + >>> blob = bucket.get_blob("blob-name.txt") >>> with blob.open("rt") as f: >>> print(f.read()) From 8cd9ecfc5a433f2b5afe4c08827bc614d586d034 Mon Sep 17 00:00:00 2001 From: Andrew Gorcester Date: Wed, 24 Mar 2021 11:52:07 -0700 Subject: [PATCH 9/9] docstrings final final pass --- google/cloud/storage/blob.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/google/cloud/storage/blob.py b/google/cloud/storage/blob.py index 9e9b53062..52cd9c7d3 100644 --- a/google/cloud/storage/blob.py +++ b/google/cloud/storage/blob.py @@ -3431,7 +3431,8 @@ def open( reader downloads progressively in chunks, this could result in data from multiple versions being mixed together. If this is a concern, use either bucket.get_blob(), or blob.reload(), which will download the - latest generation number and set it. + latest generation number and set it; or, if the generation is known, set + it manually, for instance with bucket.blob(generation=123456). :type mode: str :param mode: @@ -3488,6 +3489,9 @@ def open( Example: Read from a text blob by using open() as context manager. + Using bucket.get_blob() fetches metadata such as the generation, + which prevents race conditions in case the blob is modified. + >>> from google.cloud import storage >>> client = storage.Client() >>> bucket = client.bucket("bucket-name")