From d27ae215f9ed392358b081c8c7dabb96d845ca18 Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Sun, 26 Nov 2023 08:27:19 -0700 Subject: [PATCH 1/7] feat(datasets): support versioning data partitions Signed-off-by: Deepyaman Datta --- kedro-datasets/RELEASE.md | 5 +- .../partitions/incremental_dataset.py | 15 ++-- .../partitions/partitioned_dataset.py | 25 ++++--- .../partitions/test_incremental_dataset.py | 66 ++++++++++++++++- .../partitions/test_partitioned_dataset.py | 74 ++++++++++++++++--- 5 files changed, 159 insertions(+), 26 deletions(-) diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index cb3332edb..7eb678397 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -1,8 +1,9 @@ # Upcoming Release ## Major features and improvements -* Removed support for Python 3.7 -* Spark and Databricks based datasets now support [databricks-connect>=13.0](https://docs.databricks.com/en/dev-tools/databricks-connect-ref.html) +* Removed support for Python 3.7. +* Spark and Databricks based datasets now support [databricks-connect>=13.0](https://docs.databricks.com/en/dev-tools/databricks-connect-ref.html). +* `PartitionedDataSet` and `IncrementalDataSet` now both support versioning of the underlying dataset. ## Bug fixes and other changes * Fixed bug with loading models saved with `TensorFlowModelDataset`. diff --git a/kedro-datasets/kedro_datasets/partitions/incremental_dataset.py b/kedro-datasets/kedro_datasets/partitions/incremental_dataset.py index cab476611..edc351928 100644 --- a/kedro-datasets/kedro_datasets/partitions/incremental_dataset.py +++ b/kedro-datasets/kedro_datasets/partitions/incremental_dataset.py @@ -22,7 +22,11 @@ from kedro.io.data_catalog import CREDENTIALS_KEY from kedro.utils import load_obj -from .partitioned_dataset import KEY_PROPAGATION_WARNING, PartitionedDataset +from .partitioned_dataset import ( + KEY_PROPAGATION_WARNING, + PartitionedDataset, + _grandparent, +) class IncrementalDataset(PartitionedDataset): @@ -125,7 +129,7 @@ def __init__( # noqa: PLR0913 This is ignored by Kedro, but may be consumed by users or external plugins. Raises: - DatasetError: If versioning is enabled for the underlying dataset. + DatasetError: If versioning is enabled for the checkpoint dataset. """ super().__init__( @@ -185,6 +189,7 @@ def _list_partitions(self) -> list[str]: checkpoint_path = self._filesystem._strip_protocol( self._checkpoint_config[self._filepath_arg] ) + dataset_is_versioned = VERSION_KEY in self._dataset_config def _is_valid_partition(partition) -> bool: if not partition.endswith(self._filename_suffix): @@ -198,9 +203,9 @@ def _is_valid_partition(partition) -> bool: return self._comparison_func(partition_id, checkpoint) return sorted( - part - for part in self._filesystem.find(self._normalized_path, **self._load_args) - if _is_valid_partition(part) + _grandparent(path) if dataset_is_versioned else path + for path in self._filesystem.find(self._normalized_path, **self._load_args) + if _is_valid_partition(path) ) @property diff --git a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py index d991ddb76..19a2c53a6 100644 --- a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py +++ b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py @@ -5,6 +5,7 @@ import operator from copy import deepcopy +from pathlib import PurePosixPath from typing import Any, Callable, Dict from urllib.parse import urlparse from warnings import warn @@ -13,7 +14,6 @@ from cachetools import Cache, cachedmethod from kedro.io.core import ( VERSION_KEY, - VERSIONED_FLAG_KEY, AbstractDataset, DatasetError, parse_dataset_definition, @@ -28,6 +28,18 @@ S3_PROTOCOLS = ("s3", "s3a", "s3n") +def _grandparent(path: str) -> str: + path_obj = PurePosixPath(path) + grandparent = path_obj.parents[1] + if grandparent.name != path_obj.name: + last_three_parts = path_obj.relative_to(*path_obj.parts[:-3]) + raise DatasetError( + f"`{path}` is not a well-formed versioned path ending with " + f"`filename/timestamp/filename` (got `{last_three_parts}`)." + ) + return str(grandparent) + + class PartitionedDataset(AbstractDataset[Dict[str, Any], Dict[str, Callable[[], Any]]]): """``PartitionedDataset`` loads and saves partitioned file-like data using the underlying dataset definition. For filesystem level operations it uses `fsspec`: @@ -171,7 +183,7 @@ def __init__( # noqa: PLR0913 load_args: Keyword arguments to be passed into ``find()`` method of the filesystem implementation. fs_args: Extra arguments to pass into underlying filesystem class constructor - (e.g. `{"project": "my-project"}` for ``GCSFileSystem``) + (e.g. `{"project": "my-project"}` for ``GCSFileSystem``). overwrite: If True, any existing partitions will be removed. metadata: Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins. @@ -192,12 +204,6 @@ def __init__( # noqa: PLR0913 dataset = dataset if isinstance(dataset, dict) else {"type": dataset} self._dataset_type, self._dataset_config = parse_dataset_definition(dataset) - if VERSION_KEY in self._dataset_config: - raise DatasetError( - f"'{self.__class__.__name__}' does not support versioning of the " - f"underlying dataset. Please remove '{VERSIONED_FLAG_KEY}' flag from " - f"the dataset definition." - ) if credentials: if CREDENTIALS_KEY in self._dataset_config: @@ -245,8 +251,9 @@ def _normalized_path(self) -> str: @cachedmethod(cache=operator.attrgetter("_partition_cache")) def _list_partitions(self) -> list[str]: + dataset_is_versioned = VERSION_KEY in self._dataset_config return [ - path + _grandparent(path) if dataset_is_versioned else path for path in self._filesystem.find(self._normalized_path, **self._load_args) if path.endswith(self._filename_suffix) ] diff --git a/kedro-datasets/tests/partitions/test_incremental_dataset.py b/kedro-datasets/tests/partitions/test_incremental_dataset.py index 539ab0a66..826e591ce 100644 --- a/kedro-datasets/tests/partitions/test_incremental_dataset.py +++ b/kedro-datasets/tests/partitions/test_incremental_dataset.py @@ -259,11 +259,75 @@ def test_checkpoint_type( ), ], ) - def test_version_not_allowed(self, tmp_path, checkpoint_config, error_pattern): + def test_checkpoint_versioning_not_allowed( + self, tmp_path, checkpoint_config, error_pattern + ): """Test that invalid checkpoint configurations raise expected errors""" with pytest.raises(DatasetError, match=re.escape(error_pattern)): IncrementalDataset(str(tmp_path), DATASET, checkpoint=checkpoint_config) + @pytest.mark.parametrize("dataset_config", [{"type": DATASET, "versioned": True}]) + @pytest.mark.parametrize( + "suffix,expected_num_parts", [("", 5), (".csv", 5), ("bad", 0)] + ) + def test_versioned_dataset_save_and_load( + self, + mocker, + tmp_path, + partitioned_data_pandas, + dataset_config, + suffix, + expected_num_parts, + ): + """Test that saved and reloaded data matches the original one for + the versioned dataset.""" + save_version = "2020-01-01T00.00.00.000Z" + mock_ts = mocker.patch( + "kedro.io.core.generate_timestamp", return_value=save_version + ) + IncrementalDataset(str(tmp_path), dataset_config).save(partitioned_data_pandas) + mock_ts.assert_called_once() + + dataset = IncrementalDataset( + str(tmp_path), dataset_config, filename_suffix=suffix + ) + loaded_partitions = dataset.load() + + assert len(loaded_partitions) == expected_num_parts + + actual_save_versions = set() + for part in loaded_partitions: + partition_dir = tmp_path / (part + suffix) + actual_save_versions |= {each.name for each in partition_dir.iterdir()} + assert partition_dir.is_dir() + assert_frame_equal( + loaded_partitions[part], partitioned_data_pandas[part + suffix] + ) + + if expected_num_parts: + # all partitions were saved using the same version string + assert actual_save_versions == {save_version} + + def test_malformed_versioned_path(self, tmp_path): + local_dir = tmp_path / "files" + local_dir.mkdir() + + path = local_dir / "path/to/folder/new/partition/version/partition/file" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text("content") + + dataset = IncrementalDataset( + str(local_dir / "path/to/folder"), + {"type": "pandas.CSVDataset", "versioned": True}, + ) + + pattern = re.escape( + f"`{path.as_posix()}` is not a well-formed versioned path ending with " + f"`filename/timestamp/filename` (got `version/partition/file`)." + ) + with pytest.raises(DatasetError, match=pattern): + dataset.load() + @pytest.mark.parametrize( "pds_config,fs_creds,dataset_creds,checkpoint_creds", [ diff --git a/kedro-datasets/tests/partitions/test_partitioned_dataset.py b/kedro-datasets/tests/partitions/test_partitioned_dataset.py index 4feb79ac4..0791fa325 100644 --- a/kedro-datasets/tests/partitions/test_partitioned_dataset.py +++ b/kedro-datasets/tests/partitions/test_partitioned_dataset.py @@ -28,7 +28,7 @@ def partitioned_data_pandas(): @pytest.fixture def local_csvs(tmp_path, partitioned_data_pandas): - local_dir = Path(str(tmp_path / "csvs")) + local_dir = local_dir = tmp_path / "csvs" local_dir.mkdir() for k, data in partitioned_data_pandas.items(): @@ -38,6 +38,11 @@ def local_csvs(tmp_path, partitioned_data_pandas): return local_dir +@pytest.fixture +def filepath_csvs(tmp_path): + return str(tmp_path / "csvs") + + LOCAL_DATASET_DEFINITION = [ "pandas.CSVDataset", "kedro_datasets.pandas.CSVDataset", @@ -279,17 +284,68 @@ def test_invalid_dataset_config(self, dataset_config, error_pattern): @pytest.mark.parametrize( "dataset_config", [ - {"type": CSVDataset, "versioned": True}, - {"type": "pandas.CSVDataset", "versioned": True}, + {**ds_config, "versioned": True} + for ds_config in LOCAL_DATASET_DEFINITION + if isinstance(ds_config, dict) ], ) - def test_versioned_dataset_not_allowed(self, dataset_config): - pattern = ( - "'PartitionedDataset' does not support versioning of the underlying " - "dataset. Please remove 'versioned' flag from the dataset definition." + @pytest.mark.parametrize( + "suffix,expected_num_parts", [("", 5), (".csv", 3), ("p4", 1)] + ) + def test_versioned_dataset_save_and_load( + self, + mocker, + filepath_csvs, + dataset_config, + suffix, + expected_num_parts, + partitioned_data_pandas, + ): + """Test that saved and reloaded data matches the original one for + the versioned dataset.""" + save_version = "2020-01-01T00.00.00.000Z" + mock_ts = mocker.patch( + "kedro.io.core.generate_timestamp", return_value=save_version ) - with pytest.raises(DatasetError, match=re.escape(pattern)): - PartitionedDataset(str(Path.cwd()), dataset_config) + PartitionedDataset(filepath_csvs, dataset_config).save(partitioned_data_pandas) + mock_ts.assert_called_once() + + pds = PartitionedDataset(filepath_csvs, dataset_config, filename_suffix=suffix) + loaded_partitions = pds.load() + + assert len(loaded_partitions) == expected_num_parts + actual_save_versions = set() + for partition_id, load_func in loaded_partitions.items(): + partition_dir = Path(filepath_csvs, partition_id + suffix) + actual_save_versions |= {each.name for each in partition_dir.iterdir()} + df = load_func() + assert_frame_equal(df, partitioned_data_pandas[partition_id + suffix]) + if suffix: + assert not partition_id.endswith(suffix) + + if expected_num_parts: + # all partitions were saved using the same version string + assert actual_save_versions == {save_version} + + def test_malformed_versioned_path(self, tmp_path): + local_dir = tmp_path / "files" + local_dir.mkdir() + + path = local_dir / "path/to/folder/new/partition/version/partition/file" + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text("content") + + pds = PartitionedDataset( + str(local_dir / "path/to/folder"), + {"type": "pandas.CSVDataset", "versioned": True}, + ) + + pattern = re.escape( + f"`{path.as_posix()}` is not a well-formed versioned path ending with " + f"`filename/timestamp/filename` (got `version/partition/file`)." + ) + with pytest.raises(DatasetError, match=pattern): + pds.load() def test_no_partitions(self, tmpdir): pds = PartitionedDataset(str(tmpdir), "pandas.CSVDataset") From 965ac49cb3e7308ec18a4f2efe1b178939b34385 Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Tue, 28 Nov 2023 19:39:09 -0700 Subject: [PATCH 2/7] Remove unused import Signed-off-by: Deepyaman Datta --- kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py index c146113d4..cb51f6607 100644 --- a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py +++ b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py @@ -6,7 +6,7 @@ import operator from copy import deepcopy from pathlib import PurePosixPath -from typing import Any, Callable, Dict +from typing import Any, Callable from urllib.parse import urlparse from warnings import warn From 14dd0947db2c5a7a2f741405d5a03826562bb2b6 Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Tue, 28 Nov 2023 19:44:01 -0700 Subject: [PATCH 3/7] chore(datasets): use keyword arguments when needed Signed-off-by: Deepyaman Datta --- .../tests/partitions/test_incremental_dataset.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/kedro-datasets/tests/partitions/test_incremental_dataset.py b/kedro-datasets/tests/partitions/test_incremental_dataset.py index 4fb775e3b..f292ee9c0 100644 --- a/kedro-datasets/tests/partitions/test_incremental_dataset.py +++ b/kedro-datasets/tests/partitions/test_incremental_dataset.py @@ -297,11 +297,13 @@ def test_versioned_dataset_save_and_load( mock_ts = mocker.patch( "kedro.io.core.generate_timestamp", return_value=save_version ) - IncrementalDataset(str(tmp_path), dataset_config).save(partitioned_data_pandas) + IncrementalDataset(path=str(tmp_path), dataset=dataset_config).save( + partitioned_data_pandas + ) mock_ts.assert_called_once() dataset = IncrementalDataset( - str(tmp_path), dataset_config, filename_suffix=suffix + path=str(tmp_path), dataset=dataset_config, filename_suffix=suffix ) loaded_partitions = dataset.load() @@ -329,8 +331,8 @@ def test_malformed_versioned_path(self, tmp_path): path.write_text("content") dataset = IncrementalDataset( - str(local_dir / "path/to/folder"), - {"type": "pandas.CSVDataset", "versioned": True}, + path=str(local_dir / "path/to/folder"), + dataset={"type": "pandas.CSVDataset", "versioned": True}, ) pattern = re.escape( From 848d88066c5b6d92a7f0f82b514a350b7274b9ec Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Thu, 30 Nov 2023 10:21:05 -0700 Subject: [PATCH 4/7] Apply suggestions from code review Co-authored-by: Merel Theisen <49397448+merelcht@users.noreply.github.com> Signed-off-by: Deepyaman Datta --- kedro-datasets/RELEASE.md | 2 +- kedro-datasets/tests/partitions/test_partitioned_dataset.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index 8d0e996da..bfd811642 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -3,7 +3,7 @@ ## Major features and improvements * Removed support for Python 3.7 and 3.8. * Spark and Databricks based datasets now support [databricks-connect>=13.0](https://docs.databricks.com/en/dev-tools/databricks-connect-ref.html). -* `PartitionedDataSet` and `IncrementalDataSet` now both support versioning of the underlying dataset. +* `PartitionedDataset` and `IncrementalDataset` now both support versioning of the underlying dataset. ## Bug fixes and other changes * Fixed bug with loading models saved with `TensorFlowModelDataset`. diff --git a/kedro-datasets/tests/partitions/test_partitioned_dataset.py b/kedro-datasets/tests/partitions/test_partitioned_dataset.py index 0791fa325..49e403992 100644 --- a/kedro-datasets/tests/partitions/test_partitioned_dataset.py +++ b/kedro-datasets/tests/partitions/test_partitioned_dataset.py @@ -28,7 +28,7 @@ def partitioned_data_pandas(): @pytest.fixture def local_csvs(tmp_path, partitioned_data_pandas): - local_dir = local_dir = tmp_path / "csvs" + local_dir = tmp_path / "csvs" local_dir.mkdir() for k, data in partitioned_data_pandas.items(): From 00626307c75b2ace97f6fdfafc32fac8533a83a5 Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Thu, 30 Nov 2023 10:55:14 -0700 Subject: [PATCH 5/7] Update kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py Signed-off-by: Deepyaman Datta --- kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py index 8f91ba2e7..4e7812559 100644 --- a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py +++ b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py @@ -29,6 +29,7 @@ def _grandparent(path: str) -> str: + "Validate that the parent's parent has the same name, and return.""" path_obj = PurePosixPath(path) grandparent = path_obj.parents[1] if grandparent.name != path_obj.name: From e90b1c1da85540932bffba354bfa97f4e77a8478 Mon Sep 17 00:00:00 2001 From: Deepyaman Datta Date: Thu, 30 Nov 2023 13:54:22 -0700 Subject: [PATCH 6/7] Fix docstring Signed-off-by: Deepyaman Datta --- kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py index 4e7812559..fffdfd4ef 100644 --- a/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py +++ b/kedro-datasets/kedro_datasets/partitions/partitioned_dataset.py @@ -29,7 +29,7 @@ def _grandparent(path: str) -> str: - "Validate that the parent's parent has the same name, and return.""" + """Check and return the logical parent of the parent of the path.""" path_obj = PurePosixPath(path) grandparent = path_obj.parents[1] if grandparent.name != path_obj.name: From 0bf1a3940f104dbbf531702e10cd8e8d7cc0615f Mon Sep 17 00:00:00 2001 From: Merel Theisen Date: Mon, 11 Dec 2023 11:56:42 +0000 Subject: [PATCH 7/7] Fix test Signed-off-by: Merel Theisen --- .../tests/partitions/test_partitioned_dataset.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/kedro-datasets/tests/partitions/test_partitioned_dataset.py b/kedro-datasets/tests/partitions/test_partitioned_dataset.py index 6aa3fb9cc..587ccd52d 100644 --- a/kedro-datasets/tests/partitions/test_partitioned_dataset.py +++ b/kedro-datasets/tests/partitions/test_partitioned_dataset.py @@ -319,10 +319,14 @@ def test_versioned_dataset_save_and_load( mock_ts = mocker.patch( "kedro.io.core.generate_timestamp", return_value=save_version ) - PartitionedDataset(filepath_csvs, dataset_config).save(partitioned_data_pandas) + PartitionedDataset(path=filepath_csvs, dataset=dataset_config).save( + partitioned_data_pandas + ) mock_ts.assert_called_once() - pds = PartitionedDataset(filepath_csvs, dataset_config, filename_suffix=suffix) + pds = PartitionedDataset( + path=filepath_csvs, dataset=dataset_config, filename_suffix=suffix + ) loaded_partitions = pds.load() assert len(loaded_partitions) == expected_num_parts @@ -348,8 +352,8 @@ def test_malformed_versioned_path(self, tmp_path): path.write_text("content") pds = PartitionedDataset( - str(local_dir / "path/to/folder"), - {"type": "pandas.CSVDataset", "versioned": True}, + path=str(local_dir / "path/to/folder"), + dataset={"type": "pandas.CSVDataset", "versioned": True}, ) pattern = re.escape(