Skip to content

Commit

Permalink
Add pandas.GBQQueryDataSet (#1039)
Browse files Browse the repository at this point in the history
  • Loading branch information
simonpicard authored Nov 23, 2021
1 parent 7998c3e commit ce58b4d
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 4 deletions.
4 changes: 3 additions & 1 deletion RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
| --------------------------- | ---------------------------------------------------- | --------------------------------- |
| `plotly.JSONDataSet` | Works with plotly graph object Figures (saves as json file) | `kedro.extras.datasets.plotly` |
| `pandas.GenericDataSet` | Provides a 'best effort' facility to read / write any format provided by the `pandas` library | `kedro.extras.datasets.pandas` |
| `pandas.GBQQueryDataSet` | Loads data from a Google Bigquery table using provided SQL query | `kedro.extras.datasets.pandas` |

## Bug fixes and other changes
* Fixed an issue where `kedro new --config config.yml` was ignoring the config file when `prompts.yml` didn't exist.
Expand Down Expand Up @@ -48,7 +49,8 @@
[Simon Brugman](https://github.com/sbrugman),
[Kiyo Kunii](https://github.com/921kiyo),
[Benjamin Levy](https://github.com/BenjaminLevyQB),
[Louis de Charsonville](https://github.com/louisdecharson)
[Louis de Charsonville](https://github.com/louisdecharson),
[Simon Picard](https://github.com/simonpicard)

# Release 0.17.5

Expand Down
1 change: 1 addition & 0 deletions docs/source/15_api_docs/kedro.extras.datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ kedro.extras.datasets
kedro.extras.datasets.pandas.ExcelDataSet
kedro.extras.datasets.pandas.AppendableExcelDataSet
kedro.extras.datasets.pandas.FeatherDataSet
kedro.extras.datasets.pandas.GBQQueryDataSet
kedro.extras.datasets.pandas.GBQTableDataSet
kedro.extras.datasets.pandas.GenericDataSet
kedro.extras.datasets.pandas.HDFDataSet
Expand Down
3 changes: 2 additions & 1 deletion kedro/extras/datasets/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"ExcelDataSet",
"FeatherDataSet",
"GBQTableDataSet",
"GBQQueryDataSet",
"ExcelDataSet",
"AppendableExcelDataSet",
"HDFDataSet",
Expand All @@ -26,7 +27,7 @@
with suppress(ImportError):
from .feather_dataset import FeatherDataSet
with suppress(ImportError):
from .gbq_dataset import GBQTableDataSet
from .gbq_dataset import GBQQueryDataSet, GBQTableDataSet
with suppress(ImportError):
from .hdf_dataset import HDFDataSet
with suppress(ImportError):
Expand Down
151 changes: 150 additions & 1 deletion kedro/extras/datasets/pandas/gbq_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,22 @@
"""

import copy
from pathlib import PurePosixPath
from typing import Any, Dict, Union

import fsspec
import pandas as pd
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
from google.oauth2.credentials import Credentials

from kedro.io.core import AbstractDataSet, DataSetError, validate_on_forbidden_chars
from kedro.io.core import (
AbstractDataSet,
DataSetError,
get_filepath_str,
get_protocol_and_path,
validate_on_forbidden_chars,
)


class GBQTableDataSet(AbstractDataSet):
Expand Down Expand Up @@ -161,3 +169,144 @@ def _validate_location(self):
"to be the same for save and load args. "
"Details: https://cloud.google.com/bigquery/docs/locations"
)


class GBQQueryDataSet(AbstractDataSet):
"""``GBQQueryDataSet`` loads data from a provided SQL query from Google
BigQuery. It uses ``pandas.read_gbq`` which itself uses ``pandas-gbq``
internally to read from BigQuery table. Therefore it supports all allowed
pandas options on ``read_gbq``.
Example adding a catalog entry with
`YAML API <https://kedro.readthedocs.io/en/stable/05_data/\
01_data_catalog.html#using-the-data-catalog-with-the-yaml-api>`_:
.. code-block:: yaml
>>> vehicles:
>>> type: pandas.GBQQueryDataSet
>>> sql: "select shuttle, shuttle_id from spaceflights.shuttles;"
>>> project: my-project
>>> credentials: gbq-creds
>>> load_args:
>>> reauth: True
Example using Python API:
::
>>> from kedro.extras.datasets.pandas import GBQQueryDataSet
>>>
>>> sql = "SELECT * FROM dataset_1.table_a"
>>>
>>> data_set = GBQQueryDataSet(sql, project='my-project')
>>>
>>> sql_data = data_set.load()
>>>
"""

DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any]

# pylint: disable=too-many-arguments
def __init__(
self,
sql: str = None,
project: str = None,
credentials: Union[Dict[str, Any], Credentials] = None,
load_args: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
filepath: str = None,
) -> None:
"""Creates a new instance of ``GBQQueryDataSet``.
Args:
sql: The sql query statement.
project: Google BigQuery Account project ID.
Optional when available from the environment.
https://cloud.google.com/resource-manager/docs/creating-managing-projects
credentials: Credentials for accessing Google APIs.
Either ``google.auth.credentials.Credentials`` object or dictionary with
parameters required to instantiate ``google.oauth2.credentials.Credentials``.
Here you can find all the arguments:
https://google-auth.readthedocs.io/en/latest/reference/google.oauth2.credentials.html
load_args: Pandas options for loading BigQuery table into DataFrame.
Here you can find all available arguments:
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_gbq.html
All defaults are preserved.
fs_args: Extra arguments to pass into underlying filesystem class constructor
(e.g. `{"project": "my-project"}` for ``GCSFileSystem``) used for reading the
SQL query from filepath.
filepath: A path to a file with a sql query statement.
Raises:
DataSetError: When ``sql`` and ``filepath`` parameters are either both empty
or both provided, as well as when the `save()` method is invoked.
"""
if sql and filepath:
raise DataSetError(
"`sql` and `filepath` arguments cannot both be provided."
"Please only provide one."
)

if not (sql or filepath):
raise DataSetError(
"`sql` and `filepath` arguments cannot both be empty."
"Please provide a sql query or path to a sql query file."
)

# Handle default load arguments
self._load_args = copy.deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)

self._project_id = project

if isinstance(credentials, dict):
credentials = Credentials(**credentials)

self._credentials = credentials
self._client = bigquery.Client(
project=self._project_id,
credentials=self._credentials,
location=self._load_args.get("location"),
)

# load sql query from arg or from file
if sql:
self._load_args["query"] = sql
self._filepath = None
else:
# filesystem for loading sql file
_fs_args = copy.deepcopy(fs_args) or {}
_fs_credentials = _fs_args.pop("credentials", {})
protocol, path = get_protocol_and_path(str(filepath))

self._protocol = protocol
self._fs = fsspec.filesystem(self._protocol, **_fs_credentials, **_fs_args)
self._filepath = path

def _describe(self) -> Dict[str, Any]:
load_args = copy.deepcopy(self._load_args)
desc = {}
desc["sql"] = str(load_args.pop("query", None))
desc["filepath"] = str(self._filepath)
desc["load_args"] = str(load_args)

return desc

def _load(self) -> pd.DataFrame:
load_args = copy.deepcopy(self._load_args)

if self._filepath:
load_path = get_filepath_str(PurePosixPath(self._filepath), self._protocol)
with self._fs.open(load_path, mode="r") as fs_file:
load_args["query"] = fs_file.read()

return pd.read_gbq(
project_id=self._project_id,
credentials=self._credentials,
**load_args,
)

def _save(self, data: pd.DataFrame) -> None:
raise DataSetError("`save` is not supported on GBQQueryDataSet")
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def _collect_requirements(requires):
"pandas.AppendableExcelDataSet": [PANDAS, "openpyxl>=3.0.3, <4.0"],
"pandas.FeatherDataSet": [PANDAS],
"pandas.GBQTableDataSet": [PANDAS, "pandas-gbq>=0.12.0, <1.0"],
"pandas.GBQQueryDataSet": [PANDAS, "pandas-gbq>=0.12.0, <1.0"],
"pandas.HDFDataSet": [PANDAS, "tables~=3.6"],
"pandas.JSONDataSet": [PANDAS],
"pandas.ParquetDataSet": [PANDAS, "pyarrow>=1.0, <7.0"],
Expand Down
1 change: 1 addition & 0 deletions static/jsonschema/kedro-catalog-0.17.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"pandas.CSVDataSet",
"pandas.ExcelDataSet",
"pandas.GBQTableDataSet",
"pandas.GBQQueryDataSet",
"pandas.GenericDataSet"
]
}
Expand Down
139 changes: 138 additions & 1 deletion tests/extras/datasets/pandas/test_gbq_dataset.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from pathlib import PosixPath

import pandas as pd
import pytest
from google.cloud.exceptions import NotFound
from pandas.testing import assert_frame_equal

from kedro.extras.datasets.pandas import GBQTableDataSet
from kedro.extras.datasets.pandas import GBQQueryDataSet, GBQTableDataSet
from kedro.io.core import DataSetError

DATASET = "dataset"
TABLE_NAME = "table_name"
PROJECT = "project"
SQL_QUERY = "SELECT * FROM table_a"


@pytest.fixture
Expand Down Expand Up @@ -36,6 +39,35 @@ def gbq_dataset(
)


@pytest.fixture(params=[{}])
def gbq_sql_dataset(load_args, mock_bigquery_client): # pylint: disable=unused-argument
return GBQQueryDataSet(
sql=SQL_QUERY,
project=PROJECT,
credentials=None,
load_args=load_args,
)


@pytest.fixture
def sql_file(tmp_path: PosixPath):
file = tmp_path / "test.sql"
file.write_text(SQL_QUERY)
return file.as_posix()


@pytest.fixture(params=[{}])
def gbq_sql_file_dataset(
load_args, sql_file, mock_bigquery_client
): # pylint: disable=unused-argument
return GBQQueryDataSet(
filepath=sql_file,
project=PROJECT,
credentials=None,
load_args=load_args,
)


class TestGBQDataSet:
def test_exists(self, mock_bigquery_client):
"""Test `exists` method invocation."""
Expand Down Expand Up @@ -178,3 +210,108 @@ def test_credentials_propagation(self, mocker):
mocked_bigquery.Client.assert_called_once_with(
project=PROJECT, credentials=credentials_obj, location=None
)


class TestGBQQueryDataSet:
def test_empty_query_error(self):
"""Check the error when instantiating with empty query or file"""
pattern = (
r"`sql` and `filepath` arguments cannot both be empty\."
r"Please provide a sql query or path to a sql query file\."
)
with pytest.raises(DataSetError, match=pattern):
GBQQueryDataSet(sql="", filepath="", credentials=None)

@pytest.mark.parametrize(
"load_args", [{"k1": "v1", "index": "value"}], indirect=True
)
def test_load_extra_params(self, gbq_sql_dataset, load_args):
"""Test overriding the default load arguments."""
for key, value in load_args.items():
assert gbq_sql_dataset._load_args[key] == value

def test_credentials_propagation(self, mocker):
credentials = {"token": "my_token"}
credentials_obj = "credentials"
mocked_credentials = mocker.patch(
"kedro.extras.datasets.pandas.gbq_dataset.Credentials",
return_value=credentials_obj,
)
mocked_bigquery = mocker.patch(
"kedro.extras.datasets.pandas.gbq_dataset.bigquery"
)

data_set = GBQQueryDataSet(
sql=SQL_QUERY,
credentials=credentials,
project=PROJECT,
)

assert data_set._credentials == credentials_obj
mocked_credentials.assert_called_once_with(**credentials)
mocked_bigquery.Client.assert_called_once_with(
project=PROJECT, credentials=credentials_obj, location=None
)

def test_load(self, mocker, gbq_sql_dataset, dummy_dataframe):
"""Test `load` method invocation"""
mocked_read_gbq = mocker.patch(
"kedro.extras.datasets.pandas.gbq_dataset.pd.read_gbq"
)
mocked_read_gbq.return_value = dummy_dataframe

loaded_data = gbq_sql_dataset.load()

mocked_read_gbq.assert_called_once_with(
project_id=PROJECT, credentials=None, query=SQL_QUERY
)

assert_frame_equal(dummy_dataframe, loaded_data)

def test_load_query_file(self, mocker, gbq_sql_file_dataset, dummy_dataframe):
"""Test `load` method invocation using a file as input query"""
mocked_read_gbq = mocker.patch(
"kedro.extras.datasets.pandas.gbq_dataset.pd.read_gbq"
)
mocked_read_gbq.return_value = dummy_dataframe

loaded_data = gbq_sql_file_dataset.load()

mocked_read_gbq.assert_called_once_with(
project_id=PROJECT, credentials=None, query=SQL_QUERY
)

assert_frame_equal(dummy_dataframe, loaded_data)

def test_save_error(self, gbq_sql_dataset, dummy_dataframe):
"""Check the error when trying to save to the data set"""
pattern = r"`save` is not supported on GBQQueryDataSet"
with pytest.raises(DataSetError, match=pattern):
gbq_sql_dataset.save(dummy_dataframe)

def test_str_representation_sql(self, gbq_sql_dataset, sql_file):
"""Test the data set instance string representation"""
str_repr = str(gbq_sql_dataset)
assert (
f"GBQQueryDataSet(filepath=None, load_args={{}}, sql={SQL_QUERY})"
in str_repr
)
assert sql_file not in str_repr

def test_str_representation_filepath(self, gbq_sql_file_dataset, sql_file):
"""Test the data set instance string representation with filepath arg."""
str_repr = str(gbq_sql_file_dataset)
assert (
f"GBQQueryDataSet(filepath={str(sql_file)}, load_args={{}}, sql=None)"
in str_repr
)
assert SQL_QUERY not in str_repr

def test_sql_and_filepath_args(self, sql_file):
"""Test that an error is raised when both `sql` and `filepath` args are given."""
pattern = (
r"`sql` and `filepath` arguments cannot both be provided."
r"Please only provide one."
)
with pytest.raises(DataSetError, match=pattern):
GBQQueryDataSet(sql=SQL_QUERY, filepath=sql_file)

0 comments on commit ce58b4d

Please sign in to comment.