Skip to content

Commit e26888f

Browse files
authored
feat(zb-experimental): implement download_ranges (#1551)
Downloads multiple byte ranges from the object into the buffers provided by user. This interface provides option to download multiple ranges of a GCS `Object` concurrently. Example usage ```python client = AsyncGrpcClient().grpc_client mrd = await AsyncMultiRangeDownloader.create_mrd( client, bucket_name="chandrasiri-rs", object_name="test_open9" ) my_buff1 = open('my_fav_file.txt', 'wb') my_buff2 = BytesIO() my_buff3 = BytesIO() my_buff4 = any_object_which_provides_BytesIO_like_interface() results_arr, error_obj = await mrd.download_ranges( [ (0, 100, my_buff1), (100, 20, my_buff2), (200, 123, my_buff3), (300, 789, my_buff4), ] ) if error_obj: print("Error occurred: ") print(error_obj) print( "please issue call to `download_ranges` with updated" "`read_ranges` based on diff of (bytes_requested - bytes_written)" ) for result in results_arr: print("downloaded bytes", result) ```
1 parent 4766164 commit e26888f

File tree

2 files changed

+233
-57
lines changed

2 files changed

+233
-57
lines changed

google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py

Lines changed: 109 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
from __future__ import annotations
1616

17-
from typing import Any, List, Optional, Tuple
17+
from typing import List, Optional, Tuple
1818

1919
from google.cloud.storage._experimental.asyncio.async_read_object_stream import (
2020
_AsyncReadObjectStream,
@@ -24,6 +24,38 @@
2424
)
2525

2626
from io import BytesIO
27+
from google.cloud import _storage_v2
28+
29+
30+
_MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100
31+
32+
33+
class Result:
34+
"""An instance of this class will be populated and retured for each
35+
`read_range` provided to ``download_ranges`` method.
36+
37+
"""
38+
39+
def __init__(self, bytes_requested: int):
40+
# only while instantiation, should not be edited later.
41+
# hence there's no setter, only getter is provided.
42+
self._bytes_requested: int = bytes_requested
43+
self._bytes_written: int = 0
44+
45+
@property
46+
def bytes_requested(self) -> int:
47+
return self._bytes_requested
48+
49+
@property
50+
def bytes_written(self) -> int:
51+
return self._bytes_written
52+
53+
@bytes_written.setter
54+
def bytes_written(self, value: int):
55+
self._bytes_written = value
56+
57+
def __repr__(self):
58+
return f"bytes_requested: {self._bytes_requested}, bytes_written: {self._bytes_written}"
2759

2860

2961
class AsyncMultiRangeDownloader:
@@ -38,21 +70,30 @@ class AsyncMultiRangeDownloader:
3870
mrd = await AsyncMultiRangeDownloader.create_mrd(
3971
client, bucket_name="chandrasiri-rs", object_name="test_open9"
4072
)
41-
my_buff1 = BytesIO()
73+
my_buff1 = open('my_fav_file.txt', 'wb')
4274
my_buff2 = BytesIO()
4375
my_buff3 = BytesIO()
44-
my_buff4 = BytesIO()
45-
buffers = [my_buff1, my_buff2, my_buff3, my_buff4]
46-
await mrd.download_ranges(
76+
my_buff4 = any_object_which_provides_BytesIO_like_interface()
77+
results_arr, error_obj = await mrd.download_ranges(
4778
[
79+
# (start_byte, bytes_to_read, writeable_buffer)
4880
(0, 100, my_buff1),
49-
(100, 200, my_buff2),
50-
(200, 300, my_buff3),
51-
(300, 400, my_buff4),
81+
(100, 20, my_buff2),
82+
(200, 123, my_buff3),
83+
(300, 789, my_buff4),
5284
]
5385
)
54-
for buff in buffers:
55-
print("downloaded bytes", buff.getbuffer().nbytes)
86+
if error_obj:
87+
print("Error occurred: ")
88+
print(error_obj)
89+
print(
90+
"please issue call to `download_ranges` with updated"
91+
"`read_ranges` based on diff of (bytes_requested - bytes_written)"
92+
)
93+
94+
for result in results_arr:
95+
print("downloaded bytes", result)
96+
5697
5798
"""
5899

@@ -148,18 +189,70 @@ async def open(self) -> None:
148189
self.read_handle = self.read_obj_str.read_handle
149190
return
150191

151-
async def download_ranges(self, read_ranges: List[Tuple[int, int, BytesIO]]) -> Any:
192+
async def download_ranges(
193+
self, read_ranges: List[Tuple[int, int, BytesIO]]
194+
) -> List[Result]:
152195
"""Downloads multiple byte ranges from the object into the buffers
153196
provided by user.
154197
155198
:type read_ranges: List[Tuple[int, int, "BytesIO"]]
156199
:param read_ranges: A list of tuples, where each tuple represents a
157-
byte range (start_byte, end_byte, buffer) to download. Buffer has to
158-
be provided by the user, and user has to make sure appropriate
200+
byte range (start_byte, bytes_to_read, writeable_buffer). Buffer has
201+
to be provided by the user, and user has to make sure appropriate
159202
memory is available in the application to avoid out-of-memory crash.
160203
204+
:rtype: List[:class:`~google.cloud.storage._experimental.asyncio.async_multi_range_downloader.Result`]
205+
:returns: A list of ``Result`` objects, where each object corresponds
206+
to a requested range.
161207
162-
Raises:
163-
NotImplementedError: This method is not yet implemented.
164208
"""
165-
raise NotImplementedError("TODO")
209+
if len(read_ranges) > 1000:
210+
raise ValueError(
211+
"Invalid input - length of read_ranges cannot be more than 1000"
212+
)
213+
214+
read_id_to_writable_buffer_dict = {}
215+
results = []
216+
for i in range(0, len(read_ranges), _MAX_READ_RANGES_PER_BIDI_READ_REQUEST):
217+
read_ranges_segment = read_ranges[
218+
i : i + _MAX_READ_RANGES_PER_BIDI_READ_REQUEST
219+
]
220+
221+
read_ranges_for_bidi_req = []
222+
for j, read_range in enumerate(read_ranges_segment):
223+
read_id = i + j
224+
read_id_to_writable_buffer_dict[read_id] = read_range[2]
225+
bytes_requested = read_range[1]
226+
results.append(Result(bytes_requested))
227+
read_ranges_for_bidi_req.append(
228+
_storage_v2.ReadRange(
229+
read_offset=read_range[0],
230+
read_length=bytes_requested,
231+
read_id=read_id,
232+
)
233+
)
234+
await self.read_obj_str.send(
235+
_storage_v2.BidiReadObjectRequest(read_ranges=read_ranges_for_bidi_req)
236+
)
237+
238+
while len(read_id_to_writable_buffer_dict) > 0:
239+
response = await self.read_obj_str.recv()
240+
241+
if response is None:
242+
raise Exception("None response received, something went wrong.")
243+
244+
for object_data_range in response.object_data_ranges:
245+
if object_data_range.read_range is None:
246+
raise Exception("Invalid response, read_range is None")
247+
248+
data = object_data_range.checksummed_data.content
249+
read_id = object_data_range.read_range.read_id
250+
buffer = read_id_to_writable_buffer_dict[read_id]
251+
buffer.write(data)
252+
results[read_id].bytes_written += len(data)
253+
254+
if object_data_range.range_end:
255+
del read_id_to_writable_buffer_dict[
256+
object_data_range.read_range.read_id
257+
]
258+
return results

tests/unit/asyncio/test_async_multi_range_downloader.py

Lines changed: 124 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import pytest
1616
from unittest import mock
1717
from unittest.mock import AsyncMock
18+
from google.cloud import _storage_v2
1819

1920
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
2021
AsyncMultiRangeDownloader,
@@ -28,52 +29,134 @@
2829
_TEST_READ_HANDLE = b"test-handle"
2930

3031

31-
@mock.patch(
32-
"google.cloud.storage._experimental.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
33-
)
34-
@mock.patch(
35-
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
36-
)
37-
@pytest.mark.asyncio
38-
async def test_create_mrd(mock_async_grpc_client, async_read_object_stream):
39-
# Arrange
40-
mock_stream_instance = async_read_object_stream.return_value
41-
mock_stream_instance.open = AsyncMock()
42-
mock_stream_instance.generation_number = _TEST_GENERATION_NUMBER
43-
mock_stream_instance.read_handle = _TEST_READ_HANDLE
44-
45-
# act
46-
mrd = await AsyncMultiRangeDownloader.create_mrd(
47-
mock_async_grpc_client, _TEST_BUCKET_NAME, _TEST_OBJECT_NAME
48-
)
49-
50-
# Assert
51-
async_read_object_stream.assert_called_once_with(
52-
client=mock_async_grpc_client,
32+
class TestAsyncMultiRangeDownloader:
33+
# helper method
34+
@pytest.mark.asyncio
35+
async def _make_mock_mrd(
36+
self,
37+
mock_grpc_client,
38+
mock_cls_async_read_object_stream,
5339
bucket_name=_TEST_BUCKET_NAME,
5440
object_name=_TEST_OBJECT_NAME,
55-
generation_number=None,
56-
read_handle=None,
41+
generation_number=_TEST_GENERATION_NUMBER,
42+
read_handle=_TEST_READ_HANDLE,
43+
):
44+
mock_stream = mock_cls_async_read_object_stream.return_value
45+
mock_stream.open = AsyncMock()
46+
mock_stream.generation_number = _TEST_GENERATION_NUMBER
47+
mock_stream.read_handle = _TEST_READ_HANDLE
48+
49+
mrd = await AsyncMultiRangeDownloader.create_mrd(
50+
mock_grpc_client, bucket_name, object_name, generation_number, read_handle
51+
)
52+
53+
return mrd
54+
55+
@mock.patch(
56+
"google.cloud.storage._experimental.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
5757
)
58-
mock_stream_instance.open.assert_called_once()
58+
@mock.patch(
59+
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
60+
)
61+
@pytest.mark.asyncio
62+
async def test_create_mrd(
63+
self, mock_grpc_client, mock_cls_async_read_object_stream
64+
):
65+
# Arrange & Act
66+
mrd = await self._make_mock_mrd(
67+
mock_grpc_client, mock_cls_async_read_object_stream
68+
)
5969

60-
assert mrd.client == mock_async_grpc_client
61-
assert mrd.bucket_name == _TEST_BUCKET_NAME
62-
assert mrd.object_name == _TEST_OBJECT_NAME
63-
assert mrd.generation_number == _TEST_GENERATION_NUMBER
64-
assert mrd.read_handle == _TEST_READ_HANDLE
65-
assert mrd.read_obj_str is mock_stream_instance
70+
# Assert
71+
mock_cls_async_read_object_stream.assert_called_once_with(
72+
client=mock_grpc_client,
73+
bucket_name=_TEST_BUCKET_NAME,
74+
object_name=_TEST_OBJECT_NAME,
75+
generation_number=_TEST_GENERATION_NUMBER,
76+
read_handle=_TEST_READ_HANDLE,
77+
)
6678

79+
mrd.read_obj_str.open.assert_called_once()
6780

68-
@mock.patch(
69-
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
70-
)
71-
@pytest.mark.asyncio
72-
async def test_download_ranges(mock_async_grpc_client):
73-
"""Test that download_ranges() raises NotImplementedError."""
74-
mrd = AsyncMultiRangeDownloader(
75-
mock_async_grpc_client, _TEST_BUCKET_NAME, _TEST_OBJECT_NAME
81+
assert mrd.client == mock_grpc_client
82+
assert mrd.bucket_name == _TEST_BUCKET_NAME
83+
assert mrd.object_name == _TEST_OBJECT_NAME
84+
assert mrd.generation_number == _TEST_GENERATION_NUMBER
85+
assert mrd.read_handle == _TEST_READ_HANDLE
86+
87+
@mock.patch(
88+
"google.cloud.storage._experimental.asyncio.async_multi_range_downloader._AsyncReadObjectStream"
89+
)
90+
@mock.patch(
91+
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
92+
)
93+
@pytest.mark.asyncio
94+
async def test_download_ranges(
95+
self, mock_grpc_client, mock_cls_async_read_object_stream
96+
):
97+
# Arrange
98+
mock_mrd = await self._make_mock_mrd(
99+
mock_grpc_client, mock_cls_async_read_object_stream
100+
)
101+
mock_mrd.read_obj_str.send = AsyncMock()
102+
mock_mrd.read_obj_str.recv = AsyncMock()
103+
mock_mrd.read_obj_str.recv.return_value = _storage_v2.BidiReadObjectResponse(
104+
object_data_ranges=[
105+
_storage_v2.ObjectRangeData(
106+
checksummed_data=_storage_v2.ChecksummedData(
107+
content=b"these_are_18_chars", crc32c=123
108+
),
109+
range_end=True,
110+
read_range=_storage_v2.ReadRange(
111+
read_offset=0, read_length=18, read_id=0
112+
),
113+
)
114+
],
115+
)
116+
117+
# Act
118+
buffer = BytesIO()
119+
results = await mock_mrd.download_ranges([(0, 18, buffer)])
120+
121+
# Assert
122+
mock_mrd.read_obj_str.send.assert_called_once_with(
123+
_storage_v2.BidiReadObjectRequest(
124+
read_ranges=[
125+
_storage_v2.ReadRange(read_offset=0, read_length=18, read_id=0)
126+
]
127+
)
128+
)
129+
assert len(results) == 1
130+
assert results[0].bytes_requested == 18
131+
assert results[0].bytes_written == 18
132+
assert buffer.getvalue() == b"these_are_18_chars"
133+
134+
def create_read_ranges(self, num_ranges):
135+
ranges = []
136+
for i in range(num_ranges):
137+
ranges.append(
138+
_storage_v2.ReadRange(read_offset=i, read_length=1, read_id=i)
139+
)
140+
return ranges
141+
142+
@mock.patch(
143+
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
76144
)
145+
@pytest.mark.asyncio
146+
async def test_downloading_ranges_with_more_than_1000_should_throw_error(
147+
self, mock_grpc_client
148+
):
149+
# Arrange
150+
mrd = AsyncMultiRangeDownloader(
151+
mock_grpc_client, _TEST_BUCKET_NAME, _TEST_OBJECT_NAME
152+
)
153+
154+
# Act + Assert
155+
with pytest.raises(ValueError) as exc:
156+
await mrd.download_ranges(self.create_read_ranges(1001))
77157

78-
with pytest.raises(NotImplementedError):
79-
await mrd.download_ranges([(0, 100, BytesIO())])
158+
# Assert
159+
assert (
160+
str(exc.value)
161+
== "Invalid input - length of read_ranges cannot be more than 1000"
162+
)

0 commit comments

Comments
 (0)