Skip to content

Commit

Permalink
Merge pull request #13170 from [BEAM-9650] Adding support for ReadAll…
Browse files Browse the repository at this point in the history
… from BigQuery transform

* Moving ReadFromBQ components to internal module

* Fix lint and formatting

* testing ReadFromBQ from ReadAllFromBQ

* Implementing Python Bounded Source Reader DoFn

* Adding annotations

* fixing import issue

* fix lint

* Adding ReadAllFromBQ

* fix issue

* Fixup

* Fix precommit

* fixup

* fixup

* fixup

* Fix tests

* fix formatting

* Moving test to runner_v2

* Add documentation and changes for ReadAllFromBigQuery

* fix docs test

* Addressing comments

* fixup

* fixup

* fixup

* fixup

* fixup

* Moving RFBQRequest to public file

* fixup

* Fixup

* comment
  • Loading branch information
pabloem authored Nov 30, 2020
1 parent bee6602 commit a1fac1d
Show file tree
Hide file tree
Showing 6 changed files with 588 additions and 88 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
## I/Os
* ReadFromMongoDB can now be used with MongoDB Atlas (Python) ([BEAM-11266](https://issues.apache.org/jira/browse/BEAM-11266).)
* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* There is a new transform `ReadAllFromBigQuery` that can receive multiple requests to read data from BigQuery at pipeline runtime. See [PR 13170](https://github.com/apache/beam/pull/13170), and [BEAM-9650](https://issues.apache.org/jira/browse/BEAM-9650).

## New Features / Improvements

Expand Down
268 changes: 186 additions & 82 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,44 @@
`ReadFromBigQuery`, you can use the flag `use_json_exports` to export
data as JSON, and receive base64-encoded bytes.
ReadAllFromBigQuery
-------------------
Beam 2.27.0 introduces a new transform called `ReadAllFromBigQuery` which
allows you to define table and query reads from BigQuery at pipeline
runtime.:::
read_requests = p | beam.Create([
ReadFromBigQueryRequest(query='SELECT * FROM mydataset.mytable'),
ReadFromBigQueryRequest(table='myproject.mydataset.mytable')])
results = read_requests | ReadAllFromBigQuery()
A good application for this transform is in streaming pipelines to
refresh a side input coming from BigQuery. This would work like so:::
side_input = (
p
| 'PeriodicImpulse' >> PeriodicImpulse(
first_timestamp, last_timestamp, interval, True)
| 'MapToReadRequest' >> beam.Map(
lambda x: ReadFromBigQueryRequest(table='dataset.table'))
| beam.io.ReadAllFromBigQuery())
main_input = (
p
| 'MpImpulse' >> beam.Create(sample_main_input_elements)
|
'MapMpToTimestamped' >> beam.Map(lambda src: TimestampedValue(src, src))
| 'WindowMpInto' >> beam.WindowInto(
window.FixedWindows(main_input_windowing_interval)))
result = (
main_input
| 'ApplyCrossJoin' >> beam.FlatMap(
cross_join, rights=beam.pvalue.AsIter(side_input)))
**Note**: This transform is supported on Portable and Dataflow v2 runners.
**Note**: This transform does not currently clean up temporary datasets
created for its execution. (BEAM-11359)
Writing Data to BigQuery
========================
Expand Down Expand Up @@ -234,7 +272,6 @@ def compute_table_name(row):
from __future__ import absolute_import

import collections
import decimal
import itertools
import json
import logging
Expand All @@ -243,6 +280,8 @@ def compute_table_name(row):
import uuid
from builtins import object
from builtins import zip
from typing import Dict
from typing import Union

from future.utils import itervalues
from past.builtins import unicode
Expand All @@ -257,12 +296,15 @@ def compute_table_name(row):
from apache_beam.io.filesystems import FileSystems
from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.bigquery_io_metadata import create_bigquery_io_metadata
from apache_beam.io.gcp.bigquery_read_internal import _BigQueryReadSplit
from apache_beam.io.gcp.bigquery_read_internal import _JsonToDictCoder
from apache_beam.io.gcp.bigquery_read_internal import _PassThroughThenCleanup
from apache_beam.io.gcp.bigquery_read_internal import bigquery_export_destination_uri
from apache_beam.io.gcp.bigquery_tools import RetryStrategy
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.io.iobase import BoundedSource
from apache_beam.io.iobase import RangeTracker
from apache_beam.io.iobase import SDFBoundedSourceReader
from apache_beam.io.iobase import SourceBundle
from apache_beam.io.textio import _TextSource as TextSource
from apache_beam.metrics import Metrics
Expand All @@ -284,6 +326,14 @@ def compute_table_name(row):
from apache_beam.transforms.window import GlobalWindows
from apache_beam.utils import retry
from apache_beam.utils.annotations import deprecated
from apache_beam.utils.annotations import experimental

try:
from apache_beam.io.gcp.internal.clients.bigquery import DatasetReference
from apache_beam.io.gcp.internal.clients.bigquery import TableReference
except ImportError:
DatasetReference = None
TableReference = None

__all__ = [
'TableRowJsonCoder',
Expand All @@ -292,6 +342,8 @@ def compute_table_name(row):
'BigQuerySink',
'WriteToBigQuery',
'ReadFromBigQuery',
'ReadFromBigQueryRequest',
'ReadAllFromBigQuery',
'SCHEMA_AUTODETECT',
]

Expand Down Expand Up @@ -591,84 +643,6 @@ def reader(self, test_bigquery_client=None):
kms_key=self.kms_key)


FieldSchema = collections.namedtuple('FieldSchema', 'fields mode name type')


class _JsonToDictCoder(coders.Coder):
"""A coder for a JSON string to a Python dict."""
def __init__(self, table_schema):
self.fields = self._convert_to_tuple(table_schema.fields)
self._converters = {
'INTEGER': int,
'INT64': int,
'FLOAT': float,
'FLOAT64': float,
'NUMERIC': self._to_decimal,
'BYTES': self._to_bytes,
}

@staticmethod
def _to_decimal(value):
return decimal.Decimal(value)

@staticmethod
def _to_bytes(value):
"""Converts value from str to bytes on Python 3.x. Does nothing on
Python 2.7."""
return value.encode('utf-8')

@classmethod
def _convert_to_tuple(cls, table_field_schemas):
"""Recursively converts the list of TableFieldSchema instances to the
list of tuples to prevent errors when pickling and unpickling
TableFieldSchema instances.
"""
if not table_field_schemas:
return []

return [
FieldSchema(cls._convert_to_tuple(x.fields), x.mode, x.name, x.type)
for x in table_field_schemas
]

def decode(self, value):
value = json.loads(value.decode('utf-8'))
return self._decode_with_schema(value, self.fields)

def _decode_with_schema(self, value, schema_fields):
for field in schema_fields:
if field.name not in value:
# The field exists in the schema, but it doesn't exist in this row.
# It probably means its value was null, as the extract to JSON job
# doesn't preserve null fields
value[field.name] = None
continue

if field.type == 'RECORD':
nested_values = value[field.name]
if field.mode == 'REPEATED':
for i, nested_value in enumerate(nested_values):
nested_values[i] = self._decode_with_schema(
nested_value, field.fields)
else:
value[field.name] = self._decode_with_schema(
nested_values, field.fields)
else:
try:
converter = self._converters[field.type]
value[field.name] = converter(value[field.name])
except KeyError:
# No need to do any conversion
pass
return value

def is_deterministic(self):
return True

def to_type_hint(self):
return dict


class _CustomBigQuerySource(BoundedSource):
def __init__(
self,
Expand Down Expand Up @@ -720,7 +694,7 @@ def __init__(
self.bigquery_job_labels = bigquery_job_labels or {}
self.use_json_exports = use_json_exports
self.temp_dataset = temp_dataset
self._job_name = job_name or 'AUTOMATIC_JOB_NAME'
self._job_name = job_name or 'BQ_EXPORT_JOB'
self._step_name = step_name
self._source_uuid = unique_id

Expand Down Expand Up @@ -1666,7 +1640,7 @@ def _compute_method(self, experiments, is_streaming_pipeline):
def expand(self, pcoll):
p = pcoll.pipeline

if (isinstance(self.table_reference, bigquery.TableReference) and
if (isinstance(self.table_reference, TableReference) and
self.table_reference.projectId is None):
self.table_reference.projectId = pcoll.pipeline.options.view_as(
GoogleCloudOptions).project
Expand Down Expand Up @@ -1878,6 +1852,7 @@ class ReadFromBigQuery(PTransform):
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro\
#avro_conversions
"""

COUNTER = 0

def __init__(self, gcs_location=None, *args, **kwargs):
Expand All @@ -1897,7 +1872,7 @@ def __init__(self, gcs_location=None, *args, **kwargs):
self._kwargs = kwargs

def expand(self, pcoll):
unique_id = str(uuid.uuid4())[0:10]
# TODO(BEAM-11115): Make ReadFromBQ rely on ReadAllFromBQ implementation.
temp_location = pcoll.pipeline.options.view_as(
GoogleCloudOptions).temp_location
job_name = pcoll.pipeline.options.view_as(GoogleCloudOptions).job_name
Expand Down Expand Up @@ -1931,3 +1906,132 @@ def file_path_to_remove(unused_elm):
*self._args,
**self._kwargs))
| _PassThroughThenCleanup(files_to_remove_pcoll))


class ReadFromBigQueryRequest:
"""
Class that defines data to read from BQ.
"""
def __init__(
self,
query: str = None,
use_standard_sql: bool = True,
table: Union[str, TableReference] = None,
flatten_results: bool = False):
"""
Only one of query or table should be specified.
:param query: SQL query to fetch data.
:param use_standard_sql:
Specifies whether to use BigQuery's standard SQL dialect for this query.
The default value is :data:`True`. If set to :data:`False`,
the query will use BigQuery's legacy SQL dialect.
This parameter is ignored for table inputs.
:param table:
The ID of the table to read. The ID must contain only letters
``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. Table should
define project and dataset (ex.: ``'PROJECT:DATASET.TABLE'``).
:param flatten_results:
Flattens all nested and repeated fields in the query results.
The default value is :data:`False`.
"""
self.flatten_results = flatten_results
self.query = query
self.use_standard_sql = use_standard_sql
self.table = table
self.validate()

# We use this internal object ID to generate BigQuery export directories.
self.obj_id = random.randint(0, 100000)

def validate(self):
if self.table is not None and self.query is not None:
raise ValueError(
'Both a BigQuery table and a query were specified.'
' Please specify only one of these.')
elif self.table is None and self.query is None:
raise ValueError('A BigQuery table or a query must be specified')
if self.table is not None:
if isinstance(self.table, str):
assert self.table.find('.'), (
'Expected a table reference '
'(PROJECT:DATASET.TABLE or DATASET.TABLE) instead of %s'
% self.table)


@experimental()
class ReadAllFromBigQuery(PTransform):
"""Read data from BigQuery.
PTransform:ReadFromBigQueryRequest->Rows
This PTransform uses a BigQuery export job to take a snapshot of the table
on GCS, and then reads from each produced file. Data is exported into
a new subdirectory for each export using UUIDs generated in
`ReadFromBigQueryRequest` objects.
It is recommended not to use this PTransform for streaming jobs on
GlobalWindow, since it will not be able to cleanup snapshots.
Args:
gcs_location (str): The name of the Google Cloud Storage
bucket where the extracted table should be written as a string. If
:data:`None`, then the temp_location parameter is used.
validate (bool): If :data:`True`, various checks will be done when source
gets initialized (e.g., is table present?).
kms_key (str): Experimental. Optional Cloud KMS key name for use when
creating new temporary tables.
"""
COUNTER = 0

def __init__(
self,
gcs_location: Union[str, ValueProvider] = None,
validate: bool = False,
kms_key: str = None,
temp_dataset: Union[str, DatasetReference] = None,
bigquery_job_labels: Dict[str, str] = None):
if gcs_location:
if not isinstance(gcs_location, (str, ValueProvider)):
raise TypeError(
'%s: gcs_location must be of type string'
' or ValueProvider; got %r instead' %
(self.__class__.__name__, type(gcs_location)))

self.gcs_location = gcs_location
self.validate = validate
self.kms_key = kms_key
self.bigquery_job_labels = bigquery_job_labels
self.temp_dataset = temp_dataset

def expand(self, pcoll):
job_name = pcoll.pipeline.options.view_as(GoogleCloudOptions).job_name
project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
unique_id = str(uuid.uuid4())[0:10]

try:
step_name = self.label
except AttributeError:
step_name = 'ReadAllFromBigQuery_%d' % ReadAllFromBigQuery.COUNTER
ReadAllFromBigQuery.COUNTER += 1

sources_to_read, cleanup_locations = (
pcoll
| beam.ParDo(
_BigQueryReadSplit(
options=pcoll.pipeline.options,
gcs_location=self.gcs_location,
bigquery_job_labels=self.bigquery_job_labels,
job_name=job_name,
step_name=step_name,
unique_id=unique_id,
kms_key=self.kms_key,
project=project,
temp_dataset=self.temp_dataset)).with_outputs(
"location_to_cleanup", main="files_to_read")
)

return (
sources_to_read
| SDFBoundedSourceReader()
| _PassThroughThenCleanup(beam.pvalue.AsIter(cleanup_locations)))
Loading

0 comments on commit a1fac1d

Please sign in to comment.