Skip to content

Commit

Permalink
fix(bigquerystorage): resume reader connection on EOS internal error
Browse files Browse the repository at this point in the history
It's infeasible for the backend to change the status of `EOS on DATA`
internal errors, so instead we check the error message to see if it's
an error that is resumable. We don't want to try to resume on *all*
internal errors, so inspecting the message is the best we can do.
  • Loading branch information
tswast committed Dec 18, 2019
1 parent 8bb4068 commit d893035
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 5 deletions.
18 changes: 18 additions & 0 deletions bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@

_STREAM_RESUMPTION_EXCEPTIONS = (google.api_core.exceptions.ServiceUnavailable,)

# The Google API endpoint can unexpectedly close long-running HTTP/2 streams.
# Unfortunately, this condition is surfaced to the caller as an internal error
# by gRPC. We don't want to resume on all internal errors, so instead we look
# for error message that we know are caused by problems that are safe to
# reconnect.
_STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES = (
# See: https://issuetracker.google.com/143292803
"unexpected EOS on DATA frame",
)

_FASTAVRO_REQUIRED = (
"fastavro is required to parse ReadRowResponse messages with Avro bytes."
)
Expand Down Expand Up @@ -131,6 +141,14 @@ def __iter__(self):
yield message

return # Made it through the whole stream.
except google.api_core.exceptions.InternalServerError as exc:
resumable_error = False
for resumable_message in _STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES:
resumable_error = (
resumable_error or resumable_message in exc.message
)
if not resumable_error:
raise
except _STREAM_RESUMPTION_EXCEPTIONS:
# Transient error, so reconnect to the stream.
pass
Expand Down
60 changes: 55 additions & 5 deletions bigquery_storage/tests/unit/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,22 @@ def _bq_to_arrow_batches(bq_blocks, arrow_schema):
return arrow_batches


def _avro_blocks_w_nonresumable_internal_error(avro_blocks):
for block in avro_blocks:
yield block
raise google.api_core.exceptions.InternalServerError(
"INTERNAL: Got a nonresumable error."
)


def _avro_blocks_w_resumable_internal_error(avro_blocks):
for block in avro_blocks:
yield block
raise google.api_core.exceptions.InternalServerError(
"INTERNAL: Received unexpected EOS on DATA frame from server."
)


def _avro_blocks_w_unavailable(avro_blocks):
for block in avro_blocks:
yield block
Expand Down Expand Up @@ -363,6 +379,29 @@ def test_rows_w_timeout(class_under_test, mock_client):
mock_client.read_rows.assert_not_called()


def test_rows_w_nonresumable_internal_error(class_under_test, mock_client):
bq_columns = [{"name": "int_col", "type": "int64"}]
avro_schema = _bq_to_avro_schema(bq_columns)
read_session = _generate_avro_read_session(avro_schema)
bq_blocks = [[{"int_col": 1024}, {"int_col": 512}], [{"int_col": 256}]]
avro_blocks = _avro_blocks_w_nonresumable_internal_error(
_bq_to_avro_blocks(bq_blocks, avro_schema)
)

stream_position = bigquery_storage_v1beta1.types.StreamPosition(
stream={"name": "test"}
)

reader = class_under_test(avro_blocks, mock_client, stream_position, {})

with pytest.raises(
google.api_core.exceptions.InternalServerError, match="nonresumable error"
):
list(reader.rows(read_session))

mock_client.read_rows.assert_not_called()


def test_rows_w_reconnect(class_under_test, mock_client):
bq_columns = [{"name": "int_col", "type": "int64"}]
avro_schema = _bq_to_avro_schema(bq_columns)
Expand All @@ -374,13 +413,17 @@ def test_rows_w_reconnect(class_under_test, mock_client):
avro_blocks_1 = _avro_blocks_w_unavailable(
_bq_to_avro_blocks(bq_blocks_1, avro_schema)
)
bq_blocks_2 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]]
avro_blocks_2 = _bq_to_avro_blocks(bq_blocks_2, avro_schema)
bq_blocks_2 = [[{"int_col": 1024}, {"int_col": 512}], [{"int_col": 256}]]
avro_blocks_2 = _avro_blocks_w_resumable_internal_error(
_bq_to_avro_blocks(bq_blocks_2, avro_schema)
)
bq_blocks_3 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]]
avro_blocks_3 = _bq_to_avro_blocks(bq_blocks_3, avro_schema)

for block in avro_blocks_2:
for block in avro_blocks_3:
block.status.estimated_row_count = 7

mock_client.read_rows.return_value = avro_blocks_2
mock_client.read_rows.side_effect = (avro_blocks_2, avro_blocks_3)
stream_position = bigquery_storage_v1beta1.types.StreamPosition(
stream={"name": "test"}
)
Expand All @@ -397,17 +440,24 @@ def test_rows_w_reconnect(class_under_test, mock_client):
itertools.chain(
itertools.chain.from_iterable(bq_blocks_1),
itertools.chain.from_iterable(bq_blocks_2),
itertools.chain.from_iterable(bq_blocks_3),
)
)

assert tuple(got) == expected
assert got.total_rows == 7
mock_client.read_rows.assert_called_once_with(
mock_client.read_rows.assert_any_call(
bigquery_storage_v1beta1.types.StreamPosition(
stream={"name": "test"}, offset=4
),
metadata={"test-key": "test-value"},
)
mock_client.read_rows.assert_called_with(
bigquery_storage_v1beta1.types.StreamPosition(
stream={"name": "test"}, offset=7
),
metadata={"test-key": "test-value"},
)


def test_rows_w_reconnect_by_page(class_under_test, mock_client):
Expand Down

0 comments on commit d893035

Please sign in to comment.