Skip to content

Commit 0e2961b

Browse files
authored
feat: expose persisted size in mrd (#1671)
feat: expose persisted size in MRD (MultiRangeReader)
1 parent b8855b0 commit 0e2961b

File tree

5 files changed

+41
-2
lines changed

5 files changed

+41
-2
lines changed

google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ def __init__(
180180
self._read_id_to_writable_buffer_dict = {}
181181
self._read_id_to_download_ranges_id = {}
182182
self._download_ranges_id_to_pending_read_ids = {}
183+
self.persisted_size: Optional[int] = None # updated after opening the stream
183184

184185
async def open(self) -> None:
185186
"""Opens the bidi-gRPC connection to read from the object.
@@ -206,6 +207,8 @@ async def open(self) -> None:
206207
if self.generation_number is None:
207208
self.generation_number = self.read_obj_str.generation_number
208209
self.read_handle = self.read_obj_str.read_handle
210+
if self.read_obj_str.persisted_size is not None:
211+
self.persisted_size = self.read_obj_str.persisted_size
209212
return
210213

211214
async def download_ranges(

google/cloud/storage/_experimental/asyncio/async_read_object_stream.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ def __init__(
9292
self.metadata = (("x-goog-request-params", f"bucket={self._full_bucket_name}"),)
9393
self.socket_like_rpc: Optional[AsyncBidiRpc] = None
9494
self._is_stream_open: bool = False
95+
self.persisted_size: Optional[int] = None
9596

9697
async def open(self) -> None:
9798
"""Opens the bidi-gRPC connection to read from the object.
@@ -106,8 +107,13 @@ async def open(self) -> None:
106107
)
107108
await self.socket_like_rpc.open() # this is actually 1 send
108109
response = await self.socket_like_rpc.recv()
109-
if self.generation_number is None:
110-
self.generation_number = response.metadata.generation
110+
# populated only in the first response of bidi-stream and when opened
111+
# without using `read_handle`
112+
if response.metadata:
113+
if self.generation_number is None:
114+
self.generation_number = response.metadata.generation
115+
# update persisted size
116+
self.persisted_size = response.metadata.size
111117

112118
self.read_handle = response.read_handle
113119

tests/system/test_zonal.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,30 @@ async def test_basic_wrd(storage_client, blobs_to_delete, attempt_direct_path):
5858
await mrd.download_ranges([(0, 0, buffer)])
5959
await mrd.close()
6060
assert buffer.getvalue() == _BYTES_TO_UPLOAD
61+
assert mrd.persisted_size == len(_BYTES_TO_UPLOAD)
62+
63+
# Clean up; use json client (i.e. `storage_client` fixture) to delete.
64+
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
65+
66+
67+
@pytest.mark.asyncio
68+
async def test_read_unfinalized_appendable_object(storage_client, blobs_to_delete):
69+
object_name = f"read_unfinalized_appendable_object-{str(uuid.uuid4())[:4]}"
70+
grpc_client = AsyncGrpcClient(attempt_direct_path=True).grpc_client
71+
72+
writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name)
73+
await writer.open()
74+
await writer.append(_BYTES_TO_UPLOAD)
75+
await writer.flush()
76+
77+
mrd = AsyncMultiRangeDownloader(grpc_client, _ZONAL_BUCKET, object_name)
78+
buffer = BytesIO()
79+
await mrd.open()
80+
assert mrd.persisted_size == len(_BYTES_TO_UPLOAD)
81+
# (0, 0) means read the whole object
82+
await mrd.download_ranges([(0, 0, buffer)])
83+
await mrd.close()
84+
assert buffer.getvalue() == _BYTES_TO_UPLOAD
6185

6286
# Clean up; use json client (i.e. `storage_client` fixture) to delete.
6387
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))

tests/unit/asyncio/test_async_multi_range_downloader.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
_TEST_BUCKET_NAME = "test-bucket"
3232
_TEST_OBJECT_NAME = "test-object"
33+
_TEST_OBJECT_SIZE = 1024 * 1024 # 1 MiB
3334
_TEST_GENERATION_NUMBER = 123456789
3435
_TEST_READ_HANDLE = b"test-handle"
3536

@@ -57,6 +58,7 @@ async def _make_mock_mrd(
5758
mock_stream = mock_cls_async_read_object_stream.return_value
5859
mock_stream.open = AsyncMock()
5960
mock_stream.generation_number = _TEST_GENERATION_NUMBER
61+
mock_stream.persisted_size = _TEST_OBJECT_SIZE
6062
mock_stream.read_handle = _TEST_READ_HANDLE
6163

6264
mrd = await AsyncMultiRangeDownloader.create_mrd(
@@ -106,6 +108,7 @@ async def test_create_mrd(
106108
assert mrd.object_name == _TEST_OBJECT_NAME
107109
assert mrd.generation_number == _TEST_GENERATION_NUMBER
108110
assert mrd.read_handle == _TEST_READ_HANDLE
111+
assert mrd.persisted_size == _TEST_OBJECT_SIZE
109112
assert mrd.is_stream_open
110113

111114
@mock.patch(

tests/unit/asyncio/test_async_read_object_stream.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
_TEST_BUCKET_NAME = "test-bucket"
2626
_TEST_OBJECT_NAME = "test-object"
2727
_TEST_GENERATION_NUMBER = 12345
28+
_TEST_OBJECT_SIZE = 1024 * 1024 # 1 MiB
2829
_TEST_READ_HANDLE = b"test-read-handle"
2930

3031

@@ -37,6 +38,7 @@ async def instantiate_read_obj_stream(mock_client, mock_cls_async_bidi_rpc, open
3738
recv_response = mock.MagicMock(spec=_storage_v2.BidiReadObjectResponse)
3839
recv_response.metadata = mock.MagicMock(spec=_storage_v2.Object)
3940
recv_response.metadata.generation = _TEST_GENERATION_NUMBER
41+
recv_response.metadata.size = _TEST_OBJECT_SIZE
4042
recv_response.read_handle = _TEST_READ_HANDLE
4143
socket_like_rpc.recv = AsyncMock(return_value=recv_response)
4244

@@ -112,6 +114,7 @@ async def test_open(mock_client, mock_cls_async_bidi_rpc):
112114

113115
assert read_obj_stream.generation_number == _TEST_GENERATION_NUMBER
114116
assert read_obj_stream.read_handle == _TEST_READ_HANDLE
117+
assert read_obj_stream.persisted_size == _TEST_OBJECT_SIZE
115118
assert read_obj_stream.is_stream_open
116119

117120

0 commit comments

Comments
 (0)