From 6e5373eca0233c3032414b59997cf45a35f77391 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Fri, 3 Feb 2023 17:19:51 +0000 Subject: [PATCH 01/25] Add databricks deployment check and automatic DBFS path addition Signed-off-by: Jannic Holzer --- .../kedro_datasets/spark/spark_dataset.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index 7a2b54eef..36ba9ee50 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -1,6 +1,7 @@ """``AbstractVersionedDataSet`` implementation to access Spark dataframes using ``pyspark`` """ +import os import json from copy import deepcopy from fnmatch import fnmatch @@ -269,6 +270,8 @@ def __init__( # pylint: disable=too-many-arguments """ credentials = deepcopy(credentials) or {} fs_prefix, filepath = _split_filepath(filepath) + if not fs_prefix and not self._deployed_on_databricks(): + filepath = self._build_dbfs_path(filepath) exists_function = None glob_function = None @@ -415,3 +418,13 @@ def _handle_delta_format(self) -> None: f"with mode '{write_mode}' on 'SparkDataSet'. " f"Please use 'spark.DeltaTableDataSet' instead." ) + + @staticmethod + def _deployed_on_databricks() -> bool: + return "DATABRICKS_RUNTIME_VERSION" in os.environ + + @staticmethod + def _build_dbfs_path(filepath: str) -> str: + if filepath.startswith("/dbfs"): + return filepath + return f"/dbfs/{filepath}" \ No newline at end of file From bdb6958f2504c8fc862d18d018c2ec962598b4fb Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Fri, 3 Feb 2023 17:21:51 +0000 Subject: [PATCH 02/25] Add newline at end of file Signed-off-by: Jannic Holzer --- kedro-datasets/kedro_datasets/spark/spark_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index 36ba9ee50..e6f8770db 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -427,4 +427,4 @@ def _deployed_on_databricks() -> bool: def _build_dbfs_path(filepath: str) -> str: if filepath.startswith("/dbfs"): return filepath - return f"/dbfs/{filepath}" \ No newline at end of file + return f"/dbfs/{filepath}" From 50c72d37e9c98fadcacc6e38113cdc6880ae2e47 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Mon, 6 Feb 2023 08:18:44 +0100 Subject: [PATCH 03/25] Remove spurious 'not' Signed-off-by: Jannic Holzer --- kedro-datasets/kedro_datasets/spark/spark_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index e6f8770db..7e59f0d0d 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -270,7 +270,7 @@ def __init__( # pylint: disable=too-many-arguments """ credentials = deepcopy(credentials) or {} fs_prefix, filepath = _split_filepath(filepath) - if not fs_prefix and not self._deployed_on_databricks(): + if not fs_prefix and self._deployed_on_databricks(): filepath = self._build_dbfs_path(filepath) exists_function = None glob_function = None From 6fc8ca67b0dadb0a9b13a27d9d9c1e3beab60286 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Mon, 6 Feb 2023 09:01:35 +0100 Subject: [PATCH 04/25] Move dbfs utility functions from SparkDataSet Signed-off-by: Jannic Holzer --- .../kedro_datasets/spark/spark_dataset.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index 7e59f0d0d..2cec5a45a 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -115,6 +115,16 @@ def _dbfs_exists(pattern: str, dbutils: Any) -> bool: return False +def _deployed_on_databricks() -> bool: + return "DATABRICKS_RUNTIME_VERSION" in os.environ + + +def _build_dbfs_path(filepath: str) -> str: + if filepath.startswith("/dbfs"): + return filepath + return f"/dbfs/{filepath}" + + class KedroHdfsInsecureClient(InsecureClient): """Subclasses ``hdfs.InsecureClient`` and implements ``hdfs_exists`` and ``hdfs_glob`` methods required by ``SparkDataSet``""" @@ -270,8 +280,8 @@ def __init__( # pylint: disable=too-many-arguments """ credentials = deepcopy(credentials) or {} fs_prefix, filepath = _split_filepath(filepath) - if not fs_prefix and self._deployed_on_databricks(): - filepath = self._build_dbfs_path(filepath) + if not fs_prefix and _deployed_on_databricks(): + filepath = _build_dbfs_path(filepath) exists_function = None glob_function = None @@ -418,13 +428,3 @@ def _handle_delta_format(self) -> None: f"with mode '{write_mode}' on 'SparkDataSet'. " f"Please use 'spark.DeltaTableDataSet' instead." ) - - @staticmethod - def _deployed_on_databricks() -> bool: - return "DATABRICKS_RUNTIME_VERSION" in os.environ - - @staticmethod - def _build_dbfs_path(filepath: str) -> str: - if filepath.startswith("/dbfs"): - return filepath - return f"/dbfs/{filepath}" From 868913c935a4911ebe5c337650b16b859232ce5f Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Mon, 6 Feb 2023 12:54:10 +0100 Subject: [PATCH 05/25] Add edge case logic to _build_dbfs_path Signed-off-by: Jannic Holzer --- .../kedro_datasets/spark/spark_dataset.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index 2cec5a45a..bf1eb2147 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -116,13 +116,26 @@ def _dbfs_exists(pattern: str, dbutils: Any) -> bool: def _deployed_on_databricks() -> bool: + """Check if running on Databricks.""" return "DATABRICKS_RUNTIME_VERSION" in os.environ def _build_dbfs_path(filepath: str) -> str: + """Modify a file path so that it points to a location in DBFS. + + Args: + filepath: Filepath to modify. + + Returns: + Modified filepath. + """ if filepath.startswith("/dbfs"): return filepath - return f"/dbfs/{filepath}" + if filepath.startswith("dbfs"): + return "/" + filepath + if filepath.startswith("/"): + return "/dbfs" + filepath + return "/dbfs/" + filepath class KedroHdfsInsecureClient(InsecureClient): @@ -281,6 +294,7 @@ def __init__( # pylint: disable=too-many-arguments credentials = deepcopy(credentials) or {} fs_prefix, filepath = _split_filepath(filepath) if not fs_prefix and _deployed_on_databricks(): + print("HERE") filepath = _build_dbfs_path(filepath) exists_function = None glob_function = None From 85c5862f5b8f713ddf3ddad2702bc2ea71d0ab1f Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Mon, 6 Feb 2023 12:54:23 +0100 Subject: [PATCH 06/25] Add test for dbfs path construction Signed-off-by: Jannic Holzer --- kedro-datasets/tests/spark/test_spark_dataset.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/kedro-datasets/tests/spark/test_spark_dataset.py b/kedro-datasets/tests/spark/test_spark_dataset.py index 4567d6fc9..e94cd5674 100644 --- a/kedro-datasets/tests/spark/test_spark_dataset.py +++ b/kedro-datasets/tests/spark/test_spark_dataset.py @@ -1,3 +1,4 @@ +import os import re import sys import tempfile @@ -439,6 +440,14 @@ def test_copy(self): assert spark_dataset_copy._file_format == "csv" assert spark_dataset_copy._save_args == {"mode": "overwrite"} + @pytest.mark.parametrize( + "filepath", ["data", "/data", "dbfs/data","/dbfs/data"] + ) + def test_dbfs_filepath_prefix(self, filepath, mocker): + mocker.patch.dict(os.environ, {"DATABRICKS_RUNTIME_VERSION": "7.3"}) + spark_dataset = SparkDataSet(filepath=filepath) + assert spark_dataset._filepath == PurePosixPath("/dbfs/data") + class TestSparkDataSetVersionedLocal: def test_no_version(self, versioned_dataset_local): From 7f5bcdaf5e8f0551174a62418b26ecee3cd63421 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Mon, 6 Feb 2023 12:56:22 +0100 Subject: [PATCH 07/25] Linting Signed-off-by: Jannic Holzer --- kedro-datasets/kedro_datasets/spark/spark_dataset.py | 2 +- kedro-datasets/tests/spark/test_spark_dataset.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index bf1eb2147..05a422cb9 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -1,8 +1,8 @@ """``AbstractVersionedDataSet`` implementation to access Spark dataframes using ``pyspark`` """ -import os import json +import os from copy import deepcopy from fnmatch import fnmatch from functools import partial diff --git a/kedro-datasets/tests/spark/test_spark_dataset.py b/kedro-datasets/tests/spark/test_spark_dataset.py index e94cd5674..faa58a928 100644 --- a/kedro-datasets/tests/spark/test_spark_dataset.py +++ b/kedro-datasets/tests/spark/test_spark_dataset.py @@ -440,9 +440,7 @@ def test_copy(self): assert spark_dataset_copy._file_format == "csv" assert spark_dataset_copy._save_args == {"mode": "overwrite"} - @pytest.mark.parametrize( - "filepath", ["data", "/data", "dbfs/data","/dbfs/data"] - ) + @pytest.mark.parametrize("filepath", ["data", "/data", "dbfs/data", "/dbfs/data"]) def test_dbfs_filepath_prefix(self, filepath, mocker): mocker.patch.dict(os.environ, {"DATABRICKS_RUNTIME_VERSION": "7.3"}) spark_dataset = SparkDataSet(filepath=filepath) From f6f60c1bf1918688fff30f8b53c1f910a10d6cdf Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Mon, 6 Feb 2023 12:58:38 +0100 Subject: [PATCH 08/25] Remove spurious print statement :) Signed-off-by: Jannic Holzer --- kedro-datasets/kedro_datasets/spark/spark_dataset.py | 1 - 1 file changed, 1 deletion(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index 05a422cb9..8529ea9af 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -294,7 +294,6 @@ def __init__( # pylint: disable=too-many-arguments credentials = deepcopy(credentials) or {} fs_prefix, filepath = _split_filepath(filepath) if not fs_prefix and _deployed_on_databricks(): - print("HERE") filepath = _build_dbfs_path(filepath) exists_function = None glob_function = None From da9bb601342d7447764bc30474c3a3185e24e30d Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Mon, 6 Feb 2023 15:55:37 +0100 Subject: [PATCH 09/25] Add pylint disable too-many-public-methods Signed-off-by: Jannic Holzer --- kedro-datasets/tests/spark/test_spark_dataset.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kedro-datasets/tests/spark/test_spark_dataset.py b/kedro-datasets/tests/spark/test_spark_dataset.py index faa58a928..a751650e2 100644 --- a/kedro-datasets/tests/spark/test_spark_dataset.py +++ b/kedro-datasets/tests/spark/test_spark_dataset.py @@ -161,6 +161,7 @@ def isDir(self): return "." not in self.path.split("/")[-1] +# pylint: disable=too-many-public-methods class TestSparkDataSet: def test_load_parquet(self, tmp_path, sample_pandas_df): temp_path = (tmp_path / "data").as_posix() From 3d5116cc58bf7646b408aefcfe92e509c354ddc4 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Wed, 15 Feb 2023 19:04:07 +0000 Subject: [PATCH 10/25] Move tests into single method to appease linter Signed-off-by: Jannic Holzer --- .../tests/spark/test_spark_dataset.py | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/kedro-datasets/tests/spark/test_spark_dataset.py b/kedro-datasets/tests/spark/test_spark_dataset.py index 22e68fd8f..9017a826c 100644 --- a/kedro-datasets/tests/spark/test_spark_dataset.py +++ b/kedro-datasets/tests/spark/test_spark_dataset.py @@ -442,11 +442,24 @@ def test_copy(self): assert spark_dataset_copy._file_format == "csv" assert spark_dataset_copy._save_args == {"mode": "overwrite"} - @pytest.mark.parametrize("filepath", ["data", "/data", "dbfs/data", "/dbfs/data"]) - def test_dbfs_filepath_prefix(self, filepath, mocker): + def test_dbfs_prefix_warning(self, mocker, caplog): + expected_message = ( + "Using SparkDataSet on Databricks without `/dbfs` prefix in filepath " + "will prevent versioning from working." + ) + + # test that warning is not raised when not on Databricks + SparkDataSet(filepath="my_project/data/02_intermediate/processed_data") + assert expected_message not in caplog.text + + # test that warning is not raised when on Databricks and filepath has /dbfs prefix mocker.patch.dict(os.environ, {"DATABRICKS_RUNTIME_VERSION": "7.3"}) - spark_dataset = SparkDataSet(filepath=filepath) - assert spark_dataset._filepath == PurePosixPath("/dbfs/data") + SparkDataSet(filepath = "/dbfs/my_project/data/02_intermediate/processed_data") + assert expected_message not in caplog.text + + # test that warning is raised when on Databricks and filepath does not have /dbfs prefix + SparkDataSet(filepath="my_project/data/02_intermediate/processed_data") + assert expected_message in caplog.text class TestSparkDataSetVersionedLocal: From d43a53f9836e1b90a069e95b08074177c93990d1 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Wed, 15 Feb 2023 19:04:25 +0000 Subject: [PATCH 11/25] Modify prefix check to /dbfs/ Signed-off-by: Jannic Holzer --- .../kedro_datasets/spark/spark_dataset.py | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index 1e3b1a8cd..b3b26e97f 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -2,6 +2,7 @@ ``pyspark`` """ import json +import logging import os from copy import deepcopy from fnmatch import fnmatch @@ -24,6 +25,8 @@ from pyspark.sql.utils import AnalysisException from s3fs import S3FileSystem +logger = logging.getLogger(__name__) + def _parse_glob_pattern(pattern: str) -> str: special = ("*", "?", "[") @@ -120,22 +123,13 @@ def _deployed_on_databricks() -> bool: return "DATABRICKS_RUNTIME_VERSION" in os.environ -def _build_dbfs_path(filepath: str) -> str: - """Modify a file path so that it points to a location in DBFS. +def _path_has_dbfs_prefix(path: str) -> bool: + """Check if a file path has a valid dbfs prefix. Args: - filepath: Filepath to modify. - - Returns: - Modified filepath. + path: File path to check. """ - if filepath.startswith("/dbfs"): - return filepath - if filepath.startswith("dbfs"): - return "/" + filepath - if filepath.startswith("/"): - return "/dbfs" + filepath - return "/dbfs/" + filepath + return path.startswith("/dbfs/") class KedroHdfsInsecureClient(InsecureClient): @@ -295,8 +289,6 @@ def __init__( # pylint: disable=too-many-arguments """ credentials = deepcopy(credentials) or {} fs_prefix, filepath = _split_filepath(filepath) - if not fs_prefix and _deployed_on_databricks(): - filepath = _build_dbfs_path(filepath) exists_function = None glob_function = None @@ -330,7 +322,11 @@ def __init__( # pylint: disable=too-many-arguments else: path = PurePosixPath(filepath) - + if _deployed_on_databricks() and not _path_has_dbfs_prefix(filepath): + logger.warning( + "Using SparkDataSet on Databricks without `/dbfs` prefix in filepath " + "will prevent versioning from working." + ) if filepath.startswith("/dbfs"): dbutils = _get_dbutils(self._get_spark()) if dbutils: From ee432777d83bd33777c708f56bfdbcda5ade6f6c Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Wed, 15 Feb 2023 19:04:25 +0000 Subject: [PATCH 12/25] Modify prefix check to /dbfs/ Signed-off-by: Jannic Holzer --- .../kedro_datasets/spark/spark_dataset.py | 28 ++++++++----------- .../tests/spark/test_spark_dataset.py | 4 +-- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index 1e3b1a8cd..b3b26e97f 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -2,6 +2,7 @@ ``pyspark`` """ import json +import logging import os from copy import deepcopy from fnmatch import fnmatch @@ -24,6 +25,8 @@ from pyspark.sql.utils import AnalysisException from s3fs import S3FileSystem +logger = logging.getLogger(__name__) + def _parse_glob_pattern(pattern: str) -> str: special = ("*", "?", "[") @@ -120,22 +123,13 @@ def _deployed_on_databricks() -> bool: return "DATABRICKS_RUNTIME_VERSION" in os.environ -def _build_dbfs_path(filepath: str) -> str: - """Modify a file path so that it points to a location in DBFS. +def _path_has_dbfs_prefix(path: str) -> bool: + """Check if a file path has a valid dbfs prefix. Args: - filepath: Filepath to modify. - - Returns: - Modified filepath. + path: File path to check. """ - if filepath.startswith("/dbfs"): - return filepath - if filepath.startswith("dbfs"): - return "/" + filepath - if filepath.startswith("/"): - return "/dbfs" + filepath - return "/dbfs/" + filepath + return path.startswith("/dbfs/") class KedroHdfsInsecureClient(InsecureClient): @@ -295,8 +289,6 @@ def __init__( # pylint: disable=too-many-arguments """ credentials = deepcopy(credentials) or {} fs_prefix, filepath = _split_filepath(filepath) - if not fs_prefix and _deployed_on_databricks(): - filepath = _build_dbfs_path(filepath) exists_function = None glob_function = None @@ -330,7 +322,11 @@ def __init__( # pylint: disable=too-many-arguments else: path = PurePosixPath(filepath) - + if _deployed_on_databricks() and not _path_has_dbfs_prefix(filepath): + logger.warning( + "Using SparkDataSet on Databricks without `/dbfs` prefix in filepath " + "will prevent versioning from working." + ) if filepath.startswith("/dbfs"): dbutils = _get_dbutils(self._get_spark()) if dbutils: diff --git a/kedro-datasets/tests/spark/test_spark_dataset.py b/kedro-datasets/tests/spark/test_spark_dataset.py index 9017a826c..ff6367c8f 100644 --- a/kedro-datasets/tests/spark/test_spark_dataset.py +++ b/kedro-datasets/tests/spark/test_spark_dataset.py @@ -442,7 +442,7 @@ def test_copy(self): assert spark_dataset_copy._file_format == "csv" assert spark_dataset_copy._save_args == {"mode": "overwrite"} - def test_dbfs_prefix_warning(self, mocker, caplog): + def test_dbfs_prefix_warning(self, monkeypatch, caplog): expected_message = ( "Using SparkDataSet on Databricks without `/dbfs` prefix in filepath " "will prevent versioning from working." @@ -453,7 +453,7 @@ def test_dbfs_prefix_warning(self, mocker, caplog): assert expected_message not in caplog.text # test that warning is not raised when on Databricks and filepath has /dbfs prefix - mocker.patch.dict(os.environ, {"DATABRICKS_RUNTIME_VERSION": "7.3"}) + monkeypatch.setenv("DATABRICKS_RUNTIME_VERSION", "7.3") SparkDataSet(filepath = "/dbfs/my_project/data/02_intermediate/processed_data") assert expected_message not in caplog.text From 271c76cddf80e9ec3c18c069c14068531a66d72f Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Wed, 15 Feb 2023 19:33:22 +0000 Subject: [PATCH 13/25] Make warning message clearer Signed-off-by: Jannic Holzer --- kedro-datasets/kedro_datasets/spark/spark_dataset.py | 2 +- kedro-datasets/tests/spark/test_spark_dataset.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index b3b26e97f..4124930fc 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -325,7 +325,7 @@ def __init__( # pylint: disable=too-many-arguments if _deployed_on_databricks() and not _path_has_dbfs_prefix(filepath): logger.warning( "Using SparkDataSet on Databricks without `/dbfs` prefix in filepath " - "will prevent versioning from working." + "will raise an error when using versioning. Add this prefix to fix the error." ) if filepath.startswith("/dbfs"): dbutils = _get_dbutils(self._get_spark()) diff --git a/kedro-datasets/tests/spark/test_spark_dataset.py b/kedro-datasets/tests/spark/test_spark_dataset.py index ff6367c8f..e77c3b3c7 100644 --- a/kedro-datasets/tests/spark/test_spark_dataset.py +++ b/kedro-datasets/tests/spark/test_spark_dataset.py @@ -445,7 +445,8 @@ def test_copy(self): def test_dbfs_prefix_warning(self, monkeypatch, caplog): expected_message = ( "Using SparkDataSet on Databricks without `/dbfs` prefix in filepath " - "will prevent versioning from working." + "will raise an error with versioning enabled. Add this prefix to " + "fix the error." ) # test that warning is not raised when not on Databricks From c1cce4258f397757e5fd19886c5ada6993d013e5 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Wed, 15 Feb 2023 19:53:29 +0000 Subject: [PATCH 14/25] Add release note Signed-off-by: Jannic Holzer --- kedro-datasets/RELEASE.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index 9c6deef45..317ddea3a 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -1,6 +1,9 @@ # Upcoming Release: +## Bug fixes and other changes +* Added a warning when the user tries to use `SparkDataSet` on Databricks without specifying a file path with the `/dbfs/` prefix. + # Release 1.0.2: ## Bug fixes and other changes From 40b90f73d054e2aaff39731fe3400408d6483a66 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Wed, 15 Feb 2023 19:53:51 +0000 Subject: [PATCH 15/25] Fix linting Signed-off-by: Jannic Holzer --- kedro-datasets/tests/spark/test_spark_dataset.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/kedro-datasets/tests/spark/test_spark_dataset.py b/kedro-datasets/tests/spark/test_spark_dataset.py index e77c3b3c7..62a63c781 100644 --- a/kedro-datasets/tests/spark/test_spark_dataset.py +++ b/kedro-datasets/tests/spark/test_spark_dataset.py @@ -1,4 +1,4 @@ -import os +# pylint: disable=too-many-lines import re import sys import tempfile @@ -444,9 +444,8 @@ def test_copy(self): def test_dbfs_prefix_warning(self, monkeypatch, caplog): expected_message = ( - "Using SparkDataSet on Databricks without `/dbfs` prefix in filepath " - "will raise an error with versioning enabled. Add this prefix to " - "fix the error." + "Using SparkDataSet on Databricks without the `/dbfs` prefix in the " + "filepath will raise an error. Add this prefix to fix the error." ) # test that warning is not raised when not on Databricks @@ -455,7 +454,7 @@ def test_dbfs_prefix_warning(self, monkeypatch, caplog): # test that warning is not raised when on Databricks and filepath has /dbfs prefix monkeypatch.setenv("DATABRICKS_RUNTIME_VERSION", "7.3") - SparkDataSet(filepath = "/dbfs/my_project/data/02_intermediate/processed_data") + SparkDataSet(filepath="/dbfs/my_project/data/02_intermediate/processed_data") assert expected_message not in caplog.text # test that warning is raised when on Databricks and filepath does not have /dbfs prefix From 086fa0864a1984aad53d7b62a03059acd5453c57 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Wed, 15 Feb 2023 20:37:47 +0000 Subject: [PATCH 16/25] Update warning message Signed-off-by: Jannic Holzer --- kedro-datasets/kedro_datasets/spark/spark_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index 4124930fc..0510cbe34 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -324,8 +324,8 @@ def __init__( # pylint: disable=too-many-arguments path = PurePosixPath(filepath) if _deployed_on_databricks() and not _path_has_dbfs_prefix(filepath): logger.warning( - "Using SparkDataSet on Databricks without `/dbfs` prefix in filepath " - "will raise an error when using versioning. Add this prefix to fix the error." + "Using SparkDataSet on Databricks without the `/dbfs` prefix in the " + "filepath will raise an error. Add this prefix to fix the error." ) if filepath.startswith("/dbfs"): dbutils = _get_dbutils(self._get_spark()) From 307f8c4a855b092c482a5759de6749f118cef209 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Wed, 15 Feb 2023 20:53:54 +0000 Subject: [PATCH 17/25] Modify log warning level to error Signed-off-by: Jannic Holzer --- kedro-datasets/kedro_datasets/spark/spark_dataset.py | 4 ++-- kedro-datasets/tests/spark/test_spark_dataset.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index 0510cbe34..b6b7ac85b 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -323,9 +323,9 @@ def __init__( # pylint: disable=too-many-arguments else: path = PurePosixPath(filepath) if _deployed_on_databricks() and not _path_has_dbfs_prefix(filepath): - logger.warning( + logger.error( "Using SparkDataSet on Databricks without the `/dbfs` prefix in the " - "filepath will raise an error. Add this prefix to fix the error." + "filepath raises an error. Add this prefix to fix the error." ) if filepath.startswith("/dbfs"): dbutils = _get_dbutils(self._get_spark()) diff --git a/kedro-datasets/tests/spark/test_spark_dataset.py b/kedro-datasets/tests/spark/test_spark_dataset.py index 62a63c781..5ea61631a 100644 --- a/kedro-datasets/tests/spark/test_spark_dataset.py +++ b/kedro-datasets/tests/spark/test_spark_dataset.py @@ -445,7 +445,7 @@ def test_copy(self): def test_dbfs_prefix_warning(self, monkeypatch, caplog): expected_message = ( "Using SparkDataSet on Databricks without the `/dbfs` prefix in the " - "filepath will raise an error. Add this prefix to fix the error." + "filepath raises an error. Add this prefix to fix the error." ) # test that warning is not raised when not on Databricks From d3a0a80bccbd7306180edca7211617bb81294eeb Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 16 Feb 2023 10:17:48 +0000 Subject: [PATCH 18/25] Modify message back to warning, refer to undefined behaviour Signed-off-by: Jannic Holzer --- kedro-datasets/kedro_datasets/spark/spark_dataset.py | 4 ++-- kedro-datasets/tests/spark/test_spark_dataset.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index b6b7ac85b..04e2cad8f 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -323,9 +323,9 @@ def __init__( # pylint: disable=too-many-arguments else: path = PurePosixPath(filepath) if _deployed_on_databricks() and not _path_has_dbfs_prefix(filepath): - logger.error( + logger.warning( "Using SparkDataSet on Databricks without the `/dbfs` prefix in the " - "filepath raises an error. Add this prefix to fix the error." + "filepath causes undefined behaviour. You must add this prefix." ) if filepath.startswith("/dbfs"): dbutils = _get_dbutils(self._get_spark()) diff --git a/kedro-datasets/tests/spark/test_spark_dataset.py b/kedro-datasets/tests/spark/test_spark_dataset.py index 5ea61631a..7e053fa81 100644 --- a/kedro-datasets/tests/spark/test_spark_dataset.py +++ b/kedro-datasets/tests/spark/test_spark_dataset.py @@ -445,7 +445,7 @@ def test_copy(self): def test_dbfs_prefix_warning(self, monkeypatch, caplog): expected_message = ( "Using SparkDataSet on Databricks without the `/dbfs` prefix in the " - "filepath raises an error. Add this prefix to fix the error." + "filepath causes undefined behaviour. You must add this prefix." ) # test that warning is not raised when not on Databricks From c349fa8adf33fd8c47c1fbf2a932dfe92250aca9 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 16 Feb 2023 10:19:03 +0000 Subject: [PATCH 19/25] Modify required prefix to /dbfs/ Signed-off-by: Jannic Holzer --- kedro-datasets/kedro_datasets/spark/spark_dataset.py | 2 +- kedro-datasets/tests/spark/test_spark_dataset.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index 04e2cad8f..1a1bd6c2d 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -324,7 +324,7 @@ def __init__( # pylint: disable=too-many-arguments path = PurePosixPath(filepath) if _deployed_on_databricks() and not _path_has_dbfs_prefix(filepath): logger.warning( - "Using SparkDataSet on Databricks without the `/dbfs` prefix in the " + "Using SparkDataSet on Databricks without the `/dbfs/` prefix in the " "filepath causes undefined behaviour. You must add this prefix." ) if filepath.startswith("/dbfs"): diff --git a/kedro-datasets/tests/spark/test_spark_dataset.py b/kedro-datasets/tests/spark/test_spark_dataset.py index 7e053fa81..212ff3c11 100644 --- a/kedro-datasets/tests/spark/test_spark_dataset.py +++ b/kedro-datasets/tests/spark/test_spark_dataset.py @@ -444,7 +444,7 @@ def test_copy(self): def test_dbfs_prefix_warning(self, monkeypatch, caplog): expected_message = ( - "Using SparkDataSet on Databricks without the `/dbfs` prefix in the " + "Using SparkDataSet on Databricks without the `/dbfs/` prefix in the " "filepath causes undefined behaviour. You must add this prefix." ) From d882b7b4a8b01bbaec134e0f5261d25db22d78a3 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 16 Feb 2023 10:23:33 +0000 Subject: [PATCH 20/25] Modify doc string Signed-off-by: Jannic Holzer --- kedro-datasets/kedro_datasets/spark/spark_dataset.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index 1a1bd6c2d..98dbb3fd4 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -258,9 +258,7 @@ def __init__( # pylint: disable=too-many-arguments Args: filepath: Filepath in POSIX format to a Spark dataframe. When using Databricks - and working with data written to mount path points, - specify ``filepath``s for (versioned) ``SparkDataSet``s - starting with ``/dbfs/mnt``. + specify ``filepath``s starting with ``/dbfs/``. file_format: File format used during load and save operations. These are formats supported by the running SparkContext include parquet, csv, delta. For a list of supported From 44b16f5ceb2aa10429da4d9d0ac79b26d271d406 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Thu, 16 Feb 2023 15:49:33 +0000 Subject: [PATCH 21/25] Modify warning message Signed-off-by: Jannic Holzer --- kedro-datasets/kedro_datasets/spark/spark_dataset.py | 2 +- kedro-datasets/tests/spark/test_spark_dataset.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index 98dbb3fd4..c96641f60 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -323,7 +323,7 @@ def __init__( # pylint: disable=too-many-arguments if _deployed_on_databricks() and not _path_has_dbfs_prefix(filepath): logger.warning( "Using SparkDataSet on Databricks without the `/dbfs/` prefix in the " - "filepath causes undefined behaviour. You must add this prefix." + "filepath is a known source of error. You must add this prefix." ) if filepath.startswith("/dbfs"): dbutils = _get_dbutils(self._get_spark()) diff --git a/kedro-datasets/tests/spark/test_spark_dataset.py b/kedro-datasets/tests/spark/test_spark_dataset.py index 212ff3c11..fb243e9db 100644 --- a/kedro-datasets/tests/spark/test_spark_dataset.py +++ b/kedro-datasets/tests/spark/test_spark_dataset.py @@ -445,7 +445,7 @@ def test_copy(self): def test_dbfs_prefix_warning(self, monkeypatch, caplog): expected_message = ( "Using SparkDataSet on Databricks without the `/dbfs/` prefix in the " - "filepath causes undefined behaviour. You must add this prefix." + "filepath is a known source of error. You must add this prefix." ) # test that warning is not raised when not on Databricks From 017d3fa00c6aa60817e133117d81b8f9759d1760 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Mon, 20 Feb 2023 16:33:15 +0000 Subject: [PATCH 22/25] Split tests and add filepath to warning Signed-off-by: Jannic Holzer --- .../kedro_datasets/spark/spark_dataset.py | 2 +- .../tests/spark/test_spark_dataset.py | 25 ++++++++++++++----- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index c96641f60..db822f5f3 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -323,7 +323,7 @@ def __init__( # pylint: disable=too-many-arguments if _deployed_on_databricks() and not _path_has_dbfs_prefix(filepath): logger.warning( "Using SparkDataSet on Databricks without the `/dbfs/` prefix in the " - "filepath is a known source of error. You must add this prefix." + f"filepath is a known source of error. You must add this prefix to {filepath}." ) if filepath.startswith("/dbfs"): dbutils = _get_dbutils(self._get_spark()) diff --git a/kedro-datasets/tests/spark/test_spark_dataset.py b/kedro-datasets/tests/spark/test_spark_dataset.py index fb243e9db..d6bd8f341 100644 --- a/kedro-datasets/tests/spark/test_spark_dataset.py +++ b/kedro-datasets/tests/spark/test_spark_dataset.py @@ -442,23 +442,36 @@ def test_copy(self): assert spark_dataset_copy._file_format == "csv" assert spark_dataset_copy._save_args == {"mode": "overwrite"} - def test_dbfs_prefix_warning(self, monkeypatch, caplog): + def test_dbfs_prefix_warning_no_databricks(self, caplog): + # test that warning is not raised when not on Databricks + filepath = "my_project/data/02_intermediate/processed_data" expected_message = ( "Using SparkDataSet on Databricks without the `/dbfs/` prefix in the " - "filepath is a known source of error. You must add this prefix." + f"filepath is a known source of error. You must add this prefix to {filepath}." ) - - # test that warning is not raised when not on Databricks SparkDataSet(filepath="my_project/data/02_intermediate/processed_data") assert expected_message not in caplog.text + def test_dbfs_prefix_warning_on_databricks_no_prefix(self, monkeypatch, caplog): # test that warning is not raised when on Databricks and filepath has /dbfs prefix + filepath = "/dbfs/my_project/data/02_intermediate/processed_data" + expected_message = ( + "Using SparkDataSet on Databricks without the `/dbfs/` prefix in the " + f"filepath is a known source of error. You must add this prefix to {filepath}" + ) monkeypatch.setenv("DATABRICKS_RUNTIME_VERSION", "7.3") - SparkDataSet(filepath="/dbfs/my_project/data/02_intermediate/processed_data") + SparkDataSet(filepath=filepath) assert expected_message not in caplog.text + def test_dbfs_prefix_warning_on_databricks_with_prefix(self, monkeypatch, caplog): # test that warning is raised when on Databricks and filepath does not have /dbfs prefix - SparkDataSet(filepath="my_project/data/02_intermediate/processed_data") + filepath = "my_project/data/02_intermediate/processed_data" + expected_message = ( + "Using SparkDataSet on Databricks without the `/dbfs/` prefix in the " + f"filepath is a known source of error. You must add this prefix to {filepath}." + ) + monkeypatch.setenv("DATABRICKS_RUNTIME_VERSION", "7.3") + SparkDataSet(filepath=filepath) assert expected_message in caplog.text From b4482971e5bfc426185b04457e2cb359ab469ca4 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Mon, 20 Feb 2023 16:43:45 +0000 Subject: [PATCH 23/25] Modify f string in logging call Signed-off-by: Jannic Holzer --- kedro-datasets/kedro_datasets/spark/spark_dataset.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index db822f5f3..5b0610697 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -323,7 +323,8 @@ def __init__( # pylint: disable=too-many-arguments if _deployed_on_databricks() and not _path_has_dbfs_prefix(filepath): logger.warning( "Using SparkDataSet on Databricks without the `/dbfs/` prefix in the " - f"filepath is a known source of error. You must add this prefix to {filepath}." + "filepath is a known source of error. You must add this prefix to %s", + filepath ) if filepath.startswith("/dbfs"): dbutils = _get_dbutils(self._get_spark()) From 37d1bddbc203c2ac817bd552f2ecb6c911990e62 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Mon, 20 Feb 2023 17:08:53 +0000 Subject: [PATCH 24/25] Fix tests Signed-off-by: Jannic Holzer --- kedro-datasets/tests/spark/test_spark_dataset.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/kedro-datasets/tests/spark/test_spark_dataset.py b/kedro-datasets/tests/spark/test_spark_dataset.py index d6bd8f341..74c5ee2bf 100644 --- a/kedro-datasets/tests/spark/test_spark_dataset.py +++ b/kedro-datasets/tests/spark/test_spark_dataset.py @@ -452,23 +452,19 @@ def test_dbfs_prefix_warning_no_databricks(self, caplog): SparkDataSet(filepath="my_project/data/02_intermediate/processed_data") assert expected_message not in caplog.text - def test_dbfs_prefix_warning_on_databricks_no_prefix(self, monkeypatch, caplog): + def test_dbfs_prefix_warning_on_databricks_with_prefix(self, monkeypatch, caplog): # test that warning is not raised when on Databricks and filepath has /dbfs prefix filepath = "/dbfs/my_project/data/02_intermediate/processed_data" - expected_message = ( - "Using SparkDataSet on Databricks without the `/dbfs/` prefix in the " - f"filepath is a known source of error. You must add this prefix to {filepath}" - ) monkeypatch.setenv("DATABRICKS_RUNTIME_VERSION", "7.3") SparkDataSet(filepath=filepath) - assert expected_message not in caplog.text + assert caplog.text == "" - def test_dbfs_prefix_warning_on_databricks_with_prefix(self, monkeypatch, caplog): + def test_dbfs_prefix_warning_on_databricks_no_prefix(self, monkeypatch, caplog): # test that warning is raised when on Databricks and filepath does not have /dbfs prefix filepath = "my_project/data/02_intermediate/processed_data" expected_message = ( "Using SparkDataSet on Databricks without the `/dbfs/` prefix in the " - f"filepath is a known source of error. You must add this prefix to {filepath}." + f"filepath is a known source of error. You must add this prefix to {filepath}" ) monkeypatch.setenv("DATABRICKS_RUNTIME_VERSION", "7.3") SparkDataSet(filepath=filepath) From 8c5f92ca210d2383da834aafaba6e218fab26f13 Mon Sep 17 00:00:00 2001 From: Jannic Holzer Date: Mon, 20 Feb 2023 17:14:13 +0000 Subject: [PATCH 25/25] Lint Signed-off-by: Jannic Holzer --- kedro-datasets/kedro_datasets/spark/spark_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kedro-datasets/kedro_datasets/spark/spark_dataset.py b/kedro-datasets/kedro_datasets/spark/spark_dataset.py index 5b0610697..d366eae08 100644 --- a/kedro-datasets/kedro_datasets/spark/spark_dataset.py +++ b/kedro-datasets/kedro_datasets/spark/spark_dataset.py @@ -324,7 +324,7 @@ def __init__( # pylint: disable=too-many-arguments logger.warning( "Using SparkDataSet on Databricks without the `/dbfs/` prefix in the " "filepath is a known source of error. You must add this prefix to %s", - filepath + filepath, ) if filepath.startswith("/dbfs"): dbutils = _get_dbutils(self._get_spark())