Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove obsolete and deprecated bigquery native read. #23557

Merged
merged 6 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
* Decreased TextSource CPU utilization by 2.3x (Java) ([#23193](https://github.com/apache/beam/issues/23193)).
* Fixed bug when using SpannerIO with RuntimeValueProvider options (Java) ([#22146](https://github.com/apache/beam/issues/22146)).
* Fixed issue for unicode rendering on WriteToBigQuery ([#10785](https://github.com/apache/beam/issues/10785))
* Remove obsolete variant of BigQuery Read ([#23559](https://github.com/apache/beam/issues/23559)).
* Bumped google-cloud-spanner dependency version to 3.x for Python SDK ([#21198](https://github.com/apache/beam/issues/21198)).

## New Features / Improvements
Expand Down
173 changes: 20 additions & 153 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ def chain_after(result):
import random
import time
import uuid
import warnings
from dataclasses import dataclass
from typing import Dict
from typing import List
Expand Down Expand Up @@ -482,11 +483,6 @@ def RowAsDictJsonCoder(*args, **kwargs):
return bigquery_tools.RowAsDictJsonCoder(*args, **kwargs)


@deprecated(since='2.11.0', current="bigquery_tools.BigQueryReader")
def BigQueryReader(*args, **kwargs):
return bigquery_tools.BigQueryReader(*args, **kwargs)


@deprecated(since='2.11.0', current="bigquery_tools.BigQueryWriter")
def BigQueryWriter(*args, **kwargs):
return bigquery_tools.BigQueryWriter(*args, **kwargs)
Expand Down Expand Up @@ -591,158 +587,29 @@ def BigQuerySource(
kms_key=None,
use_dataflow_native_source=False):
if use_dataflow_native_source:
return _BigQuerySource(
table,
dataset,
project,
query,
validate,
coder,
use_standard_sql,
flatten_results,
kms_key)
else:
return ReadFromBigQuery(
table=table,
dataset=dataset,
project=project,
query=query,
validate=validate,
coder=coder,
use_standard_sql=use_standard_sql,
flatten_results=flatten_results,
use_json_exports=True,
kms_key=kms_key)
warnings.warn(
"Native sources no longer implemented; "
"falling back to standard Beam source.")
return ReadFromBigQuery(
table=table,
dataset=dataset,
project=project,
query=query,
validate=validate,
coder=coder,
use_standard_sql=use_standard_sql,
flatten_results=flatten_results,
use_json_exports=True,
kms_key=kms_key)


@deprecated(since='2.25.0', current="ReadFromBigQuery")
class _BigQuerySource(dataflow_io.NativeSource):
def _BigQuerySource(*args, **kwargs):
"""A source based on a BigQuery table."""
def __init__(
self,
table=None,
dataset=None,
project=None,
query=None,
validate=False,
coder=None,
use_standard_sql=False,
flatten_results=True,
kms_key=None,
temp_dataset=None):
"""Initialize a :class:`BigQuerySource`.

Args:
table (str): The ID of a BigQuery table. If specified all data of the
table will be used as input of the current source. The ID must contain
only letters ``a-z``, ``A-Z``, numbers ``0-9``, or underscores
``_``. If dataset and query arguments are :data:`None` then the table
argument must contain the entire table reference specified as:
``'DATASET.TABLE'`` or ``'PROJECT:DATASET.TABLE'``.
dataset (str): The ID of the dataset containing this table or
:data:`None` if the table reference is specified entirely by the table
argument or a query is specified.
project (str): The ID of the project containing this table or
:data:`None` if the table reference is specified entirely by the table
argument or a query is specified.
query (str): A query to be used instead of arguments table, dataset, and
project.
validate (bool): If :data:`True`, various checks will be done when source
gets initialized (e.g., is table present?). This should be
:data:`True` for most scenarios in order to catch errors as early as
possible (pipeline construction instead of pipeline execution). It
should be :data:`False` if the table is created during pipeline
execution by a previous step.
coder (~apache_beam.coders.coders.Coder): The coder for the table
rows if serialized to disk. If :data:`None`, then the default coder is
:class:`~apache_beam.io.gcp.bigquery_tools.RowAsDictJsonCoder`,
which will interpret every line in a file as a JSON serialized
dictionary. This argument needs a value only in special cases when
returning table rows as dictionaries is not desirable.
use_standard_sql (bool): Specifies whether to use BigQuery's standard SQL
dialect for this query. The default value is :data:`False`.
If set to :data:`True`, the query will use BigQuery's updated SQL
dialect with improved standards compliance.
This parameter is ignored for table inputs.
flatten_results (bool): Flattens all nested and repeated fields in the
query results. The default value is :data:`True`.
kms_key (str): Optional Cloud KMS key name for use when creating new
tables.
temp_dataset (``google.cloud.bigquery.dataset.DatasetReference``):
The dataset in which to create temporary tables when performing file
loads. By default, a new dataset is created in the execution project for
temporary tables.

Raises:
ValueError: if any of the following is true:

1) the table reference as a string does not match the expected format
2) neither a table nor a query is specified
3) both a table and a query is specified.
"""

# Import here to avoid adding the dependency for local running scenarios.
try:
# pylint: disable=wrong-import-order, wrong-import-position
from apitools.base import py # pylint: disable=unused-import
except ImportError:
raise ImportError(
'Google Cloud IO not available, '
'please install apache_beam[gcp]')

if table is not None and query is not None:
raise ValueError(
'Both a BigQuery table and a query were specified.'
' Please specify only one of these.')
elif table is None and query is None:
raise ValueError('A BigQuery table or a query must be specified')
elif table is not None:
self.table_reference = bigquery_tools.parse_table_reference(
table, dataset, project)
self.query = None
self.use_legacy_sql = True
else:
self.query = query
# TODO(BEAM-1082): Change the internal flag to be standard_sql
self.use_legacy_sql = not use_standard_sql
self.table_reference = None

self.validate = validate
self.flatten_results = flatten_results
self.coder = coder or bigquery_tools.RowAsDictJsonCoder()
self.kms_key = kms_key
self.temp_dataset = temp_dataset

def display_data(self):
if self.query is not None:
res = {'query': DisplayDataItem(self.query, label='Query')}
else:
if self.table_reference.projectId is not None:
tableSpec = '{}:{}.{}'.format(
self.table_reference.projectId,
self.table_reference.datasetId,
self.table_reference.tableId)
else:
tableSpec = '{}.{}'.format(
self.table_reference.datasetId, self.table_reference.tableId)
res = {'table': DisplayDataItem(tableSpec, label='Table')}

res['validation'] = DisplayDataItem(
self.validate, label='Validation Enabled')
return res

@property
def format(self):
"""Source format name required for remote execution."""
return 'bigquery'

def reader(self, test_bigquery_client=None):
return bigquery_tools.BigQueryReader(
source=self,
test_bigquery_client=test_bigquery_client,
use_legacy_sql=self.use_legacy_sql,
flatten_results=self.flatten_results,
kms_key=self.kms_key)
warnings.warn(
"Native sources no longer implemented; "
"falling back to standard Beam source.")
return ReadFromBigQuery(*args, **kwargs)


# TODO(https://github.com/apache/beam/issues/21622): remove the serialization
Expand Down
114 changes: 0 additions & 114 deletions sdks/python/apache_beam/io/gcp/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,120 +187,6 @@ def test_invalid_json_neg_inf(self):
self.json_compliance_exception(float('-inf'))


@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestBigQuerySource(unittest.TestCase):
def test_display_data_item_on_validate_true(self):
source = beam.io.BigQuerySource(
'dataset.table', validate=True, use_dataflow_native_source=True)

dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', True),
DisplayDataItemMatcher('table', 'dataset.table')
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))

def test_table_reference_display_data(self):
source = beam.io.BigQuerySource(
'dataset.table', use_dataflow_native_source=True)
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', False),
DisplayDataItemMatcher('table', 'dataset.table')
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))

source = beam.io.BigQuerySource(
'project:dataset.table', use_dataflow_native_source=True)
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', False),
DisplayDataItemMatcher('table', 'project:dataset.table')
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))

source = beam.io.BigQuerySource(
'xyz.com:project:dataset.table', use_dataflow_native_source=True)
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', False),
DisplayDataItemMatcher('table', 'xyz.com:project:dataset.table')
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))

def test_parse_table_reference(self):
source = beam.io.BigQuerySource(
'dataset.table', use_dataflow_native_source=True)
self.assertEqual(source.table_reference.datasetId, 'dataset')
self.assertEqual(source.table_reference.tableId, 'table')

source = beam.io.BigQuerySource(
'project:dataset.table', use_dataflow_native_source=True)
self.assertEqual(source.table_reference.projectId, 'project')
self.assertEqual(source.table_reference.datasetId, 'dataset')
self.assertEqual(source.table_reference.tableId, 'table')

source = beam.io.BigQuerySource(
'xyz.com:project:dataset.table', use_dataflow_native_source=True)
self.assertEqual(source.table_reference.projectId, 'xyz.com:project')
self.assertEqual(source.table_reference.datasetId, 'dataset')
self.assertEqual(source.table_reference.tableId, 'table')

source = beam.io.BigQuerySource(
query='my_query', use_dataflow_native_source=True)
self.assertEqual(source.query, 'my_query')
self.assertIsNone(source.table_reference)
self.assertTrue(source.use_legacy_sql)

def test_query_only_display_data(self):
source = beam.io.BigQuerySource(
query='my_query', use_dataflow_native_source=True)
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', False),
DisplayDataItemMatcher('query', 'my_query')
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))

def test_specify_query_sql_format(self):
source = beam.io.BigQuerySource(
query='my_query',
use_standard_sql=True,
use_dataflow_native_source=True)
self.assertEqual(source.query, 'my_query')
self.assertFalse(source.use_legacy_sql)

def test_specify_query_flattened_records(self):
source = beam.io.BigQuerySource(
query='my_query',
flatten_results=False,
use_dataflow_native_source=True)
self.assertFalse(source.flatten_results)

def test_specify_query_unflattened_records(self):
source = beam.io.BigQuerySource(
query='my_query', flatten_results=True, use_dataflow_native_source=True)
self.assertTrue(source.flatten_results)

def test_specify_query_without_table(self):
source = beam.io.BigQuerySource(
query='my_query', use_dataflow_native_source=True)
self.assertEqual(source.query, 'my_query')
self.assertIsNone(source.table_reference)

def test_date_partitioned_table_name(self):
source = beam.io.BigQuerySource(
'dataset.table$20030102',
validate=True,
use_dataflow_native_source=True)
dd = DisplayData.create_from(source)
expected_items = [
DisplayDataItemMatcher('validation', True),
DisplayDataItemMatcher('table', 'dataset.table$20030102')
]
hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))


@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
class TestJsonToDictCoder(unittest.TestCase):
@staticmethod
Expand Down
Loading