Skip to content

Commit 93ce515

Browse files
authored
feat(experimental): add checksum for bidi reads operation (#1566)
* feat(experimental): add checksum for bidi reads operation * resolving comments
1 parent 5ee391d commit 93ce515

File tree

2 files changed

+89
-3
lines changed

2 files changed

+89
-3
lines changed

google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
# limitations under the License.
1414

1515
from __future__ import annotations
16+
import google_crc32c
17+
from google.api_core import exceptions
18+
from google_crc32c import Checksum
1619

1720
from typing import List, Optional, Tuple
1821

@@ -25,6 +28,7 @@
2528

2629
from io import BytesIO
2730
from google.cloud import _storage_v2
31+
from google.cloud.storage.exceptions import DataCorruption
2832

2933

3034
_MAX_READ_RANGES_PER_BIDI_READ_REQUEST = 100
@@ -153,6 +157,16 @@ def __init__(
153157
:type read_handle: bytes
154158
:param read_handle: (Optional) An existing read handle.
155159
"""
160+
161+
# Verify that the fast, C-accelerated version of crc32c is available.
162+
# If not, raise an error to prevent silent performance degradation.
163+
if google_crc32c.implementation != "c":
164+
raise exceptions.NotFound(
165+
"The google-crc32c package is not installed with C support. "
166+
"Bidi reads require the C extension for data integrity checks."
167+
"For more information, see https://github.com/googleapis/python-crc32c."
168+
)
169+
156170
self.client = client
157171
self.bucket_name = bucket_name
158172
self.object_name = object_name
@@ -248,7 +262,19 @@ async def download_ranges(
248262
if object_data_range.read_range is None:
249263
raise Exception("Invalid response, read_range is None")
250264

251-
data = object_data_range.checksummed_data.content
265+
checksummed_data = object_data_range.checksummed_data
266+
data = checksummed_data.content
267+
server_checksum = checksummed_data.crc32c
268+
269+
client_crc32c = Checksum(data).digest()
270+
client_checksum = int.from_bytes(client_crc32c, "big")
271+
272+
if server_checksum != client_checksum:
273+
raise DataCorruption(response,
274+
f"Checksum mismatch for read_id {object_data_range.read_range.read_id}. "
275+
f"Server sent {server_checksum}, client calculated {client_checksum}."
276+
)
277+
252278
read_id = object_data_range.read_range.read_id
253279
buffer = read_id_to_writable_buffer_dict[read_id]
254280
buffer.write(data)

tests/unit/asyncio/test_async_multi_range_downloader.py

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@
1616
from unittest import mock
1717
from unittest.mock import AsyncMock
1818
from google.cloud import _storage_v2
19+
from google.api_core import exceptions
20+
from google_crc32c import Checksum
1921

2022
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
2123
AsyncMultiRangeDownloader,
2224
)
25+
from google.cloud.storage._experimental.asyncio import async_read_object_stream
2326
from io import BytesIO
27+
from google.cloud.storage.exceptions import DataCorruption
2428

2529

2630
_TEST_BUCKET_NAME = "test-bucket"
@@ -114,6 +118,10 @@ async def test_download_ranges(
114118
self, mock_grpc_client, mock_cls_async_read_object_stream
115119
):
116120
# Arrange
121+
data = b"these_are_18_chars"
122+
crc32c = Checksum(data).digest()
123+
crc32c_int = int.from_bytes(crc32c, "big")
124+
117125
mock_mrd = await self._make_mock_mrd(
118126
mock_grpc_client, mock_cls_async_read_object_stream
119127
)
@@ -123,7 +131,7 @@ async def test_download_ranges(
123131
object_data_ranges=[
124132
_storage_v2.ObjectRangeData(
125133
checksummed_data=_storage_v2.ChecksummedData(
126-
content=b"these_are_18_chars", crc32c=123
134+
content=data, crc32c=crc32c_int
127135
),
128136
range_end=True,
129137
read_range=_storage_v2.ReadRange(
@@ -148,7 +156,7 @@ async def test_download_ranges(
148156
assert len(results) == 1
149157
assert results[0].bytes_requested == 18
150158
assert results[0].bytes_written == 18
151-
assert buffer.getvalue() == b"these_are_18_chars"
159+
assert buffer.getvalue() == data
152160

153161
@mock.patch(
154162
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
@@ -251,3 +259,55 @@ async def test_downloading_without_opening_should_throw_error(
251259
# Assert
252260
assert str(exc.value) == "Underlying bidi-gRPC stream is not open"
253261
assert not mrd.is_stream_open
262+
263+
@mock.patch("google.cloud.storage._experimental.asyncio.async_multi_range_downloader.google_crc32c")
264+
@mock.patch("google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client")
265+
def test_init_raises_if_crc32c_c_extension_is_missing(
266+
self, mock_grpc_client, mock_google_crc32c
267+
):
268+
mock_google_crc32c.implementation = "python"
269+
270+
with pytest.raises(exceptions.NotFound) as exc_info:
271+
AsyncMultiRangeDownloader(
272+
mock_grpc_client, "bucket", "object"
273+
)
274+
275+
assert "The google-crc32c package is not installed with C support" in str(exc_info.value)
276+
277+
@pytest.mark.asyncio
278+
@mock.patch("google.cloud.storage._experimental.asyncio.async_multi_range_downloader.Checksum")
279+
@mock.patch("google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client")
280+
async def test_download_ranges_raises_on_checksum_mismatch(self, mock_client, mock_checksum_class):
281+
mock_stream = mock.AsyncMock(spec=async_read_object_stream._AsyncReadObjectStream)
282+
283+
test_data = b"some-data"
284+
server_checksum = 12345
285+
mock_checksum_instance = mock_checksum_class.return_value
286+
mock_checksum_instance.digest.return_value = (54321).to_bytes(4, "big")
287+
288+
mock_response = _storage_v2.BidiReadObjectResponse(
289+
object_data_ranges=[
290+
_storage_v2.ObjectRangeData(
291+
checksummed_data=_storage_v2.ChecksummedData(
292+
content=test_data, crc32c=server_checksum
293+
),
294+
read_range=_storage_v2.ReadRange(read_id=0),
295+
range_end=True,
296+
)
297+
]
298+
)
299+
300+
mock_stream.recv.side_effect = [mock_response, None]
301+
302+
mrd = AsyncMultiRangeDownloader(
303+
mock_client, "bucket", "object"
304+
)
305+
mrd.read_obj_str = mock_stream
306+
mrd._is_stream_open = True
307+
308+
with pytest.raises(DataCorruption) as exc_info:
309+
await mrd.download_ranges([(0, len(test_data), BytesIO())])
310+
311+
assert "Checksum mismatch" in str(exc_info.value)
312+
mock_checksum_class.assert_called_once_with(test_data)
313+

0 commit comments

Comments
 (0)