You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We run a batch job that reads data from GCS, transforms them and writes output to BigQuery. We used Python SDK with Dataflow runner and apache-beam==2.39.0 package.
It seems like we are silently loosing data during write to BigQuery. In the job view in GCP we see that over 1.7 billion records going into WriteToBigQuery step:
However output table contains roughly 266 mln records.
Although the job has finished properly, there are two error logs from workers. Maybe they are related to missing data:
Error message from worker: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1458, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 561, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "apache_beam/runners/common.py", line 566, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "apache_beam/runners/common.py", line 1730, in apache_beam.runners.common._OutputHandler.finish_bundle_outputs
File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 273, in finish_bundle
writer.close()
File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1567, in close
self._file_handle.close()
File "/usr/local/lib/python3.9/site-packages/apache_beam/io/filesystemio.py", line 215, in close
self._uploader.finish()
File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/gcsio.py", line 788, in finish
raise self._upload_thread.last_error # pylint: disable=raising-bad-type
File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/gcsio.py", line 761, in _start_upload
self._client.objects.Insert(self._insert_request, upload=self._upload)
File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1152, in Insert
return self._RunMethod(
File "/usr/local/lib/python3.9/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
return self.ProcessHttpResponse(method_config, http_response, request)
File "/usr/local/lib/python3.9/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
self.__ProcessHttpResponse(method_config, http_response, request))
File "/usr/local/lib/python3.9/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse
raise exceptions.HttpError.FromResponse(
apitools.base.py.exceptions.HttpError: HttpError accessing <https://www.googleapis.com/resumable/upload/storage/v1/b/uni_test_bucket/o?alt=json&name=tmp5%2Fbq_load%2Fc412cea93eed45ef80d00bb57c14642e%2Funi-flifo-pipelines-dev.uni_flifo_pipelines_dev.flifo-parse-2022-08-22-12-11%2F0e61d8ac-f66f-47d9-b96e-93bdbd661743&uploadType=resumable&upload_id=ADPycdte7hA5_nNO5_d_UwvPf0ji0yy5JlexiECnvdX4vjY7cDQTZWp61h0oCyezYArTfnlIkfcETdxiqPqkj8I7qilTuBztTRLW>: response: <{'content-type': 'text/plain; charset=utf-8', 'x-guploader-uploadid': 'ADPycdte7hA5_nNO5_d_UwvPf0ji0yy5JlexiECnvdX4vjY7cDQTZWp61h0oCyezYArTfnlIkfcETdxiqPqkj8I7qilTuBztTRLW', 'content-length': '0', 'date': 'Mon, 22 Aug 2022 15:17:50 GMT', 'server': 'UploadServer', 'status': '503'}>, content <>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 284, in _execute
response = task()
File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 357, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction
return getattr(self, request_type)(
File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 635, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1009, in process_bundle
op.finish()
File "apache_beam/runners/worker/operations.py", line 939, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/worker/operations.py", line 942, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/worker/operations.py", line 943, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/common.py", line 1479, in apache_beam.runners.common.DoFnRunner.finish
File "apache_beam/runners/common.py", line 1460, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1458, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 561, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "apache_beam/runners/common.py", line 566, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "apache_beam/runners/common.py", line 1730, in apache_beam.runners.common._OutputHandler.finish_bundle_outputs
File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery_file_loads.py", line 273, in finish_bundle
writer.close()
File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 1567, in close
self._file_handle.close()
File "/usr/local/lib/python3.9/site-packages/apache_beam/io/filesystemio.py", line 215, in close
self._uploader.finish()
File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/gcsio.py", line 788, in finish
raise self._upload_thread.last_error # pylint: disable=raising-bad-type
File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/gcsio.py", line 761, in _start_upload
self._client.objects.Insert(self._insert_request, upload=self._upload)
File "/usr/local/lib/python3.9/site-packages/apache_beam/io/gcp/internal/clients/storage/storage_v1_client.py", line 1152, in Insert
return self._RunMethod(
File "/usr/local/lib/python3.9/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
return self.ProcessHttpResponse(method_config, http_response, request)
File "/usr/local/lib/python3.9/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
self.__ProcessHttpResponse(method_config, http_response, request))
File "/usr/local/lib/python3.9/site-packages/apitools/base/py/base_api.py", line 603, in __ProcessHttpResponse
raise exceptions.HttpError.FromResponse(
RuntimeError: apitools.base.py.exceptions.HttpError: HttpError accessing <https://www.googleapis.com/resumable/upload/storage/v1/b/uni_test_bucket/o?alt=json&name=tmp5%2Fbq_load%2Fc412cea93eed45ef80d00bb57c14642e%2Funi-flifo-pipelines-dev.uni_flifo_pipelines_dev.flifo-parse-2022-08-22-12-11%2F0e61d8ac-f66f-47d9-b96e-93bdbd661743&uploadType=resumable&upload_id=ADPycdte7hA5_nNO5_d_UwvPf0ji0yy5JlexiECnvdX4vjY7cDQTZWp61h0oCyezYArTfnlIkfcETdxiqPqkj8I7qilTuBztTRLW>: response: <{'content-type': 'text/plain; charset=utf-8', 'x-guploader-uploadid': 'ADPycdte7hA5_nNO5_d_UwvPf0ji0yy5JlexiECnvdX4vjY7cDQTZWp61h0oCyezYArTfnlIkfcETdxiqPqkj8I7qilTuBztTRLW', 'content-length': '0', 'date': 'Mon, 22 Aug 2022 15:17:50 GMT', 'server': 'UploadServer', 'status': '503'}>, content <> [while running 'Save FLIFO updates/Write to BQ/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)-ptransform-119']
It looks like a problem with access to a GCS bucket which we use as both tmp and staging location. This error appears only sometimes and so far it occured only at the certain data volume (although I am not sure if that's related).
Unfortunatelly I am not allowed to provide exact reproducible code from my employee, but transforms are fairly simple - reading from GCS, xml parsing, filtering broken records, mapping to json and uploading to BQ. Also schema is auto-generated from Pydantic objects, so we are quite certain that this part is working.
From what I found in the code, the default compute method for batch is WriteToBigQuery.Method.FILE_LOADS, which means that WriteToBigQuery uses BigQueryBatchFileLoads. Unlike retry strategy used by _StreamToBigQuery in streaming pipelines, BigQueryBatchFileLoads does not have take an argument for failure strategy. At this point it's a bit hard for me to debug it further without going deep into the implementation.
Any help will be appreciated.
Issue Priority
Priority: 2
Issue Component
Component: io-py-gcp
The text was updated successfully, but these errors were encountered:
Looks like the error is thrown when temp files are finished writing at writer.close() and Beam tries to upload them to GCS. I'm not sure about the underlying reason for your HTTP errors but ideally this should be retried. Temp files are written to GCS and looking at the uploading code in gcsio.py, I see there is the @retry.no_retries decorator. @silviulica I saw your TODO tag there, do you have insight on this by any chance (I know this was committed a long time ago)
What happened?
Problem
We run a batch job that reads data from GCS, transforms them and writes output to BigQuery. We used Python SDK with Dataflow runner and apache-beam==2.39.0 package.
It seems like we are silently loosing data during write to BigQuery. In the job view in GCP we see that over 1.7 billion records going into WriteToBigQuery step:
However output table contains roughly 266 mln records.
Although the job has finished properly, there are two error logs from workers. Maybe they are related to missing data:
It looks like a problem with access to a GCS bucket which we use as both tmp and staging location. This error appears only sometimes and so far it occured only at the certain data volume (although I am not sure if that's related).
Unfortunatelly I am not allowed to provide exact reproducible code from my employee, but transforms are fairly simple - reading from GCS, xml parsing, filtering broken records, mapping to json and uploading to BQ. Also schema is auto-generated from Pydantic objects, so we are quite certain that this part is working.
From what I found in the code, the default compute method for batch is WriteToBigQuery.Method.FILE_LOADS, which means that WriteToBigQuery uses BigQueryBatchFileLoads. Unlike retry strategy used by _StreamToBigQuery in streaming pipelines, BigQueryBatchFileLoads does not have take an argument for failure strategy. At this point it's a bit hard for me to debug it further without going deep into the implementation.
Any help will be appreciated.
Issue Priority
Priority: 2
Issue Component
Component: io-py-gcp
The text was updated successfully, but these errors were encountered: