Skip to content

Commit

Permalink
Merge branch 'bq_si_final_change' of https://github.com/pabloem/beam
Browse files Browse the repository at this point in the history
…into bq_si_final_change
  • Loading branch information
pabloem committed Nov 21, 2020
2 parents 4fc9b65 + d5b718b commit d470c4f
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 56 deletions.
52 changes: 51 additions & 1 deletion sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ def compute_table_name(row):
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
from apache_beam.io.gcp.bigquery_read_internal import ReadFromBigQueryRequest
from apache_beam.io.gcp.bigquery_read_internal import _BigQueryReadSplit
from apache_beam.io.gcp.bigquery_read_internal import _JsonToDictCoder
from apache_beam.io.gcp.bigquery_read_internal import _PassThroughThenCleanup
Expand Down Expand Up @@ -1906,6 +1905,57 @@ def file_path_to_remove(unused_elm):
| _PassThroughThenCleanup(files_to_remove_pcoll))


class ReadFromBigQueryRequest:
"""
Class that defines data to read from BQ.
"""
def __init__(
self,
query: str = None,
use_standard_sql: bool = True,
table: Union[str, TableReference] = None,
flatten_results: bool = False):
"""
Only one of query or table should be specified.
:param query: SQL query to fetch data.
:param use_standard_sql:
Specifies whether to use BigQuery's standard SQL dialect for this query.
The default value is :data:`True`. If set to :data:`False`,
the query will use BigQuery's legacy SQL dialect.
This parameter is ignored for table inputs.
:param table:
The ID of the table to read. The ID must contain only letters
``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
:param flatten_results:
Flattens all nested and repeated fields in the query results.
The default value is :data:`False`.
"""
self.flatten_results = flatten_results
self.query = query
self.use_standard_sql = use_standard_sql
self.table = table
self.validate()

# We use this internal object ID to generate BigQuery export directories.
self.obj_id = random.randint(0, 100000)

def validate(self):
if self.table is not None and self.query is not None:
raise ValueError(
'Both a BigQuery table and a query were specified.'
' Please specify only one of these.')
elif self.table is None and self.query is None:
raise ValueError('A BigQuery table or a query must be specified')
if self.table is not None:
if isinstance(self.table, str):
assert self.table.find('.'), (
'Expected a table reference '
'(PROJECT:DATASET.TABLE or DATASET.TABLE) instead of %s'
% self.table)


@experimental()
class ReadAllFromBigQuery(PTransform):
"""Read data from BigQuery.
Expand Down
63 changes: 8 additions & 55 deletions sdks/python/apache_beam/io/gcp/bigquery_read_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import logging
import random
import uuid
from typing import TYPE_CHECKING
from typing import Dict
from typing import Iterable
from typing import Optional
Expand All @@ -45,6 +46,9 @@
from apache_beam.options.value_provider import ValueProvider
from apache_beam.transforms import PTransform

if TYPE_CHECKING:
from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest

try:
from apache_beam.io.gcp.internal.clients.bigquery import DatasetReference
from apache_beam.io.gcp.internal.clients.bigquery import TableReference
Expand Down Expand Up @@ -125,57 +129,6 @@ def process(self, unused_element, unused_signal, gcs_locations):
return main_output


class ReadFromBigQueryRequest:
"""
Class that defines data to read from BQ.
"""
def __init__(
self,
query: str = None,
use_standard_sql: bool = True,
table: Union[str, TableReference] = None,
flatten_results: bool = False):
"""
Only one of query or table should be specified.
:param query: SQL query to fetch data.
:param use_standard_sql:
Specifies whether to use BigQuery's standard SQL dialect for this query.
The default value is :data:`True`. If set to :data:`False`,
the query will use BigQuery's legacy SQL dialect.
This parameter is ignored for table inputs.
:param table:
The ID of the table to read. The ID must contain only letters
``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
:param flatten_results:
Flattens all nested and repeated fields in the query results.
The default value is :data:`False`.
"""
self.flatten_results = flatten_results
self.query = query
self.use_standard_sql = use_standard_sql
self.table = table
self.validate()

# We use this internal object ID to generate BigQuery export directories.
self.obj_id = random.randint(0, 100000)

def validate(self):
if self.table is not None and self.query is not None:
raise ValueError(
'Both a BigQuery table and a query were specified.'
' Please specify only one of these.')
elif self.table is None and self.query is None:
raise ValueError('A BigQuery table or a query must be specified')
if self.table is not None:
if isinstance(self.table, str):
assert self.table.find('.'), (
'Expected a table reference '
'(PROJECT:DATASET.TABLE or DATASET.TABLE) instead of %s'
% self.table)


class _BigQueryReadSplit(beam.transforms.DoFn):
"""Starts the process of reading from BigQuery.
Expand Down Expand Up @@ -223,7 +176,7 @@ def _get_temp_dataset(self):
else:
return self.temp_dataset

def process(self, element: ReadFromBigQueryRequest, *args,
def process(self, element: 'ReadFromBigQueryRequest', *args,
**kwargs) -> Iterable[BoundedSource]:
bq = bigquery_tools.BigQueryWrapper(
temp_dataset_id=self._get_temp_dataset().datasetId)
Expand Down Expand Up @@ -269,15 +222,15 @@ def _create_source(self, path, schema):
def _setup_temporary_dataset(
self,
bq: bigquery_tools.BigQueryWrapper,
element: ReadFromBigQueryRequest):
element: 'ReadFromBigQueryRequest'):
location = bq.get_query_location(
self._get_project(), element.query, not element.use_standard_sql)
bq.create_temporary_dataset(self._get_project(), location)

def _execute_query(
self,
bq: bigquery_tools.BigQueryWrapper,
element: ReadFromBigQueryRequest):
element: 'ReadFromBigQueryRequest'):
query_job_name = bigquery_tools.generate_bq_job_name(
self._job_name,
self._source_uuid,
Expand All @@ -299,7 +252,7 @@ def _execute_query(
def _export_files(
self,
bq: bigquery_tools.BigQueryWrapper,
element: ReadFromBigQueryRequest,
element: 'ReadFromBigQueryRequest',
table_reference: TableReference):
"""Runs a BigQuery export job.
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ def create_bq_schema(cls, with_extra=False):
@skip(['PortableRunner', 'FlinkRunner'])
@attr('IT')
def test_read_queries(self):
# TODO(BEAM-11311): Remove experiment when tests run on r_v2.
args = self.args + ["--experiments=use_runner_v2"]
with beam.Pipeline(argv=args) as p:
result = (
Expand Down

0 comments on commit d470c4f

Please sign in to comment.