From 6adecd438790d8c1b5182043db16232b68ff7a98 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 11 Oct 2022 23:13:33 -0700 Subject: [PATCH] Remove obsolete and deprecated bigquery native write. #23557 (#23558) These have been deprecated for two years; unconditionally fall back to the Beam-native variant now. --- CHANGES.md | 3 +- .../gcp/big_query_query_to_table_it_test.py | 65 ------- sdks/python/apache_beam/io/gcp/bigquery.py | 176 +----------------- .../apache_beam/io/gcp/bigquery_test.py | 52 +----- .../apache_beam/io/gcp/bigquery_tools.py | 67 ------- .../apache_beam/io/gcp/bigquery_tools_test.py | 159 ---------------- .../runners/dataflow/dataflow_runner.py | 13 +- .../runners/dataflow/dataflow_runner_test.py | 63 ------- .../runners/dataflow/ptransform_overrides.py | 135 -------------- 9 files changed, 21 insertions(+), 712 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 6ec430ea84e9..be4cf30bc3af 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -62,7 +62,8 @@ * Decreased TextSource CPU utilization by 2.3x (Java) ([#23193](https://github.com/apache/beam/issues/23193)). * Fixed bug when using SpannerIO with RuntimeValueProvider options (Java) ([#22146](https://github.com/apache/beam/issues/22146)). * Fixed issue for unicode rendering on WriteToBigQuery ([#10785](https://github.com/apache/beam/issues/10785)) -* Remove obsolete variant of BigQuery Read ([#23559](https://github.com/apache/beam/issues/23559)). +* Remove obsolete variants of BigQuery Read and Write, always using Beam-native variant + ([#23564](https://github.com/apache/beam/issues/23564) and [#23559](https://github.com/apache/beam/issues/23559)). * Bumped google-cloud-spanner dependency version to 3.x for Python SDK ([#21198](https://github.com/apache/beam/issues/21198)). ## New Features / Improvements diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py index ba1349281d69..eede4248ea04 100644 --- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py +++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py @@ -35,7 +35,6 @@ from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.io.gcp.tests.bigquery_matcher import BigqueryMatcher -from apache_beam.runners.direct.test_direct_runner import TestDirectRunner from apache_beam.testing import test_utils from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher from apache_beam.testing.test_pipeline import TestPipeline @@ -201,42 +200,6 @@ def test_big_query_standard_sql(self): options = self.test_pipeline.get_full_options_as_args(**extra_opts) big_query_query_to_table_pipeline.run_bq_pipeline(options) - @pytest.mark.it_postcommit - def test_big_query_standard_sql_kms_key_native(self): - if isinstance(self.test_pipeline.runner, TestDirectRunner): - self.skipTest("This test doesn't work on DirectRunner.") - verify_query = DIALECT_OUTPUT_VERIFY_QUERY % self.output_table - expected_checksum = test_utils.compute_hash(DIALECT_OUTPUT_EXPECTED) - pipeline_verifiers = [ - PipelineStateMatcher(), - BigqueryMatcher( - project=self.project, - query=verify_query, - checksum=expected_checksum) - ] - kms_key = self.test_pipeline.get_option('kms_key_name') - self.assertTrue(kms_key) - extra_opts = { - 'query': STANDARD_QUERY, - 'output': self.output_table, - 'output_schema': DIALECT_OUTPUT_SCHEMA, - 'use_standard_sql': True, - 'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS, - 'on_success_matcher': all_of(*pipeline_verifiers), - 'kms_key': kms_key, - 'native': True, - 'experiments': 'use_legacy_bq_sink', - } - options = self.test_pipeline.get_full_options_as_args(**extra_opts) - big_query_query_to_table_pipeline.run_bq_pipeline(options) - - table = self.bigquery_client.get_table( - self.project, self.dataset_id, 'output_table') - self.assertIsNotNone( - table.encryptionConfiguration, - 'No encryption configuration found: %s' % table) - self.assertEqual(kms_key, table.encryptionConfiguration.kmsKeyName) - @pytest.mark.it_postcommit def test_big_query_new_types(self): expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED) @@ -284,34 +247,6 @@ def test_big_query_new_types_avro(self): options = self.test_pipeline.get_full_options_as_args(**extra_opts) big_query_query_to_table_pipeline.run_bq_pipeline(options) - @pytest.mark.it_postcommit - def test_big_query_new_types_native(self): - expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED) - verify_query = NEW_TYPES_OUTPUT_VERIFY_QUERY % self.output_table - pipeline_verifiers = [ - PipelineStateMatcher(), - BigqueryMatcher( - project=self.project, - query=verify_query, - checksum=expected_checksum, - timeout_secs=30, - ) - ] - self._setup_new_types_env() - extra_opts = { - 'query': NEW_TYPES_QUERY % (self.dataset_id, NEW_TYPES_INPUT_TABLE), - 'output': self.output_table, - 'output_schema': NEW_TYPES_OUTPUT_SCHEMA, - 'use_standard_sql': False, - 'native': True, - 'use_json_exports': True, - 'wait_until_finish_duration': WAIT_UNTIL_FINISH_DURATION_MS, - 'on_success_matcher': all_of(*pipeline_verifiers), - 'experiments': 'use_legacy_bq_sink', - } - options = self.test_pipeline.get_full_options_as_args(**extra_opts) - big_query_query_to_table_pipeline.run_bq_pipeline(options) - if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 0e363bbd7879..bad20f69243f 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -392,7 +392,6 @@ def chain_after(result): from apache_beam.options.value_provider import ValueProvider from apache_beam.options.value_provider import check_accessible from apache_beam.pvalue import PCollection -from apache_beam.runners.dataflow.native_io import iobase as dataflow_io from apache_beam.transforms import DoFn from apache_beam.transforms import ParDo from apache_beam.transforms import PTransform @@ -483,11 +482,6 @@ def RowAsDictJsonCoder(*args, **kwargs): return bigquery_tools.RowAsDictJsonCoder(*args, **kwargs) -@deprecated(since='2.11.0', current="bigquery_tools.BigQueryWriter") -def BigQueryWriter(*args, **kwargs): - return bigquery_tools.BigQueryWriter(*args, **kwargs) - - @deprecated(since='2.11.0', current="bigquery_tools.BigQueryWrapper") def BigQueryWrapper(*args, **kwargs): return bigquery_tools.BigQueryWrapper(*args, **kwargs) @@ -1281,167 +1275,12 @@ def __next__(self): @deprecated(since='2.11.0', current="WriteToBigQuery") -class BigQuerySink(dataflow_io.NativeSink): - """A sink based on a BigQuery table. - - This BigQuery sink triggers a Dataflow native sink for BigQuery - that only supports batch pipelines. - Instead of using this sink directly, please use WriteToBigQuery - transform that works for both batch and streaming pipelines. - """ - def __init__( - self, - table, - dataset=None, - project=None, - schema=None, - create_disposition=BigQueryDisposition.CREATE_IF_NEEDED, - write_disposition=BigQueryDisposition.WRITE_EMPTY, - validate=False, - coder=None, - kms_key=None): - """Initialize a BigQuerySink. - - Args: - table (str): The ID of the table. If **dataset** argument is :data:`None` - then the table argument must contain the entire table reference - specified as: ``'DATASET.TABLE'`` or ``'PROJECT:DATASET.TABLE'``. - dataset (str): The ID of the dataset containing this table or - :data:`None` if the table reference is specified entirely by the table - argument. - project (str): The ID of the project containing this table or - :data:`None` if the table reference is specified entirely by the table - argument. - schema (str): The schema to be used if the BigQuery table to write has - to be created. This can be either specified as a - :class:`~apache_beam.io.gcp.internal.clients.bigquery.\ -bigquery_v2_messages.TableSchema` object or a single string of the form - ``'field1:type1,field2:type2,field3:type3'`` that defines a comma - separated list of fields. Here ``'type'`` should specify the BigQuery - type of the field. Single string based schemas do not support nested - fields, repeated fields, or specifying a BigQuery mode for fields (mode - will always be set to ``'NULLABLE'``). - create_disposition (BigQueryDisposition): A string describing what - happens if the table does not exist. Possible values are: - - * :attr:`BigQueryDisposition.CREATE_IF_NEEDED`: create if does not - exist. - * :attr:`BigQueryDisposition.CREATE_NEVER`: fail the write if does not - exist. - - write_disposition (BigQueryDisposition): A string describing what - happens if the table has already some data. Possible values are: - - * :attr:`BigQueryDisposition.WRITE_TRUNCATE`: delete existing rows. - * :attr:`BigQueryDisposition.WRITE_APPEND`: add to existing rows. - * :attr:`BigQueryDisposition.WRITE_EMPTY`: fail the write if table not - empty. - - validate (bool): If :data:`True`, various checks will be done when sink - gets initialized (e.g., is table present given the disposition - arguments?). This should be :data:`True` for most scenarios in order to - catch errors as early as possible (pipeline construction instead of - pipeline execution). It should be :data:`False` if the table is created - during pipeline execution by a previous step. - coder (~apache_beam.coders.coders.Coder): The coder for the - table rows if serialized to disk. If :data:`None`, then the default - coder is :class:`~apache_beam.io.gcp.bigquery_tools.RowAsDictJsonCoder`, - which will interpret every element written to the sink as a dictionary - that will be JSON serialized as a line in a file. This argument needs a - value only in special cases when writing table rows as dictionaries is - not desirable. - kms_key (str): Optional Cloud KMS key name for use when creating new - tables. - - Raises: - TypeError: if the schema argument is not a :class:`str` or a - :class:`~apache_beam.io.gcp.internal.clients.bigquery.\ -bigquery_v2_messages.TableSchema` object. - ValueError: if the table reference as a string does not - match the expected format. - """ - # Import here to avoid adding the dependency for local running scenarios. - try: - # pylint: disable=wrong-import-order, wrong-import-position - from apitools.base import py # pylint: disable=unused-import - except ImportError: - raise ImportError( - 'Google Cloud IO not available, ' - 'please install apache_beam[gcp]') - - self.table_reference = bigquery_tools.parse_table_reference( - table, dataset, project) - # Transform the table schema into a bigquery.TableSchema instance. - if isinstance(schema, str): - # TODO(silviuc): Should add a regex-based validation of the format. - table_schema = bigquery.TableSchema() - schema_list = [s.strip(' ') for s in schema.split(',')] - for field_and_type in schema_list: - field_name, field_type = field_and_type.split(':') - field_schema = bigquery.TableFieldSchema() - field_schema.name = field_name - field_schema.type = field_type - field_schema.mode = 'NULLABLE' - table_schema.fields.append(field_schema) - self.table_schema = table_schema - elif schema is None: - # TODO(silviuc): Should check that table exists if no schema specified. - self.table_schema = schema - elif isinstance(schema, bigquery.TableSchema): - self.table_schema = schema - else: - raise TypeError('Unexpected schema argument: %s.' % schema) - - self.create_disposition = BigQueryDisposition.validate_create( - create_disposition) - self.write_disposition = BigQueryDisposition.validate_write( - write_disposition) - self.validate = validate - self.coder = coder or bigquery_tools.RowAsDictJsonCoder() - self.kms_key = kms_key - - def display_data(self): - res = {} - if self.table_reference is not None: - tableSpec = '{}.{}'.format( - self.table_reference.datasetId, self.table_reference.tableId) - if self.table_reference.projectId is not None: - tableSpec = '{}:{}'.format(self.table_reference.projectId, tableSpec) - res['table'] = DisplayDataItem(tableSpec, label='Table') - - res['validation'] = DisplayDataItem( - self.validate, label="Validation Enabled") - return res - - def schema_as_json(self): - """Returns the TableSchema associated with the sink as a JSON string.""" - def schema_list_as_object(schema_list): - """Returns a list of TableFieldSchema objects as a list of dicts.""" - fields = [] - for f in schema_list: - fs = {'name': f.name, 'type': f.type} - if f.description is not None: - fs['description'] = f.description - if f.mode is not None: - fs['mode'] = f.mode - if f.type.lower() == 'record': - fs['fields'] = schema_list_as_object(f.fields) - fields.append(fs) - return fields - - return json.dumps( - {'fields': schema_list_as_object(self.table_schema.fields)}) - - @property - def format(self): - """Sink format name required for remote execution.""" - return 'bigquery' - - def writer(self, test_bigquery_client=None, buffer_size=None): - return bigquery_tools.BigQueryWriter( - sink=self, - test_bigquery_client=test_bigquery_client, - buffer_size=buffer_size) +def BigQuerySink(*args, validate=False, **kwargs): + """A deprecated alias for WriteToBigQuery.""" + warnings.warn( + "Native sinks no longer implemented; " + "falling back to standard Beam sink.") + return WriteToBigQuery(*args, validate=validate, **kwargs) _KNOWN_TABLES = set() @@ -2265,6 +2104,9 @@ def display_data(self): if self.table_reference.projectId is not None: tableSpec = '{}:{}'.format(self.table_reference.projectId, tableSpec) res['table'] = DisplayDataItem(tableSpec, label='Table') + + res['validation'] = DisplayDataItem( + self._validate, label="Validation Enabled") return res def to_runner_api_parameter(self, context): diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 47d0236dc2a2..02e6fb04c47a 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -495,8 +495,8 @@ def test_parse_schema_descriptor(self): self.assertEqual(sink.table_reference.datasetId, 'dataset') self.assertEqual(sink.table_reference.tableId, 'table') result_schema = { - field.name: field.type - for field in sink.table_schema.fields + field['name']: field['type'] + for field in sink.schema['fields'] } self.assertEqual({'n': 'INTEGER', 's': 'STRING'}, result_schema) @@ -509,54 +509,6 @@ def test_project_table_display_data(self): ] hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) - def test_simple_schema_as_json(self): - sink = beam.io.BigQuerySink( - 'project:dataset.table', schema='s:STRING, n:INTEGER') - self.assertEqual( - json.dumps({ - 'fields': [{ - 'name': 's', 'type': 'STRING', 'mode': 'NULLABLE' - }, { - 'name': 'n', 'type': 'INTEGER', 'mode': 'NULLABLE' - }] - }), - sink.schema_as_json()) - - def test_nested_schema_as_json(self): - string_field = bigquery.TableFieldSchema( - name='s', type='STRING', mode='NULLABLE', description='s description') - number_field = bigquery.TableFieldSchema( - name='n', type='INTEGER', mode='REQUIRED', description='n description') - record_field = bigquery.TableFieldSchema( - name='r', - type='RECORD', - mode='REQUIRED', - description='r description', - fields=[string_field, number_field]) - schema = bigquery.TableSchema(fields=[record_field]) - sink = beam.io.BigQuerySink('dataset.table', schema=schema) - self.assertEqual({ - 'fields': [{ - 'name': 'r', - 'type': 'RECORD', - 'mode': 'REQUIRED', - 'description': 'r description', - 'fields': [{ - 'name': 's', - 'type': 'STRING', - 'mode': 'NULLABLE', - 'description': 's description' - }, - { - 'name': 'n', - 'type': 'INTEGER', - 'mode': 'REQUIRED', - 'description': 'n description' - }] - }] - }, - json.loads(sink.schema_as_json())) - @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestWriteToBigQuery(unittest.TestCase): diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index f087d21128d4..0bec4664777f 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -56,8 +56,6 @@ from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.metrics import monitoring_infos from apache_beam.options import value_provider -from apache_beam.options.pipeline_options import GoogleCloudOptions -from apache_beam.runners.dataflow.native_io import iobase as dataflow_io from apache_beam.transforms import DoFn from apache_beam.typehints.typehints import Any from apache_beam.utils import retry @@ -1340,71 +1338,6 @@ def convert_row_to_dict(self, row, schema): return result -# ----------------------------------------------------------------------------- -# BigQueryWriter. - - -class BigQueryWriter(dataflow_io.NativeSinkWriter): - """The sink writer for a BigQuerySink.""" - def __init__(self, sink, test_bigquery_client=None, buffer_size=None): - self.sink = sink - self.test_bigquery_client = test_bigquery_client - self.row_as_dict = isinstance(self.sink.coder, RowAsDictJsonCoder) - # Buffer used to batch written rows so we reduce communication with the - # BigQuery service. - self.rows_buffer = [] - self.rows_buffer_flush_threshold = buffer_size or 1000 - # Figure out the project, dataset, and table used for the sink. - self.project_id = self.sink.table_reference.projectId - - # If table schema did not define a project we default to executing project. - if self.project_id is None and hasattr(sink, 'pipeline_options'): - self.project_id = ( - sink.pipeline_options.view_as(GoogleCloudOptions).project) - - assert self.project_id is not None - - self.dataset_id = self.sink.table_reference.datasetId - self.table_id = self.sink.table_reference.tableId - - def _flush_rows_buffer(self): - if self.rows_buffer: - _LOGGER.info( - 'Writing %d rows to %s:%s.%s table.', - len(self.rows_buffer), - self.project_id, - self.dataset_id, - self.table_id) - passed, errors = self.client.insert_rows( - project_id=self.project_id, dataset_id=self.dataset_id, - table_id=self.table_id, rows=self.rows_buffer) - self.rows_buffer = [] - if not passed: - raise RuntimeError( - 'Could not successfully insert rows to BigQuery' - ' table [%s:%s.%s]. Errors: %s' % - (self.project_id, self.dataset_id, self.table_id, errors)) - - def __enter__(self): - self.client = BigQueryWrapper(client=self.test_bigquery_client) - self.client.get_or_create_table( - self.project_id, - self.dataset_id, - self.table_id, - self.sink.table_schema, - self.sink.create_disposition, - self.sink.write_disposition) - return self - - def __exit__(self, exception_type, exception_value, traceback): - self._flush_rows_buffer() - - def Write(self, row): - self.rows_buffer.append(row) - if len(self.rows_buffer) > self.rows_buffer_flush_threshold: - self._flush_rows_buffer() - - class RowAsDictJsonCoder(coders.Coder): """A coder for a table row (represented as a dict) to/from a JSON string. diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index bd65542d19f6..cf533265d7be 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -45,7 +45,6 @@ from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.metrics import monitoring_infos from apache_beam.metrics.execution import MetricsEnvironment -from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import StaticValueProvider # Protect against environments where bigquery library is not available. @@ -564,164 +563,6 @@ def test_start_query_job_priority_configuration(self): 'INTERACTIVE') -@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') -class TestBigQueryWriter(unittest.TestCase): - @mock.patch('time.sleep', return_value=None) - def test_no_table_and_create_never(self, patched_time_sleep): - client = mock.Mock() - client.tables.Get.side_effect = HttpError( - response={'status': '404'}, url='', content='') - create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER - with self.assertRaisesRegex( - RuntimeError, - r'Table project:dataset\.table not found but create ' - r'disposition is CREATE_NEVER'): - with beam.io.BigQuerySink( - 'project:dataset.table', - create_disposition=create_disposition).writer(client): - pass - - def test_no_table_and_create_if_needed(self): - client = mock.Mock() - table = bigquery.Table( - tableReference=bigquery.TableReference( - projectId='project', datasetId='dataset', tableId='table'), - schema=bigquery.TableSchema()) - client.tables.Get.side_effect = HttpError( - response={'status': '404'}, url='', content='') - client.tables.Insert.return_value = table - create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED - with beam.io.BigQuerySink( - 'project:dataset.table', - schema='somefield:INTEGER', - create_disposition=create_disposition).writer(client): - pass - self.assertTrue(client.tables.Get.called) - self.assertTrue(client.tables.Insert.called) - - @mock.patch('time.sleep', return_value=None) - def test_no_table_and_create_if_needed_and_no_schema( - self, patched_time_sleep): - client = mock.Mock() - client.tables.Get.side_effect = HttpError( - response={'status': '404'}, url='', content='') - create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED - with self.assertRaisesRegex( - RuntimeError, - r'Table project:dataset\.table requires a schema\. None ' - r'can be inferred because the table does not exist'): - with beam.io.BigQuerySink( - 'project:dataset.table', - create_disposition=create_disposition).writer(client): - pass - - @mock.patch('time.sleep', return_value=None) - def test_table_not_empty_and_write_disposition_empty( - self, patched_time_sleep): - client = mock.Mock() - client.tables.Get.return_value = bigquery.Table( - tableReference=bigquery.TableReference( - projectId='project', datasetId='dataset', tableId='table'), - schema=bigquery.TableSchema()) - client.tabledata.List.return_value = bigquery.TableDataList(totalRows=1) - write_disposition = beam.io.BigQueryDisposition.WRITE_EMPTY - with self.assertRaisesRegex( - RuntimeError, - r'Table project:dataset\.table is not empty but write ' - r'disposition is WRITE_EMPTY'): - with beam.io.BigQuerySink( - 'project:dataset.table', - write_disposition=write_disposition).writer(client): - pass - - def test_table_empty_and_write_disposition_empty(self): - client = mock.Mock() - table = bigquery.Table( - tableReference=bigquery.TableReference( - projectId='project', datasetId='dataset', tableId='table'), - schema=bigquery.TableSchema()) - client.tables.Get.return_value = table - client.tabledata.List.return_value = bigquery.TableDataList(totalRows=0) - client.tables.Insert.return_value = table - write_disposition = beam.io.BigQueryDisposition.WRITE_EMPTY - with beam.io.BigQuerySink( - 'project:dataset.table', - write_disposition=write_disposition).writer(client): - pass - self.assertTrue(client.tables.Get.called) - self.assertTrue(client.tabledata.List.called) - self.assertFalse(client.tables.Delete.called) - self.assertFalse(client.tables.Insert.called) - - @mock.patch('time.sleep', return_value=None) - def test_table_with_write_disposition_truncate(self, _patched_sleep): - client = mock.Mock() - table = bigquery.Table( - tableReference=bigquery.TableReference( - projectId='project', datasetId='dataset', tableId='table'), - schema=bigquery.TableSchema()) - client.tables.Get.return_value = table - client.tables.Insert.return_value = table - write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE - with beam.io.BigQuerySink( - 'project:dataset.table', - write_disposition=write_disposition).writer(client): - pass - self.assertTrue(client.tables.Get.called) - self.assertTrue(client.tables.Delete.called) - self.assertTrue(client.tables.Insert.called) - - def test_table_with_write_disposition_append(self): - client = mock.Mock() - table = bigquery.Table( - tableReference=bigquery.TableReference( - projectId='project', datasetId='dataset', tableId='table'), - schema=bigquery.TableSchema()) - client.tables.Get.return_value = table - client.tables.Insert.return_value = table - write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND - with beam.io.BigQuerySink( - 'project:dataset.table', - write_disposition=write_disposition).writer(client): - pass - self.assertTrue(client.tables.Get.called) - self.assertFalse(client.tables.Delete.called) - self.assertFalse(client.tables.Insert.called) - - def test_rows_are_written(self): - client = mock.Mock() - table = bigquery.Table( - tableReference=bigquery.TableReference( - projectId='project', datasetId='dataset', tableId='table'), - schema=bigquery.TableSchema()) - client.tables.Get.return_value = table - write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND - - client.insert_rows_json.return_value = [] - - with beam.io.BigQuerySink( - 'project:dataset.table', - write_disposition=write_disposition).writer(client) as writer: - writer.Write({'i': 1, 'b': True, 's': 'abc', 'f': 3.14}) - - sample_row = {'i': 1, 'b': True, 's': 'abc', 'f': 3.14} - client.insert_rows_json.assert_called_with( - '%s.%s.%s' % ('project', 'dataset', 'table'), - json_rows=[sample_row], - row_ids=['_1'], - skip_invalid_rows=False, - timeout=120, - ignore_unknown_values=False) - - def test_table_schema_without_project(self): - # Writer should pick executing project by default. - sink = beam.io.BigQuerySink(table='mydataset.mytable') - options = PipelineOptions(flags=['--project', 'myproject']) - sink.pipeline_options = options - writer = sink.writer() - self.assertEqual('myproject', writer.project_id) - - @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestRowAsDictJsonCoder(unittest.TestCase): def test_row_as_dict(self): diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index f4d91d5d99e5..cf209ad7247f 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -28,6 +28,7 @@ import threading import time import traceback +import warnings from collections import defaultdict from subprocess import DEVNULL from typing import TYPE_CHECKING @@ -448,12 +449,14 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): # contain any added PTransforms. pipeline.replace_all(DataflowRunner._PTRANSFORM_OVERRIDES) - from apache_beam.runners.dataflow.ptransform_overrides import WriteToBigQueryPTransformOverride + if debug_options.lookup_experiment('use_legacy_bq_sink'): + warnings.warn( + "Native sinks no longer implemented; " + "ignoring use_legacy_bq_sink.") + from apache_beam.runners.dataflow.ptransform_overrides import GroupIntoBatchesWithShardedKeyPTransformOverride - pipeline.replace_all([ - WriteToBigQueryPTransformOverride(pipeline, options), - GroupIntoBatchesWithShardedKeyPTransformOverride(self, options) - ]) + pipeline.replace_all( + [GroupIntoBatchesWithShardedKeyPTransformOverride(self, options)]) if use_fnapi and not apiclient._use_unified_worker(options): pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index 0511fa429aee..c91737d9d177 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -645,69 +645,6 @@ def test_gbk_translation(self): self.assertEqual( gbk_step[u'properties']['output_info'], expected_output_info) - def test_write_bigquery_translation(self): - runner = DataflowRunner() - - self.default_properties.append('--experiments=use_legacy_bq_sink') - with beam.Pipeline(runner=runner, - options=PipelineOptions(self.default_properties)) as p: - # pylint: disable=expression-not-assigned - p | beam.Create([1]) | beam.io.WriteToBigQuery('some.table') - - job_dict = json.loads(str(runner.job)) - - expected_step = { - "kind": "ParallelWrite", - "name": "s2", - "properties": { - "create_disposition": "CREATE_IF_NEEDED", - "dataset": "some", - "display_data": [], - "encoding": { - "@type": "kind:windowed_value", - "component_encodings": [{ - "component_encodings": [], - "pipeline_proto_coder_id": "ref_Coder_RowAsDictJsonCoder_4" - }, { - "@type": "kind:global_window" - }], - "is_wrapper": True - }, - "format": "bigquery", - "parallel_input": { - "@type": "OutputReference", - "output_name": "out", - "step_name": "s1" - }, - "table": "table", - "user_name": "WriteToBigQuery/Write/NativeWrite", - "write_disposition": "WRITE_APPEND" - } - } - job_dict = json.loads(str(runner.job)) - write_step = [ - s for s in job_dict[u'steps'] - if s[u'properties'][u'user_name'].startswith('WriteToBigQuery') - ][0] - - # Delete the @type field because in this case it is a hash which may change - # depending on the pickling version. - step_encoding = write_step[u'properties'][u'encoding'] - del step_encoding[u'component_encodings'][0][u'@type'] - self.assertEqual(expected_step, write_step) - - def test_write_bigquery_failed_translation(self): - """Tests that WriteToBigQuery cannot have any consumers if replaced.""" - runner = DataflowRunner() - - self.default_properties.append('--experiments=use_legacy_bq_sink') - with self.assertRaises(Exception): - with beam.Pipeline(runner=runner, - options=PipelineOptions(self.default_properties)) as p: - # pylint: disable=expression-not-assigned - out = p | beam.Create([1]) | beam.io.WriteToBigQuery('some.table') - out['destination_file_pairs'] | 'MyTransform' >> beam.Map(lambda _: _) - @unittest.skip( 'https://github.com/apache/beam/issues/18716: enable once ' 'CombineFnVisitor is fixed') diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py index e8a660c0ccf9..1f215fd02bdd 100644 --- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py +++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py @@ -19,7 +19,6 @@ # pytype: skip-file -from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.pipeline import PTransformOverride @@ -196,140 +195,6 @@ def expand(self, pbegin): ptransform.source.coder.to_type_hint()) -class WriteToBigQueryPTransformOverride(PTransformOverride): - def __init__(self, pipeline, options): - super().__init__() - self.options = options - self.outputs = [] - - self._check_bq_outputs(pipeline) - - def _check_bq_outputs(self, pipeline): - """Checks that there are no consumers if the transform will be replaced. - - The WriteToBigQuery replacement is the native BigQuerySink which has an - output of a PDone. The original transform, however, returns a dict. The user - may be inadvertantly using the dict output which will have no side-effects - or fail pipeline construction esoterically. This checks the outputs and - gives a user-friendsly error. - """ - # Imported here to avoid circular dependencies. - # pylint: disable=wrong-import-order, wrong-import-position, unused-import - from apache_beam.pipeline import PipelineVisitor - from apache_beam.io import WriteToBigQuery - - # First, retrieve all the outpts from all the WriteToBigQuery transforms - # that will be replaced. Later, we will use these to make sure no one - # consumes these. - class GetWriteToBqOutputsVisitor(PipelineVisitor): - def __init__(self, matches): - self.matches = matches - self.outputs = set() - - def enter_composite_transform(self, transform_node): - # Only add outputs that are going to be replaced. - if self.matches(transform_node): - self.outputs.update(set(transform_node.outputs.values())) - - outputs_visitor = GetWriteToBqOutputsVisitor(self.matches) - pipeline.visit(outputs_visitor) - - # Finally, verify that there are no consumers to the previously found - # outputs. - class VerifyWriteToBqOutputsVisitor(PipelineVisitor): - def __init__(self, outputs): - self.outputs = outputs - - def enter_composite_transform(self, transform_node): - self.visit_transform(transform_node) - - def visit_transform(self, transform_node): - # Internal consumers of the outputs we're overriding are expected. - # We only error out on non-internal consumers. - if ('BigQueryBatchFileLoads' not in transform_node.full_label and - [o for o in self.outputs if o in transform_node.inputs]): - raise ValueError( - 'WriteToBigQuery was being replaced with the native ' - 'BigQuerySink, but the transform "{}" has an input which will be ' - 'replaced with a PDone. To fix, please remove all transforms ' - 'that read from any WriteToBigQuery transforms.'.format( - transform_node.full_label)) - - pipeline.visit(VerifyWriteToBqOutputsVisitor(outputs_visitor.outputs)) - - def matches(self, applied_ptransform): - # Imported here to avoid circular dependencies. - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam import io - transform = applied_ptransform.transform - if (not isinstance(transform, io.WriteToBigQuery) or - getattr(transform, 'override', False)): - return False - - experiments = self.options.view_as(DebugOptions).experiments or [] - if 'use_legacy_bq_sink' not in experiments: - return False - - if transform.schema == io.gcp.bigquery.SCHEMA_AUTODETECT: - raise RuntimeError( - 'Schema auto-detection is not supported on the native sink') - - # The replacement is only valid for Batch. - standard_options = self.options.view_as(StandardOptions) - if standard_options.streaming: - if transform.write_disposition == io.BigQueryDisposition.WRITE_TRUNCATE: - raise RuntimeError('Can not use write truncation mode in streaming') - return False - - self.outputs = list(applied_ptransform.outputs.keys()) - return True - - def get_replacement_transform(self, ptransform): - # Imported here to avoid circular dependencies. - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam import io - - class WriteToBigQuery(io.WriteToBigQuery): - override = True - - def __init__(self, transform, outputs): - self.transform = transform - self.outputs = outputs - - def __getattr__(self, name): - """Returns the given attribute from the parent. - - This allows this transform to act like a WriteToBigQuery transform - without having to construct a new WriteToBigQuery transform. - """ - return self.transform.__getattribute__(name) - - def expand(self, pcoll): - from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json - import json - - schema = None - if self.schema: - schema = parse_table_schema_from_json(json.dumps(self.schema)) - - out = pcoll | io.Write( - io.BigQuerySink( - self.table_reference.tableId, - self.table_reference.datasetId, - self.table_reference.projectId, - schema, - self.create_disposition, - self.write_disposition, - kms_key=self.kms_key)) - - # The WriteToBigQuery can have different outputs depending on if it's - # Batch or Streaming. This retrieved the output keys from the node and - # is replacing them here to be consistent. - return {key: out for key in self.outputs} - - return WriteToBigQuery(ptransform, self.outputs) - - class GroupIntoBatchesWithShardedKeyPTransformOverride(PTransformOverride): """A ``PTransformOverride`` for ``GroupIntoBatches.WithShardedKey``.