Skip to content

Commit

Permalink
chore: emit a warning if return_immediately is set (#355)
Browse files Browse the repository at this point in the history
This flag is deprecated and should always be set to `False` when
pulling messages synchronously.
  • Loading branch information
plamut authored Mar 31, 2021
1 parent b035b86 commit 20f0712
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 1 deletion.
7 changes: 7 additions & 0 deletions google/pubsub_v1/services/subscriber/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
Type,
Union,
)
import warnings
import pkg_resources

import google.api_core.client_options as ClientOptions # type: ignore
Expand Down Expand Up @@ -935,6 +936,12 @@ async def pull(
if max_messages is not None:
request.max_messages = max_messages

if request.return_immediately:
warnings.warn(
"The return_immediately flag is deprecated and should be set to False.",
category=DeprecationWarning,
)

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = gapic_v1.method_async.wrap_method(
Expand Down
7 changes: 7 additions & 0 deletions google/pubsub_v1/services/subscriber/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
Type,
Union,
)
import warnings
import pkg_resources

from google.api_core import client_options as client_options_lib # type: ignore
Expand Down Expand Up @@ -1124,6 +1125,12 @@ def pull(
if max_messages is not None:
request.max_messages = max_messages

if request.return_immediately:
warnings.warn(
"The return_immediately flag is deprecated and should be set to False.",
category=DeprecationWarning,
)

# Wrap the RPC method; this adds retry and timeout information,
# and friendly error handling.
rpc = self._transport._wrapped_methods[self._transport.pull]
Expand Down
31 changes: 30 additions & 1 deletion synth.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
flags=re.MULTILINE | re.DOTALL,
)

if count < 18:
if count < 15:
raise Exception("Expected replacements for gRPC channel options not made.")

# If the emulator is used, force an insecure gRPC channel to avoid SSL errors.
Expand Down Expand Up @@ -141,6 +141,35 @@
\g<0>""",
)

# Emit deprecation warning if return_immediately flag is set with synchronous pull.
count = s.replace(
"google/pubsub_v1/services/subscriber/*client.py",
r"import pkg_resources",
"import warnings\n\g<0>",
)
count = s.replace(
"google/pubsub_v1/services/subscriber/*client.py",
r"""
([^\n\S]+(?:async\ )?def\ pull\(.*?->\ pubsub\.PullResponse:.*?)
((?P<indent>[^\n\S]+)\#\ Wrap\ the\ RPC\ method)
""",
textwrap.dedent(
"""
\g<1>
\g<indent>if request.return_immediately:
\g<indent> warnings.warn(
\g<indent> "The return_immediately flag is deprecated and should be set to False.",
\g<indent> category=DeprecationWarning,
\g<indent> )
\g<2>"""
),
flags=re.MULTILINE | re.DOTALL | re.VERBOSE,
)

if count != 2:
raise Exception("Too many or too few replacements in pull() methods.")

# Make sure that client library version is present in user agent header.
s.replace(
[
Expand Down
43 changes: 43 additions & 0 deletions tests/unit/pubsub_v1/subscriber/test_subscriber_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import warnings

from google.auth import credentials
import grpc
import mock
import pytest

from google.api_core.gapic_v1.client_info import METRICS_METADATA_KEY
from google.cloud.pubsub_v1 import subscriber
Expand Down Expand Up @@ -217,3 +220,43 @@ def test_streaming_pull_gapic_monkeypatch():
transport = client.api._transport
assert hasattr(transport.streaming_pull, "_prefetch_first_result_")
assert not transport.streaming_pull._prefetch_first_result_


def test_sync_pull_warning_if_return_immediately():
client = subscriber.Client()
subscription_path = "projects/foo/subscriptions/bar"

with mock.patch.object(
client.api._transport, "_wrapped_methods"
), warnings.catch_warnings(record=True) as warned:
client.pull(subscription=subscription_path, return_immediately=True)

# Setting the deprecated return_immediately flag to True should emit a warning.
assert len(warned) == 1
assert issubclass(warned[0].category, DeprecationWarning)
warning_msg = str(warned[0].message)
assert "return_immediately" in warning_msg
assert "deprecated" in warning_msg


@pytest.mark.asyncio
async def test_sync_pull_warning_if_return_immediately_async():
from google.pubsub_v1.services.subscriber.async_client import SubscriberAsyncClient

client = SubscriberAsyncClient()
subscription_path = "projects/foo/subscriptions/bar"

patcher = mock.patch(
"google.pubsub_v1.services.subscriber.async_client.gapic_v1.method_async.wrap_method",
new=mock.AsyncMock,
)

with patcher, warnings.catch_warnings(record=True) as warned:
await client.pull(subscription=subscription_path, return_immediately=True)

# Setting the deprecated return_immediately flag to True should emit a warning.
assert len(warned) == 1
assert issubclass(warned[0].category, DeprecationWarning)
warning_msg = str(warned[0].message)
assert "return_immediately" in warning_msg
assert "deprecated" in warning_msg

0 comments on commit 20f0712

Please sign in to comment.