Skip to content

Commit 6dc711d

Browse files
authored
fix: add system test for opening with read_handle (#1672)
fix: add system test for opening with read_handle
1 parent 0e2961b commit 6dc711d

File tree

4 files changed

+120
-22
lines changed

4 files changed

+120
-22
lines changed

google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -193,15 +193,13 @@ async def open(self) -> None:
193193
"""
194194
if self._is_stream_open:
195195
raise ValueError("Underlying bidi-gRPC stream is already open")
196-
197-
if self.read_obj_str is None:
198-
self.read_obj_str = _AsyncReadObjectStream(
199-
client=self.client,
200-
bucket_name=self.bucket_name,
201-
object_name=self.object_name,
202-
generation_number=self.generation_number,
203-
read_handle=self.read_handle,
204-
)
196+
self.read_obj_str = _AsyncReadObjectStream(
197+
client=self.client,
198+
bucket_name=self.bucket_name,
199+
object_name=self.object_name,
200+
generation_number=self.generation_number,
201+
read_handle=self.read_handle,
202+
)
205203
await self.read_obj_str.open()
206204
self._is_stream_open = True
207205
if self.generation_number is None:
@@ -342,6 +340,7 @@ async def close(self):
342340
if not self._is_stream_open:
343341
raise ValueError("Underlying bidi-gRPC stream is not open")
344342
await self.read_obj_str.close()
343+
self.read_obj_str = None
345344
self._is_stream_open = False
346345

347346
@property

google/cloud/storage/_experimental/asyncio/async_read_object_stream.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,6 @@ def __init__(
8484
self.rpc = self.client._client._transport._wrapped_methods[
8585
self.client._client._transport.bidi_read_object
8686
]
87-
self.first_bidi_read_req = _storage_v2.BidiReadObjectRequest(
88-
read_object_spec=_storage_v2.BidiReadObjectSpec(
89-
bucket=self._full_bucket_name, object=object_name
90-
),
91-
)
9287
self.metadata = (("x-goog-request-params", f"bucket={self._full_bucket_name}"),)
9388
self.socket_like_rpc: Optional[AsyncBidiRpc] = None
9489
self._is_stream_open: bool = False
@@ -102,14 +97,21 @@ async def open(self) -> None:
10297
"""
10398
if self._is_stream_open:
10499
raise ValueError("Stream is already open")
100+
self.first_bidi_read_req = _storage_v2.BidiReadObjectRequest(
101+
read_object_spec=_storage_v2.BidiReadObjectSpec(
102+
bucket=self._full_bucket_name,
103+
object=self.object_name,
104+
read_handle=self.read_handle,
105+
),
106+
)
105107
self.socket_like_rpc = AsyncBidiRpc(
106108
self.rpc, initial_request=self.first_bidi_read_req, metadata=self.metadata
107109
)
108110
await self.socket_like_rpc.open() # this is actually 1 send
109111
response = await self.socket_like_rpc.recv()
110112
# populated only in the first response of bidi-stream and when opened
111113
# without using `read_handle`
112-
if response.metadata:
114+
if hasattr(response, "metadata") and response.metadata:
113115
if self.generation_number is None:
114116
self.generation_number = response.metadata.generation
115117
# update persisted size

tests/system/test_zonal.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# py standard imports
2+
import asyncio
23
import os
34
import uuid
45
from io import BytesIO
@@ -27,6 +28,36 @@
2728
_BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object"
2829

2930

31+
async def write_one_appendable_object(
32+
bucket_name: str,
33+
object_name: str,
34+
data: bytes,
35+
) -> None:
36+
"""Helper to write an appendable object."""
37+
grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client
38+
writer = AsyncAppendableObjectWriter(grpc_client, bucket_name, object_name)
39+
await writer.open()
40+
await writer.append(data)
41+
await writer.close()
42+
43+
44+
@pytest.fixture(scope="function")
45+
def appendable_object(storage_client, blobs_to_delete):
46+
"""Fixture to create and cleanup an appendable object."""
47+
object_name = f"appendable_obj_for_mrd-{str(uuid.uuid4())[:4]}"
48+
asyncio.run(
49+
write_one_appendable_object(
50+
_ZONAL_BUCKET,
51+
object_name,
52+
_BYTES_TO_UPLOAD,
53+
)
54+
)
55+
yield object_name
56+
57+
# Clean up; use json client (i.e. `storage_client` fixture) to delete.
58+
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
59+
60+
3061
@pytest.mark.asyncio
3162
@pytest.mark.parametrize(
3263
"attempt_direct_path",
@@ -85,3 +116,25 @@ async def test_read_unfinalized_appendable_object(storage_client, blobs_to_delet
85116

86117
# Clean up; use json client (i.e. `storage_client` fixture) to delete.
87118
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
119+
120+
121+
@pytest.mark.asyncio
122+
async def test_mrd_open_with_read_handle(appendable_object):
123+
grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client
124+
125+
mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, appendable_object)
126+
await mrd.open()
127+
read_handle = mrd.read_handle
128+
await mrd.close()
129+
130+
# Open a new MRD using the `read_handle` obtained above
131+
new_mrd = AsyncMultiRangeDownloader(
132+
grpc_client, _ZONAL_BUCKET, appendable_object, read_handle=read_handle
133+
)
134+
await new_mrd.open()
135+
# persisted_size not set when opened with read_handle
136+
assert new_mrd.persisted_size is None
137+
buffer = BytesIO()
138+
await new_mrd.download_ranges([(0, 0, buffer)])
139+
await new_mrd.close()
140+
assert buffer.getvalue() == _BYTES_TO_UPLOAD

tests/unit/asyncio/test_async_read_object_stream.py

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
_TEST_GENERATION_NUMBER = 12345
2828
_TEST_OBJECT_SIZE = 1024 * 1024 # 1 MiB
2929
_TEST_READ_HANDLE = b"test-read-handle"
30+
_TEST_READ_HANDLE_NEW = b"test-read-handle-new"
3031

3132

3233
async def instantiate_read_obj_stream(mock_client, mock_cls_async_bidi_rpc, open=True):
@@ -54,6 +55,30 @@ async def instantiate_read_obj_stream(mock_client, mock_cls_async_bidi_rpc, open
5455
return read_obj_stream
5556

5657

58+
async def instantiate_read_obj_stream_with_read_handle(
59+
mock_client, mock_cls_async_bidi_rpc, open=True
60+
):
61+
"""Helper to create an instance of _AsyncReadObjectStream and open it by default."""
62+
socket_like_rpc = AsyncMock()
63+
mock_cls_async_bidi_rpc.return_value = socket_like_rpc
64+
socket_like_rpc.open = AsyncMock()
65+
66+
recv_response = mock.MagicMock(spec=_storage_v2.BidiReadObjectResponse)
67+
recv_response.read_handle = _TEST_READ_HANDLE_NEW
68+
socket_like_rpc.recv = AsyncMock(return_value=recv_response)
69+
70+
read_obj_stream = _AsyncReadObjectStream(
71+
client=mock_client,
72+
bucket_name=_TEST_BUCKET_NAME,
73+
object_name=_TEST_OBJECT_NAME,
74+
)
75+
76+
if open:
77+
await read_obj_stream.open()
78+
79+
return read_obj_stream
80+
81+
5782
@mock.patch(
5883
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
5984
)
@@ -67,12 +92,6 @@ def test_init_with_bucket_object_generation(mock_client, mock_async_bidi_rpc):
6792
mock_client._client._transport._wrapped_methods = {
6893
"bidi_read_object_rpc": rpc_sentinel,
6994
}
70-
full_bucket_name = f"projects/_/buckets/{_TEST_BUCKET_NAME}"
71-
first_bidi_read_req = _storage_v2.BidiReadObjectRequest(
72-
read_object_spec=_storage_v2.BidiReadObjectSpec(
73-
bucket=full_bucket_name, object=_TEST_OBJECT_NAME
74-
),
75-
)
7695

7796
# Act
7897
read_obj_stream = _AsyncReadObjectStream(
@@ -88,7 +107,6 @@ def test_init_with_bucket_object_generation(mock_client, mock_async_bidi_rpc):
88107
assert read_obj_stream.object_name == _TEST_OBJECT_NAME
89108
assert read_obj_stream.generation_number == _TEST_GENERATION_NUMBER
90109
assert read_obj_stream.read_handle == _TEST_READ_HANDLE
91-
assert read_obj_stream.first_bidi_read_req == first_bidi_read_req
92110
assert read_obj_stream.rpc == rpc_sentinel
93111

94112

@@ -118,6 +136,32 @@ async def test_open(mock_client, mock_cls_async_bidi_rpc):
118136
assert read_obj_stream.is_stream_open
119137

120138

139+
@mock.patch(
140+
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
141+
)
142+
@mock.patch(
143+
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
144+
)
145+
@pytest.mark.asyncio
146+
async def test_open_with_read_handle(mock_client, mock_cls_async_bidi_rpc):
147+
# arrange
148+
read_obj_stream = await instantiate_read_obj_stream_with_read_handle(
149+
mock_client, mock_cls_async_bidi_rpc, open=False
150+
)
151+
152+
# act
153+
await read_obj_stream.open()
154+
155+
# assert
156+
read_obj_stream.socket_like_rpc.open.assert_called_once()
157+
read_obj_stream.socket_like_rpc.recv.assert_called_once()
158+
159+
assert read_obj_stream.generation_number is None
160+
assert read_obj_stream.persisted_size is None
161+
assert read_obj_stream.read_handle == _TEST_READ_HANDLE_NEW
162+
assert read_obj_stream.is_stream_open
163+
164+
121165
@mock.patch(
122166
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
123167
)

0 commit comments

Comments
 (0)