Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,28 @@ async def append(self, data: bytes) -> None:
self.offset += chunk_size
bytes_to_flush += chunk_size
if bytes_to_flush >= _MAX_BUFFER_SIZE_BYTES:
await self.flush()
await self.simple_flush()
bytes_to_flush = 0
start_idx = end_idx

async def simple_flush(self) -> None:
"""Flushes the data to the server.
Please note: Unlike `flush` it does not do `state_lookup`

:rtype: None

:raises ValueError: If the stream is not open (i.e., `open()` has not
been called).
"""
if not self._is_stream_open:
raise ValueError("Stream is not open. Call open() before simple_flush().")

await self.write_obj_stream.send(
_storage_v2.BidiWriteObjectRequest(
flush=True,
)
)

async def flush(self) -> int:
"""Flushes the data to the server.

Expand Down
48 changes: 40 additions & 8 deletions tests/unit/asyncio/test_async_appendable_object_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,38 @@ async def test_flush_without_open_raises_value_error(mock_client):
await writer.flush()


@pytest.mark.asyncio
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
)
async def test_simple_flush(mock_write_object_stream, mock_client):
"""Test that flush sends the correct request and updates state."""
# Arrange
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
writer._is_stream_open = True
mock_stream = mock_write_object_stream.return_value
mock_stream.send = mock.AsyncMock()

# Act
await writer.simple_flush()

# Assert
mock_stream.send.assert_awaited_once_with(
_storage_v2.BidiWriteObjectRequest(flush=True)
)


@pytest.mark.asyncio
async def test_simple_flush_without_open_raises_value_error(mock_client):
"""Test that flush raises an error if the stream is not open."""
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
with pytest.raises(
ValueError,
match="Stream is not open. Call open\\(\\) before simple_flush\\(\\).",
):
await writer.simple_flush()


@pytest.mark.asyncio
@mock.patch(
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
Expand Down Expand Up @@ -369,7 +401,7 @@ async def test_append_sends_data_in_chunks(mock_write_object_stream, mock_client
writer.persisted_size = 100
mock_stream = mock_write_object_stream.return_value
mock_stream.send = mock.AsyncMock()
writer.flush = mock.AsyncMock()
writer.simple_flush = mock.AsyncMock()

data = b"a" * (_MAX_CHUNK_SIZE_BYTES + 1)
await writer.append(data)
Expand All @@ -387,7 +419,7 @@ async def test_append_sends_data_in_chunks(mock_write_object_stream, mock_client
assert len(second_call[0][0].checksummed_data.content) == 1

assert writer.offset == 100 + len(data)
writer.flush.assert_not_awaited()
writer.simple_flush.assert_not_awaited()


@pytest.mark.asyncio
Expand All @@ -407,12 +439,12 @@ async def test_append_flushes_when_buffer_is_full(
writer.persisted_size = 0
mock_stream = mock_write_object_stream.return_value
mock_stream.send = mock.AsyncMock()
writer.flush = mock.AsyncMock()
writer.simple_flush = mock.AsyncMock()

data = b"a" * _MAX_BUFFER_SIZE_BYTES
await writer.append(data)

writer.flush.assert_awaited_once()
writer.simple_flush.assert_awaited_once()


@pytest.mark.asyncio
Expand All @@ -430,12 +462,12 @@ async def test_append_handles_large_data(mock_write_object_stream, mock_client):
writer.persisted_size = 0
mock_stream = mock_write_object_stream.return_value
mock_stream.send = mock.AsyncMock()
writer.flush = mock.AsyncMock()
writer.simple_flush = mock.AsyncMock()

data = b"a" * (_MAX_BUFFER_SIZE_BYTES * 2 + 1)
await writer.append(data)

assert writer.flush.await_count == 2
assert writer.simple_flush.await_count == 2


@pytest.mark.asyncio
Expand All @@ -453,7 +485,7 @@ async def test_append_data_two_times(mock_write_object_stream, mock_client):
writer.persisted_size = 0
mock_stream = mock_write_object_stream.return_value
mock_stream.send = mock.AsyncMock()
writer.flush = mock.AsyncMock()
writer.simple_flush = mock.AsyncMock()

data1 = b"a" * (_MAX_CHUNK_SIZE_BYTES + 10)
await writer.append(data1)
Expand All @@ -463,4 +495,4 @@ async def test_append_data_two_times(mock_write_object_stream, mock_client):

total_data_length = len(data1) + len(data2)
assert writer.offset == total_data_length
assert writer.flush.await_count == 0
assert writer.simple_flush.await_count == 0