Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial read_gbq wrapper #22616

Merged
merged 40 commits into from
Sep 1, 2022
Merged

Conversation

svetakvsundhar
Copy link
Contributor

@svetakvsundhar svetakvsundhar commented Aug 6, 2022

Fixes #20810

This PR adds a wrapper function 'read_gbq', that wraps a wrapper PTransform 'ReadGbq'. 'ReadGbq' allows users to Read from BQ, convert the schema into a Beam Schema, and plumb into a Deferred Dataframe.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@codecov
Copy link

codecov bot commented Aug 6, 2022

Codecov Report

Merging #22616 (3d17f62) into master (720ee14) will increase coverage by 0.02%.
The diff coverage is 94.44%.

@@            Coverage Diff             @@
##           master   #22616      +/-   ##
==========================================
+ Coverage   74.23%   74.26%   +0.02%     
==========================================
  Files         707      710       +3     
  Lines       93295    93885     +590     
==========================================
+ Hits        69257    69719     +462     
- Misses      22770    22898     +128     
  Partials     1268     1268              
Flag Coverage Δ
python 83.57% <94.44%> (-0.05%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
sdks/python/apache_beam/dataframe/io.py 89.53% <94.44%> (+0.75%) ⬆️
sdks/python/apache_beam/typehints/__init__.py 77.77% <0.00%> (-22.23%) ⬇️
.../python/apache_beam/testing/test_stream_service.py 88.09% <0.00%> (-4.77%) ⬇️
...examples/inference/sklearn_mnist_classification.py 43.75% <0.00%> (-3.75%) ⬇️
sdks/python/apache_beam/utils/subprocess_server.py 56.54% <0.00%> (-2.20%) ⬇️
...n/apache_beam/ml/gcp/recommendations_ai_test_it.py 73.46% <0.00%> (-2.05%) ⬇️
...che_beam/runners/interactive/interactive_runner.py 90.06% <0.00%> (-1.33%) ⬇️
sdks/python/apache_beam/runners/direct/executor.py 96.46% <0.00%> (-0.55%) ⬇️
sdks/python/apache_beam/typehints/schemas.py 93.84% <0.00%> (-0.48%) ⬇️
sdks/python/apache_beam/coders/coders.py 88.00% <0.00%> (-0.35%) ⬇️
... and 32 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@github-actions
Copy link
Contributor

github-actions bot commented Aug 6, 2022

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @pabloem for label python.
R: @johnjcasey for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@svetakvsundhar
Copy link
Contributor Author

svetakvsundhar commented Aug 7, 2022

R: @TheNeuralBit this is ready for an initial pass, PTAL.

@TheNeuralBit TheNeuralBit self-requested a review August 9, 2022 16:38
@TheNeuralBit TheNeuralBit changed the title BEAM-11587 Wrapper PTransform Add initial read_gbq wrapper Aug 9, 2022
self.max_results is not None or self.progress_bar_type) is not None:
raise ValueError(
"Unsupported parameter entered in ReadGbq. "
"Please enter only supported parameters.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of checking each unsupported parameter explicitly, could we capture all of these in **kwargs, (so the argument list becomes (table=None, use_bqstorage_api=False, **kwargs)) and then raise this error if kwargs is non-empty? Then it could also include a listing of the unsupported arguments, like auth_local_webserver, progress_bar_type, ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also suggest moving this to read_gbq, and having _ReadGbq just accept supported params.


This PTransform currently does not support queries.
Note that all Python ReadFromBigQuery args can be passed in
to this PTransform, in addition to the args listed below.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is right, I don't see any logic to pass the ReadFromBigQuery args through.

"""If you would like to use the 'DIRECT_READ' method ins ReadFromBigQuery,
please set use_bq_storage_api to True.
Otherwise, if you would like to use the 'EXPORT' method, please set
use_bq_storage_api to False, or leave it unspecified."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could be swayed, but I think it would be preferable not to document how this is working under the hood (e.g. that it relies on ReadFromBigQuery), and instead just present the user-facing API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I disagree. I'm assuming that users that want to ReadFromBQ might already be use to the way the ReadFromBQ PTransform works, and thus it would be useful to let them know how to utilize 'DIRECT_READ' and 'EXPORT' via the ReadGbq PTransform.

What are the benefits of not mentioning 'DIRECT_READ' and 'EXPORT'?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the perspective of the users of this API, the mode is fully controlled by the use_bqstorage_api parameter. It just confuses things to reveal that it maps to mode={DIRECT_READ, EXPORT} under the hood.

If we want to reference the mode in the docstring, we should just have a mode parameter, and not bother with a use_bqstorage_api parameter. To be clear, I think that's an acceptable alternative, but I'd like to pick one or the other.

I have a slight preference to avoid using Beam terminology though - a goal of the DataFrame API is to provide an API that feels natural to developers who are already familiar with pandas, and not familiar with Beam.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok, thanks for sharing, I agree with your perspective on this and will use the pandas terminology.

@@ -58,6 +58,17 @@
_DEFAULT_BYTES_CHUNKSIZE = 1 << 20


@frame_base.with_docs_from(pd)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it makes sense to pull the docs from pandas in this case. We're diverging from pandas pretty significantly here: there are a lot of arguments we don't need (e.g. auth related ones), and we prefer a table argument rather than query. I'd suggest writing a whole new docstring (possibly drawing heavily from the pandas one).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure-- for clarity here, should the new docstring be documenting the args to read_gbq, as well as the ReadGbq PTransform (e.g. some of the comments written in the ReadGbq PTransform documentation)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the new docstring here should be all of the user-facing documentation. _ReadGbq doesn't need to have a docstring since it's a private implementation detail (it could still have one to document its purpose, but this is for Beam devs, not for users).

Comment on lines 440 to 442
pipeline = p | apache_beam.dataframe.io.read_gbq(
table="dataset.sample_table", use_bqstorage_api=True)
pipeline
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note the output here should be a deferred dataframe object, it's confusing to name it pipeline (p is actually a Pipeline). Regardless, I'd suggest assigning this to _ so pylint doesn't complain about an unassigned output or an unused variable:

Suggested change
pipeline = p | apache_beam.dataframe.io.read_gbq(
table="dataset.sample_table", use_bqstorage_api=True)
pipeline
_ = p | apache_beam.dataframe.io.read_gbq(
table="dataset.sample_table", use_bqstorage_api=True)

@@ -259,6 +259,43 @@ def test_table_schema_retrieve_with_direct_read(self):
]))


class ReadUsingReadGbqTests(BigQueryReadIntegrationTests):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest making a new dataframe.io_it_test for these, rather than adding them here. Will that work?

table="apache-beam-testing:"
"beam_bigquery_io_test."
"dfsqltable_3c7d6fd5_16e0460dfd0")
assert_that(expected_df, equal_to(actual_df))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit odd to have two pipelines like this and verify them against each other. I also wouldn't expect this to work with a deferred dataframe instance. Is it actually working?

I think it would be preferable to just have an explicit expected value, as in the other ITs. Can we do that instead?

@@ -410,5 +419,50 @@ def test_double_write(self):
set(self.read_all_lines(output + 'out2.csv*')))


@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens with read_gbq if GCP dependencies are not installed? Maybe we should not define it in that case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried without this, and the tests passed locally but failed on Jenkins and thus I wasn't able to easily reproduce the issue. Can discuss this offline if you think it's worth looking further into.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you do an experiment to see what the error looks like if you try to use read_gbq without gcp dependencies installed? It should be sufficient to just make a new virtualenv, and install beam with python -m pip install -e '.[test,dataframe]' (note there's no gcp extra).

I'd like to make sure that the user gets a helpful error directing them to install GCP deps.

Even better would be to add a test that confirms this, it could be skipped unless GCP deps are not installed.

@svetakvsundhar
Copy link
Contributor Author

Thanks for the suggestions @TheNeuralBit , I think this is ready for another pass.

@svetakvsundhar
Copy link
Contributor Author

Run Python_PVR_Flink PreCommit

@TheNeuralBit
Copy link
Member

Run Python 3.8 PostCommit

Copy link
Member

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I just have a few more requests.

Please also run one of the PostCommits and double-check that this new test is running there (if you're not sure how to do this I can help with it when I'm back the week after next).

utype(id=1, name='customer1', type='test'),
utype(id=2, name='customer2', type='test'),
utype(id=4, name='customer2', type='test')
]))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be good to add one more test that actually applies some computation on the dataframe, just to test it end-to-end e.g:

beam_df = read_gbq(...)
actual_df = beam_df.groupby('name').count()

(Note in that case the 'name' column will be in the index, so you'll have to use to_pcollection(actual_df, include_indexes=True))

Alternatively we could build a nice example pipeline that uses read_gbq with a computation and put it in apache_beam.examples.dataframe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg. ill add an integration test here and make a separate PR for an example pipeline to add?

Comment on lines 61 to 66
equal_to([
utype(id=3, name='customer1', type='test'),
utype(id=1, name='customer1', type='test'),
utype(id=2, name='customer2', type='test'),
utype(id=4, name='customer2', type='test')
]))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should be able to just compare the result to tuple instances, then you wouldn't need to generate the utype above. Will this work?

Suggested change
equal_to([
utype(id=3, name='customer1', type='test'),
utype(id=1, name='customer1', type='test'),
utype(id=2, name='customer2', type='test'),
utype(id=4, name='customer2', type='test')
]))
equal_to([
(3, 'customer1', 'test'),
(1, 'customer1', 'test'),
(2, 'customer2', 'test'),
(4, 'customer2', 'test')
]))


# Protect against environments where bigquery library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
# pylint: enable=wrong-import-order, wrong-import-position
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These comments look out of place

# limitations under the License.
#

"""Unit tests for Dataframe sources and sinks."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
"""Unit tests for Dataframe sources and sinks."""
"""Integration tests for Dataframe sources and sinks."""

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another nit: I'd prefer we put this in apache_beam.dataframe.io_it_test

_LOGGER = logging.getLogger(__name__)


class ReadUsingReadGbqTests(bigquery_read_it_test.BigQueryReadIntegrationTests):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this inheriting from BigQueryReadIntegrationTests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to set up and tear down the requests to BQ.

Comment on lines 63 to 65
"""This function reads data from a BigQuery source and outputs it into
a Beam deferred dataframe
(https://beam.apache.org/documentation/dsls/dataframes/overview/)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""This function reads data from a BigQuery source and outputs it into
a Beam deferred dataframe
(https://beam.apache.org/documentation/dsls/dataframes/overview/)
"""This function reads data from a BigQuery table and produces a :class:`~apache_beam.dataframe.frames.DeferredDataFrame`.

(this will make a link in the generated documentation, like you see here)

Comment on lines 66 to 72
Please specify a table in the format 'PROJECT:dataset.table'
or use the table, dataset, and project_id args
to specify the table. If you would like to utilize the BigQuery
Storage API in ReadFromBigQuery,
please set use_bq_storage_api to True.
Otherwise, please set the flag to false or
leave it unspecified."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could you format this with "Args:" instead?

Suggested change
Please specify a table in the format 'PROJECT:dataset.table'
or use the table, dataset, and project_id args
to specify the table. If you would like to utilize the BigQuery
Storage API in ReadFromBigQuery,
please set use_bq_storage_api to True.
Otherwise, please set the flag to false or
leave it unspecified."""
Args:
table: ...
dataset: ...
project_id: ...
use_bqstorage_api: ...

@@ -410,5 +419,50 @@ def test_double_write(self):
set(self.read_all_lines(output + 'out2.csv*')))


@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you do an experiment to see what the error looks like if you try to use read_gbq without gcp dependencies installed? It should be sufficient to just make a new virtualenv, and install beam with python -m pip install -e '.[test,dataframe]' (note there's no gcp extra).

I'd like to make sure that the user gets a helpful error directing them to install GCP deps.

Even better would be to add a test that confirms this, it could be skipped unless GCP deps are not installed.

raise ValueError(
"Unsupported parameter entered in read_gbq. Please enter only "
"supported parameters 'table', 'dataset', "
"'project_id', 'use_bqstorage_api'.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe print the contents of kwargs.keys() instead, to identify the specific unsupported parameters?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using kwargs.keys() as suggested previously?

@svetakvsundhar
Copy link
Contributor Author

Run Python 3.8 PostCommit

@svetakvsundhar
Copy link
Contributor Author

Run Python PreCommit

@svetakvsundhar
Copy link
Contributor Author

@TheNeuralBit ptal and lmk if the io_it_test looks viable

@TheNeuralBit
Copy link
Member

Run Python PreCommit

@TheNeuralBit
Copy link
Member

@TheNeuralBit ptal and lmk if the io_it_test looks viable

Looks great! I can merge when we get CI green

@svetakvsundhar
Copy link
Contributor Author

Run Python PreCommit

1 similar comment
@svetakvsundhar
Copy link
Contributor Author

Run Python PreCommit

@TheNeuralBit TheNeuralBit merged commit 7153c21 into apache:master Sep 1, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support pd.read_gbq and DataFrame.to_gbq
2 participants