Skip to content

Commit

Permalink
fix: handle AttributeError in bigquery_storage writer (#414)
Browse files Browse the repository at this point in the history
* fix: resolve AttributeError in bigquery_storage writer

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* handle AttributeError to avoid race condition

* actually catch AttributeError

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
Co-authored-by: Steffany Brown <30247553+steffnay@users.noreply.github.com>
  • Loading branch information
3 people authored Jun 24, 2022
1 parent 3369053 commit 2cb641a
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 26 deletions.
40 changes: 27 additions & 13 deletions google/cloud/bigquery_storage_v1/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,21 +182,35 @@ def _open(
# ValueError: Can not send() on an RPC that has never been open()ed.
#
# when they try to send a request.
while not self._rpc.is_active and self._consumer.is_active:
# Avoid 100% CPU while waiting for RPC to be ready.
time.sleep(_WRITE_OPEN_INTERVAL)

# TODO: Check retry.deadline instead of (per-request) timeout.
# Blocked by
# https://github.com/googleapis/python-api-core/issues/262
if timeout is None:
continue
current_time = time.monotonic()
if current_time - start_time > timeout:
break
try:
while not self._rpc.is_active and self._consumer.is_active:
# Avoid 100% CPU while waiting for RPC to be ready.
time.sleep(_WRITE_OPEN_INTERVAL)

# TODO: Check retry.deadline instead of (per-request) timeout.
# Blocked by
# https://github.com/googleapis/python-api-core/issues/262
if timeout is None:
continue
current_time = time.monotonic()
if current_time - start_time > timeout:
break
except AttributeError:
# Handle the AttributeError which can occur if the stream is
# unable to be opened. In that case, self._rpc or self._consumer
# may be None.
pass

try:
is_consumer_active = self._consumer.is_active
except AttributeError:
# Handle the AttributeError which can occur if the stream is
# unable to be opened. In that case, self._consumer
# may be None.
is_consumer_active = False

# Something went wrong when opening the RPC.
if not self._consumer.is_active:
if not is_consumer_active:
# TODO: Share the exception from _rpc.open(). Blocked by
# https://github.com/googleapis/python-api-core/issues/268
request_exception = exceptions.Unknown(
Expand Down
40 changes: 27 additions & 13 deletions google/cloud/bigquery_storage_v1beta2/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,21 +182,35 @@ def _open(
# ValueError: Can not send() on an RPC that has never been open()ed.
#
# when they try to send a request.
while not self._rpc.is_active and self._consumer.is_active:
# Avoid 100% CPU while waiting for RPC to be ready.
time.sleep(_WRITE_OPEN_INTERVAL)

# TODO: Check retry.deadline instead of (per-request) timeout.
# Blocked by
# https://github.com/googleapis/python-api-core/issues/262
if timeout is None:
continue
current_time = time.monotonic()
if current_time - start_time > timeout:
break
try:
while not self._rpc.is_active and self._consumer.is_active:
# Avoid 100% CPU while waiting for RPC to be ready.
time.sleep(_WRITE_OPEN_INTERVAL)

# TODO: Check retry.deadline instead of (per-request) timeout.
# Blocked by
# https://github.com/googleapis/python-api-core/issues/262
if timeout is None:
continue
current_time = time.monotonic()
if current_time - start_time > timeout:
break
except AttributeError:
# Handle the AttributeError which can occur if the stream is
# unable to be opened. In that case, self._rpc or self._consumer
# may be None.
pass

try:
is_consumer_active = self._consumer.is_active
except AttributeError:
# Handle the AttributeError which can occur if the stream is
# unable to be opened. In that case, self._consumer
# may be None.
is_consumer_active = False

# Something went wrong when opening the RPC.
if not self._consumer.is_active:
if not is_consumer_active:
# TODO: Share the exception from _rpc.open(). Blocked by
# https://github.com/googleapis/python-api-core/issues/268
request_exception = exceptions.Unknown(
Expand Down

0 comments on commit 2cb641a

Please sign in to comment.