diff --git a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py index 90dabc829..5c3e54d71 100644 --- a/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py +++ b/google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py @@ -21,7 +21,7 @@ if you want to use these Rapid Storage APIs. """ -from typing import Optional +from typing import Optional, Union from google.cloud import _storage_v2 from google.cloud.storage._experimental.asyncio.async_grpc_client import ( AsyncGrpcClient, @@ -31,6 +31,10 @@ ) +_MAX_CHUNK_SIZE_BYTES = 2 * 1024 * 1024 # 2 MiB +_MAX_BUFFER_SIZE_BYTES = 16 * 1024 * 1024 # 16 MiB + + class AsyncAppendableObjectWriter: """Class for appending data to a GCS Appendable Object asynchronously.""" @@ -118,7 +122,13 @@ async def state_lookup(self) -> int: :rtype: int :returns: persisted size. + + :raises ValueError: If the stream is not open (i.e., `open()` has not + been called). """ + if not self._is_stream_open: + raise ValueError("Stream is not open. Call open() before state_lookup().") + await self.write_obj_stream.send( _storage_v2.BidiWriteObjectRequest( state_lookup=True, @@ -129,7 +139,11 @@ async def state_lookup(self) -> int: return self.persisted_size async def open(self) -> None: - """Opens the underlying bidi-gRPC stream.""" + """Opens the underlying bidi-gRPC stream. + + :raises ValueError: If the stream is already open. + + """ if self._is_stream_open: raise ValueError("Underlying bidi-gRPC stream is already open") @@ -142,15 +156,65 @@ async def open(self) -> None: # Update self.persisted_size _ = await self.state_lookup() - async def append(self, data: bytes): - raise NotImplementedError("append is not implemented yet.") + async def append(self, data: bytes) -> None: + """Appends data to the Appendable object. + + This method sends the provided data to the GCS server in chunks. It + maintains an internal threshold `_MAX_BUFFER_SIZE_BYTES` and will + automatically flush the data to make it visible to readers when that + threshold has reached. + + :type data: bytes + :param data: The bytes to append to the object. + + :rtype: None + + :raises ValueError: If the stream is not open (i.e., `open()` has not + been called). + """ + + if not self._is_stream_open: + raise ValueError("Stream is not open. Call open() before append().") + total_bytes = len(data) + if total_bytes == 0: + # TODO: add warning. + return + if self.offset is None: + assert self.persisted_size is not None + self.offset = self.persisted_size + + start_idx = 0 + bytes_to_flush = 0 + while start_idx < total_bytes: + end_idx = min(start_idx + _MAX_CHUNK_SIZE_BYTES, total_bytes) + await self.write_obj_stream.send( + _storage_v2.BidiWriteObjectRequest( + write_offset=self.offset, + checksummed_data=_storage_v2.ChecksummedData( + content=data[start_idx:end_idx] + ), + ) + ) + chunk_size = end_idx - start_idx + self.offset += chunk_size + bytes_to_flush += chunk_size + if bytes_to_flush >= _MAX_BUFFER_SIZE_BYTES: + await self.flush() + bytes_to_flush = 0 + start_idx = end_idx async def flush(self) -> int: """Flushes the data to the server. :rtype: int :returns: The persisted size after flush. + + :raises ValueError: If the stream is not open (i.e., `open()` has not + been called). """ + if not self._is_stream_open: + raise ValueError("Stream is not open. Call open() before flush().") + await self.write_obj_stream.send( _storage_v2.BidiWriteObjectRequest( flush=True, @@ -162,14 +226,34 @@ async def flush(self) -> int: self.offset = self.persisted_size return self.persisted_size - async def close(self, finalize_on_close=False) -> int: - """Returns persisted_size""" + async def close(self, finalize_on_close=False) -> Union[int, _storage_v2.Object]: + """Closes the underlying bidi-gRPC stream. + + :type finalize_on_close: bool + :param finalize_on_close: Finalizes the Appendable Object. No more data + can be appended. + + rtype: Union[int, _storage_v2.Object] + returns: Updated `self.persisted_size` by default after closing the + bidi-gRPC stream. However, if `finalize_on_close=True` is passed, + returns the finalized object resource. + + :raises ValueError: If the stream is not open (i.e., `open()` has not + been called). + + """ + if not self._is_stream_open: + raise ValueError("Stream is not open. Call open() before close().") + if finalize_on_close: await self.finalize() + else: + await self.flush() + await self.write_obj_stream.close() - await self.write_obj_stream.close() self._is_stream_open = False self.offset = None + return self.object_resource if finalize_on_close else self.persisted_size async def finalize(self) -> _storage_v2.Object: """Finalizes the Appendable Object. @@ -178,12 +262,20 @@ async def finalize(self) -> _storage_v2.Object: rtype: google.cloud.storage_v2.types.Object returns: The finalized object resource. + + :raises ValueError: If the stream is not open (i.e., `open()` has not + been called). """ + if not self._is_stream_open: + raise ValueError("Stream is not open. Call open() before finalize().") + await self.write_obj_stream.send( _storage_v2.BidiWriteObjectRequest(finish_write=True) ) response = await self.write_obj_stream.recv() self.object_resource = response.resource + self.persisted_size = self.object_resource.size + return self.object_resource # helper methods. async def append_from_string(self, data: str): diff --git a/tests/unit/asyncio/test_async_appendable_object_writer.py b/tests/unit/asyncio/test_async_appendable_object_writer.py index 18f7a8826..902ed5f8b 100644 --- a/tests/unit/asyncio/test_async_appendable_object_writer.py +++ b/tests/unit/asyncio/test_async_appendable_object_writer.py @@ -93,6 +93,7 @@ async def test_state_lookup(mock_write_object_stream, mock_client): """Test state_lookup method.""" # Arrange writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True mock_stream = mock_write_object_stream.return_value mock_stream.send = mock.AsyncMock() mock_stream.recv = mock.AsyncMock( @@ -111,6 +112,17 @@ async def test_state_lookup(mock_write_object_stream, mock_client): assert response == PERSISTED_SIZE +@pytest.mark.asyncio +async def test_state_lookup_without_open_raises_value_error(mock_client): + """Test that state_lookup raises an error if the stream is not open.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, + match="Stream is not open. Call open\\(\\) before state_lookup\\(\\).", + ): + await writer.state_lookup() + + @pytest.mark.asyncio @mock.patch( "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" @@ -168,9 +180,6 @@ async def test_unimplemented_methods_raise_error(mock_client): """Test that all currently unimplemented methods raise NotImplementedError.""" writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) - with pytest.raises(NotImplementedError): - await writer.append(b"data") - with pytest.raises(NotImplementedError): await writer.append_from_string("data") @@ -188,6 +197,7 @@ async def test_unimplemented_methods_raise_error(mock_client): async def test_flush(mock_write_object_stream, mock_client): """Test that flush sends the correct request and updates state.""" writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True mock_stream = mock_write_object_stream.return_value mock_stream.send = mock.AsyncMock() mock_stream.recv = mock.AsyncMock( @@ -204,46 +214,79 @@ async def test_flush(mock_write_object_stream, mock_client): assert persisted_size == 1024 +@pytest.mark.asyncio +async def test_flush_without_open_raises_value_error(mock_client): + """Test that flush raises an error if the stream is not open.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, match="Stream is not open. Call open\\(\\) before flush\\(\\)." + ): + await writer.flush() + + @pytest.mark.asyncio @mock.patch( "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" ) -async def test_close_without_finalize(mock_write_object_stream, mock_client): - """Test close without finalizing.""" +async def test_close(mock_write_object_stream, mock_client): writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) writer._is_stream_open = True writer.offset = 1024 mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + mock_stream.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse(persisted_size=1024) + ) mock_stream.close = mock.AsyncMock() writer.finalize = mock.AsyncMock() - await writer.close(finalize_on_close=False) + persisted_size = await writer.close() writer.finalize.assert_not_awaited() mock_stream.close.assert_awaited_once() - assert not writer._is_stream_open assert writer.offset is None + assert persisted_size == 1024 + assert not writer._is_stream_open + + +@pytest.mark.asyncio +async def test_close_without_open_raises_value_error(mock_client): + """Test that close raises an error if the stream is not open.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, match="Stream is not open. Call open\\(\\) before close\\(\\)." + ): + await writer.close() @pytest.mark.asyncio @mock.patch( "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" ) -async def test_close_with_finalize(mock_write_object_stream, mock_client): +async def test_finalize_on_close(mock_write_object_stream, mock_client): """Test close with finalizing.""" + # Arrange + mock_resource = _storage_v2.Object(name=OBJECT, bucket=BUCKET, size=2048) writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) writer._is_stream_open = True writer.offset = 1024 mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + mock_stream.recv = mock.AsyncMock( + return_value=_storage_v2.BidiWriteObjectResponse(resource=mock_resource) + ) mock_stream.close = mock.AsyncMock() - writer.finalize = mock.AsyncMock() - await writer.close(finalize_on_close=True) + # Act + result = await writer.close(finalize_on_close=True) - writer.finalize.assert_awaited_once() - mock_stream.close.assert_awaited_once() + # Assert + mock_stream.close.assert_not_awaited() # Based on new implementation assert not writer._is_stream_open assert writer.offset is None + assert writer.object_resource == mock_resource + assert writer.persisted_size == 2048 + assert result == mock_resource @pytest.mark.asyncio @@ -253,17 +296,171 @@ async def test_close_with_finalize(mock_write_object_stream, mock_client): async def test_finalize(mock_write_object_stream, mock_client): """Test that finalize sends the correct request and updates state.""" writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + mock_resource = _storage_v2.Object(name=OBJECT, bucket=BUCKET, size=123) mock_stream = mock_write_object_stream.return_value mock_stream.send = mock.AsyncMock() - mock_resource = _storage_v2.Object(name=OBJECT, bucket=BUCKET) mock_stream.recv = mock.AsyncMock( return_value=_storage_v2.BidiWriteObjectResponse(resource=mock_resource) ) - await writer.finalize() + gcs_object = await writer.finalize() mock_stream.send.assert_awaited_once_with( _storage_v2.BidiWriteObjectRequest(finish_write=True) ) mock_stream.recv.assert_awaited_once() assert writer.object_resource == mock_resource + assert writer.persisted_size == 123 + assert gcs_object == mock_resource + + +@pytest.mark.asyncio +async def test_finalize_without_open_raises_value_error(mock_client): + """Test that finalize raises an error if the stream is not open.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, match="Stream is not open. Call open\\(\\) before finalize\\(\\)." + ): + await writer.finalize() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_append_raises_error_if_not_open(mock_write_object_stream, mock_client): + """Test that append raises an error if the stream is not open.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + with pytest.raises( + ValueError, match="Stream is not open. Call open\\(\\) before append\\(\\)." + ): + await writer.append(b"some data") + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_append_with_empty_data(mock_write_object_stream, mock_client): + """Test that append does nothing if data is empty.""" + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + + await writer.append(b"") + + mock_stream.send.assert_not_awaited() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_append_sends_data_in_chunks(mock_write_object_stream, mock_client): + """Test that append sends data in chunks and updates offset.""" + from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + _MAX_CHUNK_SIZE_BYTES, + ) + + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + writer.persisted_size = 100 + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + writer.flush = mock.AsyncMock() + + data = b"a" * (_MAX_CHUNK_SIZE_BYTES + 1) + await writer.append(data) + + assert mock_stream.send.await_count == 2 + first_call = mock_stream.send.await_args_list[0] + second_call = mock_stream.send.await_args_list[1] + + # First chunk + assert first_call[0][0].write_offset == 100 + assert len(first_call[0][0].checksummed_data.content) == _MAX_CHUNK_SIZE_BYTES + + # Second chunk + assert second_call[0][0].write_offset == 100 + _MAX_CHUNK_SIZE_BYTES + assert len(second_call[0][0].checksummed_data.content) == 1 + + assert writer.offset == 100 + len(data) + writer.flush.assert_not_awaited() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_append_flushes_when_buffer_is_full( + mock_write_object_stream, mock_client +): + """Test that append flushes the stream when the buffer size is reached.""" + from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + _MAX_BUFFER_SIZE_BYTES, + ) + + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + writer.persisted_size = 0 + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + writer.flush = mock.AsyncMock() + + data = b"a" * _MAX_BUFFER_SIZE_BYTES + await writer.append(data) + + writer.flush.assert_awaited_once() + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_append_handles_large_data(mock_write_object_stream, mock_client): + """Test that append handles data larger than the buffer size.""" + from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + _MAX_BUFFER_SIZE_BYTES, + ) + + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + writer.persisted_size = 0 + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + writer.flush = mock.AsyncMock() + + data = b"a" * (_MAX_BUFFER_SIZE_BYTES * 2 + 1) + await writer.append(data) + + assert writer.flush.await_count == 2 + + +@pytest.mark.asyncio +@mock.patch( + "google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream" +) +async def test_append_data_two_times(mock_write_object_stream, mock_client): + """Test that append sends data correctly when called multiple times.""" + from google.cloud.storage._experimental.asyncio.async_appendable_object_writer import ( + _MAX_CHUNK_SIZE_BYTES, + ) + + writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT) + writer._is_stream_open = True + writer.persisted_size = 0 + mock_stream = mock_write_object_stream.return_value + mock_stream.send = mock.AsyncMock() + writer.flush = mock.AsyncMock() + + data1 = b"a" * (_MAX_CHUNK_SIZE_BYTES + 10) + await writer.append(data1) + + data2 = b"b" * (_MAX_CHUNK_SIZE_BYTES + 20) + await writer.append(data2) + + total_data_length = len(data1) + len(data2) + assert writer.offset == total_data_length + assert writer.flush.await_count == 0