From 94c41f6d4923673b432ea9e970f151da7376c474 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Thu, 5 Aug 2021 13:49:34 -0400 Subject: [PATCH 01/46] add precommit config --- .pre-commit-config.yaml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 .pre-commit-config.yaml diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..ee7164d --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,17 @@ +repos: +- repo: https://github.com/psf/black + rev: 20.8b1 + hooks: + - id: black + language_version: python3 + exclude: versioneer.py +- repo: https://gitlab.com/pycqa/flake8 + rev: 3.8.3 + hooks: + - id: flake8 + language_version: python3 +- repo: https://github.com/pycqa/isort + rev: 5.8.0 + hooks: + - id: isort + language_version: python3 \ No newline at end of file From 48becdb0a4c6ece487fcad5dc7f19281ed7469b0 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Thu, 5 Aug 2021 16:44:56 -0400 Subject: [PATCH 02/46] add read_gbq --- dask_bigquery/__init__.py | 1 + dask_bigquery/core.py | 236 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 237 insertions(+) create mode 100644 dask_bigquery/__init__.py create mode 100644 dask_bigquery/core.py diff --git a/dask_bigquery/__init__.py b/dask_bigquery/__init__.py new file mode 100644 index 0000000..80a7c9c --- /dev/null +++ b/dask_bigquery/__init__.py @@ -0,0 +1 @@ +from .core import read_gbq diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py new file mode 100644 index 0000000..a0344e6 --- /dev/null +++ b/dask_bigquery/core.py @@ -0,0 +1,236 @@ +from __future__ import annotations + +import logging +import warnings +from collections.abc import Iterable +from contextlib import contextmanager +from functools import partial + +import dask +import dask.dataframe as dd +import pandas as pd +import pyarrow +from google.cloud import bigquery, bigquery_storage + + +@contextmanager +def bigquery_client(project_id="dask-bigquery", with_storage_api=False): + # Ignore google auth credentials warning + warnings.filterwarnings( + "ignore", "Your application has authenticated using end user credentials" + ) + + bq_storage_client = None + bq_client = bigquery.Client(project_id) + try: + if with_storage_api: + bq_storage_client = bigquery_storage.BigQueryReadClient( + credentials=bq_client._credentials + ) + yield bq_client, bq_storage_client + else: + yield bq_client + finally: + bq_client.close() + + +def _stream_to_dfs(bqs_client, stream_name, schema, timeout): + """Given a Storage API client and a stream name, yield all dataframes.""" + return [ + pyarrow.ipc.read_record_batch( + pyarrow.py_buffer(message.arrow_record_batch.serialized_record_batch), + schema, + ).to_pandas() + for message in bqs_client.read_rows(name=stream_name, offset=0, timeout=timeout) + ] + + +@dask.delayed +def _read_rows_arrow( + *, + make_create_read_session_request: callable, + partition_field: str = None, + project_id: str, + stream_name: str = None, + timeout: int, +) -> pd.DataFrame: + """Read a single batch of rows via BQ Storage API, in Arrow binary format. + Args: + project_id: BigQuery project + create_read_session_request: kwargs to pass to `bqs_client.create_read_session` + as `request` + partition_field: BigQuery field for partitions, to be used as Dask index col for + divisions + NOTE: Please set if specifying `row_restriction` filters in TableReadOptions. + stream_name: BigQuery Storage API Stream "name". + NOTE: Please set if reading from Storage API without any `row_restriction`. + https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream + NOTE: `partition_field` and `stream_name` kwargs are mutually exclusive. + Adapted from + https://github.com/googleapis/python-bigquery-storage/blob/a0fc0af5b4447ce8b50c365d4d081b9443b8490e/google/cloud/bigquery_storage_v1/reader.py. + """ + with bigquery_client(project_id, with_storage_api=True) as (bq_client, bqs_client): + session = bqs_client.create_read_session(make_create_read_session_request()) + schema = pyarrow.ipc.read_schema( + pyarrow.py_buffer(session.arrow_schema.serialized_schema) + ) + + if (partition_field is not None) and (stream_name is not None): + raise ValueError( + "The kwargs `partition_field` and `stream_name` are mutually exclusive." + ) + + elif partition_field is not None: + shards = [ + df + for stream in session.streams + for df in _stream_to_dfs( + bqs_client, stream.name, schema, timeout=timeout + ) + ] + # NOTE: if no rows satisfying the row_restriction, then `shards` will be empty list + if len(shards) == 0: + shards = [schema.empty_table().to_pandas()] + shards = [shard.set_index(partition_field, drop=True) for shard in shards] + + elif stream_name is not None: + shards = _stream_to_dfs(bqs_client, stream_name, schema, timeout=timeout) + # NOTE: BQ Storage API can return empty streams + if len(shards) == 0: + shards = [schema.empty_table().to_pandas()] + + else: + raise NotImplementedError( + "Please specify either `partition_field` or `stream_name`." + ) + + return pd.concat(shards) + + +def read_gbq( + project_id: str, + dataset_id: str, + table_id: str, + partition_field: str = None, + partitions: Iterable[str] = None, + row_filter="", + fields: list[str] = (), + read_timeout: int = 3600, +): + """Read table as dask dataframe using BigQuery Storage API via Arrow format. + If `partition_field` and `partitions` are specified, then the resulting dask dataframe + will be partitioned along the same boundaries. Otherwise, partitions will be approximately + balanced according to BigQuery stream allocation logic. + If `partition_field` is specified but not included in `fields` (either implicitly by requesting + all fields, or explicitly by inclusion in the list `fields`), then it will still be included + in the query in order to have it available for dask dataframe indexing. + Args: + project_id: BigQuery project + dataset_id: BigQuery dataset within project + table_id: BigQuery table within dataset + partition_field: to specify filters of form "WHERE {partition_field} = ..." + partitions: all values to select of `partition_field` + fields: names of the fields (columns) to select (default None to "SELECT *") + read_timeout: # of seconds an individual read request has before timing out + Returns: + dask dataframe + See https://github.com/dask/dask/issues/3121 for additional context. + """ + if (partition_field is None) and (partitions is not None): + raise ValueError("Specified `partitions` without `partition_field`.") + + # If `partition_field` is not part of the `fields` filter, fetch it anyway to be able + # to set it as dask dataframe index. We want this to be able to have consistent: + # BQ partitioning + dask divisions + pandas index values + if (partition_field is not None) and fields and (partition_field not in fields): + fields = (partition_field, *fields) + + # These read tasks seems to cause deadlocks (or at least long stuck workers out of touch with + # the scheduler), particularly when mixed with other tasks that execute C code. Anecdotally + # annotating the tasks with a higher priority seems to help (but not fully solve) the issue at + # the expense of higher cluster memory usage. + with bigquery_client(project_id, with_storage_api=True) as ( + bq_client, + bqs_client, + ), dask.annotate(priority=1): + table_ref = bq_client.get_table(".".join((dataset_id, table_id))) + if table_ref.table_type == "VIEW": + # Materialize the view since the operations below don't work on views. + logging.warning( + "Materializing view in order to read into dask. This may be expensive." + ) + query = f"SELECT * FROM `{full_id(table_ref)}`" + table_ref, _, _ = execute_query(query) + + # The protobuf types can't be pickled (may be able to tweak w/ copyreg), so instead use a + # generator func. + def make_create_read_session_request(row_filter=""): + return bigquery_storage.types.CreateReadSessionRequest( + max_stream_count=100, # 0 -> use as many streams as BQ Storage will provide + parent=f"projects/{project_id}", + read_session=bigquery_storage.types.ReadSession( + data_format=bigquery_storage.types.DataFormat.ARROW, + read_options=bigquery_storage.types.ReadSession.TableReadOptions( + row_restriction=row_filter, + selected_fields=fields, + ), + table=table_ref.to_bqstorage(), + ), + ) + + # Create a read session in order to detect the schema. + # Read sessions are light weight and will be auto-deleted after 24 hours. + session = bqs_client.create_read_session( + make_create_read_session_request(row_filter=row_filter) + ) + schema = pyarrow.ipc.read_schema( + pyarrow.py_buffer(session.arrow_schema.serialized_schema) + ) + meta = schema.empty_table().to_pandas() + delayed_kwargs = dict(prefix=f"{dataset_id}.{table_id}-") + + if partition_field is not None: + if row_filter: + raise ValueError("Cannot pass both `partition_field` and `row_filter`") + delayed_kwargs["meta"] = meta.set_index(partition_field, drop=True) + + if partitions is None: + logging.info( + "Specified `partition_field` without `partitions`; reading full table." + ) + partitions = pd.read_gbq( + f"SELECT DISTINCT {partition_field} FROM {dataset_id}.{table_id}", + project_id=project_id, + )[partition_field].tolist() + # TODO generalize to ranges (as opposed to discrete values) + + partitions = sorted(partitions) + delayed_kwargs["divisions"] = (*partitions, partitions[-1]) + row_filters = [ + f'{partition_field} = "{partition_value}"' + for partition_value in partitions + ] + delayed_dfs = [ + _read_rows_arrow( + make_create_read_session_request=partial( + make_create_read_session_request, row_filter=row_filter + ), + partition_field=partition_field, + project_id=project_id, + timeout=read_timeout, + ) + for row_filter in row_filters + ] + else: + delayed_kwargs["meta"] = meta + delayed_dfs = [ + _read_rows_arrow( + make_create_read_session_request=make_create_read_session_request, + project_id=project_id, + stream_name=stream.name, + timeout=read_timeout, + ) + for stream in session.streams + ] + + return dd.from_delayed(dfs=delayed_dfs, **delayed_kwargs) From a9342590ea012976f274b22ff790f3fae12a1320 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Thu, 5 Aug 2021 16:45:50 -0400 Subject: [PATCH 03/46] add setup and req --- requirements.txt | 7 +++++++ setup.cfg | 4 ++++ setup.py | 21 +++++++++++++++++++++ 3 files changed, 32 insertions(+) create mode 100644 requirements.txt create mode 100644 setup.cfg create mode 100644 setup.py diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..30f84f1 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +dask +distributed +google-cloud-bigquery +google-cloud-bigquery-storage +pandas +pandas-gbq +pyarrow diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..4921ff4 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,4 @@ +[flake8] +exclude = __init__.py +max-line-length = 120 +ignore = F811 \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..48525a0 --- /dev/null +++ b/setup.py @@ -0,0 +1,21 @@ +#!/usr/bin/env python + +from setuptools import setup + +with open("README.md", "r", encoding="utf-8") as f: + long_description = f.read() + +setup( + name="dask-bigquery", + version="0.0.1", + description="Dask + BigQuery intergration", + license="BSD", + packages=["dask_bigquery"], + long_description=long_description, + long_description_content_type="text/markdown", + python_requires=">=3.7", + install_requires=open("requirements.txt").read().strip().split("\n"), + extras_require={"test": ["pytest"]}, + include_package_data=True, + zip_safe=False, +) From 04bdd8041e78517b3669cb5704b226496ca09759 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Fri, 6 Aug 2021 11:33:53 -0400 Subject: [PATCH 04/46] modifications suggested by bnaul --- dask_bigquery/core.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index a0344e6..7bfbcd5 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -1,7 +1,6 @@ from __future__ import annotations import logging -import warnings from collections.abc import Iterable from contextlib import contextmanager from functools import partial @@ -14,11 +13,7 @@ @contextmanager -def bigquery_client(project_id="dask-bigquery", with_storage_api=False): - # Ignore google auth credentials warning - warnings.filterwarnings( - "ignore", "Your application has authenticated using end user credentials" - ) +def bigquery_client(project_id=None, with_storage_api=False): bq_storage_client = None bq_client = bigquery.Client(project_id) @@ -34,6 +29,10 @@ def bigquery_client(project_id="dask-bigquery", with_storage_api=False): bq_client.close() +def full_id(table): + return f"{table.project}.{table.dataset_id}.{table.table_id}" + + def _stream_to_dfs(bqs_client, stream_name, schema, timeout): """Given a Storage API client and a stream name, yield all dataframes.""" return [ From ab16a32f226239fe84dc58b6044149ff3fc8e4c0 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Fri, 6 Aug 2021 11:44:01 -0400 Subject: [PATCH 05/46] raise error when table type is VIEW --- dask_bigquery/core.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index 7bfbcd5..1ff42b0 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -29,10 +29,6 @@ def bigquery_client(project_id=None, with_storage_api=False): bq_client.close() -def full_id(table): - return f"{table.project}.{table.dataset_id}.{table.table_id}" - - def _stream_to_dfs(bqs_client, stream_name, schema, timeout): """Given a Storage API client and a stream name, yield all dataframes.""" return [ @@ -154,12 +150,7 @@ def read_gbq( ), dask.annotate(priority=1): table_ref = bq_client.get_table(".".join((dataset_id, table_id))) if table_ref.table_type == "VIEW": - # Materialize the view since the operations below don't work on views. - logging.warning( - "Materializing view in order to read into dask. This may be expensive." - ) - query = f"SELECT * FROM `{full_id(table_ref)}`" - table_ref, _, _ = execute_query(query) + raise TypeError("Table type VIEW not supported") # The protobuf types can't be pickled (may be able to tweak w/ copyreg), so instead use a # generator func. From 455f749f55c987b5c50c02671d67536bf519a798 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Fri, 6 Aug 2021 11:50:14 -0400 Subject: [PATCH 06/46] add linting github actions --- .github/workflows/pre-commit.yml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 .github/workflows/pre-commit.yml diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml new file mode 100644 index 0000000..229c576 --- /dev/null +++ b/.github/workflows/pre-commit.yml @@ -0,0 +1,16 @@ +name: Linting + +on: + push: + branches: main + pull_request: + branches: main + +jobs: + checks: + name: "pre-commit hooks" + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-python@v2 + - uses: pre-commit/action@v2.0.0 \ No newline at end of file From c417d5f288064a446443e6ca557f09dc8f9e25b5 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Fri, 6 Aug 2021 11:54:59 -0400 Subject: [PATCH 07/46] add comment on context manager related to possible upstram solution --- dask_bigquery/core.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index 1ff42b0..61d01e8 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -14,6 +14,11 @@ @contextmanager def bigquery_client(project_id=None, with_storage_api=False): + """This context manager is a temporary solution until there is an + upstream solution to handle this. + See googleapis/google-cloud-python#9457 + and googleapis/gapic-generator-python#575 for reference. + """ bq_storage_client = None bq_client = bigquery.Client(project_id) From 4839bbb81f2b68997ad7acb70951e80269f3ab23 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Wed, 11 Aug 2021 10:54:07 -0400 Subject: [PATCH 08/46] avoid scanning table when creating partitions --- dask_bigquery/core.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index 61d01e8..18ada9a 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -193,10 +193,11 @@ def make_create_read_session_request(row_filter=""): logging.info( "Specified `partition_field` without `partitions`; reading full table." ) - partitions = pd.read_gbq( - f"SELECT DISTINCT {partition_field} FROM {dataset_id}.{table_id}", - project_id=project_id, - )[partition_field].tolist() + partitions = [ + p + for p in bq_client.list_partitions(f"{dataset_id}.{table_id}") + if p != "__NULL__" + ] # TODO generalize to ranges (as opposed to discrete values) partitions = sorted(partitions) From 774e79bffb0f87363dda3f00b2f0b36d5aafa673 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Tue, 17 Aug 2021 14:48:07 -0400 Subject: [PATCH 09/46] add first read_gbq test --- dask_bigquery/tests/test_core.py | 66 ++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 dask_bigquery/tests/test_core.py diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py new file mode 100644 index 0000000..cf0aeaf --- /dev/null +++ b/dask_bigquery/tests/test_core.py @@ -0,0 +1,66 @@ +import random + +import pandas as pd +from distributed.utils_test import cluster_fixture # noqa: F401 +from distributed.utils_test import client, loop # noqa: F401 +from google.cloud import bigquery + +from dask_bigquery import read_gbq + +# These tests are run locally and assume the user is already athenticated. +# It also assumes that the user has created a project called dask-bigquery. + + +def gen_data(size=10): + records = [ + { + "name": random.choice(["fred", "wilma", "barney", "betty"]), + "number": random.randint(0, 100), + "idx": i, + } + for i in range(size) + ] + return pd.DataFrame(records) + + +def push_data(): + "Push data to BigQuery using pandas gbq" + df = gen_data() + + pd.DataFrame.to_gbq( + df, + destination_table="dataset_test.table_test", + project_id="dask-bigquery", + chunksize=5, + if_exists="append", + ) + + return df + + +def test_read_gbq(client): + """Test simple read of data pushed to BigQuery using pandas-gbq""" + try: + # delete data set if exists + bq_client = bigquery.Client() + bq_client.delete_dataset( + dataset="dask-bigquery.dataset_test", + delete_contents=True, + ) + bq_client.close() + except: # if data doesn't exisit continue is that Value Error? + pass + # create data + df = push_data() + + ddf = read_gbq( + project_id="dask-bigquery", dataset_id="dataset_test", table_id="table_test" + ) + + assert ddf.columns.tolist() == ["name", "number", "idx"] + assert len(ddf) == 10 + assert ddf.npartitions == 2 + + ddf_comp = ddf.set_index("idx").compute() + # breakpoint() + assert all(ddf_comp == df.set_index("idx")) From 7bdd66a788cf4d74141d7766c67a23fc182cc2b8 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Tue, 17 Aug 2021 15:57:30 -0400 Subject: [PATCH 10/46] add partitioning test --- dask_bigquery/tests/test_core.py | 36 +++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py index cf0aeaf..041dc3c 100644 --- a/dask_bigquery/tests/test_core.py +++ b/dask_bigquery/tests/test_core.py @@ -1,6 +1,7 @@ import random import pandas as pd +import pytest from distributed.utils_test import cluster_fixture # noqa: F401 from distributed.utils_test import client, loop # noqa: F401 from google.cloud import bigquery @@ -38,6 +39,7 @@ def push_data(): return df +# test simple read def test_read_gbq(client): """Test simple read of data pushed to BigQuery using pandas-gbq""" try: @@ -62,5 +64,37 @@ def test_read_gbq(client): assert ddf.npartitions == 2 ddf_comp = ddf.set_index("idx").compute() - # breakpoint() assert all(ddf_comp == df.set_index("idx")) + + +# test partitioned data: this test requires a copy of the public dataset +# bigquery-public-data.covid19_public_forecasts.county_14d into a the +# project dask-bigquery + + +@pytest.mark.parametrize( + "fields", + ([], ["county_name"], ["county_name", "county_fips_code"]), + ids=["no_fields", "missing_partition_field", "fields"], +) +def test_read_gbq_partitioning(fields, client): + partitions = ["Teton", "Loudoun"] + ddf = read_gbq( + project_id="dask-bigquery", + dataset_id="covid19_public_forecasts", + table_id="county_14d", + partition_field="county_name", + partitions=partitions, + fields=fields, + ) + + assert len(ddf) # check it's not empty + loaded = set(ddf.columns) | {ddf.index.name} + + if fields: + assert loaded == set(fields) | {"county_name"} + else: # all columns loaded + assert loaded >= set(["county_name", "county_fips_code"]) + + assert ddf.npartitions == len(partitions) + assert list(ddf.divisions) == sorted(ddf.divisions) From 31a1253d786ab792a176edf1e815fb1924d13a19 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Wed, 18 Aug 2021 11:58:05 -0400 Subject: [PATCH 11/46] use pytest fixtures --- dask_bigquery/tests/test_core.py | 50 ++++++++++++++++---------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py index 041dc3c..4ed9d2e 100644 --- a/dask_bigquery/tests/test_core.py +++ b/dask_bigquery/tests/test_core.py @@ -2,6 +2,7 @@ import pandas as pd import pytest +from dask.dataframe.utils import assert_eq from distributed.utils_test import cluster_fixture # noqa: F401 from distributed.utils_test import client, loop # noqa: F401 from google.cloud import bigquery @@ -12,36 +13,24 @@ # It also assumes that the user has created a project called dask-bigquery. -def gen_data(size=10): +@pytest.fixture +def df(): records = [ { "name": random.choice(["fred", "wilma", "barney", "betty"]), "number": random.randint(0, 100), "idx": i, } - for i in range(size) + for i in range(10) ] - return pd.DataFrame(records) + yield pd.DataFrame(records) -def push_data(): - "Push data to BigQuery using pandas gbq" - df = gen_data() - - pd.DataFrame.to_gbq( - df, - destination_table="dataset_test.table_test", - project_id="dask-bigquery", - chunksize=5, - if_exists="append", - ) - - return df +@pytest.fixture +def dataset(df): + "Push some data to BigQuery using pandas gbq" -# test simple read -def test_read_gbq(client): - """Test simple read of data pushed to BigQuery using pandas-gbq""" try: # delete data set if exists bq_client = bigquery.Client() @@ -52,19 +41,30 @@ def test_read_gbq(client): bq_client.close() except: # if data doesn't exisit continue is that Value Error? pass - # create data - df = push_data() - ddf = read_gbq( - project_id="dask-bigquery", dataset_id="dataset_test", table_id="table_test" + # push data to gbq + pd.DataFrame.to_gbq( + df, + destination_table="dataset_test.table_test", + project_id="dask-bigquery", + chunksize=5, + if_exists="append", ) + yield "dask-bigquery.dataset_test.table_test" + + +# test simple read +def test_read_gbq(df, dataset, client): + """Test simple read of data pushed to BigQuery using pandas-gbq""" + project_id, dataset_id, table_id = dataset.split(".") + + ddf = read_gbq(project_id=project_id, dataset_id=dataset_id, table_id=table_id) assert ddf.columns.tolist() == ["name", "number", "idx"] assert len(ddf) == 10 assert ddf.npartitions == 2 - ddf_comp = ddf.set_index("idx").compute() - assert all(ddf_comp == df.set_index("idx")) + assert assert_eq(ddf.set_index("idx").compute(), df.set_index("idx")) # test partitioned data: this test requires a copy of the public dataset From db4edb4f0cc65fc7c765f2c99d9cb81fd40ae67f Mon Sep 17 00:00:00 2001 From: ncclementi Date: Wed, 18 Aug 2021 13:24:13 -0400 Subject: [PATCH 12/46] use context manager on test --- dask_bigquery/tests/test_core.py | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py index 4ed9d2e..d4b2541 100644 --- a/dask_bigquery/tests/test_core.py +++ b/dask_bigquery/tests/test_core.py @@ -31,16 +31,14 @@ def df(): def dataset(df): "Push some data to BigQuery using pandas gbq" - try: - # delete data set if exists - bq_client = bigquery.Client() - bq_client.delete_dataset( - dataset="dask-bigquery.dataset_test", - delete_contents=True, - ) - bq_client.close() - except: # if data doesn't exisit continue is that Value Error? - pass + with bigquery.Client() as bq_client: + try: + bq_client.delete_dataset( + dataset="dask-bigquery.dataset_test", + delete_contents=True, + ) + except: # noqa: + pass # push data to gbq pd.DataFrame.to_gbq( From be1efbd4dade5af1d410f78dce1085a3445f408e Mon Sep 17 00:00:00 2001 From: ncclementi Date: Wed, 18 Aug 2021 13:25:17 -0400 Subject: [PATCH 13/46] ignore bare except for now --- dask_bigquery/tests/test_core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py index d4b2541..de32f7d 100644 --- a/dask_bigquery/tests/test_core.py +++ b/dask_bigquery/tests/test_core.py @@ -37,7 +37,7 @@ def dataset(df): dataset="dask-bigquery.dataset_test", delete_contents=True, ) - except: # noqa: + except: # noqa: E722 pass # push data to gbq From 35cbdc62d58a9c9b71bc2a8252dfff85a7b30404 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Wed, 18 Aug 2021 15:53:43 -0400 Subject: [PATCH 14/46] remove prefix from delayed kwargs --- dask_bigquery/core.py | 2 +- dask_bigquery/tests/test_core.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index 18ada9a..9a3b6a8 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -182,7 +182,7 @@ def make_create_read_session_request(row_filter=""): pyarrow.py_buffer(session.arrow_schema.serialized_schema) ) meta = schema.empty_table().to_pandas() - delayed_kwargs = dict(prefix=f"{dataset_id}.{table_id}-") + delayed_kwargs = {} if partition_field is not None: if row_filter: diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py index de32f7d..7b9c561 100644 --- a/dask_bigquery/tests/test_core.py +++ b/dask_bigquery/tests/test_core.py @@ -62,7 +62,7 @@ def test_read_gbq(df, dataset, client): assert len(ddf) == 10 assert ddf.npartitions == 2 - assert assert_eq(ddf.set_index("idx").compute(), df.set_index("idx")) + assert assert_eq(ddf.set_index("idx"), df.set_index("idx")) # test partitioned data: this test requires a copy of the public dataset From 40de1ea899e36943e8bf0be00835a755be1bdd9e Mon Sep 17 00:00:00 2001 From: ncclementi Date: Wed, 18 Aug 2021 17:55:23 -0400 Subject: [PATCH 15/46] make dataset name random, remove annotate --- dask_bigquery/core.py | 2 +- dask_bigquery/tests/test_core.py | 25 ++++++++++++------------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index 9a3b6a8..2077411 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -152,7 +152,7 @@ def read_gbq( with bigquery_client(project_id, with_storage_api=True) as ( bq_client, bqs_client, - ), dask.annotate(priority=1): + ): table_ref = bq_client.get_table(".".join((dataset_id, table_id))) if table_ref.table_type == "VIEW": raise TypeError("Table type VIEW not supported") diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py index 7b9c561..cfce96a 100644 --- a/dask_bigquery/tests/test_core.py +++ b/dask_bigquery/tests/test_core.py @@ -1,4 +1,5 @@ import random +import uuid import pandas as pd import pytest @@ -30,25 +31,23 @@ def df(): @pytest.fixture def dataset(df): "Push some data to BigQuery using pandas gbq" - - with bigquery.Client() as bq_client: - try: - bq_client.delete_dataset( - dataset="dask-bigquery.dataset_test", - delete_contents=True, - ) - except: # noqa: E722 - pass - + dataset_id = uuid.uuid4().hex + project_id = "dask-bigquery" # push data to gbq pd.DataFrame.to_gbq( df, - destination_table="dataset_test.table_test", - project_id="dask-bigquery", + destination_table=f"{dataset_id}.table_test", + project_id=project_id, chunksize=5, if_exists="append", ) - yield "dask-bigquery.dataset_test.table_test" + yield f"{project_id}.{dataset_id}.table_test" + + with bigquery.Client() as bq_client: + bq_client.delete_dataset( + dataset=f"{project_id}.{dataset_id}", + delete_contents=True, + ) # test simple read From 45e0004a679eb2563dc64a2338fa95e854234c42 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Wed, 18 Aug 2021 18:08:34 -0400 Subject: [PATCH 16/46] better name for delayed _read_rows_arrow --- dask_bigquery/core.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index 2077411..244ed02 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -46,7 +46,7 @@ def _stream_to_dfs(bqs_client, stream_name, schema, timeout): @dask.delayed -def _read_rows_arrow( +def bigquery_arrow_read( *, make_create_read_session_request: callable, partition_field: str = None, @@ -207,7 +207,7 @@ def make_create_read_session_request(row_filter=""): for partition_value in partitions ] delayed_dfs = [ - _read_rows_arrow( + bigquery_arrow_read( make_create_read_session_request=partial( make_create_read_session_request, row_filter=row_filter ), @@ -220,7 +220,7 @@ def make_create_read_session_request(row_filter=""): else: delayed_kwargs["meta"] = meta delayed_dfs = [ - _read_rows_arrow( + bigquery_arrow_read( make_create_read_session_request=make_create_read_session_request, project_id=project_id, stream_name=stream.name, From de93e88e8ff911cfd5cb0328cbd0df49afec77cd Mon Sep 17 00:00:00 2001 From: ncclementi Date: Thu, 19 Aug 2021 12:55:39 -0400 Subject: [PATCH 17/46] implementation of HLG - wip --- dask_bigquery/core.py | 37 +++++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index 244ed02..09ed804 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -5,10 +5,13 @@ from contextlib import contextmanager from functools import partial -import dask import dask.dataframe as dd import pandas as pd import pyarrow +from dask.base import tokenize +from dask.dataframe.core import new_dd_object +from dask.highlevelgraph import HighLevelGraph +from dask.layers import DataFrameIOLayer from google.cloud import bigquery, bigquery_storage @@ -45,7 +48,6 @@ def _stream_to_dfs(bqs_client, stream_name, schema, timeout): ] -@dask.delayed def bigquery_arrow_read( *, make_create_read_session_request: callable, @@ -217,16 +219,31 @@ def make_create_read_session_request(row_filter=""): ) for row_filter in row_filters ] + return dd.from_delayed(dfs=delayed_dfs, **delayed_kwargs) else: - delayed_kwargs["meta"] = meta - delayed_dfs = [ + label = "read-gbq-" + output_name = label + tokenize( + project_id, + dataset_id, + table_id, + partition_field, + partitions, + row_filter, + fields, + read_timeout, + ) + # Create Blockwise layer + layer = DataFrameIOLayer( + output_name, + meta.columns, + [stream.name for stream in session.streams], bigquery_arrow_read( make_create_read_session_request=make_create_read_session_request, project_id=project_id, - stream_name=stream.name, timeout=read_timeout, - ) - for stream in session.streams - ] - - return dd.from_delayed(dfs=delayed_dfs, **delayed_kwargs) + ), + label=label, + ) + divisions = tuple([None] * (len(session.streams) + 1)) + graph = HighLevelGraph({output_name: layer}, {output_name: set()}) + return new_dd_object(graph, output_name, meta, divisions) From 3070ae30d4b56ccd8afa35ed04ef168072bd9a5d Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 20 Aug 2021 14:16:05 -0500 Subject: [PATCH 18/46] Slight refactor --- dask_bigquery/core.py | 145 +++++++++++++++++++++++------------------- 1 file changed, 78 insertions(+), 67 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index 09ed804..fb435f8 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -5,7 +5,6 @@ from contextlib import contextmanager from functools import partial -import dask.dataframe as dd import pandas as pd import pyarrow from dask.base import tokenize @@ -48,13 +47,12 @@ def _stream_to_dfs(bqs_client, stream_name, schema, timeout): ] -def bigquery_arrow_read( - *, +def bigquery_read_partition_field( make_create_read_session_request: callable, - partition_field: str = None, project_id: str, - stream_name: str = None, timeout: int, + partition_field: str, + row_filter: str, ) -> pd.DataFrame: """Read a single batch of rows via BQ Storage API, in Arrow binary format. Args: @@ -64,6 +62,41 @@ def bigquery_arrow_read( partition_field: BigQuery field for partitions, to be used as Dask index col for divisions NOTE: Please set if specifying `row_restriction` filters in TableReadOptions. + Adapted from + https://github.com/googleapis/python-bigquery-storage/blob/a0fc0af5b4447ce8b50c365d4d081b9443b8490e/google/cloud/bigquery_storage_v1/reader.py. + """ + with bigquery_client(project_id, with_storage_api=True) as (bq_client, bqs_client): + session = bqs_client.create_read_session( + make_create_read_session_request(row_filter=row_filter) + ) + schema = pyarrow.ipc.read_schema( + pyarrow.py_buffer(session.arrow_schema.serialized_schema) + ) + + shards = [ + df + for stream in session.streams + for df in _stream_to_dfs(bqs_client, stream.name, schema, timeout=timeout) + ] + # NOTE: if no rows satisfying the row_restriction, then `shards` will be empty list + if len(shards) == 0: + shards = [schema.empty_table().to_pandas()] + shards = [shard.set_index(partition_field, drop=True) for shard in shards] + + return pd.concat(shards) + + +def bigquery_read( + make_create_read_session_request: callable, + project_id: str, + timeout: int, + stream_name: str, +) -> pd.DataFrame: + """Read a single batch of rows via BQ Storage API, in Arrow binary format. + Args: + project_id: BigQuery project + create_read_session_request: kwargs to pass to `bqs_client.create_read_session` + as `request` stream_name: BigQuery Storage API Stream "name". NOTE: Please set if reading from Storage API without any `row_restriction`. https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream @@ -76,35 +109,10 @@ def bigquery_arrow_read( schema = pyarrow.ipc.read_schema( pyarrow.py_buffer(session.arrow_schema.serialized_schema) ) - - if (partition_field is not None) and (stream_name is not None): - raise ValueError( - "The kwargs `partition_field` and `stream_name` are mutually exclusive." - ) - - elif partition_field is not None: - shards = [ - df - for stream in session.streams - for df in _stream_to_dfs( - bqs_client, stream.name, schema, timeout=timeout - ) - ] - # NOTE: if no rows satisfying the row_restriction, then `shards` will be empty list - if len(shards) == 0: - shards = [schema.empty_table().to_pandas()] - shards = [shard.set_index(partition_field, drop=True) for shard in shards] - - elif stream_name is not None: - shards = _stream_to_dfs(bqs_client, stream_name, schema, timeout=timeout) - # NOTE: BQ Storage API can return empty streams - if len(shards) == 0: - shards = [schema.empty_table().to_pandas()] - - else: - raise NotImplementedError( - "Please specify either `partition_field` or `stream_name`." - ) + shards = _stream_to_dfs(bqs_client, stream_name, schema, timeout=timeout) + # NOTE: BQ Storage API can return empty streams + if len(shards) == 0: + shards = [schema.empty_table().to_pandas()] return pd.concat(shards) @@ -184,12 +192,24 @@ def make_create_read_session_request(row_filter=""): pyarrow.py_buffer(session.arrow_schema.serialized_schema) ) meta = schema.empty_table().to_pandas() - delayed_kwargs = {} + + label = "read-gbq-" + output_name = label + tokenize( + project_id, + dataset_id, + table_id, + partition_field, + partitions, + row_filter, + fields, + read_timeout, + ) if partition_field is not None: if row_filter: raise ValueError("Cannot pass both `partition_field` and `row_filter`") - delayed_kwargs["meta"] = meta.set_index(partition_field, drop=True) + + meta = meta.set_index(partition_field, drop=True) if partitions is None: logging.info( @@ -203,47 +223,38 @@ def make_create_read_session_request(row_filter=""): # TODO generalize to ranges (as opposed to discrete values) partitions = sorted(partitions) - delayed_kwargs["divisions"] = (*partitions, partitions[-1]) row_filters = [ f'{partition_field} = "{partition_value}"' for partition_value in partitions ] - delayed_dfs = [ - bigquery_arrow_read( - make_create_read_session_request=partial( - make_create_read_session_request, row_filter=row_filter - ), - partition_field=partition_field, - project_id=project_id, - timeout=read_timeout, - ) - for row_filter in row_filters - ] - return dd.from_delayed(dfs=delayed_dfs, **delayed_kwargs) - else: - label = "read-gbq-" - output_name = label + tokenize( - project_id, - dataset_id, - table_id, - partition_field, - partitions, - row_filter, - fields, - read_timeout, + layer = DataFrameIOLayer( + output_name, + meta.columns, + row_filters, + partial( + bigquery_read_partition_field, + make_create_read_session_request, + project_id, + read_timeout, + partition_field, + ), + label=label, ) - # Create Blockwise layer + divisions = (*partitions, partitions[-1]) + else: layer = DataFrameIOLayer( output_name, meta.columns, [stream.name for stream in session.streams], - bigquery_arrow_read( - make_create_read_session_request=make_create_read_session_request, - project_id=project_id, - timeout=read_timeout, + partial( + bigquery_read, + make_create_read_session_request, + project_id, + read_timeout, ), label=label, ) divisions = tuple([None] * (len(session.streams) + 1)) - graph = HighLevelGraph({output_name: layer}, {output_name: set()}) - return new_dd_object(graph, output_name, meta, divisions) + + graph = HighLevelGraph({output_name: layer}, {output_name: set()}) + return new_dd_object(graph, output_name, meta, divisions) From b43daf621a7b06a2684567146ca3b84a270520c6 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 20 Aug 2021 14:30:44 -0500 Subject: [PATCH 19/46] Minor test tweaks --- dask_bigquery/tests/test_core.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py index cfce96a..c2ca052 100644 --- a/dask_bigquery/tests/test_core.py +++ b/dask_bigquery/tests/test_core.py @@ -31,17 +31,18 @@ def df(): @pytest.fixture def dataset(df): "Push some data to BigQuery using pandas gbq" - dataset_id = uuid.uuid4().hex project_id = "dask-bigquery" + dataset_id = uuid.uuid4().hex + table_id = "table_test" # push data to gbq pd.DataFrame.to_gbq( df, - destination_table=f"{dataset_id}.table_test", + destination_table=f"{dataset_id}.{table_id}", project_id=project_id, chunksize=5, if_exists="append", ) - yield f"{project_id}.{dataset_id}.table_test" + yield (project_id, dataset_id, table_id) with bigquery.Client() as bq_client: bq_client.delete_dataset( @@ -53,14 +54,11 @@ def dataset(df): # test simple read def test_read_gbq(df, dataset, client): """Test simple read of data pushed to BigQuery using pandas-gbq""" - project_id, dataset_id, table_id = dataset.split(".") - + project_id, dataset_id, table_id = dataset ddf = read_gbq(project_id=project_id, dataset_id=dataset_id, table_id=table_id) - assert ddf.columns.tolist() == ["name", "number", "idx"] - assert len(ddf) == 10 + assert list(ddf.columns) == ["name", "number", "idx"] assert ddf.npartitions == 2 - assert assert_eq(ddf.set_index("idx"), df.set_index("idx")) From 50f3c6a488c6f3bb0a08ecfb0ad808610d255c3b Mon Sep 17 00:00:00 2001 From: Naty Clementi Date: Thu, 16 Sep 2021 15:25:35 -0400 Subject: [PATCH 20/46] Update requirements.txt Co-authored-by: James Bourbeau --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 30f84f1..14c5f49 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ dask distributed -google-cloud-bigquery +google-cloud-bigquery >= 2.11.0 google-cloud-bigquery-storage pandas pandas-gbq From f8a578c615446f76abea5118a4e902c516c3da8e Mon Sep 17 00:00:00 2001 From: ncclementi Date: Fri, 17 Sep 2021 12:53:15 -0400 Subject: [PATCH 21/46] use context manager for bq client --- dask_bigquery/core.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index fb435f8..f129406 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -23,17 +23,17 @@ def bigquery_client(project_id=None, with_storage_api=False): """ bq_storage_client = None - bq_client = bigquery.Client(project_id) - try: - if with_storage_api: - bq_storage_client = bigquery_storage.BigQueryReadClient( - credentials=bq_client._credentials - ) - yield bq_client, bq_storage_client - else: - yield bq_client - finally: - bq_client.close() + with bigquery.Client(project_id) as bq_client: + try: + if with_storage_api: + bq_storage_client = bigquery_storage.BigQueryReadClient( + credentials=bq_client._credentials + ) + yield bq_client, bq_storage_client + else: + yield bq_client + finally: + bq_storage_client.transport.grpc_channel.close() def _stream_to_dfs(bqs_client, stream_name, schema, timeout): From a91c73c1c8e1d66f1960e5de2ffa4e38881cfe6b Mon Sep 17 00:00:00 2001 From: ncclementi Date: Fri, 17 Sep 2021 12:56:49 -0400 Subject: [PATCH 22/46] remove with_storage_api since it is always true --- dask_bigquery/core.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index f129406..1db3d6d 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -15,7 +15,7 @@ @contextmanager -def bigquery_client(project_id=None, with_storage_api=False): +def bigquery_client(project_id=None): """This context manager is a temporary solution until there is an upstream solution to handle this. See googleapis/google-cloud-python#9457 @@ -25,13 +25,10 @@ def bigquery_client(project_id=None, with_storage_api=False): bq_storage_client = None with bigquery.Client(project_id) as bq_client: try: - if with_storage_api: - bq_storage_client = bigquery_storage.BigQueryReadClient( - credentials=bq_client._credentials - ) - yield bq_client, bq_storage_client - else: - yield bq_client + bq_storage_client = bigquery_storage.BigQueryReadClient( + credentials=bq_client._credentials + ) + yield bq_client, bq_storage_client finally: bq_storage_client.transport.grpc_channel.close() @@ -65,7 +62,7 @@ def bigquery_read_partition_field( Adapted from https://github.com/googleapis/python-bigquery-storage/blob/a0fc0af5b4447ce8b50c365d4d081b9443b8490e/google/cloud/bigquery_storage_v1/reader.py. """ - with bigquery_client(project_id, with_storage_api=True) as (bq_client, bqs_client): + with bigquery_client(project_id) as (bq_client, bqs_client): session = bqs_client.create_read_session( make_create_read_session_request(row_filter=row_filter) ) @@ -104,7 +101,7 @@ def bigquery_read( Adapted from https://github.com/googleapis/python-bigquery-storage/blob/a0fc0af5b4447ce8b50c365d4d081b9443b8490e/google/cloud/bigquery_storage_v1/reader.py. """ - with bigquery_client(project_id, with_storage_api=True) as (bq_client, bqs_client): + with bigquery_client(project_id) as (bq_client, bqs_client): session = bqs_client.create_read_session(make_create_read_session_request()) schema = pyarrow.ipc.read_schema( pyarrow.py_buffer(session.arrow_schema.serialized_schema) @@ -159,7 +156,7 @@ def read_gbq( # the scheduler), particularly when mixed with other tasks that execute C code. Anecdotally # annotating the tasks with a higher priority seems to help (but not fully solve) the issue at # the expense of higher cluster memory usage. - with bigquery_client(project_id, with_storage_api=True) as ( + with bigquery_client(project_id) as ( bq_client, bqs_client, ): From 548f2fba310ac8d3ec015c09112f7208772e2110 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Fri, 17 Sep 2021 13:05:59 -0400 Subject: [PATCH 23/46] remove partition fields option --- dask_bigquery/core.py | 123 ++++--------------------------- dask_bigquery/tests/test_core.py | 33 --------- 2 files changed, 13 insertions(+), 143 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index 1db3d6d..f5f1354 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -1,7 +1,5 @@ from __future__ import annotations -import logging -from collections.abc import Iterable from contextlib import contextmanager from functools import partial @@ -44,45 +42,6 @@ def _stream_to_dfs(bqs_client, stream_name, schema, timeout): ] -def bigquery_read_partition_field( - make_create_read_session_request: callable, - project_id: str, - timeout: int, - partition_field: str, - row_filter: str, -) -> pd.DataFrame: - """Read a single batch of rows via BQ Storage API, in Arrow binary format. - Args: - project_id: BigQuery project - create_read_session_request: kwargs to pass to `bqs_client.create_read_session` - as `request` - partition_field: BigQuery field for partitions, to be used as Dask index col for - divisions - NOTE: Please set if specifying `row_restriction` filters in TableReadOptions. - Adapted from - https://github.com/googleapis/python-bigquery-storage/blob/a0fc0af5b4447ce8b50c365d4d081b9443b8490e/google/cloud/bigquery_storage_v1/reader.py. - """ - with bigquery_client(project_id) as (bq_client, bqs_client): - session = bqs_client.create_read_session( - make_create_read_session_request(row_filter=row_filter) - ) - schema = pyarrow.ipc.read_schema( - pyarrow.py_buffer(session.arrow_schema.serialized_schema) - ) - - shards = [ - df - for stream in session.streams - for df in _stream_to_dfs(bqs_client, stream.name, schema, timeout=timeout) - ] - # NOTE: if no rows satisfying the row_restriction, then `shards` will be empty list - if len(shards) == 0: - shards = [schema.empty_table().to_pandas()] - shards = [shard.set_index(partition_field, drop=True) for shard in shards] - - return pd.concat(shards) - - def bigquery_read( make_create_read_session_request: callable, project_id: str, @@ -118,10 +77,7 @@ def read_gbq( project_id: str, dataset_id: str, table_id: str, - partition_field: str = None, - partitions: Iterable[str] = None, row_filter="", - fields: list[str] = (), read_timeout: int = 3600, ): """Read table as dask dataframe using BigQuery Storage API via Arrow format. @@ -143,19 +99,7 @@ def read_gbq( dask dataframe See https://github.com/dask/dask/issues/3121 for additional context. """ - if (partition_field is None) and (partitions is not None): - raise ValueError("Specified `partitions` without `partition_field`.") - - # If `partition_field` is not part of the `fields` filter, fetch it anyway to be able - # to set it as dask dataframe index. We want this to be able to have consistent: - # BQ partitioning + dask divisions + pandas index values - if (partition_field is not None) and fields and (partition_field not in fields): - fields = (partition_field, *fields) - # These read tasks seems to cause deadlocks (or at least long stuck workers out of touch with - # the scheduler), particularly when mixed with other tasks that execute C code. Anecdotally - # annotating the tasks with a higher priority seems to help (but not fully solve) the issue at - # the expense of higher cluster memory usage. with bigquery_client(project_id) as ( bq_client, bqs_client, @@ -174,7 +118,6 @@ def make_create_read_session_request(row_filter=""): data_format=bigquery_storage.types.DataFormat.ARROW, read_options=bigquery_storage.types.ReadSession.TableReadOptions( row_restriction=row_filter, - selected_fields=fields, ), table=table_ref.to_bqstorage(), ), @@ -195,63 +138,23 @@ def make_create_read_session_request(row_filter=""): project_id, dataset_id, table_id, - partition_field, - partitions, row_filter, - fields, read_timeout, ) - if partition_field is not None: - if row_filter: - raise ValueError("Cannot pass both `partition_field` and `row_filter`") - - meta = meta.set_index(partition_field, drop=True) - - if partitions is None: - logging.info( - "Specified `partition_field` without `partitions`; reading full table." - ) - partitions = [ - p - for p in bq_client.list_partitions(f"{dataset_id}.{table_id}") - if p != "__NULL__" - ] - # TODO generalize to ranges (as opposed to discrete values) - - partitions = sorted(partitions) - row_filters = [ - f'{partition_field} = "{partition_value}"' - for partition_value in partitions - ] - layer = DataFrameIOLayer( - output_name, - meta.columns, - row_filters, - partial( - bigquery_read_partition_field, - make_create_read_session_request, - project_id, - read_timeout, - partition_field, - ), - label=label, - ) - divisions = (*partitions, partitions[-1]) - else: - layer = DataFrameIOLayer( - output_name, - meta.columns, - [stream.name for stream in session.streams], - partial( - bigquery_read, - make_create_read_session_request, - project_id, - read_timeout, - ), - label=label, - ) - divisions = tuple([None] * (len(session.streams) + 1)) + layer = DataFrameIOLayer( + output_name, + meta.columns, + [stream.name for stream in session.streams], + partial( + bigquery_read, + make_create_read_session_request, + project_id, + read_timeout, + ), + label=label, + ) + divisions = tuple([None] * (len(session.streams) + 1)) graph = HighLevelGraph({output_name: layer}, {output_name: set()}) return new_dd_object(graph, output_name, meta, divisions) diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py index c2ca052..8a737f0 100644 --- a/dask_bigquery/tests/test_core.py +++ b/dask_bigquery/tests/test_core.py @@ -60,36 +60,3 @@ def test_read_gbq(df, dataset, client): assert list(ddf.columns) == ["name", "number", "idx"] assert ddf.npartitions == 2 assert assert_eq(ddf.set_index("idx"), df.set_index("idx")) - - -# test partitioned data: this test requires a copy of the public dataset -# bigquery-public-data.covid19_public_forecasts.county_14d into a the -# project dask-bigquery - - -@pytest.mark.parametrize( - "fields", - ([], ["county_name"], ["county_name", "county_fips_code"]), - ids=["no_fields", "missing_partition_field", "fields"], -) -def test_read_gbq_partitioning(fields, client): - partitions = ["Teton", "Loudoun"] - ddf = read_gbq( - project_id="dask-bigquery", - dataset_id="covid19_public_forecasts", - table_id="county_14d", - partition_field="county_name", - partitions=partitions, - fields=fields, - ) - - assert len(ddf) # check it's not empty - loaded = set(ddf.columns) | {ddf.index.name} - - if fields: - assert loaded == set(fields) | {"county_name"} - else: # all columns loaded - assert loaded >= set(["county_name", "county_fips_code"]) - - assert ddf.npartitions == len(partitions) - assert list(ddf.divisions) == sorted(ddf.divisions) From d3ffa79586870b2e9bf5802f42571facac4d0a49 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Fri, 17 Sep 2021 15:03:07 -0400 Subject: [PATCH 24/46] add test github actions setup --- .github/workflows/tests.yml | 53 +++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 .github/workflows/tests.yml diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..e34ff1a --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,53 @@ +name: Tests + +on: [push, pull_request] + +# When this workflow is queued, automatically cancel any previous running +# or pending jobs from the same branch +concurrency: + group: ${{ github.ref }} + cancel-in-progress: true + +jobs: + test: + runs-on: ${{ matrix.os }} + defaults: + run: + shell: bash -l {0} + strategy: + fail-fast: false + matrix: + os: ["windows-latest", "ubuntu-latest", "macos-latest"] + python-version: ["3.7", "3.8", "3.9"] + + steps: + - name: Checkout source + uses: actions/checkout@v2 + with: + fetch-depth: 0 # Needed by codecov.io + + - name: Setup Conda Environment + uses: conda-incubator/setup-miniconda@v2 + with: + miniforge-version: latest + channel-priority: strict + python-version: ${{ matrix.python-version }} + environment-file: ci/environment-${{ matrix.python-version }}.yaml + activate-environment: test-environment + auto-activate-base: false + + - name: Install dask-bigquery + run: python -m pip install --no-deps -e . + + - name: Set up Cloud SDK + uses: google-github-actions/setup-gcloud@master + with: + project_id: ${{ secrets.GCP_PROJECT_ID }} + service_account_key: ${{ secrets.GCP_SA_KEY }} + export_default_credentials: true + + - name: Use gcloud CLI + run: gcloud info + + - name: Run tests + run: pytest -v dask_bigquery \ No newline at end of file From 44096a13a8e577c3818d263cd43d0c0ec8f6e25b Mon Sep 17 00:00:00 2001 From: ncclementi Date: Fri, 17 Sep 2021 15:03:40 -0400 Subject: [PATCH 25/46] add ci environments --- ci/environment-3.7.yaml | 13 +++++++++++++ ci/environment-3.8.yaml | 13 +++++++++++++ ci/environment-3.9.yaml | 13 +++++++++++++ 3 files changed, 39 insertions(+) create mode 100644 ci/environment-3.7.yaml create mode 100644 ci/environment-3.8.yaml create mode 100644 ci/environment-3.9.yaml diff --git a/ci/environment-3.7.yaml b/ci/environment-3.7.yaml new file mode 100644 index 0000000..013ed5d --- /dev/null +++ b/ci/environment-3.7.yaml @@ -0,0 +1,13 @@ +name: test-environment +channels: + - conda-forge +dependencies: + - python=3.7 + - dask + - distributed + - pandas + - pyarrow + - grpcio + - pandas-gbq + - google-cloud-bigquery + - google-cloud-bigquery-storage \ No newline at end of file diff --git a/ci/environment-3.8.yaml b/ci/environment-3.8.yaml new file mode 100644 index 0000000..27241a5 --- /dev/null +++ b/ci/environment-3.8.yaml @@ -0,0 +1,13 @@ +name: test-environment +channels: + - conda-forge +dependencies: + - python=3.8 + - dask + - distributed + - pandas + - pyarrow + - grpcio + - pandas-gbq + - google-cloud-bigquery + - google-cloud-bigquery-storage \ No newline at end of file diff --git a/ci/environment-3.9.yaml b/ci/environment-3.9.yaml new file mode 100644 index 0000000..37b74d6 --- /dev/null +++ b/ci/environment-3.9.yaml @@ -0,0 +1,13 @@ +name: test-environment +channels: + - conda-forge +dependencies: + - python=3.9 + - dask + - distributed + - pandas + - pyarrow + - grpcio + - pandas-gbq + - google-cloud-bigquery + - google-cloud-bigquery-storage \ No newline at end of file From b19dca4bdc42a5730e51bffc8c1e6ee5530bd7eb Mon Sep 17 00:00:00 2001 From: ncclementi Date: Fri, 17 Sep 2021 15:23:50 -0400 Subject: [PATCH 26/46] trigger ci From 982a5f5ef13cba6b443af71fcfb8d58d0c2720f5 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Fri, 17 Sep 2021 15:45:28 -0400 Subject: [PATCH 27/46] trigger ci again From 4292ac3573aa554e5f899c84dead702675f90ca9 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Fri, 17 Sep 2021 16:00:04 -0400 Subject: [PATCH 28/46] add pytest to envs --- ci/environment-3.7.yaml | 1 + ci/environment-3.8.yaml | 1 + ci/environment-3.9.yaml | 1 + 3 files changed, 3 insertions(+) diff --git a/ci/environment-3.7.yaml b/ci/environment-3.7.yaml index 013ed5d..a8f48ce 100644 --- a/ci/environment-3.7.yaml +++ b/ci/environment-3.7.yaml @@ -7,6 +7,7 @@ dependencies: - distributed - pandas - pyarrow + - pytest - grpcio - pandas-gbq - google-cloud-bigquery diff --git a/ci/environment-3.8.yaml b/ci/environment-3.8.yaml index 27241a5..ba179fb 100644 --- a/ci/environment-3.8.yaml +++ b/ci/environment-3.8.yaml @@ -7,6 +7,7 @@ dependencies: - distributed - pandas - pyarrow + - pytest - grpcio - pandas-gbq - google-cloud-bigquery diff --git a/ci/environment-3.9.yaml b/ci/environment-3.9.yaml index 37b74d6..c9cec6f 100644 --- a/ci/environment-3.9.yaml +++ b/ci/environment-3.9.yaml @@ -7,6 +7,7 @@ dependencies: - distributed - pandas - pyarrow + - pytest - grpcio - pandas-gbq - google-cloud-bigquery From 14ba56ce12a276eb2ba8d9b848844c8d8a749e7c Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 20 Sep 2021 17:17:05 -0500 Subject: [PATCH 29/46] Only run CI on push events --- .github/workflows/tests.yml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index e34ff1a..5d67d1b 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -1,6 +1,6 @@ name: Tests -on: [push, pull_request] +on: push # When this workflow is queued, automatically cancel any previous running # or pending jobs from the same branch @@ -46,8 +46,5 @@ jobs: service_account_key: ${{ secrets.GCP_SA_KEY }} export_default_credentials: true - - name: Use gcloud CLI - run: gcloud info - - name: Run tests run: pytest -v dask_bigquery \ No newline at end of file From 32b668679e3d21620da2cbcc59d07c0e5bcfd547 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 20 Sep 2021 17:18:32 -0500 Subject: [PATCH 30/46] Minor cleanup --- dask_bigquery/core.py | 37 +++++++++++++++---------------------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index f5f1354..966a153 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -19,33 +19,29 @@ def bigquery_client(project_id=None): See googleapis/google-cloud-python#9457 and googleapis/gapic-generator-python#575 for reference. """ - - bq_storage_client = None with bigquery.Client(project_id) as bq_client: - try: - bq_storage_client = bigquery_storage.BigQueryReadClient( - credentials=bq_client._credentials - ) - yield bq_client, bq_storage_client - finally: - bq_storage_client.transport.grpc_channel.close() + bq_storage_client = bigquery_storage.BigQueryReadClient( + credentials=bq_client._credentials + ) + yield bq_client, bq_storage_client + bq_storage_client.transport.grpc_channel.close() -def _stream_to_dfs(bqs_client, stream_name, schema, timeout): +def _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs): """Given a Storage API client and a stream name, yield all dataframes.""" return [ pyarrow.ipc.read_record_batch( pyarrow.py_buffer(message.arrow_record_batch.serialized_record_batch), schema, ).to_pandas() - for message in bqs_client.read_rows(name=stream_name, offset=0, timeout=timeout) + for message in bqs_client.read_rows(name=stream_name, offset=0, **read_kwargs) ] def bigquery_read( make_create_read_session_request: callable, project_id: str, - timeout: int, + read_kwargs: int, stream_name: str, ) -> pd.DataFrame: """Read a single batch of rows via BQ Storage API, in Arrow binary format. @@ -65,7 +61,7 @@ def bigquery_read( schema = pyarrow.ipc.read_schema( pyarrow.py_buffer(session.arrow_schema.serialized_schema) ) - shards = _stream_to_dfs(bqs_client, stream_name, schema, timeout=timeout) + shards = _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs) # NOTE: BQ Storage API can return empty streams if len(shards) == 0: shards = [schema.empty_table().to_pandas()] @@ -78,7 +74,7 @@ def read_gbq( dataset_id: str, table_id: str, row_filter="", - read_timeout: int = 3600, + read_kwargs=None, ): """Read table as dask dataframe using BigQuery Storage API via Arrow format. If `partition_field` and `partitions` are specified, then the resulting dask dataframe @@ -99,12 +95,9 @@ def read_gbq( dask dataframe See https://github.com/dask/dask/issues/3121 for additional context. """ - - with bigquery_client(project_id) as ( - bq_client, - bqs_client, - ): - table_ref = bq_client.get_table(".".join((dataset_id, table_id))) + read_kwargs = read_kwargs or {} + with bigquery_client(project_id) as (bq_client, bqs_client): + table_ref = bq_client.get_table(f"{dataset_id}.{table_id}") if table_ref.table_type == "VIEW": raise TypeError("Table type VIEW not supported") @@ -139,7 +132,7 @@ def make_create_read_session_request(row_filter=""): dataset_id, table_id, row_filter, - read_timeout, + read_kwargs, ) layer = DataFrameIOLayer( @@ -150,7 +143,7 @@ def make_create_read_session_request(row_filter=""): bigquery_read, make_create_read_session_request, project_id, - read_timeout, + read_kwargs, ), label=label, ) From 97b5d21a595a113ae53963b0c2b3b79a50da3233 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 20 Sep 2021 18:01:29 -0500 Subject: [PATCH 31/46] Use mamba --- .github/workflows/tests.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 5d67d1b..788db16 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -29,7 +29,9 @@ jobs: - name: Setup Conda Environment uses: conda-incubator/setup-miniconda@v2 with: + miniforge-variant: Mambaforge miniforge-version: latest + use-mamba: true channel-priority: strict python-version: ${{ matrix.python-version }} environment-file: ci/environment-${{ matrix.python-version }}.yaml From e03e731b127ab8467fa9df2d6128180260c57967 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Mon, 20 Sep 2021 22:57:53 -0400 Subject: [PATCH 32/46] update docstrings --- dask_bigquery/core.py | 62 ++++++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index 966a153..a9dfdaf 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -41,20 +41,23 @@ def _stream_to_dfs(bqs_client, stream_name, schema, read_kwargs): def bigquery_read( make_create_read_session_request: callable, project_id: str, - read_kwargs: int, + read_kwargs: dict, stream_name: str, ) -> pd.DataFrame: """Read a single batch of rows via BQ Storage API, in Arrow binary format. - Args: - project_id: BigQuery project - create_read_session_request: kwargs to pass to `bqs_client.create_read_session` - as `request` - stream_name: BigQuery Storage API Stream "name". - NOTE: Please set if reading from Storage API without any `row_restriction`. + + Parameters + ---------- + create_read_session_request: callable + kwargs to pass to `bqs_client.create_read_session` as `request` + project_id: str + Name of the BigQuery project. + read_kwargs: dict + kwargs to pass to read_rows() + stream_name: str + BigQuery Storage API Stream "name" + NOTE: Please set if reading from Storage API without any `row_restriction`. https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream - NOTE: `partition_field` and `stream_name` kwargs are mutually exclusive. - Adapted from - https://github.com/googleapis/python-bigquery-storage/blob/a0fc0af5b4447ce8b50c365d4d081b9443b8490e/google/cloud/bigquery_storage_v1/reader.py. """ with bigquery_client(project_id) as (bq_client, bqs_client): session = bqs_client.create_read_session(make_create_read_session_request()) @@ -74,26 +77,25 @@ def read_gbq( dataset_id: str, table_id: str, row_filter="", - read_kwargs=None, + read_kwargs: dict = None, ): """Read table as dask dataframe using BigQuery Storage API via Arrow format. - If `partition_field` and `partitions` are specified, then the resulting dask dataframe - will be partitioned along the same boundaries. Otherwise, partitions will be approximately - balanced according to BigQuery stream allocation logic. - If `partition_field` is specified but not included in `fields` (either implicitly by requesting - all fields, or explicitly by inclusion in the list `fields`), then it will still be included - in the query in order to have it available for dask dataframe indexing. - Args: - project_id: BigQuery project - dataset_id: BigQuery dataset within project - table_id: BigQuery table within dataset - partition_field: to specify filters of form "WHERE {partition_field} = ..." - partitions: all values to select of `partition_field` - fields: names of the fields (columns) to select (default None to "SELECT *") - read_timeout: # of seconds an individual read request has before timing out - Returns: - dask dataframe - See https://github.com/dask/dask/issues/3121 for additional context. + Partitions will be approximately balanced according to BigQuery stream allocation logic. + + Parameters + ---------- + project_id: str + Name of the BigQuery project. + dataset_id: str + BigQuery dataset within project + table_id: str + BigQuery table within dataset + read_kwargs: dict + kwargs to pass to read_rows() + + Returns + ------- + Dask DataFrame """ read_kwargs = read_kwargs or {} with bigquery_client(project_id) as (bq_client, bqs_client): @@ -101,8 +103,8 @@ def read_gbq( if table_ref.table_type == "VIEW": raise TypeError("Table type VIEW not supported") - # The protobuf types can't be pickled (may be able to tweak w/ copyreg), so instead use a - # generator func. + # The protobuf types can't be pickled (may be able to tweak w/ copyreg), + # so instead use a generator func. def make_create_read_session_request(row_filter=""): return bigquery_storage.types.CreateReadSessionRequest( max_stream_count=100, # 0 -> use as many streams as BQ Storage will provide From d73b686660159340cc4da5e4009ae7b6e19b0819 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Mon, 20 Sep 2021 23:03:26 -0400 Subject: [PATCH 33/46] missing docstring --- dask_bigquery/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index a9dfdaf..32b862a 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -90,6 +90,8 @@ def read_gbq( BigQuery dataset within project table_id: str BigQuery table within dataset + row_filter: str + SQL text filtering statement to pass to `row_restriction` read_kwargs: dict kwargs to pass to read_rows() @@ -103,8 +105,6 @@ def read_gbq( if table_ref.table_type == "VIEW": raise TypeError("Table type VIEW not supported") - # The protobuf types can't be pickled (may be able to tweak w/ copyreg), - # so instead use a generator func. def make_create_read_session_request(row_filter=""): return bigquery_storage.types.CreateReadSessionRequest( max_stream_count=100, # 0 -> use as many streams as BQ Storage will provide From 3f8e397770bbb0316d328d5a3b801b97a0fef317 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Tue, 21 Sep 2021 11:14:56 -0400 Subject: [PATCH 34/46] trigger ci - testing workflow From 64fe0ec7f109da813fdb81240399fd6880e08a01 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Tue, 21 Sep 2021 13:19:30 -0400 Subject: [PATCH 35/46] use env variable for project id --- .github/workflows/tests.yml | 4 +++- dask_bigquery/tests/test_core.py | 6 ++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 788db16..fd7704d 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -47,6 +47,8 @@ jobs: project_id: ${{ secrets.GCP_PROJECT_ID }} service_account_key: ${{ secrets.GCP_SA_KEY }} export_default_credentials: true - + - name: Run tests + env: + DASK_BIGQUERY_PROJECT_ID: "${{ secrets.GCP_PROJECT_ID }}" run: pytest -v dask_bigquery \ No newline at end of file diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py index 8a737f0..df9b005 100644 --- a/dask_bigquery/tests/test_core.py +++ b/dask_bigquery/tests/test_core.py @@ -1,3 +1,4 @@ +import os import random import uuid @@ -10,9 +11,6 @@ from dask_bigquery import read_gbq -# These tests are run locally and assume the user is already athenticated. -# It also assumes that the user has created a project called dask-bigquery. - @pytest.fixture def df(): @@ -31,7 +29,7 @@ def df(): @pytest.fixture def dataset(df): "Push some data to BigQuery using pandas gbq" - project_id = "dask-bigquery" + project_id = os.environ.get("DASK_BIGQUERY_PROJECT_ID", "dask-bigquery") dataset_id = uuid.uuid4().hex table_id = "table_test" # push data to gbq From 6f94825d0bf1f56c351038478967bd739ec38d22 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Tue, 21 Sep 2021 16:31:58 -0400 Subject: [PATCH 36/46] add test for read with row_filter --- dask_bigquery/tests/test_core.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py index df9b005..e9aae59 100644 --- a/dask_bigquery/tests/test_core.py +++ b/dask_bigquery/tests/test_core.py @@ -58,3 +58,18 @@ def test_read_gbq(df, dataset, client): assert list(ddf.columns) == ["name", "number", "idx"] assert ddf.npartitions == 2 assert assert_eq(ddf.set_index("idx"), df.set_index("idx")) + + +def test_read_row_filter(df, dataset, client): + "Test read data with a row restriction providing `row_filter`" + project_id, dataset_id, table_id = dataset + ddf = read_gbq( + project_id=project_id, + dataset_id=dataset_id, + table_id=table_id, + row_filter="idx < 5", + ) + + assert list(ddf.columns) == ["name", "number", "idx"] + assert ddf.npartitions == 2 + assert assert_eq(ddf.set_index("idx").loc[:4], df.set_index("idx").loc[:4]) From 1a51981d9eee6004e4d580ecb6217ba965c89f64 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Tue, 21 Sep 2021 16:37:19 -0400 Subject: [PATCH 37/46] add test for read with kwargs --- dask_bigquery/tests/test_core.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py index e9aae59..f75ff73 100644 --- a/dask_bigquery/tests/test_core.py +++ b/dask_bigquery/tests/test_core.py @@ -73,3 +73,17 @@ def test_read_row_filter(df, dataset, client): assert list(ddf.columns) == ["name", "number", "idx"] assert ddf.npartitions == 2 assert assert_eq(ddf.set_index("idx").loc[:4], df.set_index("idx").loc[:4]) + + +def test_read_kwargs(df, dataset, client): + "Test read data with a `read_kwargs`" + project_id, dataset_id, table_id = dataset + ddf = read_gbq( + project_id=project_id, + dataset_id=dataset_id, + table_id=table_id, + read_kwargs={"timeout": 1e-12}, + ) + + with pytest.raises(Exception, match="504 Deadline Exceeded"): + ddf.compute() From acb404e1a8e533dda2519ffc689d314b48edd921 Mon Sep 17 00:00:00 2001 From: Naty Clementi Date: Tue, 21 Sep 2021 19:30:08 -0400 Subject: [PATCH 38/46] Update dask_bigquery/tests/test_core.py Co-authored-by: James Bourbeau --- dask_bigquery/tests/test_core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py index f75ff73..919e945 100644 --- a/dask_bigquery/tests/test_core.py +++ b/dask_bigquery/tests/test_core.py @@ -49,7 +49,6 @@ def dataset(df): ) -# test simple read def test_read_gbq(df, dataset, client): """Test simple read of data pushed to BigQuery using pandas-gbq""" project_id, dataset_id, table_id = dataset From d78c2a96d81f89ec16547e9cc3b7ebf08c2ea7b1 Mon Sep 17 00:00:00 2001 From: Naty Clementi Date: Tue, 21 Sep 2021 19:31:16 -0400 Subject: [PATCH 39/46] Update dask_bigquery/tests/test_core.py Co-authored-by: James Bourbeau --- dask_bigquery/tests/test_core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py index 919e945..741ec9a 100644 --- a/dask_bigquery/tests/test_core.py +++ b/dask_bigquery/tests/test_core.py @@ -50,7 +50,6 @@ def dataset(df): def test_read_gbq(df, dataset, client): - """Test simple read of data pushed to BigQuery using pandas-gbq""" project_id, dataset_id, table_id = dataset ddf = read_gbq(project_id=project_id, dataset_id=dataset_id, table_id=table_id) From 2b46c4f1a211239587be8197dd315ee930966595 Mon Sep 17 00:00:00 2001 From: Naty Clementi Date: Tue, 21 Sep 2021 19:31:26 -0400 Subject: [PATCH 40/46] Update dask_bigquery/tests/test_core.py Co-authored-by: James Bourbeau --- dask_bigquery/tests/test_core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py index 741ec9a..b73807e 100644 --- a/dask_bigquery/tests/test_core.py +++ b/dask_bigquery/tests/test_core.py @@ -59,7 +59,6 @@ def test_read_gbq(df, dataset, client): def test_read_row_filter(df, dataset, client): - "Test read data with a row restriction providing `row_filter`" project_id, dataset_id, table_id = dataset ddf = read_gbq( project_id=project_id, From 5ac1358dd5bc4530f134ed2ac021a1d0190f1c52 Mon Sep 17 00:00:00 2001 From: Naty Clementi Date: Tue, 21 Sep 2021 19:31:48 -0400 Subject: [PATCH 41/46] Update dask_bigquery/tests/test_core.py Co-authored-by: James Bourbeau --- dask_bigquery/tests/test_core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py index b73807e..6d84cf6 100644 --- a/dask_bigquery/tests/test_core.py +++ b/dask_bigquery/tests/test_core.py @@ -73,7 +73,6 @@ def test_read_row_filter(df, dataset, client): def test_read_kwargs(df, dataset, client): - "Test read data with a `read_kwargs`" project_id, dataset_id, table_id = dataset ddf = read_gbq( project_id=project_id, From 216a4e71d4f5995ceaa09f6b14b4a2d83f512c48 Mon Sep 17 00:00:00 2001 From: Naty Clementi Date: Tue, 21 Sep 2021 19:31:58 -0400 Subject: [PATCH 42/46] Update dask_bigquery/tests/test_core.py Co-authored-by: James Bourbeau --- dask_bigquery/tests/test_core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py index 6d84cf6..5302935 100644 --- a/dask_bigquery/tests/test_core.py +++ b/dask_bigquery/tests/test_core.py @@ -72,7 +72,7 @@ def test_read_row_filter(df, dataset, client): assert assert_eq(ddf.set_index("idx").loc[:4], df.set_index("idx").loc[:4]) -def test_read_kwargs(df, dataset, client): +def test_read_kwargs(dataset, client): project_id, dataset_id, table_id = dataset ddf = read_gbq( project_id=project_id, From 46e492316669495d4c2775ae24a2e9af0cdd9f9e Mon Sep 17 00:00:00 2001 From: Naty Clementi Date: Tue, 21 Sep 2021 19:32:11 -0400 Subject: [PATCH 43/46] Update dask_bigquery/tests/test_core.py Co-authored-by: James Bourbeau --- dask_bigquery/tests/test_core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/dask_bigquery/tests/test_core.py b/dask_bigquery/tests/test_core.py index 5302935..6da48c0 100644 --- a/dask_bigquery/tests/test_core.py +++ b/dask_bigquery/tests/test_core.py @@ -28,7 +28,6 @@ def df(): @pytest.fixture def dataset(df): - "Push some data to BigQuery using pandas gbq" project_id = os.environ.get("DASK_BIGQUERY_PROJECT_ID", "dask-bigquery") dataset_id = uuid.uuid4().hex table_id = "table_test" From 3204bc2225dabe1ef014f37ec00f987bedb56db5 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Wed, 22 Sep 2021 11:48:02 -0400 Subject: [PATCH 44/46] tweak on docstrings --- dask_bigquery/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index 32b862a..3f04b01 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -85,7 +85,7 @@ def read_gbq( Parameters ---------- project_id: str - Name of the BigQuery project. + Name of the BigQuery project id. dataset_id: str BigQuery dataset within project table_id: str From f17cfb8d2ce3977fe6d4d6cdc9a246677d9572c6 Mon Sep 17 00:00:00 2001 From: ncclementi Date: Wed, 22 Sep 2021 11:48:51 -0400 Subject: [PATCH 45/46] add readme content --- README.md | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 020194d..e9075b1 100644 --- a/README.md +++ b/README.md @@ -1 +1,30 @@ -# dask-bigquery \ No newline at end of file +# Dask-BigQuery + +[![Tests](https://github.com/coiled/dask-bigquery/actions/workflows/tests.yml/badge.svg)](https://github.com/coiled/dask-bigquery/actions/workflows/tests.yml) [![Linting](https://github.com/coiled/dask-bigquery/actions/workflows/pre-commit.yml/badge.svg)](https://github.com/coiled/dask-bigquery/actions/workflows/pre-commit.yml) + +Read data from Google BigQuery with Dask + +**Note:** This project was based on the contributions from @bnaul, @JacobHayes and @mikss. The intial inspiration can be found in a [dask_bigquery gist](https://gist.github.com/bnaul/4819f045ccbee160b60a530b6cfc0c98#file-dask_bigquery-py) + +## Installation + + +## Example + +`dask-bigquery` assumes that you are already authenticated. + +```python +import dask_bigquery + +ddf = dask_bigquery.read_gbq( + project_id="your_project_id", + dataset_id="your_dataset", + table_id="your_table", + ) + +ddf.head() +``` + +## License + +[BSD-3](LICENSE) \ No newline at end of file From d1398c2c0658044907335af67df9debb9cbc1ab1 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 22 Sep 2021 22:27:11 -0500 Subject: [PATCH 46/46] Minor updates --- README.md | 19 ++++++++++++------- dask_bigquery/core.py | 8 ++++---- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index e9075b1..c0db199 100644 --- a/README.md +++ b/README.md @@ -4,11 +4,8 @@ Read data from Google BigQuery with Dask -**Note:** This project was based on the contributions from @bnaul, @JacobHayes and @mikss. The intial inspiration can be found in a [dask_bigquery gist](https://gist.github.com/bnaul/4819f045ccbee160b60a530b6cfc0c98#file-dask_bigquery-py) - ## Installation - ## Example `dask-bigquery` assumes that you are already authenticated. @@ -17,14 +14,22 @@ Read data from Google BigQuery with Dask import dask_bigquery ddf = dask_bigquery.read_gbq( - project_id="your_project_id", - dataset_id="your_dataset", - table_id="your_table", - ) + project_id="your_project_id", + dataset_id="your_dataset", + table_id="your_table", +) ddf.head() ``` +## History + +This project stems from the discussion in +[this Dask issue](https://github.com/dask/dask/issues/3121) and +[this initial implementation](https://gist.github.com/bnaul/4819f045ccbee160b60a530b6cfc0c98#file-dask_bigquery-py) +developed by [Brett Naul](https://github.com/bnaul), [Jacob Hayes](https://github.com/JacobHayes), +and [Steven Soojin Kim](https://github.com/mikss). + ## License [BSD-3](LICENSE) \ No newline at end of file diff --git a/dask_bigquery/core.py b/dask_bigquery/core.py index 3f04b01..1858180 100644 --- a/dask_bigquery/core.py +++ b/dask_bigquery/core.py @@ -13,10 +13,10 @@ @contextmanager -def bigquery_client(project_id=None): +def bigquery_clients(project_id): """This context manager is a temporary solution until there is an upstream solution to handle this. - See googleapis/google-cloud-python#9457 + See googleapis/google-cloud-python#9457 and googleapis/gapic-generator-python#575 for reference. """ with bigquery.Client(project_id) as bq_client: @@ -59,7 +59,7 @@ def bigquery_read( NOTE: Please set if reading from Storage API without any `row_restriction`. https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta1#stream """ - with bigquery_client(project_id) as (bq_client, bqs_client): + with bigquery_clients(project_id) as (_, bqs_client): session = bqs_client.create_read_session(make_create_read_session_request()) schema = pyarrow.ipc.read_schema( pyarrow.py_buffer(session.arrow_schema.serialized_schema) @@ -100,7 +100,7 @@ def read_gbq( Dask DataFrame """ read_kwargs = read_kwargs or {} - with bigquery_client(project_id) as (bq_client, bqs_client): + with bigquery_clients(project_id) as (bq_client, bqs_client): table_ref = bq_client.get_table(f"{dataset_id}.{table_id}") if table_ref.table_type == "VIEW": raise TypeError("Table type VIEW not supported")