Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial read_gbq wrapper #22616

Merged
merged 40 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
1f82af4
initial commit
svetakvsundhar Aug 5, 2022
a1ac3e9
unit + integration tests for wrapper fn.
svetakvsundhar Aug 6, 2022
73987e9
unit + integration tests for wrapper fn.
svetakvsundhar Aug 6, 2022
fe07f19
lint
svetakvsundhar Aug 6, 2022
9519660
added test
svetakvsundhar Aug 7, 2022
b284638
added test
svetakvsundhar Aug 7, 2022
583d8cd
try to fix unittest errors
svetakvsundhar Aug 7, 2022
1c49622
lint corrected
svetakvsundhar Aug 7, 2022
68396e9
add error handling for unsupported parameter.
svetakvsundhar Aug 7, 2022
88b078a
wrapper read_gbq function.
svetakvsundhar Aug 9, 2022
ffdd87a
lint
svetakvsundhar Aug 9, 2022
87cd8e5
edits
svetakvsundhar Aug 12, 2022
69925a1
edits + lint
svetakvsundhar Aug 12, 2022
dd473b3
edits + lint
svetakvsundhar Aug 12, 2022
8094bbc
edits + lint +unit tests
svetakvsundhar Aug 12, 2022
9c53709
edits + lint +unit tests
svetakvsundhar Aug 12, 2022
7e42abf
modifications
svetakvsundhar Aug 15, 2022
ea9013c
without GCP Deps error -- should fail unit tests
svetakvsundhar Aug 16, 2022
73bef37
without GCP Deps error -- should fail unit tests
svetakvsundhar Aug 16, 2022
d2c1260
should pass unittests
svetakvsundhar Aug 17, 2022
a17f508
should pass unittests
svetakvsundhar Aug 18, 2022
5739fd9
might fail?
svetakvsundhar Aug 22, 2022
d4b0dbd
might fail?
svetakvsundhar Aug 24, 2022
c324c7f
might fail?
svetakvsundhar Aug 24, 2022
fbcba75
should pass
svetakvsundhar Aug 24, 2022
9f4bbd0
gcp deps error
svetakvsundhar Aug 24, 2022
7f3c452
formatting
svetakvsundhar Aug 24, 2022
25770a7
check rat and indent
svetakvsundhar Aug 26, 2022
81c04e9
check rat
svetakvsundhar Aug 26, 2022
5ba1806
edits made, RAT might fail?
svetakvsundhar Aug 26, 2022
f990db8
edits made, RAT might fail?
svetakvsundhar Aug 26, 2022
a9d8657
edits made, RAT might fail?
svetakvsundhar Aug 26, 2022
e1eea91
unit tests should pass
svetakvsundhar Aug 27, 2022
e106cdc
unittests and rat should pass
svetakvsundhar Aug 27, 2022
47560b0
passing rat
svetakvsundhar Aug 27, 2022
2d8be2c
fixed int tests
svetakvsundhar Aug 30, 2022
2c7a6c6
should work
svetakvsundhar Aug 30, 2022
b80e74d
lets see?
svetakvsundhar Aug 30, 2022
13865e2
fixes
svetakvsundhar Aug 30, 2022
3d17f62
fixes
svetakvsundhar Aug 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions sdks/python/apache_beam/dataframe/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,34 @@
_DEFAULT_BYTES_CHUNKSIZE = 1 << 20


def read_gbq(
table, dataset=None, project_id=None, use_bqstorage_api=False, **kwargs):
"""This function reads data from a BigQuery table and produces a
:class:`~apache_beam.dataframe.frames.DeferredDataFrame.

Args:
table (str): Please specify a table. This can be done in the format
'PROJECT:dataset.table' if one would not wish to utilize
the parameters below.
dataset (str): Please specify the dataset
(can omit if table was specified as 'PROJECT:dataset.table').
project_id (str): Please specify the project ID
(can omit if table was specified as 'PROJECT:dataset.table').
use_bqstorage_api (bool): If you would like to utilize
the BigQuery Storage API in ReadFromBigQuery, please set
this flag to true. Otherwise, please set flag
to false or leave it unspecified.
"""
if table is None:
raise ValueError("Please specify a BigQuery table to read from.")
elif len(kwargs) > 0:
raise ValueError(
"Unsupported parameter entered in read_gbq. Please enter only "
"supported parameters 'table', 'dataset', "
"'project_id', 'use_bqstorage_api'.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe print the contents of kwargs.keys() instead, to identify the specific unsupported parameters?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using kwargs.keys() as suggested previously?

return _ReadGbq(table, dataset, project_id, use_bqstorage_api)


@frame_base.with_docs_from(pd)
def read_csv(path, *args, splittable=False, **kwargs):
"""If your files are large and records do not contain quoted newlines, you may
Expand Down Expand Up @@ -756,3 +784,59 @@ def expand(self, pcoll):
| beam.Map(lambda file_result: file_result.file_name).with_output_types(
str)
}


class _ReadGbq(beam.PTransform):
"""Read data from BigQuery with output type 'BEAM_ROW',
then convert it into a deferred dataframe.

This PTransform wraps the Python ReadFromBigQuery PTransform,
and sets the output_type as 'BEAM_ROW' to convert
into a Beam Schema. Once applied to a pipeline object,
it is passed into the to_dataframe() function to convert the
PCollection into a deferred dataframe.

This PTransform currently does not support queries.

Args:
table (str): The ID of the table. The ID must contain only
letters ``a-z``, ``A-Z``,
numbers ``0-9``, underscores ``_`` or white spaces.
Note that the table argument must contain the entire table
reference specified as: ``'PROJECT:DATASET.TABLE'``.
use_bq_storage_api (bool): The method to use to read from BigQuery.
It may be 'EXPORT' or
'DIRECT_READ'. EXPORT invokes a BigQuery export request
(https://cloud.google.com/bigquery/docs/exporting-data).
'DIRECT_READ' reads
directly from BigQuery storage using the BigQuery Read API
(https://cloud.google.com/bigquery/docs/reference/storage). If
unspecified or set to false, the default is currently utilized (EXPORT).
If the flag is set to true,
'DIRECT_READ' will be utilized."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think the second line should be indented here

Suggested change
table (str): The ID of the table. The ID must contain only
letters ``a-z``, ``A-Z``,
numbers ``0-9``, underscores ``_`` or white spaces.
Note that the table argument must contain the entire table
reference specified as: ``'PROJECT:DATASET.TABLE'``.
use_bq_storage_api (bool): The method to use to read from BigQuery.
It may be 'EXPORT' or
'DIRECT_READ'. EXPORT invokes a BigQuery export request
(https://cloud.google.com/bigquery/docs/exporting-data).
'DIRECT_READ' reads
directly from BigQuery storage using the BigQuery Read API
(https://cloud.google.com/bigquery/docs/reference/storage). If
unspecified or set to false, the default is currently utilized (EXPORT).
If the flag is set to true,
'DIRECT_READ' will be utilized."""
table (str): The ID of the table. The ID must contain only
letters ``a-z``, ``A-Z``,
numbers ``0-9``, underscores ``_`` or white spaces.
Note that the table argument must contain the entire table
reference specified as: ``'PROJECT:DATASET.TABLE'``.
use_bq_storage_api (bool): The method to use to read from BigQuery.
It may be 'EXPORT' or
'DIRECT_READ'. EXPORT invokes a BigQuery export request
(https://cloud.google.com/bigquery/docs/exporting-data).
'DIRECT_READ' reads
directly from BigQuery storage using the BigQuery Read API
(https://cloud.google.com/bigquery/docs/reference/storage). If
unspecified or set to false, the default is currently utilized (EXPORT).
If the flag is set to true,
'DIRECT_READ' will be utilized."""

def __init__(
self,
table=None,
dataset_id=None,
project_id=None,
use_bqstorage_api=None):

self.table = table
self.dataset_id = dataset_id
self.project_id = project_id
self.use_bqstorage_api = use_bqstorage_api

def expand(self, root):
from apache_beam.dataframe import convert # avoid circular import
if self.use_bqstorage_api:
method = 'DIRECT_READ'
else:
method = 'EXPORT'
return convert.to_dataframe(
root
| beam.io.ReadFromBigQuery(
table=self.table,
dataset=self.dataset_id,
project=self.project_id,
method=method,
output_type='BEAM_ROW'))
123 changes: 123 additions & 0 deletions sdks/python/apache_beam/dataframe/io_it_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
Comment on lines +1 to +7
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0

I think this will make the RAT check happy. To confirm you could run ./gradlew rat locally and inspect the output (build/reports/rat/index.html).

# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Integration tests for Dataframe sources and sinks."""
# pytype: skip-file

import logging
import unittest

import pytest

import apache_beam as beam
import apache_beam.io.gcp.bigquery
from apache_beam.io.gcp import bigquery_read_it_test
from apache_beam.io.gcp import bigquery_schema_tools
from apache_beam.io.gcp import bigquery_tools
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

_LOGGER = logging.getLogger(__name__)


class ReadUsingReadGbqTests(bigquery_read_it_test.BigQueryReadIntegrationTests):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class ReadUsingReadGbqTests(bigquery_read_it_test.BigQueryReadIntegrationTests):
class ReadUsingReadGbqTests(unittest.TestCase):

Can we avoid inheriting from the BQ test here? It looks like maybe we do it to get access to self.args, but I don't think that should be necessary. If you use with TestPipeline instead of with Pipeline it should handle passing through test_pipeline_options.

@pytest.mark.it_postcommit
def test_ReadGbq(self):
from apache_beam.dataframe import convert
with beam.Pipeline(argv=self.args) as p:
actual_df = p | apache_beam.dataframe.io.read_gbq(
table="apache-beam-testing:beam_bigquery_io_test."
"dfsqltable_3c7d6fd5_16e0460dfd0",
use_bqstorage_api=False)
assert_that(
convert.to_pcollection(actual_df),
equal_to([(3, 'customer1', 'test'), (1, 'customer1', 'test'),
(2, 'customer2', 'test'), (4, 'customer2', 'test')]))

def test_ReadGbq_export_with_project(self):
from apache_beam.dataframe import convert
with beam.Pipeline(argv=self.args) as p:
actual_df = p | apache_beam.dataframe.io.read_gbq(
table="dfsqltable_3c7d6fd5_16e0460dfd0",
dataset="beam_bigquery_io_test",
project_id="apache-beam-testing",
use_bqstorage_api=False)
assert_that(
convert.to_pcollection(actual_df),
equal_to([(3, 'customer1', 'test'), (1, 'customer1', 'test'),
(2, 'customer2', 'test'), (4, 'customer2', 'test')]))

@pytest.mark.it_postcommit
def test_ReadGbq_direct_read(self):
from apache_beam.dataframe import convert
with beam.Pipeline(argv=self.args) as p:
actual_df = p | apache_beam.dataframe.io.\
read_gbq(
table=
"apache-beam-testing:beam_bigquery_io_test."
"dfsqltable_3c7d6fd5_16e0460dfd0",
use_bqstorage_api=True)
assert_that(
convert.to_pcollection(actual_df),
equal_to([(3, 'customer1', 'test'), (1, 'customer1', 'test'),
(2, 'customer2', 'test'), (4, 'customer2', 'test')]))

@pytest.mark.it_postcommit
def test_ReadGbq_direct_read_with_project(self):
from apache_beam.dataframe import convert
with beam.Pipeline(argv=self.args) as p:
actual_df = p | apache_beam.dataframe.io.read_gbq(
table="dfsqltable_3c7d6fd5_16e0460dfd0",
dataset="beam_bigquery_io_test",
project_id="apache-beam-testing",
use_bqstorage_api=True)
assert_that(
convert.to_pcollection(actual_df),
equal_to([(3, 'customer1', 'test'), (1, 'customer1', 'test'),
(2, 'customer2', 'test'), (4, 'customer2', 'test')]))

@pytest.mark.it_postcommit
def test_ReadGbq_with_computation(self):
the_table = bigquery_tools.BigQueryWrapper().get_table(
project_id="apache-beam-testing",
dataset_id="beam_bigquery_io_test",
table_id="dfsqltable_3c7d6fd5_16e0460dfd0")
table = the_table.schema
utype = bigquery_schema_tools. \
generate_user_type_from_bq_schema(table)
from apache_beam.dataframe import convert
with beam.Pipeline(argv=self.args) as p:
beam_df = p | apache_beam.dataframe.io.read_gbq(
table="dfsqltable_3c7d6fd5_16e0460dfd0",
dataset="beam_bigquery_io_test",
project_id="apache-beam-testing")
actual_df = beam_df.groupby('id').count()
assert_that(
convert.to_pcollection(actual_df, include_indexes=True),
equal_to([
utype(id=1, name=1, type=1),
utype(id=2, name=1, type=1),
utype(id=3, name=1, type=1),
utype(id=4, name=1, type=1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also avoid using utype in this test?

]))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
57 changes: 57 additions & 0 deletions sdks/python/apache_beam/dataframe/io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from io import BytesIO
from io import StringIO

import mock
import pandas as pd
import pandas.testing
import pyarrow
Expand All @@ -35,12 +36,20 @@
from parameterized import parameterized

import apache_beam as beam
import apache_beam.io.gcp.bigquery
from apache_beam.dataframe import convert
from apache_beam.dataframe import io
from apache_beam.io import restriction_trackers
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

try:
from apitools.base.py.exceptions import HttpError
except ImportError:
HttpError = None

# Get major, minor version
PD_VERSION = tuple(map(int, pd.__version__.split('.')[0:2]))
PYARROW_VERSION = tuple(map(int, pyarrow.__version__.split('.')[0:2]))
Expand Down Expand Up @@ -410,5 +419,53 @@ def test_double_write(self):
set(self.read_all_lines(output + 'out2.csv*')))


@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens with read_gbq if GCP dependencies are not installed? Maybe we should not define it in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried without this, and the tests passed locally but failed on Jenkins and thus I wasn't able to easily reproduce the issue. Can discuss this offline if you think it's worth looking further into.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you do an experiment to see what the error looks like if you try to use read_gbq without gcp dependencies installed? It should be sufficient to just make a new virtualenv, and install beam with python -m pip install -e '.[test,dataframe]' (note there's no gcp extra).

I'd like to make sure that the user gets a helpful error directing them to install GCP deps.

Even better would be to add a test that confirms this, it could be skipped unless GCP deps are not installed.

class ReadGbqTransformTests(unittest.TestCase):
@mock.patch.object(BigQueryWrapper, 'get_table')
def test_bad_schema_public_api_direct_read(self, get_table):
try:
bigquery.TableFieldSchema
except AttributeError:
raise ValueError('Please install GCP Dependencies.')
fields = [
bigquery.TableFieldSchema(name='stn', type='DOUBLE', mode="NULLABLE"),
bigquery.TableFieldSchema(name='temp', type='FLOAT64', mode="REPEATED"),
bigquery.TableFieldSchema(name='count', type='INTEGER', mode=None)
]
schema = bigquery.TableSchema(fields=fields)
table = apache_beam.io.gcp.internal.clients.bigquery. \
bigquery_v2_messages.Table(
schema=schema)
get_table.return_value = table

with self.assertRaisesRegex(ValueError,
"Encountered an unsupported type: 'DOUBLE'"):
p = apache_beam.Pipeline()
_ = p | apache_beam.dataframe.io.read_gbq(
table="dataset.sample_table", use_bqstorage_api=True)

def test_unsupported_callable(self):
def filterTable(table):
if table is not None:
return table

res = filterTable
with self.assertRaisesRegex(TypeError,
'ReadFromBigQuery: table must be of type string'
'; got a callable instead'):
p = beam.Pipeline()
_ = p | beam.dataframe.io.read_gbq(table=res)

def test_ReadGbq_unsupported_param(self):
with self.assertRaisesRegex(
ValueError,
"Unsupported parameter entered in read_gbq. Please enter only "
"supported parameters 'table', 'dataset', "
"'project_id', 'use_bqstorage_api'."):
p = beam.Pipeline()
_ = p | beam.dataframe.io.read_gbq(
table="table", use_bqstorage_api=False, reauth="true_config")


if __name__ == '__main__':
unittest.main()
Empty file.