diff --git a/RELEASE.md b/RELEASE.md index 740ba99643..7a6fb7ac98 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -11,6 +11,7 @@ | --------------------------- | ---------------------------------------------------- | --------------------------------- | | `plotly.JSONDataSet` | Works with plotly graph object Figures (saves as json file) | `kedro.extras.datasets.plotly` | | `pandas.GenericDataSet` | Provides a 'best effort' facility to read / write any format provided by the `pandas` library | `kedro.extras.datasets.pandas` | +| `pandas.GBQQueryDataSet` | Loads data from a Google Bigquery table using provided SQL query | `kedro.extras.datasets.pandas` | ## Bug fixes and other changes * Fixed an issue where `kedro new --config config.yml` was ignoring the config file when `prompts.yml` didn't exist. @@ -48,7 +49,8 @@ [Simon Brugman](https://github.com/sbrugman), [Kiyo Kunii](https://github.com/921kiyo), [Benjamin Levy](https://github.com/BenjaminLevyQB), -[Louis de Charsonville](https://github.com/louisdecharson) +[Louis de Charsonville](https://github.com/louisdecharson), +[Simon Picard](https://github.com/simonpicard) # Release 0.17.5 diff --git a/docs/source/15_api_docs/kedro.extras.datasets.rst b/docs/source/15_api_docs/kedro.extras.datasets.rst index 1c4b89334a..6150ca44eb 100644 --- a/docs/source/15_api_docs/kedro.extras.datasets.rst +++ b/docs/source/15_api_docs/kedro.extras.datasets.rst @@ -24,6 +24,7 @@ kedro.extras.datasets kedro.extras.datasets.pandas.ExcelDataSet kedro.extras.datasets.pandas.AppendableExcelDataSet kedro.extras.datasets.pandas.FeatherDataSet + kedro.extras.datasets.pandas.GBQQueryDataSet kedro.extras.datasets.pandas.GBQTableDataSet kedro.extras.datasets.pandas.GenericDataSet kedro.extras.datasets.pandas.HDFDataSet diff --git a/kedro/extras/datasets/pandas/__init__.py b/kedro/extras/datasets/pandas/__init__.py index 6c04154140..8cdceaf1ac 100644 --- a/kedro/extras/datasets/pandas/__init__.py +++ b/kedro/extras/datasets/pandas/__init__.py @@ -5,6 +5,7 @@ "ExcelDataSet", "FeatherDataSet", "GBQTableDataSet", + "GBQQueryDataSet", "ExcelDataSet", "AppendableExcelDataSet", "HDFDataSet", @@ -26,7 +27,7 @@ with suppress(ImportError): from .feather_dataset import FeatherDataSet with suppress(ImportError): - from .gbq_dataset import GBQTableDataSet + from .gbq_dataset import GBQQueryDataSet, GBQTableDataSet with suppress(ImportError): from .hdf_dataset import HDFDataSet with suppress(ImportError): diff --git a/kedro/extras/datasets/pandas/gbq_dataset.py b/kedro/extras/datasets/pandas/gbq_dataset.py index d1786a9b3d..8882fef61c 100644 --- a/kedro/extras/datasets/pandas/gbq_dataset.py +++ b/kedro/extras/datasets/pandas/gbq_dataset.py @@ -3,14 +3,22 @@ """ import copy +from pathlib import PurePosixPath from typing import Any, Dict, Union +import fsspec import pandas as pd from google.cloud import bigquery from google.cloud.exceptions import NotFound from google.oauth2.credentials import Credentials -from kedro.io.core import AbstractDataSet, DataSetError, validate_on_forbidden_chars +from kedro.io.core import ( + AbstractDataSet, + DataSetError, + get_filepath_str, + get_protocol_and_path, + validate_on_forbidden_chars, +) class GBQTableDataSet(AbstractDataSet): @@ -161,3 +169,144 @@ def _validate_location(self): "to be the same for save and load args. " "Details: https://cloud.google.com/bigquery/docs/locations" ) + + +class GBQQueryDataSet(AbstractDataSet): + """``GBQQueryDataSet`` loads data from a provided SQL query from Google + BigQuery. It uses ``pandas.read_gbq`` which itself uses ``pandas-gbq`` + internally to read from BigQuery table. Therefore it supports all allowed + pandas options on ``read_gbq``. + + Example adding a catalog entry with + `YAML API `_: + + .. code-block:: yaml + + >>> vehicles: + >>> type: pandas.GBQQueryDataSet + >>> sql: "select shuttle, shuttle_id from spaceflights.shuttles;" + >>> project: my-project + >>> credentials: gbq-creds + >>> load_args: + >>> reauth: True + + + Example using Python API: + :: + + >>> from kedro.extras.datasets.pandas import GBQQueryDataSet + >>> + >>> sql = "SELECT * FROM dataset_1.table_a" + >>> + >>> data_set = GBQQueryDataSet(sql, project='my-project') + >>> + >>> sql_data = data_set.load() + >>> + """ + + DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any] + + # pylint: disable=too-many-arguments + def __init__( + self, + sql: str = None, + project: str = None, + credentials: Union[Dict[str, Any], Credentials] = None, + load_args: Dict[str, Any] = None, + fs_args: Dict[str, Any] = None, + filepath: str = None, + ) -> None: + """Creates a new instance of ``GBQQueryDataSet``. + + Args: + sql: The sql query statement. + project: Google BigQuery Account project ID. + Optional when available from the environment. + https://cloud.google.com/resource-manager/docs/creating-managing-projects + credentials: Credentials for accessing Google APIs. + Either ``google.auth.credentials.Credentials`` object or dictionary with + parameters required to instantiate ``google.oauth2.credentials.Credentials``. + Here you can find all the arguments: + https://google-auth.readthedocs.io/en/latest/reference/google.oauth2.credentials.html + load_args: Pandas options for loading BigQuery table into DataFrame. + Here you can find all available arguments: + https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_gbq.html + All defaults are preserved. + fs_args: Extra arguments to pass into underlying filesystem class constructor + (e.g. `{"project": "my-project"}` for ``GCSFileSystem``) used for reading the + SQL query from filepath. + filepath: A path to a file with a sql query statement. + + Raises: + DataSetError: When ``sql`` and ``filepath`` parameters are either both empty + or both provided, as well as when the `save()` method is invoked. + """ + if sql and filepath: + raise DataSetError( + "`sql` and `filepath` arguments cannot both be provided." + "Please only provide one." + ) + + if not (sql or filepath): + raise DataSetError( + "`sql` and `filepath` arguments cannot both be empty." + "Please provide a sql query or path to a sql query file." + ) + + # Handle default load arguments + self._load_args = copy.deepcopy(self.DEFAULT_LOAD_ARGS) + if load_args is not None: + self._load_args.update(load_args) + + self._project_id = project + + if isinstance(credentials, dict): + credentials = Credentials(**credentials) + + self._credentials = credentials + self._client = bigquery.Client( + project=self._project_id, + credentials=self._credentials, + location=self._load_args.get("location"), + ) + + # load sql query from arg or from file + if sql: + self._load_args["query"] = sql + self._filepath = None + else: + # filesystem for loading sql file + _fs_args = copy.deepcopy(fs_args) or {} + _fs_credentials = _fs_args.pop("credentials", {}) + protocol, path = get_protocol_and_path(str(filepath)) + + self._protocol = protocol + self._fs = fsspec.filesystem(self._protocol, **_fs_credentials, **_fs_args) + self._filepath = path + + def _describe(self) -> Dict[str, Any]: + load_args = copy.deepcopy(self._load_args) + desc = {} + desc["sql"] = str(load_args.pop("query", None)) + desc["filepath"] = str(self._filepath) + desc["load_args"] = str(load_args) + + return desc + + def _load(self) -> pd.DataFrame: + load_args = copy.deepcopy(self._load_args) + + if self._filepath: + load_path = get_filepath_str(PurePosixPath(self._filepath), self._protocol) + with self._fs.open(load_path, mode="r") as fs_file: + load_args["query"] = fs_file.read() + + return pd.read_gbq( + project_id=self._project_id, + credentials=self._credentials, + **load_args, + ) + + def _save(self, data: pd.DataFrame) -> None: + raise DataSetError("`save` is not supported on GBQQueryDataSet") diff --git a/setup.py b/setup.py index cbf6c8201e..262df43429 100644 --- a/setup.py +++ b/setup.py @@ -76,6 +76,7 @@ def _collect_requirements(requires): "pandas.AppendableExcelDataSet": [PANDAS, "openpyxl>=3.0.3, <4.0"], "pandas.FeatherDataSet": [PANDAS], "pandas.GBQTableDataSet": [PANDAS, "pandas-gbq>=0.12.0, <1.0"], + "pandas.GBQQueryDataSet": [PANDAS, "pandas-gbq>=0.12.0, <1.0"], "pandas.HDFDataSet": [PANDAS, "tables~=3.6"], "pandas.JSONDataSet": [PANDAS], "pandas.ParquetDataSet": [PANDAS, "pyarrow>=1.0, <7.0"], diff --git a/static/jsonschema/kedro-catalog-0.17.json b/static/jsonschema/kedro-catalog-0.17.json index 4bf3f8aa8f..cbed415121 100644 --- a/static/jsonschema/kedro-catalog-0.17.json +++ b/static/jsonschema/kedro-catalog-0.17.json @@ -38,6 +38,7 @@ "pandas.CSVDataSet", "pandas.ExcelDataSet", "pandas.GBQTableDataSet", + "pandas.GBQQueryDataSet", "pandas.GenericDataSet" ] } diff --git a/tests/extras/datasets/pandas/test_gbq_dataset.py b/tests/extras/datasets/pandas/test_gbq_dataset.py index e7d49da9fc..47743085f9 100644 --- a/tests/extras/datasets/pandas/test_gbq_dataset.py +++ b/tests/extras/datasets/pandas/test_gbq_dataset.py @@ -1,14 +1,17 @@ +from pathlib import PosixPath + import pandas as pd import pytest from google.cloud.exceptions import NotFound from pandas.testing import assert_frame_equal -from kedro.extras.datasets.pandas import GBQTableDataSet +from kedro.extras.datasets.pandas import GBQQueryDataSet, GBQTableDataSet from kedro.io.core import DataSetError DATASET = "dataset" TABLE_NAME = "table_name" PROJECT = "project" +SQL_QUERY = "SELECT * FROM table_a" @pytest.fixture @@ -36,6 +39,35 @@ def gbq_dataset( ) +@pytest.fixture(params=[{}]) +def gbq_sql_dataset(load_args, mock_bigquery_client): # pylint: disable=unused-argument + return GBQQueryDataSet( + sql=SQL_QUERY, + project=PROJECT, + credentials=None, + load_args=load_args, + ) + + +@pytest.fixture +def sql_file(tmp_path: PosixPath): + file = tmp_path / "test.sql" + file.write_text(SQL_QUERY) + return file.as_posix() + + +@pytest.fixture(params=[{}]) +def gbq_sql_file_dataset( + load_args, sql_file, mock_bigquery_client +): # pylint: disable=unused-argument + return GBQQueryDataSet( + filepath=sql_file, + project=PROJECT, + credentials=None, + load_args=load_args, + ) + + class TestGBQDataSet: def test_exists(self, mock_bigquery_client): """Test `exists` method invocation.""" @@ -178,3 +210,108 @@ def test_credentials_propagation(self, mocker): mocked_bigquery.Client.assert_called_once_with( project=PROJECT, credentials=credentials_obj, location=None ) + + +class TestGBQQueryDataSet: + def test_empty_query_error(self): + """Check the error when instantiating with empty query or file""" + pattern = ( + r"`sql` and `filepath` arguments cannot both be empty\." + r"Please provide a sql query or path to a sql query file\." + ) + with pytest.raises(DataSetError, match=pattern): + GBQQueryDataSet(sql="", filepath="", credentials=None) + + @pytest.mark.parametrize( + "load_args", [{"k1": "v1", "index": "value"}], indirect=True + ) + def test_load_extra_params(self, gbq_sql_dataset, load_args): + """Test overriding the default load arguments.""" + for key, value in load_args.items(): + assert gbq_sql_dataset._load_args[key] == value + + def test_credentials_propagation(self, mocker): + credentials = {"token": "my_token"} + credentials_obj = "credentials" + mocked_credentials = mocker.patch( + "kedro.extras.datasets.pandas.gbq_dataset.Credentials", + return_value=credentials_obj, + ) + mocked_bigquery = mocker.patch( + "kedro.extras.datasets.pandas.gbq_dataset.bigquery" + ) + + data_set = GBQQueryDataSet( + sql=SQL_QUERY, + credentials=credentials, + project=PROJECT, + ) + + assert data_set._credentials == credentials_obj + mocked_credentials.assert_called_once_with(**credentials) + mocked_bigquery.Client.assert_called_once_with( + project=PROJECT, credentials=credentials_obj, location=None + ) + + def test_load(self, mocker, gbq_sql_dataset, dummy_dataframe): + """Test `load` method invocation""" + mocked_read_gbq = mocker.patch( + "kedro.extras.datasets.pandas.gbq_dataset.pd.read_gbq" + ) + mocked_read_gbq.return_value = dummy_dataframe + + loaded_data = gbq_sql_dataset.load() + + mocked_read_gbq.assert_called_once_with( + project_id=PROJECT, credentials=None, query=SQL_QUERY + ) + + assert_frame_equal(dummy_dataframe, loaded_data) + + def test_load_query_file(self, mocker, gbq_sql_file_dataset, dummy_dataframe): + """Test `load` method invocation using a file as input query""" + mocked_read_gbq = mocker.patch( + "kedro.extras.datasets.pandas.gbq_dataset.pd.read_gbq" + ) + mocked_read_gbq.return_value = dummy_dataframe + + loaded_data = gbq_sql_file_dataset.load() + + mocked_read_gbq.assert_called_once_with( + project_id=PROJECT, credentials=None, query=SQL_QUERY + ) + + assert_frame_equal(dummy_dataframe, loaded_data) + + def test_save_error(self, gbq_sql_dataset, dummy_dataframe): + """Check the error when trying to save to the data set""" + pattern = r"`save` is not supported on GBQQueryDataSet" + with pytest.raises(DataSetError, match=pattern): + gbq_sql_dataset.save(dummy_dataframe) + + def test_str_representation_sql(self, gbq_sql_dataset, sql_file): + """Test the data set instance string representation""" + str_repr = str(gbq_sql_dataset) + assert ( + f"GBQQueryDataSet(filepath=None, load_args={{}}, sql={SQL_QUERY})" + in str_repr + ) + assert sql_file not in str_repr + + def test_str_representation_filepath(self, gbq_sql_file_dataset, sql_file): + """Test the data set instance string representation with filepath arg.""" + str_repr = str(gbq_sql_file_dataset) + assert ( + f"GBQQueryDataSet(filepath={str(sql_file)}, load_args={{}}, sql=None)" + in str_repr + ) + assert SQL_QUERY not in str_repr + + def test_sql_and_filepath_args(self, sql_file): + """Test that an error is raised when both `sql` and `filepath` args are given.""" + pattern = ( + r"`sql` and `filepath` arguments cannot both be provided." + r"Please only provide one." + ) + with pytest.raises(DataSetError, match=pattern): + GBQQueryDataSet(sql=SQL_QUERY, filepath=sql_file)