Skip to content

Commit 2d5a7b1

Browse files
authored
fix(experimental): no state lookup while opening bidi-write stream (#1636)
fix(experimental): no state lookup while opening bidi-write stream
1 parent 4a609a4 commit 2d5a7b1

File tree

5 files changed

+57
-20
lines changed

5 files changed

+57
-20
lines changed

google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,11 @@ def __init__(
114114
write_handle=self.write_handle,
115115
)
116116
self._is_stream_open: bool = False
117+
# `offset` is the latest size of the object without staleless.
117118
self.offset: Optional[int] = None
119+
# `persisted_size` is the total_bytes persisted in the GCS server.
120+
# Please note: `offset` and `persisted_size` are same when the stream is
121+
# opened.
118122
self.persisted_size: Optional[int] = None
119123

120124
async def state_lookup(self) -> int:
@@ -152,17 +156,17 @@ async def open(self) -> None:
152156
if self.generation is None:
153157
self.generation = self.write_obj_stream.generation_number
154158
self.write_handle = self.write_obj_stream.write_handle
155-
156-
# Update self.persisted_size
157-
_ = await self.state_lookup()
159+
self.persisted_size = self.write_obj_stream.persisted_size
158160

159161
async def append(self, data: bytes) -> None:
160162
"""Appends data to the Appendable object.
161163
162-
This method sends the provided data to the GCS server in chunks. It
163-
maintains an internal threshold `_MAX_BUFFER_SIZE_BYTES` and will
164-
automatically flush the data to make it visible to readers when that
165-
threshold has reached.
164+
calling `self.append` will append bytes at the end of the current size
165+
ie. `self.offset` bytes relative to the begining of the object.
166+
167+
This method sends the provided `data` to the GCS server in chunks.
168+
and persists data in GCS at every `_MAX_BUFFER_SIZE_BYTES` bytes by
169+
calling `self.simple_flush`.
166170
167171
:type data: bytes
168172
:param data: The bytes to append to the object.

google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,8 @@ async def download_ranges(
216216
217217
:type read_ranges: List[Tuple[int, int, "BytesIO"]]
218218
:param read_ranges: A list of tuples, where each tuple represents a
219-
byte range (start_byte, bytes_to_read, writeable_buffer). Buffer has
219+
combintaion of byte_range and writeable buffer in format -
220+
(`start_byte`, `bytes_to_read`, `writeable_buffer`). Buffer has
220221
to be provided by the user, and user has to make sure appropriate
221222
memory is available in the application to avoid out-of-memory crash.
222223

google/cloud/storage/_experimental/asyncio/async_write_object_stream.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ async def open(self) -> None:
117117
object=self.object_name,
118118
generation=self.generation_number,
119119
),
120-
state_lookup=True,
121120
)
122121

123122
self.socket_like_rpc = AsyncBidiRpc(
@@ -136,11 +135,17 @@ async def open(self) -> None:
136135
raise ValueError(
137136
"Failed to obtain object generation after opening the stream"
138137
)
139-
self.generation_number = response.resource.generation
140138

141139
if not response.write_handle:
142140
raise ValueError("Failed to obtain write_handle after opening the stream")
143141

142+
if not response.resource.size:
143+
# Appending to a 0 byte appendable object.
144+
self.persisted_size = 0
145+
else:
146+
self.persisted_size = response.resource.size
147+
148+
self.generation_number = response.resource.generation
144149
self.write_handle = response.write_handle
145150

146151
async def close(self) -> None:

tests/unit/asyncio/test_async_appendable_object_writer.py

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -133,15 +133,10 @@ async def test_open_appendable_object_writer(mock_write_object_stream, mock_clie
133133
writer = AsyncAppendableObjectWriter(mock_client, BUCKET, OBJECT)
134134
mock_stream = mock_write_object_stream.return_value
135135
mock_stream.open = mock.AsyncMock()
136-
mock_stream.send = mock.AsyncMock()
137-
mock_stream.recv = mock.AsyncMock()
138-
139-
mock_state_response = mock.MagicMock()
140-
mock_state_response.persisted_size = 1024
141-
mock_stream.recv.return_value = mock_state_response
142136

143137
mock_stream.generation_number = GENERATION
144138
mock_stream.write_handle = WRITE_HANDLE
139+
mock_stream.persisted_size = 0
145140

146141
# Act
147142
await writer.open()
@@ -151,11 +146,37 @@ async def test_open_appendable_object_writer(mock_write_object_stream, mock_clie
151146
assert writer._is_stream_open
152147
assert writer.generation == GENERATION
153148
assert writer.write_handle == WRITE_HANDLE
149+
assert writer.persisted_size == 0
154150

155-
expected_request = _storage_v2.BidiWriteObjectRequest(state_lookup=True)
156-
mock_stream.send.assert_awaited_once_with(expected_request)
157-
mock_stream.recv.assert_awaited_once()
158-
assert writer.persisted_size == 1024
151+
152+
@pytest.mark.asyncio
153+
@mock.patch(
154+
"google.cloud.storage._experimental.asyncio.async_appendable_object_writer._AsyncWriteObjectStream"
155+
)
156+
async def test_open_appendable_object_writer_existing_object(
157+
mock_write_object_stream, mock_client
158+
):
159+
"""Test the open method."""
160+
# Arrange
161+
writer = AsyncAppendableObjectWriter(
162+
mock_client, BUCKET, OBJECT, generation=GENERATION
163+
)
164+
mock_stream = mock_write_object_stream.return_value
165+
mock_stream.open = mock.AsyncMock()
166+
167+
mock_stream.generation_number = GENERATION
168+
mock_stream.write_handle = WRITE_HANDLE
169+
mock_stream.persisted_size = PERSISTED_SIZE
170+
171+
# Act
172+
await writer.open()
173+
174+
# Assert
175+
mock_stream.open.assert_awaited_once()
176+
assert writer._is_stream_open
177+
assert writer.generation == GENERATION
178+
assert writer.write_handle == WRITE_HANDLE
179+
assert writer.persisted_size == PERSISTED_SIZE
159180

160181

161182
@pytest.mark.asyncio

tests/unit/asyncio/test_async_write_object_stream.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ async def instantiate_write_obj_stream(mock_client, mock_cls_async_bidi_rpc, ope
5555
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
5656
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
5757
mock_response.resource.generation = GENERATION
58+
mock_response.resource.size = 0
5859
mock_response.write_handle = WRITE_HANDLE
5960
socket_like_rpc.recv = AsyncMock(return_value=mock_response)
6061

@@ -129,6 +130,7 @@ async def test_open_for_new_object(mock_async_bidi_rpc, mock_client):
129130
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
130131
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
131132
mock_response.resource.generation = GENERATION
133+
mock_response.resource.size = 0
132134
mock_response.write_handle = WRITE_HANDLE
133135
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)
134136

@@ -143,6 +145,7 @@ async def test_open_for_new_object(mock_async_bidi_rpc, mock_client):
143145
socket_like_rpc.recv.assert_called_once()
144146
assert stream.generation_number == GENERATION
145147
assert stream.write_handle == WRITE_HANDLE
148+
assert stream.persisted_size == 0
146149

147150

148151
@pytest.mark.asyncio
@@ -158,6 +161,7 @@ async def test_open_for_existing_object(mock_async_bidi_rpc, mock_client):
158161

159162
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
160163
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
164+
mock_response.resource.size = 1024
161165
mock_response.resource.generation = GENERATION
162166
mock_response.write_handle = WRITE_HANDLE
163167
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)
@@ -175,6 +179,7 @@ async def test_open_for_existing_object(mock_async_bidi_rpc, mock_client):
175179
socket_like_rpc.recv.assert_called_once()
176180
assert stream.generation_number == GENERATION
177181
assert stream.write_handle == WRITE_HANDLE
182+
assert stream.persisted_size == 1024
178183

179184

180185
@pytest.mark.asyncio
@@ -191,6 +196,7 @@ async def test_open_when_already_open_raises_error(mock_async_bidi_rpc, mock_cli
191196
mock_response = mock.MagicMock(spec=_storage_v2.BidiWriteObjectResponse)
192197
mock_response.resource = mock.MagicMock(spec=_storage_v2.Object)
193198
mock_response.resource.generation = GENERATION
199+
mock_response.resource.size = 0
194200
mock_response.write_handle = WRITE_HANDLE
195201
socket_like_rpc.recv = mock.AsyncMock(return_value=mock_response)
196202

0 commit comments

Comments
 (0)