diff --git a/CHANGES.md b/CHANGES.md index 3a1e50b738c7..eaebdf5c3cf7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -56,6 +56,7 @@ ## I/Os * ReadFromMongoDB can now be used with MongoDB Atlas (Python) ([BEAM-11266](https://issues.apache.org/jira/browse/BEAM-11266).) * Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). +* There is a new transform `ReadAllFromBigQuery` that can receive multiple requests to read data from BigQuery at pipeline runtime. See [PR 13170](https://github.com/apache/beam/pull/13170), and [BEAM-9650](https://issues.apache.org/jira/browse/BEAM-9650). ## New Features / Improvements diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index e0866cf5c544..1814ae95ac82 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -79,6 +79,44 @@ `ReadFromBigQuery`, you can use the flag `use_json_exports` to export data as JSON, and receive base64-encoded bytes. +ReadAllFromBigQuery +------------------- +Beam 2.27.0 introduces a new transform called `ReadAllFromBigQuery` which +allows you to define table and query reads from BigQuery at pipeline +runtime.::: + + read_requests = p | beam.Create([ + ReadFromBigQueryRequest(query='SELECT * FROM mydataset.mytable'), + ReadFromBigQueryRequest(table='myproject.mydataset.mytable')]) + results = read_requests | ReadAllFromBigQuery() + +A good application for this transform is in streaming pipelines to +refresh a side input coming from BigQuery. This would work like so::: + + side_input = ( + p + | 'PeriodicImpulse' >> PeriodicImpulse( + first_timestamp, last_timestamp, interval, True) + | 'MapToReadRequest' >> beam.Map( + lambda x: ReadFromBigQueryRequest(table='dataset.table')) + | beam.io.ReadAllFromBigQuery()) + main_input = ( + p + | 'MpImpulse' >> beam.Create(sample_main_input_elements) + | + 'MapMpToTimestamped' >> beam.Map(lambda src: TimestampedValue(src, src)) + | 'WindowMpInto' >> beam.WindowInto( + window.FixedWindows(main_input_windowing_interval))) + result = ( + main_input + | 'ApplyCrossJoin' >> beam.FlatMap( + cross_join, rights=beam.pvalue.AsIter(side_input))) + +**Note**: This transform is supported on Portable and Dataflow v2 runners. + +**Note**: This transform does not currently clean up temporary datasets +created for its execution. (BEAM-11359) + Writing Data to BigQuery ======================== @@ -234,7 +272,6 @@ def compute_table_name(row): from __future__ import absolute_import import collections -import decimal import itertools import json import logging @@ -243,6 +280,8 @@ def compute_table_name(row): import uuid from builtins import object from builtins import zip +from typing import Dict +from typing import Union from future.utils import itervalues from past.builtins import unicode @@ -257,12 +296,15 @@ def compute_table_name(row): from apache_beam.io.filesystems import FileSystems from apache_beam.io.gcp import bigquery_tools from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata +from apache_beam.io.gcp.bigquery_read_internal import _BigQueryReadSplit +from apache_beam.io.gcp.bigquery_read_internal import _JsonToDictCoder from apache_beam.io.gcp.bigquery_read_internal import _PassThroughThenCleanup from apache_beam.io.gcp.bigquery_read_internal import bigquery_export_destination_uri from apache_beam.io.gcp.bigquery_tools import RetryStrategy from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.io.iobase import BoundedSource from apache_beam.io.iobase import RangeTracker +from apache_beam.io.iobase import SDFBoundedSourceReader from apache_beam.io.iobase import SourceBundle from apache_beam.io.textio import _TextSource as TextSource from apache_beam.metrics import Metrics @@ -284,6 +326,14 @@ def compute_table_name(row): from apache_beam.transforms.window import GlobalWindows from apache_beam.utils import retry from apache_beam.utils.annotations import deprecated +from apache_beam.utils.annotations import experimental + +try: + from apache_beam.io.gcp.internal.clients.bigquery import DatasetReference + from apache_beam.io.gcp.internal.clients.bigquery import TableReference +except ImportError: + DatasetReference = None + TableReference = None __all__ = [ 'TableRowJsonCoder', @@ -292,6 +342,8 @@ def compute_table_name(row): 'BigQuerySink', 'WriteToBigQuery', 'ReadFromBigQuery', + 'ReadFromBigQueryRequest', + 'ReadAllFromBigQuery', 'SCHEMA_AUTODETECT', ] @@ -591,84 +643,6 @@ def reader(self, test_bigquery_client=None): kms_key=self.kms_key) -FieldSchema = collections.namedtuple('FieldSchema', 'fields mode name type') - - -class _JsonToDictCoder(coders.Coder): - """A coder for a JSON string to a Python dict.""" - def __init__(self, table_schema): - self.fields = self._convert_to_tuple(table_schema.fields) - self._converters = { - 'INTEGER': int, - 'INT64': int, - 'FLOAT': float, - 'FLOAT64': float, - 'NUMERIC': self._to_decimal, - 'BYTES': self._to_bytes, - } - - @staticmethod - def _to_decimal(value): - return decimal.Decimal(value) - - @staticmethod - def _to_bytes(value): - """Converts value from str to bytes on Python 3.x. Does nothing on - Python 2.7.""" - return value.encode('utf-8') - - @classmethod - def _convert_to_tuple(cls, table_field_schemas): - """Recursively converts the list of TableFieldSchema instances to the - list of tuples to prevent errors when pickling and unpickling - TableFieldSchema instances. - """ - if not table_field_schemas: - return [] - - return [ - FieldSchema(cls._convert_to_tuple(x.fields), x.mode, x.name, x.type) - for x in table_field_schemas - ] - - def decode(self, value): - value = json.loads(value.decode('utf-8')) - return self._decode_with_schema(value, self.fields) - - def _decode_with_schema(self, value, schema_fields): - for field in schema_fields: - if field.name not in value: - # The field exists in the schema, but it doesn't exist in this row. - # It probably means its value was null, as the extract to JSON job - # doesn't preserve null fields - value[field.name] = None - continue - - if field.type == 'RECORD': - nested_values = value[field.name] - if field.mode == 'REPEATED': - for i, nested_value in enumerate(nested_values): - nested_values[i] = self._decode_with_schema( - nested_value, field.fields) - else: - value[field.name] = self._decode_with_schema( - nested_values, field.fields) - else: - try: - converter = self._converters[field.type] - value[field.name] = converter(value[field.name]) - except KeyError: - # No need to do any conversion - pass - return value - - def is_deterministic(self): - return True - - def to_type_hint(self): - return dict - - class _CustomBigQuerySource(BoundedSource): def __init__( self, @@ -720,7 +694,7 @@ def __init__( self.bigquery_job_labels = bigquery_job_labels or {} self.use_json_exports = use_json_exports self.temp_dataset = temp_dataset - self._job_name = job_name or 'AUTOMATIC_JOB_NAME' + self._job_name = job_name or 'BQ_EXPORT_JOB' self._step_name = step_name self._source_uuid = unique_id @@ -1666,7 +1640,7 @@ def _compute_method(self, experiments, is_streaming_pipeline): def expand(self, pcoll): p = pcoll.pipeline - if (isinstance(self.table_reference, bigquery.TableReference) and + if (isinstance(self.table_reference, TableReference) and self.table_reference.projectId is None): self.table_reference.projectId = pcoll.pipeline.options.view_as( GoogleCloudOptions).project @@ -1878,6 +1852,7 @@ class ReadFromBigQuery(PTransform): https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro\ #avro_conversions """ + COUNTER = 0 def __init__(self, gcs_location=None, *args, **kwargs): @@ -1897,7 +1872,7 @@ def __init__(self, gcs_location=None, *args, **kwargs): self._kwargs = kwargs def expand(self, pcoll): - unique_id = str(uuid.uuid4())[0:10] + # TODO(BEAM-11115): Make ReadFromBQ rely on ReadAllFromBQ implementation. temp_location = pcoll.pipeline.options.view_as( GoogleCloudOptions).temp_location job_name = pcoll.pipeline.options.view_as(GoogleCloudOptions).job_name @@ -1931,3 +1906,132 @@ def file_path_to_remove(unused_elm): *self._args, **self._kwargs)) | _PassThroughThenCleanup(files_to_remove_pcoll)) + + +class ReadFromBigQueryRequest: + """ + Class that defines data to read from BQ. + """ + def __init__( + self, + query: str = None, + use_standard_sql: bool = True, + table: Union[str, TableReference] = None, + flatten_results: bool = False): + """ + Only one of query or table should be specified. + + :param query: SQL query to fetch data. + :param use_standard_sql: + Specifies whether to use BigQuery's standard SQL dialect for this query. + The default value is :data:`True`. If set to :data:`False`, + the query will use BigQuery's legacy SQL dialect. + This parameter is ignored for table inputs. + :param table: + The ID of the table to read. The ID must contain only letters + ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should + define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``). + :param flatten_results: + Flattens all nested and repeated fields in the query results. + The default value is :data:`False`. + """ + self.flatten_results = flatten_results + self.query = query + self.use_standard_sql = use_standard_sql + self.table = table + self.validate() + + # We use this internal object ID to generate BigQuery export directories. + self.obj_id = random.randint(0, 100000) + + def validate(self): + if self.table is not None and self.query is not None: + raise ValueError( + 'Both a BigQuery table and a query were specified.' + ' Please specify only one of these.') + elif self.table is None and self.query is None: + raise ValueError('A BigQuery table or a query must be specified') + if self.table is not None: + if isinstance(self.table, str): + assert self.table.find('.'), ( + 'Expected a table reference ' + '(PROJECT:DATASET.TABLE or DATASET.TABLE) instead of %s' + % self.table) + + +@experimental() +class ReadAllFromBigQuery(PTransform): + """Read data from BigQuery. + + PTransform:ReadFromBigQueryRequest->Rows + + This PTransform uses a BigQuery export job to take a snapshot of the table + on GCS, and then reads from each produced file. Data is exported into + a new subdirectory for each export using UUIDs generated in + `ReadFromBigQueryRequest` objects. + + It is recommended not to use this PTransform for streaming jobs on + GlobalWindow, since it will not be able to cleanup snapshots. + + Args: + gcs_location (str): The name of the Google Cloud Storage + bucket where the extracted table should be written as a string. If + :data:`None`, then the temp_location parameter is used. + validate (bool): If :data:`True`, various checks will be done when source + gets initialized (e.g., is table present?). + kms_key (str): Experimental. Optional Cloud KMS key name for use when + creating new temporary tables. + """ + COUNTER = 0 + + def __init__( + self, + gcs_location: Union[str, ValueProvider] = None, + validate: bool = False, + kms_key: str = None, + temp_dataset: Union[str, DatasetReference] = None, + bigquery_job_labels: Dict[str, str] = None): + if gcs_location: + if not isinstance(gcs_location, (str, ValueProvider)): + raise TypeError( + '%s: gcs_location must be of type string' + ' or ValueProvider; got %r instead' % + (self.__class__.__name__, type(gcs_location))) + + self.gcs_location = gcs_location + self.validate = validate + self.kms_key = kms_key + self.bigquery_job_labels = bigquery_job_labels + self.temp_dataset = temp_dataset + + def expand(self, pcoll): + job_name = pcoll.pipeline.options.view_as(GoogleCloudOptions).job_name + project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project + unique_id = str(uuid.uuid4())[0:10] + + try: + step_name = self.label + except AttributeError: + step_name = 'ReadAllFromBigQuery_%d' % ReadAllFromBigQuery.COUNTER + ReadAllFromBigQuery.COUNTER += 1 + + sources_to_read, cleanup_locations = ( + pcoll + | beam.ParDo( + _BigQueryReadSplit( + options=pcoll.pipeline.options, + gcs_location=self.gcs_location, + bigquery_job_labels=self.bigquery_job_labels, + job_name=job_name, + step_name=step_name, + unique_id=unique_id, + kms_key=self.kms_key, + project=project, + temp_dataset=self.temp_dataset)).with_outputs( + "location_to_cleanup", main="files_to_read") + ) + + return ( + sources_to_read + | SDFBoundedSourceReader() + | _PassThroughThenCleanup(beam.pvalue.AsIter(cleanup_locations))) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py index d3a71f1d99a6..394e435a3ace 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_internal.py @@ -20,15 +20,42 @@ NOTHING IN THIS FILE HAS BACKWARDS COMPATIBILITY GUARANTEES. """ +import collections +import decimal +import json import logging +import random import uuid +from typing import TYPE_CHECKING +from typing import Dict +from typing import Iterable from typing import Optional +from typing import Union import apache_beam as beam +from apache_beam.coders import coders +from apache_beam.io.avroio import _create_avro_source +from apache_beam.io.filesystem import CompressionTypes from apache_beam.io.filesystems import FileSystems +from apache_beam.io.gcp import bigquery_tools +from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata +from apache_beam.io.iobase import BoundedSource +from apache_beam.io.textio import _TextSource +from apache_beam.options.pipeline_options import GoogleCloudOptions +from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import ValueProvider from apache_beam.transforms import PTransform +if TYPE_CHECKING: + from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest + +try: + from apache_beam.io.gcp.internal.clients.bigquery import DatasetReference + from apache_beam.io.gcp.internal.clients.bigquery import TableReference +except ImportError: + DatasetReference = None + TableReference = None + _LOGGER = logging.getLogger(__name__) @@ -100,3 +127,265 @@ def process(self, unused_element, unused_signal, gcs_locations): ) return main_output + + +class _BigQueryReadSplit(beam.transforms.DoFn): + """Starts the process of reading from BigQuery. + + This transform will start a BigQuery export job, and output a number of + file sources that are consumed downstream. + """ + def __init__( + self, + options: PipelineOptions, + gcs_location: Union[str, ValueProvider] = None, + use_json_exports: bool = False, + bigquery_job_labels: Dict[str, str] = None, + step_name: str = None, + job_name: str = None, + unique_id: str = None, + kms_key: str = None, + project: str = None, + temp_dataset: Union[str, DatasetReference] = None): + self.options = options + self.use_json_exports = use_json_exports + self.gcs_location = gcs_location + self.bigquery_job_labels = bigquery_job_labels or {} + self._step_name = step_name + self._job_name = job_name or 'BQ_READ_SPLIT' + self._source_uuid = unique_id + self.kms_key = kms_key + self.project = project + self.temp_dataset = temp_dataset or 'bq_read_all_%s' % uuid.uuid4().hex + self.bq_io_metadata = None + + def display_data(self): + return { + 'use_json_exports': str(self.use_json_exports), + 'gcs_location': str(self.gcs_location), + 'bigquery_job_labels': json.dumps(self.bigquery_job_labels), + 'kms_key': str(self.kms_key), + 'project': str(self.project), + 'temp_dataset': str(self.temp_dataset) + } + + def _get_temp_dataset(self): + if isinstance(self.temp_dataset, str): + return DatasetReference( + datasetId=self.temp_dataset, projectId=self._get_project()) + else: + return self.temp_dataset + + def process(self, + element: 'ReadFromBigQueryRequest') -> Iterable[BoundedSource]: + bq = bigquery_tools.BigQueryWrapper( + temp_dataset_id=self._get_temp_dataset().datasetId) + # TODO(BEAM-11359): Clean up temp dataset at pipeline completion. + + if element.query is not None: + self._setup_temporary_dataset(bq, element) + table_reference = self._execute_query(bq, element) + else: + assert element.table + table_reference = bigquery_tools.parse_table_reference( + element.table, project=self._get_project()) + + if not table_reference.projectId: + table_reference.projectId = self._get_project() + + schema, metadata_list = self._export_files(bq, element, table_reference) + + for metadata in metadata_list: + yield self._create_source(metadata.path, schema) + + if element.query is not None: + bq._delete_table( + table_reference.projectId, + table_reference.datasetId, + table_reference.tableId) + + def _get_bq_metadata(self): + if not self.bq_io_metadata: + self.bq_io_metadata = create_bigquery_io_metadata(self._step_name) + return self.bq_io_metadata + + def _create_source(self, path, schema): + if not self.use_json_exports: + return _create_avro_source(path, use_fastavro=True) + else: + return _TextSource( + path, + min_bundle_size=0, + compression_type=CompressionTypes.UNCOMPRESSED, + strip_trailing_newlines=True, + coder=_JsonToDictCoder(schema)) + + def _setup_temporary_dataset( + self, + bq: bigquery_tools.BigQueryWrapper, + element: 'ReadFromBigQueryRequest'): + location = bq.get_query_location( + self._get_project(), element.query, not element.use_standard_sql) + bq.create_temporary_dataset(self._get_project(), location) + + def _execute_query( + self, + bq: bigquery_tools.BigQueryWrapper, + element: 'ReadFromBigQueryRequest'): + query_job_name = bigquery_tools.generate_bq_job_name( + self._job_name, + self._source_uuid, + bigquery_tools.BigQueryJobTypes.QUERY, + random.randint(0, 1000)) + job = bq._start_query_job( + self._get_project(), + element.query, + not element.use_standard_sql, + element.flatten_results, + job_id=query_job_name, + kms_key=self.kms_key, + job_labels=self._get_bq_metadata().add_additional_bq_job_labels( + self.bigquery_job_labels)) + job_ref = job.jobReference + bq.wait_for_bq_job(job_ref, max_retries=0) + return bq._get_temp_table(self._get_project()) + + def _export_files( + self, + bq: bigquery_tools.BigQueryWrapper, + element: 'ReadFromBigQueryRequest', + table_reference: TableReference): + """Runs a BigQuery export job. + + Returns: + bigquery.TableSchema instance, a list of FileMetadata instances + """ + job_labels = self._get_bq_metadata().add_additional_bq_job_labels( + self.bigquery_job_labels) + export_job_name = bigquery_tools.generate_bq_job_name( + self._job_name, + self._source_uuid, + bigquery_tools.BigQueryJobTypes.EXPORT, + element.obj_id) + temp_location = self.options.view_as(GoogleCloudOptions).temp_location + gcs_location = bigquery_export_destination_uri( + self.gcs_location, + temp_location, + '%s%s' % (self._source_uuid, element.obj_id)) + if self.use_json_exports: + job_ref = bq.perform_extract_job([gcs_location], + export_job_name, + table_reference, + bigquery_tools.FileFormat.JSON, + project=self._get_project(), + job_labels=job_labels, + include_header=False) + else: + job_ref = bq.perform_extract_job([gcs_location], + export_job_name, + table_reference, + bigquery_tools.FileFormat.AVRO, + project=self._get_project(), + include_header=False, + job_labels=job_labels, + use_avro_logical_types=True) + bq.wait_for_bq_job(job_ref) + metadata_list = FileSystems.match([gcs_location])[0].metadata_list + + if isinstance(table_reference, ValueProvider): + table_ref = bigquery_tools.parse_table_reference( + element.table, project=self._get_project()) + else: + table_ref = table_reference + table = bq.get_table( + table_ref.projectId, table_ref.datasetId, table_ref.tableId) + + return table.schema, metadata_list + + def _get_project(self): + """Returns the project that queries and exports will be billed to.""" + + project = self.options.view_as(GoogleCloudOptions).project + if isinstance(project, ValueProvider): + project = project.get() + if not project: + project = self.project + return project + + +FieldSchema = collections.namedtuple('FieldSchema', 'fields mode name type') + + +class _JsonToDictCoder(coders.Coder): + """A coder for a JSON string to a Python dict.""" + def __init__(self, table_schema): + self.fields = self._convert_to_tuple(table_schema.fields) + self._converters = { + 'INTEGER': int, + 'INT64': int, + 'FLOAT': float, + 'FLOAT64': float, + 'NUMERIC': self._to_decimal, + 'BYTES': self._to_bytes, + } + + @staticmethod + def _to_decimal(value): + return decimal.Decimal(value) + + @staticmethod + def _to_bytes(value): + """Converts value from str to bytes on Python 3.x. Does nothing on + Python 2.7.""" + return value.encode('utf-8') + + @classmethod + def _convert_to_tuple(cls, table_field_schemas): + """Recursively converts the list of TableFieldSchema instances to the + list of tuples to prevent errors when pickling and unpickling + TableFieldSchema instances. + """ + if not table_field_schemas: + return [] + + return [ + FieldSchema(cls._convert_to_tuple(x.fields), x.mode, x.name, x.type) + for x in table_field_schemas + ] + + def decode(self, value): + value = json.loads(value.decode('utf-8')) + return self._decode_with_schema(value, self.fields) + + def _decode_with_schema(self, value, schema_fields): + for field in schema_fields: + if field.name not in value: + # The field exists in the schema, but it doesn't exist in this row. + # It probably means its value was null, as the extract to JSON job + # doesn't preserve null fields + value[field.name] = None + continue + + if field.type == 'RECORD': + nested_values = value[field.name] + if field.mode == 'REPEATED': + for i, nested_value in enumerate(nested_values): + nested_values[i] = self._decode_with_schema( + nested_value, field.fields) + else: + value[field.name] = self._decode_with_schema( + nested_values, field.fields) + else: + try: + converter = self._converters[field.type] + value[field.name] = converter(value[field.name]) + except KeyError: + # No need to do any conversion + pass + return value + + def is_deterministic(self): + return True + + def to_type_hint(self): + return dict diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py index ce82b3174a2b..4586f046f79b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py @@ -298,6 +298,109 @@ def test_iobase_source(self): assert_that(result, equal_to(self.get_expected_data(native=False))) +class ReadAllBQTests(BigQueryReadIntegrationTests): + TABLE_DATA_1 = [{ + 'number': 1, 'str': 'abc' + }, { + 'number': 2, 'str': 'def' + }, { + 'number': 3, 'str': u'你好' + }, { + 'number': 4, 'str': u'привет' + }] + + TABLE_DATA_2 = [{ + 'number': 10, 'str': 'abcd' + }, { + 'number': 20, 'str': 'defg' + }, { + 'number': 30, 'str': u'你好' + }, { + 'number': 40, 'str': u'привет' + }] + + TABLE_DATA_3 = [{'number': 10, 'str': 'abcde', 'extra': 3}] + + @classmethod + def setUpClass(cls): + super(ReadAllBQTests, cls).setUpClass() + cls.SCHEMA_BQ = cls.create_bq_schema() + cls.SCHEMA_BQ_WITH_EXTRA = cls.create_bq_schema(True) + + cls.table_name1 = 'python_rd_table_1' + cls.table_schema1 = cls.create_table( + cls.table_name1, cls.TABLE_DATA_1, cls.SCHEMA_BQ) + table_id1 = '{}.{}'.format(cls.dataset_id, cls.table_name1) + cls.query1 = 'SELECT number, str FROM `%s`' % table_id1 + + cls.table_name2 = 'python_rd_table_2' + cls.table_schema2 = cls.create_table( + cls.table_name2, cls.TABLE_DATA_2, cls.SCHEMA_BQ) + table_id2 = '{}.{}'.format(cls.dataset_id, cls.table_name2) + cls.query2 = 'SELECT number, str FROM %s' % table_id2 + + cls.table_name3 = 'python_rd_table_3' + cls.table_schema3 = cls.create_table( + cls.table_name3, cls.TABLE_DATA_3, cls.SCHEMA_BQ_WITH_EXTRA) + table_id3 = '{}.{}'.format(cls.dataset_id, cls.table_name3) + cls.query3 = 'SELECT number, str, extra FROM `%s`' % table_id3 + + @classmethod + def create_table(cls, table_name, data, table_schema): + table = bigquery.Table( + tableReference=bigquery.TableReference( + projectId=cls.project, datasetId=cls.dataset_id, + tableId=table_name), + schema=table_schema) + request = bigquery.BigqueryTablesInsertRequest( + projectId=cls.project, datasetId=cls.dataset_id, table=table) + cls.bigquery_client.client.tables.Insert(request) + cls.bigquery_client.insert_rows( + cls.project, cls.dataset_id, table_name, data) + return table_schema + + @classmethod + def create_bq_schema(cls, with_extra=False): + table_schema = bigquery.TableSchema() + table_field = bigquery.TableFieldSchema() + table_field.name = 'number' + table_field.type = 'INTEGER' + table_field.mode = 'NULLABLE' + table_schema.fields.append(table_field) + table_field = bigquery.TableFieldSchema() + table_field.name = 'str' + table_field.type = 'STRING' + table_field.mode = 'NULLABLE' + table_schema.fields.append(table_field) + if with_extra: + table_field = bigquery.TableFieldSchema() + table_field.name = 'extra' + table_field.type = 'INTEGER' + table_field.mode = 'NULLABLE' + table_schema.fields.append(table_field) + return table_schema + + @skip(['PortableRunner', 'FlinkRunner']) + @attr('IT') + def test_read_queries(self): + # TODO(BEAM-11311): Remove experiment when tests run on r_v2. + args = self.args + ["--experiments=use_runner_v2"] + with beam.Pipeline(argv=args) as p: + result = ( + p + | beam.Create([ + beam.io.ReadFromBigQueryRequest(query=self.query1), + beam.io.ReadFromBigQueryRequest( + query=self.query2, use_standard_sql=False), + beam.io.ReadFromBigQueryRequest( + table='%s.%s' % (self.dataset_id, self.table_name3)) + ]) + | beam.io.ReadAllFromBigQuery()) + assert_that( + result, + equal_to(self.TABLE_DATA_1 + self.TABLE_DATA_2 + self.TABLE_DATA_3)) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main() diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index da3f34f8c947..a8290624d94d 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -46,9 +46,9 @@ from apache_beam.io.gcp import bigquery_tools from apache_beam.io.gcp.bigquery import TableRowJsonCoder from apache_beam.io.gcp.bigquery import WriteToBigQuery -from apache_beam.io.gcp.bigquery import _JsonToDictCoder from apache_beam.io.gcp.bigquery import _StreamToBigQuery from apache_beam.io.gcp.bigquery_file_loads_test import _ELEMENTS +from apache_beam.io.gcp.bigquery_read_internal import _JsonToDictCoder from apache_beam.io.gcp.bigquery_read_internal import bigquery_export_destination_uri from apache_beam.io.gcp.bigquery_tools import JSON_COMPLIANCE_ERROR from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index 5501a8e77ecc..ec9db8a4176b 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -1498,7 +1498,8 @@ def __init__(self, restriction): if not isinstance(restriction, _SDFBoundedSourceRestriction): raise ValueError( 'Initializing SDFBoundedSourceRestrictionTracker' - ' requires a _SDFBoundedSourceRestriction') + ' requires a _SDFBoundedSourceRestriction. Got %s instead.' % + restriction) self.restriction = restriction def current_progress(self): @@ -1545,13 +1546,12 @@ def _check_source(self, src): 'SDFBoundedSourceRestrictionProvider can only utilize BoundedSource') def initial_restriction(self, element_source: BoundedSource): - src = element_source - self._check_source(src) - range_tracker = src.get_range_tracker(None, None) + self._check_source(element_source) + range_tracker = element_source.get_range_tracker(None, None) return _SDFBoundedSourceRestriction( SourceBundle( None, - src, + element_source, range_tracker.start_position(), range_tracker.stop_position())) @@ -1615,3 +1615,6 @@ def expand(self, pvalue): def get_windowing(self, unused_inputs): return core.Windowing(window.GlobalWindows()) + + def display_data(self): + return self._data_to_display