diff --git a/CHANGES.md b/CHANGES.md index ad93b497e549..8cefc3b01855 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -62,6 +62,7 @@ * Decreased TextSource CPU utilization by 2.3x (Java) ([#23193](https://github.com/apache/beam/issues/23193)). * Fixed bug when using SpannerIO with RuntimeValueProvider options (Java) ([#22146](https://github.com/apache/beam/issues/22146)). * Fixed issue for unicode rendering on WriteToBigQuery ([#10785](https://github.com/apache/beam/issues/10785)) +* Remove obsolete variant of BigQuery Read ([#23559](https://github.com/apache/beam/issues/23559)). * Bumped google-cloud-spanner dependency version to 3.x for Python SDK ([#21198](https://github.com/apache/beam/issues/21198)). ## New Features / Improvements diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index ece4489d777d..0e363bbd7879 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -350,6 +350,7 @@ def chain_after(result): import random import time import uuid +import warnings from dataclasses import dataclass from typing import Dict from typing import List @@ -482,11 +483,6 @@ def RowAsDictJsonCoder(*args, **kwargs): return bigquery_tools.RowAsDictJsonCoder(*args, **kwargs) -@deprecated(since='2.11.0', current="bigquery_tools.BigQueryReader") -def BigQueryReader(*args, **kwargs): - return bigquery_tools.BigQueryReader(*args, **kwargs) - - @deprecated(since='2.11.0', current="bigquery_tools.BigQueryWriter") def BigQueryWriter(*args, **kwargs): return bigquery_tools.BigQueryWriter(*args, **kwargs) @@ -591,158 +587,29 @@ def BigQuerySource( kms_key=None, use_dataflow_native_source=False): if use_dataflow_native_source: - return _BigQuerySource( - table, - dataset, - project, - query, - validate, - coder, - use_standard_sql, - flatten_results, - kms_key) - else: - return ReadFromBigQuery( - table=table, - dataset=dataset, - project=project, - query=query, - validate=validate, - coder=coder, - use_standard_sql=use_standard_sql, - flatten_results=flatten_results, - use_json_exports=True, - kms_key=kms_key) + warnings.warn( + "Native sources no longer implemented; " + "falling back to standard Beam source.") + return ReadFromBigQuery( + table=table, + dataset=dataset, + project=project, + query=query, + validate=validate, + coder=coder, + use_standard_sql=use_standard_sql, + flatten_results=flatten_results, + use_json_exports=True, + kms_key=kms_key) @deprecated(since='2.25.0', current="ReadFromBigQuery") -class _BigQuerySource(dataflow_io.NativeSource): +def _BigQuerySource(*args, **kwargs): """A source based on a BigQuery table.""" - def __init__( - self, - table=None, - dataset=None, - project=None, - query=None, - validate=False, - coder=None, - use_standard_sql=False, - flatten_results=True, - kms_key=None, - temp_dataset=None): - """Initialize a :class:`BigQuerySource`. - - Args: - table (str): The ID of a BigQuery table. If specified all data of the - table will be used as input of the current source. The ID must contain - only letters ``a-z``, ``A-Z``, numbers ``0-9``, or underscores - ``_``. If dataset and query arguments are :data:`None` then the table - argument must contain the entire table reference specified as: - ``'DATASET.TABLE'`` or ``'PROJECT:DATASET.TABLE'``. - dataset (str): The ID of the dataset containing this table or - :data:`None` if the table reference is specified entirely by the table - argument or a query is specified. - project (str): The ID of the project containing this table or - :data:`None` if the table reference is specified entirely by the table - argument or a query is specified. - query (str): A query to be used instead of arguments table, dataset, and - project. - validate (bool): If :data:`True`, various checks will be done when source - gets initialized (e.g., is table present?). This should be - :data:`True` for most scenarios in order to catch errors as early as - possible (pipeline construction instead of pipeline execution). It - should be :data:`False` if the table is created during pipeline - execution by a previous step. - coder (~apache_beam.coders.coders.Coder): The coder for the table - rows if serialized to disk. If :data:`None`, then the default coder is - :class:`~apache_beam.io.gcp.bigquery_tools.RowAsDictJsonCoder`, - which will interpret every line in a file as a JSON serialized - dictionary. This argument needs a value only in special cases when - returning table rows as dictionaries is not desirable. - use_standard_sql (bool): Specifies whether to use BigQuery's standard SQL - dialect for this query. The default value is :data:`False`. - If set to :data:`True`, the query will use BigQuery's updated SQL - dialect with improved standards compliance. - This parameter is ignored for table inputs. - flatten_results (bool): Flattens all nested and repeated fields in the - query results. The default value is :data:`True`. - kms_key (str): Optional Cloud KMS key name for use when creating new - tables. - temp_dataset (``google.cloud.bigquery.dataset.DatasetReference``): - The dataset in which to create temporary tables when performing file - loads. By default, a new dataset is created in the execution project for - temporary tables. - - Raises: - ValueError: if any of the following is true: - - 1) the table reference as a string does not match the expected format - 2) neither a table nor a query is specified - 3) both a table and a query is specified. - """ - - # Import here to avoid adding the dependency for local running scenarios. - try: - # pylint: disable=wrong-import-order, wrong-import-position - from apitools.base import py # pylint: disable=unused-import - except ImportError: - raise ImportError( - 'Google Cloud IO not available, ' - 'please install apache_beam[gcp]') - - if table is not None and query is not None: - raise ValueError( - 'Both a BigQuery table and a query were specified.' - ' Please specify only one of these.') - elif table is None and query is None: - raise ValueError('A BigQuery table or a query must be specified') - elif table is not None: - self.table_reference = bigquery_tools.parse_table_reference( - table, dataset, project) - self.query = None - self.use_legacy_sql = True - else: - self.query = query - # TODO(BEAM-1082): Change the internal flag to be standard_sql - self.use_legacy_sql = not use_standard_sql - self.table_reference = None - - self.validate = validate - self.flatten_results = flatten_results - self.coder = coder or bigquery_tools.RowAsDictJsonCoder() - self.kms_key = kms_key - self.temp_dataset = temp_dataset - - def display_data(self): - if self.query is not None: - res = {'query': DisplayDataItem(self.query, label='Query')} - else: - if self.table_reference.projectId is not None: - tableSpec = '{}:{}.{}'.format( - self.table_reference.projectId, - self.table_reference.datasetId, - self.table_reference.tableId) - else: - tableSpec = '{}.{}'.format( - self.table_reference.datasetId, self.table_reference.tableId) - res = {'table': DisplayDataItem(tableSpec, label='Table')} - - res['validation'] = DisplayDataItem( - self.validate, label='Validation Enabled') - return res - - @property - def format(self): - """Source format name required for remote execution.""" - return 'bigquery' - - def reader(self, test_bigquery_client=None): - return bigquery_tools.BigQueryReader( - source=self, - test_bigquery_client=test_bigquery_client, - use_legacy_sql=self.use_legacy_sql, - flatten_results=self.flatten_results, - kms_key=self.kms_key) + warnings.warn( + "Native sources no longer implemented; " + "falling back to standard Beam source.") + return ReadFromBigQuery(*args, **kwargs) # TODO(https://github.com/apache/beam/issues/21622): remove the serialization diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 73c8ae3905fd..47d0236dc2a2 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -187,120 +187,6 @@ def test_invalid_json_neg_inf(self): self.json_compliance_exception(float('-inf')) -@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') -class TestBigQuerySource(unittest.TestCase): - def test_display_data_item_on_validate_true(self): - source = beam.io.BigQuerySource( - 'dataset.table', validate=True, use_dataflow_native_source=True) - - dd = DisplayData.create_from(source) - expected_items = [ - DisplayDataItemMatcher('validation', True), - DisplayDataItemMatcher('table', 'dataset.table') - ] - hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) - - def test_table_reference_display_data(self): - source = beam.io.BigQuerySource( - 'dataset.table', use_dataflow_native_source=True) - dd = DisplayData.create_from(source) - expected_items = [ - DisplayDataItemMatcher('validation', False), - DisplayDataItemMatcher('table', 'dataset.table') - ] - hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) - - source = beam.io.BigQuerySource( - 'project:dataset.table', use_dataflow_native_source=True) - dd = DisplayData.create_from(source) - expected_items = [ - DisplayDataItemMatcher('validation', False), - DisplayDataItemMatcher('table', 'project:dataset.table') - ] - hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) - - source = beam.io.BigQuerySource( - 'xyz.com:project:dataset.table', use_dataflow_native_source=True) - dd = DisplayData.create_from(source) - expected_items = [ - DisplayDataItemMatcher('validation', False), - DisplayDataItemMatcher('table', 'xyz.com:project:dataset.table') - ] - hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) - - def test_parse_table_reference(self): - source = beam.io.BigQuerySource( - 'dataset.table', use_dataflow_native_source=True) - self.assertEqual(source.table_reference.datasetId, 'dataset') - self.assertEqual(source.table_reference.tableId, 'table') - - source = beam.io.BigQuerySource( - 'project:dataset.table', use_dataflow_native_source=True) - self.assertEqual(source.table_reference.projectId, 'project') - self.assertEqual(source.table_reference.datasetId, 'dataset') - self.assertEqual(source.table_reference.tableId, 'table') - - source = beam.io.BigQuerySource( - 'xyz.com:project:dataset.table', use_dataflow_native_source=True) - self.assertEqual(source.table_reference.projectId, 'xyz.com:project') - self.assertEqual(source.table_reference.datasetId, 'dataset') - self.assertEqual(source.table_reference.tableId, 'table') - - source = beam.io.BigQuerySource( - query='my_query', use_dataflow_native_source=True) - self.assertEqual(source.query, 'my_query') - self.assertIsNone(source.table_reference) - self.assertTrue(source.use_legacy_sql) - - def test_query_only_display_data(self): - source = beam.io.BigQuerySource( - query='my_query', use_dataflow_native_source=True) - dd = DisplayData.create_from(source) - expected_items = [ - DisplayDataItemMatcher('validation', False), - DisplayDataItemMatcher('query', 'my_query') - ] - hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) - - def test_specify_query_sql_format(self): - source = beam.io.BigQuerySource( - query='my_query', - use_standard_sql=True, - use_dataflow_native_source=True) - self.assertEqual(source.query, 'my_query') - self.assertFalse(source.use_legacy_sql) - - def test_specify_query_flattened_records(self): - source = beam.io.BigQuerySource( - query='my_query', - flatten_results=False, - use_dataflow_native_source=True) - self.assertFalse(source.flatten_results) - - def test_specify_query_unflattened_records(self): - source = beam.io.BigQuerySource( - query='my_query', flatten_results=True, use_dataflow_native_source=True) - self.assertTrue(source.flatten_results) - - def test_specify_query_without_table(self): - source = beam.io.BigQuerySource( - query='my_query', use_dataflow_native_source=True) - self.assertEqual(source.query, 'my_query') - self.assertIsNone(source.table_reference) - - def test_date_partitioned_table_name(self): - source = beam.io.BigQuerySource( - 'dataset.table$20030102', - validate=True, - use_dataflow_native_source=True) - dd = DisplayData.create_from(source) - expected_items = [ - DisplayDataItemMatcher('validation', True), - DisplayDataItemMatcher('table', 'dataset.table$20030102') - ] - hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) - - @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestJsonToDictCoder(unittest.TestCase): @staticmethod diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_tools.py index 27428aca5335..f087d21128d4 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py @@ -53,7 +53,6 @@ from apache_beam.internal.metrics.metric import ServiceCallMetric from apache_beam.io.gcp import bigquery_avro_tools from apache_beam.io.gcp import resource_identifiers -from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.metrics import monitoring_infos from apache_beam.options import value_provider @@ -1342,126 +1341,7 @@ def convert_row_to_dict(self, row, schema): # ----------------------------------------------------------------------------- -# BigQueryReader, BigQueryWriter. - - -class BigQueryReader(dataflow_io.NativeSourceReader): - """A reader for a BigQuery source.""" - def __init__( - self, - source, - test_bigquery_client=None, - use_legacy_sql=True, - flatten_results=True, - kms_key=None, - query_priority=None): - self.source = source - self.test_bigquery_client = test_bigquery_client - if auth.is_running_in_gce: - self.executing_project = auth.executing_project - elif hasattr(source, 'pipeline_options'): - self.executing_project = ( - source.pipeline_options.view_as(GoogleCloudOptions).project) - else: - self.executing_project = None - - # TODO(silviuc): Try to automatically get it from gcloud config info. - if not self.executing_project and test_bigquery_client is None: - raise RuntimeError( - 'Missing executing project information. Please use the --project ' - 'command line option to specify it.') - self.row_as_dict = isinstance(self.source.coder, RowAsDictJsonCoder) - # Schema for the rows being read by the reader. It is initialized the - # first time something gets read from the table. It is not required - # for reading the field values in each row but could be useful for - # getting additional details. - self.schema = None - self.use_legacy_sql = use_legacy_sql - self.flatten_results = flatten_results - self.kms_key = kms_key - self.bigquery_job_labels = {} - self.bq_io_metadata = None - - from apache_beam.io.gcp.bigquery import BigQueryQueryPriority - self.query_priority = query_priority or BigQueryQueryPriority.BATCH - - if self.source.table_reference is not None: - # If table schema did not define a project we default to executing - # project. - project_id = self.source.table_reference.projectId - if not project_id: - project_id = self.executing_project - self.query = 'SELECT * FROM [%s:%s.%s];' % ( - project_id, - self.source.table_reference.datasetId, - self.source.table_reference.tableId) - elif self.source.query is not None: - self.query = self.source.query - else: - # Enforce the "modes" enforced by BigQuerySource.__init__. - # If this exception has been raised, the BigQuerySource "modes" have - # changed and this method will need to be updated as well. - raise ValueError("BigQuerySource must have either a table or query") - - def _get_source_location(self): - """ - Get the source location (e.g. ``"EU"`` or ``"US"``) from either - - - :data:`source.table_reference` - or - - The first referenced table in :data:`source.query` - - See Also: - - :meth:`BigQueryWrapper.get_query_location` - - :meth:`BigQueryWrapper.get_table_location` - - Returns: - Optional[str]: The source location, if any. - """ - if self.source.table_reference is not None: - tr = self.source.table_reference - return self.client.get_table_location( - tr.projectId if tr.projectId is not None else self.executing_project, - tr.datasetId, - tr.tableId) - else: # It's a query source - return self.client.get_query_location( - self.executing_project, self.source.query, self.source.use_legacy_sql) - - def __enter__(self): - self.client = BigQueryWrapper(client=self.test_bigquery_client) - if not self.client.is_user_configured_dataset(): - # Temp dataset was provided by the user so we do not have to create one. - self.client.create_temporary_dataset( - self.executing_project, location=self._get_source_location()) - return self - - def __exit__(self, exception_type, exception_value, traceback): - self.client.clean_up_temporary_dataset(self.executing_project) - - def __iter__(self): - if not self.bq_io_metadata: - self.bq_io_metadata = create_bigquery_io_metadata() - for rows, schema in self.client.run_query( - project_id=self.executing_project, query=self.query, - use_legacy_sql=self.use_legacy_sql, - flatten_results=self.flatten_results, - priority=self.query_priority, - job_labels=self.bq_io_metadata.add_additional_bq_job_labels( - self.bigquery_job_labels)): - if self.schema is None: - self.schema = schema - for row in rows: - # return base64 encoded bytes as byte type on python 3 - # which matches the behavior of Beam Java SDK - for i in range(len(row.f)): - if self.schema.fields[i].type == 'BYTES' and row.f[i].v: - row.f[i].v.string_value = row.f[i].v.string_value.encode('utf-8') - - if self.row_as_dict: - yield self.client.convert_row_to_dict(row, schema) - else: - yield row +# BigQueryWriter. class BigQueryWriter(dataflow_io.NativeSinkWriter): diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py index 2ee4f374497d..bd65542d19f6 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_tools_test.py @@ -24,7 +24,6 @@ import logging import math import re -import time import unittest import fastavro @@ -33,9 +32,7 @@ from parameterized import parameterized import apache_beam as beam -from apache_beam.internal.gcp.json_value import to_json_value from apache_beam.io.gcp import resource_identifiers -from apache_beam.io.gcp.bigquery import TableRowJsonCoder from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR from apache_beam.io.gcp.bigquery_tools import AvroRowWriter from apache_beam.io.gcp.bigquery_tools import BigQueryJobTypes @@ -567,300 +564,6 @@ def test_start_query_job_priority_configuration(self): 'INTERACTIVE') -@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') -class TestBigQueryReader(unittest.TestCase): - def get_test_rows(self): - now = time.time() - dt = datetime.datetime.utcfromtimestamp(float(now)) - ts = dt.strftime('%Y-%m-%d %H:%M:%S.%f UTC') - expected_rows = [{ - 'i': 1, - 's': 'abc', - 'f': 2.3, - 'b': True, - 't': ts, - 'dt': '2016-10-31', - 'ts': '22:39:12.627498', - 'dt_ts': '2008-12-25T07:30:00', - 'r': { - 's2': 'b' - }, - 'rpr': [{ - 's3': 'c', 'rpr2': [{ - 'rs': ['d', 'e'], 's4': None - }] - }] - }, - { - 'i': 10, - 's': 'xyz', - 'f': -3.14, - 'b': False, - 'rpr': [], - 't': None, - 'dt': None, - 'ts': None, - 'dt_ts': None, - 'r': None, - }] - - nested_schema = [ - bigquery.TableFieldSchema(name='s2', type='STRING', mode='NULLABLE') - ] - nested_schema_2 = [ - bigquery.TableFieldSchema(name='s3', type='STRING', mode='NULLABLE'), - bigquery.TableFieldSchema( - name='rpr2', - type='RECORD', - mode='REPEATED', - fields=[ - bigquery.TableFieldSchema( - name='rs', type='STRING', mode='REPEATED'), - bigquery.TableFieldSchema( - name='s4', type='STRING', mode='NULLABLE') - ]) - ] - - schema = bigquery.TableSchema( - fields=[ - bigquery.TableFieldSchema( - name='b', type='BOOLEAN', mode='REQUIRED'), - bigquery.TableFieldSchema(name='f', type='FLOAT', mode='REQUIRED'), - bigquery.TableFieldSchema( - name='i', type='INTEGER', mode='REQUIRED'), - bigquery.TableFieldSchema(name='s', type='STRING', mode='REQUIRED'), - bigquery.TableFieldSchema( - name='t', type='TIMESTAMP', mode='NULLABLE'), - bigquery.TableFieldSchema(name='dt', type='DATE', mode='NULLABLE'), - bigquery.TableFieldSchema(name='ts', type='TIME', mode='NULLABLE'), - bigquery.TableFieldSchema( - name='dt_ts', type='DATETIME', mode='NULLABLE'), - bigquery.TableFieldSchema( - name='r', type='RECORD', mode='NULLABLE', fields=nested_schema), - bigquery.TableFieldSchema( - name='rpr', - type='RECORD', - mode='REPEATED', - fields=nested_schema_2) - ]) - - table_rows = [ - bigquery.TableRow( - f=[ - bigquery.TableCell(v=to_json_value('true')), - bigquery.TableCell(v=to_json_value(str(2.3))), - bigquery.TableCell(v=to_json_value(str(1))), - bigquery.TableCell(v=to_json_value('abc')), - # For timestamps cannot use str() because it will truncate the - # number representing the timestamp. - bigquery.TableCell(v=to_json_value('%f' % now)), - bigquery.TableCell(v=to_json_value('2016-10-31')), - bigquery.TableCell(v=to_json_value('22:39:12.627498')), - bigquery.TableCell(v=to_json_value('2008-12-25T07:30:00')), - # For record we cannot use dict because it doesn't create nested - # schemas correctly so we have to use this f,v based format - bigquery.TableCell(v=to_json_value({'f': [{ - 'v': 'b' - }]})), - bigquery.TableCell( - v=to_json_value([{ - 'v': { - 'f': [{ - 'v': 'c' - }, - { - 'v': [{ - 'v': { - 'f': [{ - 'v': [{ - 'v': 'd' - }, { - 'v': 'e' - }] - }, { - 'v': None - }] - } - }] - }] - } - }])) - ]), - bigquery.TableRow( - f=[ - bigquery.TableCell(v=to_json_value('false')), - bigquery.TableCell(v=to_json_value(str(-3.14))), - bigquery.TableCell(v=to_json_value(str(10))), - bigquery.TableCell(v=to_json_value('xyz')), - bigquery.TableCell(v=None), - bigquery.TableCell(v=None), - bigquery.TableCell(v=None), - bigquery.TableCell(v=None), - bigquery.TableCell(v=None), - # REPEATED field without any values. - bigquery.TableCell(v=None) - ]) - ] - return table_rows, schema, expected_rows - - def test_read_from_table(self): - client = mock.Mock() - client.jobs.Insert.return_value = bigquery.Job( - jobReference=bigquery.JobReference(jobId='somejob')) - table_rows, schema, expected_rows = self.get_test_rows() - client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( - jobComplete=True, rows=table_rows, schema=schema) - actual_rows = [] - with beam.io.BigQuerySource( - 'dataset.table', - use_dataflow_native_source=True).reader(client) as reader: - for row in reader: - actual_rows.append(row) - self.assertEqual(actual_rows, expected_rows) - self.assertEqual(schema, reader.schema) - - def test_read_from_query(self): - client = mock.Mock() - client.jobs.Insert.return_value = bigquery.Job( - jobReference=bigquery.JobReference(jobId='somejob')) - table_rows, schema, expected_rows = self.get_test_rows() - client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( - jobComplete=True, rows=table_rows, schema=schema) - actual_rows = [] - with beam.io.BigQuerySource( - query='query', - use_dataflow_native_source=True).reader(client) as reader: - for row in reader: - actual_rows.append(row) - self.assertEqual(actual_rows, expected_rows) - self.assertEqual(schema, reader.schema) - self.assertTrue(reader.use_legacy_sql) - self.assertTrue(reader.flatten_results) - - def test_read_from_query_sql_format(self): - client = mock.Mock() - client.jobs.Insert.return_value = bigquery.Job( - jobReference=bigquery.JobReference(jobId='somejob')) - table_rows, schema, expected_rows = self.get_test_rows() - client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( - jobComplete=True, rows=table_rows, schema=schema) - actual_rows = [] - with beam.io.BigQuerySource( - query='query', use_standard_sql=True, - use_dataflow_native_source=True).reader(client) as reader: - for row in reader: - actual_rows.append(row) - self.assertEqual(actual_rows, expected_rows) - self.assertEqual(schema, reader.schema) - self.assertFalse(reader.use_legacy_sql) - self.assertTrue(reader.flatten_results) - - def test_read_from_query_unflatten_records(self): - client = mock.Mock() - client.jobs.Insert.return_value = bigquery.Job( - jobReference=bigquery.JobReference(jobId='somejob')) - table_rows, schema, expected_rows = self.get_test_rows() - client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( - jobComplete=True, rows=table_rows, schema=schema) - actual_rows = [] - with beam.io.BigQuerySource( - query='query', flatten_results=False, - use_dataflow_native_source=True).reader(client) as reader: - for row in reader: - actual_rows.append(row) - self.assertEqual(actual_rows, expected_rows) - self.assertEqual(schema, reader.schema) - self.assertTrue(reader.use_legacy_sql) - self.assertFalse(reader.flatten_results) - - def test_using_both_query_and_table_fails(self): - with self.assertRaisesRegex( - ValueError, - r'Both a BigQuery table and a query were specified\. Please specify ' - r'only one of these'): - beam.io.BigQuerySource( - table='dataset.table', query='query', use_dataflow_native_source=True) - - def test_using_neither_query_nor_table_fails(self): - with self.assertRaisesRegex( - ValueError, r'A BigQuery table or a query must be specified'): - beam.io.BigQuerySource(use_dataflow_native_source=True) - - def test_read_from_table_as_tablerows(self): - client = mock.Mock() - client.jobs.Insert.return_value = bigquery.Job( - jobReference=bigquery.JobReference(jobId='somejob')) - table_rows, schema, _ = self.get_test_rows() - client.jobs.GetQueryResults.return_value = bigquery.GetQueryResultsResponse( - jobComplete=True, rows=table_rows, schema=schema) - actual_rows = [] - # We set the coder to TableRowJsonCoder, which is a signal that - # the caller wants to see the rows as TableRows. - with beam.io.BigQuerySource( - 'dataset.table', - coder=TableRowJsonCoder, - use_dataflow_native_source=True).reader(client) as reader: - for row in reader: - actual_rows.append(row) - self.assertEqual(actual_rows, table_rows) - self.assertEqual(schema, reader.schema) - - @mock.patch('time.sleep', return_value=None) - def test_read_from_table_and_job_complete_retry(self, patched_time_sleep): - client = mock.Mock() - client.jobs.Insert.return_value = bigquery.Job( - jobReference=bigquery.JobReference(jobId='somejob')) - table_rows, schema, expected_rows = self.get_test_rows() - # Return jobComplete=False on first call to trigger the code path where - # query needs to handle waiting a bit. - client.jobs.GetQueryResults.side_effect = [ - bigquery.GetQueryResultsResponse(jobComplete=False), - bigquery.GetQueryResultsResponse( - jobComplete=True, rows=table_rows, schema=schema) - ] - actual_rows = [] - with beam.io.BigQuerySource( - 'dataset.table', - use_dataflow_native_source=True).reader(client) as reader: - for row in reader: - actual_rows.append(row) - self.assertEqual(actual_rows, expected_rows) - - def test_read_from_table_and_multiple_pages(self): - client = mock.Mock() - client.jobs.Insert.return_value = bigquery.Job( - jobReference=bigquery.JobReference(jobId='somejob')) - table_rows, schema, expected_rows = self.get_test_rows() - # Return a pageToken on first call to trigger the code path where - # query needs to handle multiple pages of results. - client.jobs.GetQueryResults.side_effect = [ - bigquery.GetQueryResultsResponse( - jobComplete=True, rows=table_rows, schema=schema, - pageToken='token'), - bigquery.GetQueryResultsResponse( - jobComplete=True, rows=table_rows, schema=schema) - ] - actual_rows = [] - with beam.io.BigQuerySource( - 'dataset.table', - use_dataflow_native_source=True).reader(client) as reader: - for row in reader: - actual_rows.append(row) - # We return expected rows for each of the two pages of results so we - # adjust our expectation below accordingly. - self.assertEqual(actual_rows, expected_rows * 2) - - def test_table_schema_without_project(self): - # Reader should pick executing project by default. - source = beam.io.BigQuerySource( - table='mydataset.mytable', use_dataflow_native_source=True) - options = PipelineOptions(flags=['--project', 'myproject']) - source.pipeline_options = options - reader = source.reader() - self.assertEqual( - 'SELECT * FROM [myproject:mydataset.mytable];', reader.query) - - @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestBigQueryWriter(unittest.TestCase): @mock.patch('time.sleep', return_value=None) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 53cce9dbfa4b..f4d91d5d99e5 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -1202,47 +1202,6 @@ def run_Read(self, transform_node, options): step.add_property(PropertyNames.SOURCE_STEP_INPUT, source_dict) elif transform.source.format == 'text': step.add_property(PropertyNames.FILE_PATTERN, transform.source.path) - elif transform.source.format == 'bigquery': - if standard_options.streaming: - raise ValueError( - 'BigQuery source is not currently available for use ' - 'in streaming pipelines.') - debug_options = options.view_as(DebugOptions) - use_fn_api = ( - debug_options.experiments and - 'beam_fn_api' in debug_options.experiments) - if use_fn_api: - raise ValueError(BQ_SOURCE_UW_ERROR) - step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_AVRO') - # TODO(silviuc): Add table validation if transform.source.validate. - if transform.source.table_reference is not None: - step.add_property( - PropertyNames.BIGQUERY_DATASET, - transform.source.table_reference.datasetId) - step.add_property( - PropertyNames.BIGQUERY_TABLE, - transform.source.table_reference.tableId) - # If project owning the table was not specified then the project owning - # the workflow (current project) will be used. - if transform.source.table_reference.projectId is not None: - step.add_property( - PropertyNames.BIGQUERY_PROJECT, - transform.source.table_reference.projectId) - elif transform.source.query is not None: - step.add_property(PropertyNames.BIGQUERY_QUERY, transform.source.query) - step.add_property( - PropertyNames.BIGQUERY_USE_LEGACY_SQL, - transform.source.use_legacy_sql) - step.add_property( - PropertyNames.BIGQUERY_FLATTEN_RESULTS, - transform.source.flatten_results) - else: - raise ValueError( - 'BigQuery source %r must specify either a table or' - ' a query' % transform.source) - if transform.source.kms_key is not None: - step.add_property( - PropertyNames.BIGQUERY_KMS_KEY, transform.source.kms_key) elif transform.source.format == 'pubsub': if not standard_options.streaming: raise ValueError( diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index 6f87303cec51..0511fa429aee 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -29,7 +29,6 @@ import apache_beam as beam import apache_beam.transforms as ptransform -from apache_beam.coders import BytesCoder from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.pipeline import AppliedPTransform @@ -270,20 +269,6 @@ def test_streaming_create_translation(self): self.assertEqual(job_dict[u'steps'][1][u'kind'], u'ParallelDo') self.assertEqual(job_dict[u'steps'][2][u'kind'], u'ParallelDo') - def test_biqquery_read_fn_api_fail(self): - remote_runner = DataflowRunner() - for flag in ['beam_fn_api', 'use_unified_worker', 'use_runner_v2']: - self.default_properties.append("--experiments=%s" % flag) - with self.assertRaisesRegex( - ValueError, - 'The Read.BigQuerySource.*is not supported.*' - 'apache_beam.io.gcp.bigquery.ReadFromBigQuery.*'): - with Pipeline(remote_runner, - PipelineOptions(self.default_properties)) as p: - _ = p | beam.io.Read( - beam.io.BigQuerySource( - 'some.table', use_dataflow_native_source=True)) - def test_remote_runner_display_data(self): remote_runner = DataflowRunner() p = Pipeline( @@ -327,20 +312,6 @@ def test_remote_runner_display_data(self): }] self.assertUnhashableCountEqual(disp_data, expected_data) - def test_no_group_by_key_directly_after_bigquery(self): - remote_runner = DataflowRunner() - with self.assertRaises(ValueError, - msg=('Coder for the GroupByKey operation' - '"GroupByKey" is not a key-value coder: ' - 'RowAsDictJsonCoder')): - with beam.Pipeline(runner=remote_runner, - options=PipelineOptions(self.default_properties)) as p: - # pylint: disable=expression-not-assigned - p | beam.io.Read( - beam.io.BigQuerySource( - 'dataset.faketable', - use_dataflow_native_source=True)) | beam.GroupByKey() - def test_group_by_key_input_visitor_with_valid_inputs(self): p = TestPipeline() pcoll1 = PCollection(p) @@ -624,19 +595,6 @@ def test_read_create_translation(self): self.expect_correct_override(runner.job, u'Create/Read', u'ParallelRead') - def test_read_bigquery_translation(self): - runner = DataflowRunner() - - with beam.Pipeline(runner=runner, - options=PipelineOptions(self.default_properties)) as p: - # pylint: disable=expression-not-assigned - p | beam.io.Read( - beam.io.BigQuerySource( - 'some.table', coder=BytesCoder(), - use_dataflow_native_source=True)) - - self.expect_correct_override(runner.job, u'Read', u'ParallelRead') - def test_read_pubsub_translation(self): runner = DataflowRunner()