Skip to content

Commit fc8461b

Browse files
feat(zb-experimental): Add all other methods open close send recv in _AsyncReadObjectStream (#1549)
* add AsyncAbstractObjectStream this will be the parent class for AsyncReadObjectStream and AsyncWriteObjectStream * keep _AsyncAbstractObjectStream private * Add _AsyncReadObjectStream and it's stubs * complete __init__ for read_obj_str * remove unuseful comments * add methods open close send recv * change read_handle type from 'str' to 'bytes' * fix doc strings, add licence and type hints * pass abstract methods * add handle param * include handle in tests * remove unit tests for abstract class * edit doc string for _AsyncReadObjectStream * refactor unit tests for async_read_object_stream * bucket_name and object_name cannot be NONE * bucket_name and object_name cannot be None * simplyfy tests for open * simply tests for send recv and close * minor edit - add bidi-stream in doc string * add checks for invalid inputs * remove duplicated import * remove unused import * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * remove unused import --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent c31a516 commit fc8461b

File tree

2 files changed

+144
-24
lines changed

2 files changed

+144
-24
lines changed

google/cloud/storage/_experimental/asyncio/async_read_object_stream.py

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
2323
"""
2424

25-
from typing import Any, Optional
25+
from typing import Optional
2626
from google.cloud import _storage_v2
2727
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
2828
from google.cloud.storage._experimental.asyncio.async_abstract_object_stream import (
@@ -94,15 +94,42 @@ def __init__(
9494
)
9595

9696
async def open(self) -> None:
97-
pass
97+
"""Opens the bidi-gRPC connection to read from the object.
98+
99+
This method sends an initial request to start the stream and receives
100+
the first response containing metadata and a read handle.
101+
"""
102+
await self.socket_like_rpc.open() # this is actually 1 send
103+
response = await self.socket_like_rpc.recv()
104+
if self.generation_number is None:
105+
self.generation_number = response.metadata.generation
106+
107+
self.read_handle = response.read_handle
98108

99109
async def close(self) -> None:
100-
pass
110+
"""Closes the bidi-gRPC connection."""
111+
await self.socket_like_rpc.close()
101112

102113
async def send(
103114
self, bidi_read_object_request: _storage_v2.BidiReadObjectRequest
104115
) -> None:
105-
pass
106-
107-
async def recv(self) -> Any:
108-
pass
116+
"""Sends a request message on the stream.
117+
118+
Args:
119+
bidi_read_object_request (:class:`~google.cloud._storage_v2.types.BidiReadObjectRequest`):
120+
The request message to send. This is typically used to specify
121+
the read offset and limit.
122+
"""
123+
await self.socket_like_rpc.send(bidi_read_object_request)
124+
125+
async def recv(self) -> _storage_v2.BidiReadObjectResponse:
126+
"""Receives a response from the stream.
127+
128+
This method waits for the next message from the server, which could
129+
contain object data or metadata.
130+
131+
Returns:
132+
:class:`~google.cloud._storage_v2.types.BidiReadObjectResponse`:
133+
The response message from the server.
134+
"""
135+
return await self.socket_like_rpc.recv()

tests/unit/asyncio/test_async_read_object_stream.py

Lines changed: 110 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@
1414

1515
import pytest
1616
from unittest import mock
17+
from unittest.mock import AsyncMock
1718
from google.cloud import _storage_v2
1819

1920
from google.cloud.storage._experimental.asyncio.async_read_object_stream import (
2021
_AsyncReadObjectStream,
2122
)
2223

24+
_TEST_BUCKET_NAME = "test-bucket"
25+
_TEST_OBJECT_NAME = "test-object"
26+
_TEST_GENERATION_NUMBER = 12345
27+
_TEST_READ_HANDLE = b"test-read-handle"
28+
2329

2430
@mock.patch(
2531
"google.cloud.storage._experimental.asyncio.async_read_object_stream.AsyncBidiRpc"
@@ -28,27 +34,24 @@
2834
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
2935
)
3036
def test_init_with_bucket_object_generation(mock_client, mock_async_bidi_rpc):
31-
# initialize with bucket, object_name and generation number. & client.
32-
bucket_name = "test-bucket"
33-
object_name = "test-object"
34-
generation_number = 12345
3537
mock_client._client._transport.bidi_read_object = "bidi_read_object_rpc"
3638
mock_client._client._transport._wrapped_methods = {
3739
"bidi_read_object_rpc": mock.sentinel.A
3840
}
39-
40-
read_obj_stream = _AsyncReadObjectStream(
41-
client=mock_client,
42-
bucket_name=bucket_name,
43-
object_name=object_name,
44-
generation_number=generation_number,
45-
)
46-
full_bucket_name = f"projects/_/buckets/{bucket_name}"
41+
full_bucket_name = f"projects/_/buckets/{_TEST_BUCKET_NAME}"
4742
first_bidi_read_req = _storage_v2.BidiReadObjectRequest(
4843
read_object_spec=_storage_v2.BidiReadObjectSpec(
49-
bucket=full_bucket_name, object=object_name
44+
bucket=full_bucket_name, object=_TEST_OBJECT_NAME
5045
),
5146
)
47+
48+
read_obj_stream = _AsyncReadObjectStream(
49+
client=mock_client,
50+
bucket_name=_TEST_BUCKET_NAME,
51+
object_name=_TEST_OBJECT_NAME,
52+
generation_number=_TEST_GENERATION_NUMBER,
53+
)
54+
5255
mock_async_bidi_rpc.assert_called_once_with(
5356
mock.sentinel.A,
5457
initial_request=first_bidi_read_req,
@@ -57,8 +60,98 @@ def test_init_with_bucket_object_generation(mock_client, mock_async_bidi_rpc):
5760
assert read_obj_stream.socket_like_rpc is mock_async_bidi_rpc.return_value
5861

5962

60-
def test_init_with_invalid_parameters():
61-
"""Test the constructor of _AsyncReadObjectStream with invalid params."""
63+
@mock.patch(
64+
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
65+
)
66+
@pytest.mark.asyncio
67+
async def test_open(mock_client):
68+
# arrange
69+
read_obj_stream = _AsyncReadObjectStream(
70+
client=mock_client,
71+
bucket_name=_TEST_BUCKET_NAME,
72+
object_name=_TEST_OBJECT_NAME,
73+
)
74+
read_obj_stream.socket_like_rpc.open = AsyncMock()
75+
76+
recv_response = mock.MagicMock(spec=_storage_v2.BidiReadObjectResponse)
77+
recv_response.metadata = mock.MagicMock(spec=_storage_v2.Object)
78+
recv_response.metadata.generation = _TEST_GENERATION_NUMBER
79+
recv_response.read_handle = _TEST_READ_HANDLE
80+
read_obj_stream.socket_like_rpc.recv = AsyncMock(return_value=recv_response)
81+
82+
# act
83+
await read_obj_stream.open()
84+
85+
# assert
86+
read_obj_stream.socket_like_rpc.open.assert_called_once()
87+
read_obj_stream.socket_like_rpc.recv.assert_called_once()
88+
89+
assert read_obj_stream.generation_number == _TEST_GENERATION_NUMBER
90+
assert read_obj_stream.read_handle == _TEST_READ_HANDLE
91+
92+
93+
@mock.patch(
94+
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
95+
)
96+
@pytest.mark.asyncio
97+
async def test_close(mock_client):
98+
# arrange
99+
read_obj_stream = _AsyncReadObjectStream(
100+
client=mock_client,
101+
bucket_name=_TEST_BUCKET_NAME,
102+
object_name=_TEST_OBJECT_NAME,
103+
)
104+
read_obj_stream.socket_like_rpc.close = AsyncMock()
105+
106+
# act
107+
await read_obj_stream.close()
108+
109+
# assert
110+
read_obj_stream.socket_like_rpc.close.assert_called_once()
111+
112+
113+
@mock.patch(
114+
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
115+
)
116+
@pytest.mark.asyncio
117+
async def test_send(mock_client):
118+
# arrange
119+
read_obj_stream = _AsyncReadObjectStream(
120+
client=mock_client,
121+
bucket_name=_TEST_BUCKET_NAME,
122+
object_name=_TEST_OBJECT_NAME,
123+
)
124+
read_obj_stream.socket_like_rpc.send = AsyncMock()
125+
126+
# act
127+
bidi_read_object_request = _storage_v2.BidiReadObjectRequest()
128+
await read_obj_stream.send(bidi_read_object_request)
129+
130+
# assert
131+
read_obj_stream.socket_like_rpc.send.assert_called_once_with(
132+
bidi_read_object_request
133+
)
134+
135+
136+
@mock.patch(
137+
"google.cloud.storage._experimental.asyncio.async_grpc_client.AsyncGrpcClient.grpc_client"
138+
)
139+
@pytest.mark.asyncio
140+
async def test_recv(mock_client):
141+
# arrange
142+
read_obj_stream = _AsyncReadObjectStream(
143+
client=mock_client,
144+
bucket_name=_TEST_BUCKET_NAME,
145+
object_name=_TEST_OBJECT_NAME,
146+
)
147+
bidi_read_object_response = _storage_v2.BidiReadObjectResponse()
148+
read_obj_stream.socket_like_rpc.recv = AsyncMock(
149+
return_value=bidi_read_object_response
150+
)
151+
152+
# act
153+
response = await read_obj_stream.recv()
62154

63-
with pytest.raises(ValueError):
64-
_AsyncReadObjectStream(None, bucket_name=None, object_name=None)
155+
# assert
156+
read_obj_stream.socket_like_rpc.recv.assert_called_once()
157+
assert response == bidi_read_object_response

0 commit comments

Comments
 (0)