diff --git a/google/cloud/bigquery_storage_v1/writer.py b/google/cloud/bigquery_storage_v1/writer.py index 2268debc..fbcdfe36 100644 --- a/google/cloud/bigquery_storage_v1/writer.py +++ b/google/cloud/bigquery_storage_v1/writer.py @@ -182,21 +182,35 @@ def _open( # ValueError: Can not send() on an RPC that has never been open()ed. # # when they try to send a request. - while not self._rpc.is_active and self._consumer.is_active: - # Avoid 100% CPU while waiting for RPC to be ready. - time.sleep(_WRITE_OPEN_INTERVAL) - - # TODO: Check retry.deadline instead of (per-request) timeout. - # Blocked by - # https://github.com/googleapis/python-api-core/issues/262 - if timeout is None: - continue - current_time = time.monotonic() - if current_time - start_time > timeout: - break + try: + while not self._rpc.is_active and self._consumer.is_active: + # Avoid 100% CPU while waiting for RPC to be ready. + time.sleep(_WRITE_OPEN_INTERVAL) + + # TODO: Check retry.deadline instead of (per-request) timeout. + # Blocked by + # https://github.com/googleapis/python-api-core/issues/262 + if timeout is None: + continue + current_time = time.monotonic() + if current_time - start_time > timeout: + break + except AttributeError: + # Handle the AttributeError which can occur if the stream is + # unable to be opened. In that case, self._rpc or self._consumer + # may be None. + pass + + try: + is_consumer_active = self._consumer.is_active + except AttributeError: + # Handle the AttributeError which can occur if the stream is + # unable to be opened. In that case, self._consumer + # may be None. + is_consumer_active = False # Something went wrong when opening the RPC. - if not self._consumer.is_active: + if not is_consumer_active: # TODO: Share the exception from _rpc.open(). Blocked by # https://github.com/googleapis/python-api-core/issues/268 request_exception = exceptions.Unknown( diff --git a/google/cloud/bigquery_storage_v1beta2/writer.py b/google/cloud/bigquery_storage_v1beta2/writer.py index 5cf101ba..20019afe 100644 --- a/google/cloud/bigquery_storage_v1beta2/writer.py +++ b/google/cloud/bigquery_storage_v1beta2/writer.py @@ -182,21 +182,35 @@ def _open( # ValueError: Can not send() on an RPC that has never been open()ed. # # when they try to send a request. - while not self._rpc.is_active and self._consumer.is_active: - # Avoid 100% CPU while waiting for RPC to be ready. - time.sleep(_WRITE_OPEN_INTERVAL) - - # TODO: Check retry.deadline instead of (per-request) timeout. - # Blocked by - # https://github.com/googleapis/python-api-core/issues/262 - if timeout is None: - continue - current_time = time.monotonic() - if current_time - start_time > timeout: - break + try: + while not self._rpc.is_active and self._consumer.is_active: + # Avoid 100% CPU while waiting for RPC to be ready. + time.sleep(_WRITE_OPEN_INTERVAL) + + # TODO: Check retry.deadline instead of (per-request) timeout. + # Blocked by + # https://github.com/googleapis/python-api-core/issues/262 + if timeout is None: + continue + current_time = time.monotonic() + if current_time - start_time > timeout: + break + except AttributeError: + # Handle the AttributeError which can occur if the stream is + # unable to be opened. In that case, self._rpc or self._consumer + # may be None. + pass + + try: + is_consumer_active = self._consumer.is_active + except AttributeError: + # Handle the AttributeError which can occur if the stream is + # unable to be opened. In that case, self._consumer + # may be None. + is_consumer_active = False # Something went wrong when opening the RPC. - if not self._consumer.is_active: + if not is_consumer_active: # TODO: Share the exception from _rpc.open(). Blocked by # https://github.com/googleapis/python-api-core/issues/268 request_exception = exceptions.Unknown(