Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bidi.BackgroundConsumer.start() eats exception when _bidi_rpc.open() fails #268

Closed
tswast opened this issue Aug 31, 2021 · 3 comments · Fixed by #357
Closed

bidi.BackgroundConsumer.start() eats exception when _bidi_rpc.open() fails #268

tswast opened this issue Aug 31, 2021 · 3 comments · Fixed by #357
Assignees
Labels
type: cleanup An internal cleanup or hygiene concern.

Comments

@tswast
Copy link
Contributor

tswast commented Aug 31, 2021

Environment details

  • OS type and version:
  • Python version: python --version

Python 3.9.5

  • pip version: pip --version
conda 4.9.2
pip 21.2.1 from /usr/local/Caskroom/miniconda/base/envs/dev-3.9/lib/python3.9/site-packages/pip (python 3.9)
  • google-api-core version: pip show google-api-core
conda list google-api-core        # packages in environment at /usr/local/Caskroom/miniconda/base/envs/dev-3.9:
#
# Name                    Version                   Build  Channel
google-api-core           2.0.1                     dev_0    <develop @ dcb6ebd9994fddcb1729150df1675ebf8c503a73>

Steps to reproduce

  1. Construct a BackgroundConsumer and BidiRpc.
  2. Intentionally make the initial request one that will fail.
  3. Observe that RPC "done callback" is never called. Nor is any notification sent to on_response.

Code example

        start_time = time.monotonic()
        self._inital_request = initial_request
        self._stream_name = initial_request.write_stream

        inital_response_future = AppendRowsFuture()
        self._futures_queue.put(inital_response_future)

        self._rpc = bidi.BidiRpc(
            self._client.append_rows,
            initial_request=self._inital_request,
            # TODO: pass in retry and timeout. Blocked by
            # https://github.com/googleapis/python-api-core/issues/262
            metadata=tuple(
                itertools.chain(
                    self._metadata,
                    # This header is required so that the BigQuery Storage API
                    # knows which region to route the request to.
                    (("x-goog-request-params", f"write_stream={self._stream_name}"),),
                )
            ),
        )
        self._rpc.add_done_callback(self._on_rpc_done)

        self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
        self._consumer.start()

        # Make sure RPC has started before returning.
        # Without this, consumers may get:
        #
        # ValueError: Can not send() on an RPC that has never been open()ed.
        #
        # when they try to send a request.
        while not self._rpc.is_active and self._consumer.is_active:
            # Avoid 100% CPU while waiting for RPC to be ready.
            time.sleep(_WRITE_OPEN_INTERVAL)

            # TODO: Check retry.deadline instead of (per-request) timeout.
            # Blocked by
            # https://github.com/googleapis/python-api-core/issues/262
            if timeout is None:
                continue
            current_time = time.monotonic()
            if current_time - start_time > timeout:
                break

        return inital_response_future
...

    request = types.AppendRowsRequest()
    request.write_stream = "invalid"
    proto_data = types.AppendRowsRequest.ProtoData()
    proto_data.rows = proto_rows

...

    request.offset = 0
    # requests = generate_sample_data(write_stream.name)
    append_rows_stream, response_future_1 = write_client.append_rows(request)

See googleapis/python-bigquery-storage#284

Stack trace

The problem is that there isn't a stacktrace, but here's the output when I turn on DEBUG logging:

$ pytest 'samples/snippets/append_rows_proto2_test.py::test_append_rows_proto2[US]' --log-cli-level=DEBUG
================================ test session starts ================================
platform darwin -- Python 3.9.5, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
rootdir: /Users/swast/src/github.com/googleapis/python-bigquery-storage
plugins: cov-2.12.1, asyncio-0.15.1, requests-mock-1.9.3
collected 1 item                                                                    

samples/snippets/append_rows_proto2_test.py::test_append_rows_proto2[US] 
---------------------------------- live log setup -----------------------------------
DEBUG    google.auth._default:_default.py:206 Checking None for explicit credentials as part of auth process...
DEBUG    google.auth._default:_default.py:181 Checking Cloud SDK credentials as part of auth process...
DEBUG    google.cloud.bigquery.opentelemetry_tracing:opentelemetry_tracing.py:66 This service is instrumented using OpenTelemetry. OpenTelemetry could not be imported; please add opentelemetry-api and opentelemetry-instrumentation packages in order to get BigQuery Tracing data.
DEBUG    urllib3.util.retry:retry.py:333 Converted retries value: 3 -> Retry(total=3, connect=None, read=None, redirect=None, status=None)
DEBUG    google.auth.transport.requests:requests.py:182 Making request: POST https://oauth2.googleapis.com/token
DEBUG    urllib3.connectionpool:connectionpool.py:971 Starting new HTTPS connection (1): oauth2.googleapis.com:443
DEBUG    urllib3.connectionpool:connectionpool.py:452 https://oauth2.googleapis.com:443 "POST /token HTTP/1.1" 200 None
DEBUG    urllib3.connectionpool:connectionpool.py:971 Starting new HTTPS connection (1): bigquery.googleapis.com:443
DEBUG    urllib3.connectionpool:connectionpool.py:452 https://bigquery.googleapis.com:443 "GET /bigquery/v2/projects/swast-scratch/datasets?prettyPrint=false HTTP/1.1" 200 None
DEBUG    urllib3.connectionpool:connectionpool.py:452 https://bigquery.googleapis.com:443 "POST /bigquery/v2/projects/swast-scratch/datasets?prettyPrint=false HTTP/1.1" 200 None
DEBUG    urllib3.connectionpool:connectionpool.py:452 https://bigquery.googleapis.com:443 "POST /bigquery/v2/projects/swast-scratch/datasets?prettyPrint=false HTTP/1.1" 200 None
DEBUG    urllib3.connectionpool:connectionpool.py:452 https://bigquery.googleapis.com:443 "POST /bigquery/v2/projects/swast-scratch/datasets/python_bigquery_storage_samples_snippets_20210831210924_9f81bc/tables?prettyPrint=false HTTP/1.1" 200 None
----------------------------------- live log call -----------------------------------
DEBUG    google.auth._default:_default.py:206 Checking None for explicit credentials as part of auth process...
DEBUG    google.auth._default:_default.py:181 Checking Cloud SDK credentials as part of auth process...
DEBUG    google.auth.transport.requests:requests.py:182 Making request: POST https://oauth2.googleapis.com/token
DEBUG    urllib3.connectionpool:connectionpool.py:971 Starting new HTTPS connection (1): oauth2.googleapis.com:443
DEBUG    urllib3.connectionpool:connectionpool.py:452 https://oauth2.googleapis.com:443 "POST /token HTTP/1.1" 200 None
DEBUG    google.api_core.bidi:bidi.py:695 Started helper thread Thread-ConsumeBidirectionalStream
DEBUG    google.api_core.bidi:bidi.py:660 Thread-ConsumeBidirectionalStream caught error 400 Invalid stream name. Entity: invalid and will exit. Generally this is due to the RPC itself being cancelled and the error will be surfaced to the calling code.
Traceback (most recent call last):
  File "/Users/swast/src/github.com/googleapis/python-api-core/google/api_core/grpc_helpers.py", line 156, in error_remapped_callable
    return _StreamingResponseIterator(
  File "/Users/swast/src/github.com/googleapis/python-api-core/google/api_core/grpc_helpers.py", line 82, in __init__
    self._stored_first_result = next(self._wrapped)
  File "/usr/local/Caskroom/miniconda/base/envs/dev-3.9/lib/python3.9/site-packages/grpc/_channel.py", line 426, in __next__
    return self._next()
  File "/usr/local/Caskroom/miniconda/base/envs/dev-3.9/lib/python3.9/site-packages/grpc/_channel.py", line 826, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.INVALID_ARGUMENT
        details = "Invalid stream name. Entity: invalid"
        debug_error_string = "{"created":"@1630444168.199761000","description":"Error received from peer ipv6:[2607:f8b0:4009:819::200a]:443","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Invalid stream name. Entity: invalid","grpc_status":3}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/swast/src/github.com/googleapis/python-api-core/google/api_core/bidi.py", line 636, in _thread_main
    self._bidi_rpc.open()
  File "/Users/swast/src/github.com/googleapis/python-api-core/google/api_core/bidi.py", line 279, in open
    call = self._start_rpc(iter(request_generator), metadata=self._rpc_metadata)
  File "/Users/swast/src/github.com/googleapis/python-bigquery-storage/google/cloud/bigquery_storage_v1beta2/services/big_query_write/client.py", line 522, in append_rows
    response = rpc(requests, retry=retry, timeout=timeout, metadata=metadata,)
  File "/Users/swast/src/github.com/googleapis/python-api-core/google/api_core/gapic_v1/method.py", line 142, in __call__
    return wrapped_func(*args, **kwargs)
  File "/Users/swast/src/github.com/googleapis/python-api-core/google/api_core/retry.py", line 283, in retry_wrapped_func
    return retry_target(
  File "/Users/swast/src/github.com/googleapis/python-api-core/google/api_core/retry.py", line 190, in retry_target
    return target()
  File "/Users/swast/src/github.com/googleapis/python-api-core/google/api_core/grpc_helpers.py", line 160, in error_remapped_callable
    raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.InvalidArgument: 400 Invalid stream name. Entity: invalid
INFO     google.api_core.bidi:bidi.py:676 Thread-ConsumeBidirectionalStream exiting
FAILED                                                                        [100%]
--------------------------------- live log teardown ---------------------------------
DEBUG    urllib3.connectionpool:connectionpool.py:452 https://bigquery.googleapis.com:443 "DELETE /bigquery/v2/projects/swast-scratch/datasets/python_bigquery_storage_samples_snippets_20210831210924_9f81bc/tables/append_rows_proto2_9096?prettyPrint=false HTTP/1.1" 200 None
DEBUG    urllib3.connectionpool:connectionpool.py:452 https://bigquery.googleapis.com:443 "DELETE /bigquery/v2/projects/swast-scratch/datasets/python_bigquery_storage_samples_snippets_20210831210924_5c5f49?deleteContents=true&prettyPrint=false HTTP/1.1" 200 None
DEBUG    urllib3.connectionpool:connectionpool.py:452 https://bigquery.googleapis.com:443 "DELETE /bigquery/v2/projects/swast-scratch/datasets/python_bigquery_storage_samples_snippets_20210831210924_9f81bc?deleteContents=true&prettyPrint=false HTTP/1.1" 200 None


===================================== FAILURES ======================================
____________________________ test_append_rows_proto2[US] ____________________________

capsys = <_pytest.capture.CaptureFixture object at 0x110c67370>
bigquery_client = <google.cloud.bigquery.client.Client object at 0x110c6de20>
sample_data_table = 'swast-scratch.python_bigquery_storage_samples_snippets_20210831210924_9f81bc.append_rows_proto2_9096'

>   ???

/Users/swast/src/python-bigquery-storage/samples/snippets/append_rows_proto2_test.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
samples/snippets/append_rows_proto2.py:180: in append_rows_proto2
    response_future_2 = append_rows_stream.send(request)
google/cloud/bigquery_storage_v1beta2/writer.py:206: in send
    self._rpc.send(request)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <google.api_core.bidi.BidiRpc object at 0x110d7b760>
request = offset {
  value: 6
}
proto_rows {
  rows {
    serialized_rows: "0\243\223\001\200\001\007"
    serialized_rows: ":\0...ed_rows: "Z\01711:07:48.123456\200\001\013"
    serialized_rows: "`\206\341\307\343\357\253\362\002\200\001\014"
  }
}


    def send(self, request):
        """Queue a message to be sent on the stream.
    
        Send is non-blocking.
    
        If the underlying RPC has been closed, this will raise.
    
        Args:
            request (protobuf.Message): The request to send.
        """
        if self.call is None:
>           raise ValueError("Can not send() on an RPC that has never been open()ed.")
E           ValueError: Can not send() on an RPC that has never been open()ed.

../python-api-core/google/api_core/bidi.py:315: ValueError
-------------------------------- Captured log setup ---------------------------------
DEBUG    google.auth._default:_default.py:206 Checking None for explicit credentials as part of auth process...
DEBUG    google.auth._default:_default.py:181 Checking Cloud SDK credentials as part of auth process...
DEBUG    google.cloud.bigquery.opentelemetry_tracing:opentelemetry_tracing.py:66 This service is instrumented using OpenTelemetry. OpenTelemetry could not be imported; please add opentelemetry-api and opentelemetry-instrumentation packages in order to get BigQuery Tracing data.
DEBUG    urllib3.util.retry:retry.py:333 Converted retries value: 3 -> Retry(total=3, connect=None, read=None, redirect=None, status=None)
DEBUG    google.auth.transport.requests:requests.py:182 Making request: POST https://oauth2.googleapis.com/token
DEBUG    urllib3.connectionpool:connectionpool.py:971 Starting new HTTPS connection (1): oauth2.googleapis.com:443
DEBUG    urllib3.connectionpool:connectionpool.py:452 https://oauth2.googleapis.com:443 "POST /token HTTP/1.1" 200 None
DEBUG    urllib3.connectionpool:connectionpool.py:971 Starting new HTTPS connection (1): bigquery.googleapis.com:443
DEBUG    urllib3.connectionpool:connectionpool.py:452 https://bigquery.googleapis.com:443 "GET /bigquery/v2/projects/swast-scratch/datasets?prettyPrint=false HTTP/1.1" 200 None
DEBUG    urllib3.connectionpool:connectionpool.py:452 https://bigquery.googleapis.com:443 "POST /bigquery/v2/projects/swast-scratch/datasets?prettyPrint=false HTTP/1.1" 200 None
DEBUG    urllib3.connectionpool:connectionpool.py:452 https://bigquery.googleapis.com:443 "POST /bigquery/v2/projects/swast-scratch/datasets?prettyPrint=false HTTP/1.1" 200 None
DEBUG    urllib3.connectionpool:connectionpool.py:452 https://bigquery.googleapis.com:443 "POST /bigquery/v2/projects/swast-scratch/datasets/python_bigquery_storage_samples_snippets_20210831210924_9f81bc/tables?prettyPrint=false HTTP/1.1" 200 None
--------------------------------- Captured log call ---------------------------------
DEBUG    google.auth._default:_default.py:206 Checking None for explicit credentials as part of auth process...
DEBUG    google.auth._default:_default.py:181 Checking Cloud SDK credentials as part of auth process...
DEBUG    google.auth.transport.requests:requests.py:182 Making request: POST https://oauth2.googleapis.com/token
DEBUG    urllib3.connectionpool:connectionpool.py:971 Starting new HTTPS connection (1): oauth2.googleapis.com:443
DEBUG    urllib3.connectionpool:connectionpool.py:452 https://oauth2.googleapis.com:443 "POST /token HTTP/1.1" 200 None
DEBUG    google.api_core.bidi:bidi.py:695 Started helper thread Thread-ConsumeBidirectionalStream
DEBUG    google.api_core.bidi:bidi.py:660 Thread-ConsumeBidirectionalStream caught error 400 Invalid stream name. Entity: invalid and will exit. Generally this is due to the RPC itself being cancelled and the error will be surfaced to the calling code.
Traceback (most recent call last):
  File "/Users/swast/src/github.com/googleapis/python-api-core/google/api_core/grpc_helpers.py", line 156, in error_remapped_callable
    return _StreamingResponseIterator(
  File "/Users/swast/src/github.com/googleapis/python-api-core/google/api_core/grpc_helpers.py", line 82, in __init__
    self._stored_first_result = next(self._wrapped)
  File "/usr/local/Caskroom/miniconda/base/envs/dev-3.9/lib/python3.9/site-packages/grpc/_channel.py", line 426, in __next__
    return self._next()
  File "/usr/local/Caskroom/miniconda/base/envs/dev-3.9/lib/python3.9/site-packages/grpc/_channel.py", line 826, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.INVALID_ARGUMENT
        details = "Invalid stream name. Entity: invalid"
        debug_error_string = "{"created":"@1630444168.199761000","description":"Error received from peer ipv6:[2607:f8b0:4009:819::200a]:443","file":"src/core/lib/surface/call.cc","file_line":1067,"grpc_message":"Invalid stream name. Entity: invalid","grpc_status":3}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/swast/src/github.com/googleapis/python-api-core/google/api_core/bidi.py", line 636, in _thread_main
    self._bidi_rpc.open()
  File "/Users/swast/src/github.com/googleapis/python-api-core/google/api_core/bidi.py", line 279, in open
    call = self._start_rpc(iter(request_generator), metadata=self._rpc_metadata)
  File "/Users/swast/src/github.com/googleapis/python-bigquery-storage/google/cloud/bigquery_storage_v1beta2/services/big_query_write/client.py", line 522, in append_rows
    response = rpc(requests, retry=retry, timeout=timeout, metadata=metadata,)
  File "/Users/swast/src/github.com/googleapis/python-api-core/google/api_core/gapic_v1/method.py", line 142, in __call__
    return wrapped_func(*args, **kwargs)
  File "/Users/swast/src/github.com/googleapis/python-api-core/google/api_core/retry.py", line 283, in retry_wrapped_func
    return retry_target(
  File "/Users/swast/src/github.com/googleapis/python-api-core/google/api_core/retry.py", line 190, in retry_target
    return target()
  File "/Users/swast/src/github.com/googleapis/python-api-core/google/api_core/grpc_helpers.py", line 160, in error_remapped_callable
    raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.InvalidArgument: 400 Invalid stream name. Entity: invalid
INFO     google.api_core.bidi:bidi.py:676 Thread-ConsumeBidirectionalStream exiting
------------------------------- Captured log teardown -------------------------------
DEBUG    urllib3.connectionpool:connectionpool.py:452 https://bigquery.googleapis.com:443 "DELETE /bigquery/v2/projects/swast-scratch/datasets/python_bigquery_storage_samples_snippets_20210831210924_9f81bc/tables/append_rows_proto2_9096?prettyPrint=false HTTP/1.1" 200 None
DEBUG    urllib3.connectionpool:connectionpool.py:452 https://bigquery.googleapis.com:443 "DELETE /bigquery/v2/projects/swast-scratch/datasets/python_bigquery_storage_samples_snippets_20210831210924_5c5f49?deleteContents=true&prettyPrint=false HTTP/1.1" 200 None
DEBUG    urllib3.connectionpool:connectionpool.py:452 https://bigquery.googleapis.com:443 "DELETE /bigquery/v2/projects/swast-scratch/datasets/python_bigquery_storage_samples_snippets_20210831210924_9f81bc?deleteContents=true&prettyPrint=false HTTP/1.1" 200 None
============================== short test summary info ==============================
FAILED samples/snippets/append_rows_proto2_test.py::test_append_rows_proto2[US] - ...
================================= 1 failed in 7.33s =================================
@tswast
Copy link
Contributor Author

tswast commented Aug 31, 2021

Thankfully, I can check if not self._consumer.is_active to see if something went wrong, but I'd like a way to grab the original exception.

@busunkim96 busunkim96 added priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Sep 1, 2021
@yoshi-automation yoshi-automation added 🚨 This issue needs some love. and removed 🚨 This issue needs some love. labels Nov 30, 2021
@yoshi-automation yoshi-automation added the 🚨 This issue needs some love. label Feb 27, 2022
@parthea parthea self-assigned this Mar 14, 2022
@parthea parthea added priority: p3 Desirable enhancement or fix. May not be included in next release. and removed priority: p2 Moderately-important priority. Fix may not be included in next release. 🚨 This issue needs some love. labels Apr 29, 2022
@parthea
Copy link
Collaborator

parthea commented Apr 29, 2022

Moving to p3 as there is a solution already in place (see #268 (comment)).

@parthea parthea added type: cleanup An internal cleanup or hygiene concern. and removed type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. priority: p3 Desirable enhancement or fix. May not be included in next release. labels Apr 29, 2022
@yirutang
Copy link

yirutang commented Mar 6, 2024

I am still blocked on not being able to get the error message. I tried various ways to turn on debug mode but don't know how. I am trying to run the python program on borg and don't know how to set the value when it is a packaged binary.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: cleanup An internal cleanup or hygiene concern.
Projects
None yet
5 participants