-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add warning when SparkDataSet
is used on Databricks without a valid file path
#114
Changes from 19 commits
6e5373e
bdb6958
50c72d3
6fc8ca6
868913c
85c5862
7f5bcda
f6f60c1
da9bb60
8d4a7f2
3d5116c
d43a53f
ee43277
8390ef7
271c76c
c1cce42
40b90f7
086fa08
307f8c4
d3a0a80
c349fa8
d882b7b
44b16f5
017d3fa
b448297
37d1bdd
8c5f92c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,8 @@ | |
``pyspark`` | ||
""" | ||
import json | ||
import logging | ||
import os | ||
from copy import deepcopy | ||
from fnmatch import fnmatch | ||
from functools import partial | ||
|
@@ -23,6 +25,8 @@ | |
from pyspark.sql.utils import AnalysisException | ||
from s3fs import S3FileSystem | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
def _parse_glob_pattern(pattern: str) -> str: | ||
special = ("*", "?", "[") | ||
|
@@ -114,6 +118,20 @@ def _dbfs_exists(pattern: str, dbutils: Any) -> bool: | |
return False | ||
|
||
|
||
def _deployed_on_databricks() -> bool: | ||
"""Check if running on Databricks.""" | ||
return "DATABRICKS_RUNTIME_VERSION" in os.environ | ||
|
||
|
||
def _path_has_dbfs_prefix(path: str) -> bool: | ||
"""Check if a file path has a valid dbfs prefix. | ||
|
||
Args: | ||
path: File path to check. | ||
""" | ||
return path.startswith("/dbfs/") | ||
antonymilne marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
class KedroHdfsInsecureClient(InsecureClient): | ||
"""Subclasses ``hdfs.InsecureClient`` and implements ``hdfs_exists`` | ||
and ``hdfs_glob`` methods required by ``SparkDataSet``""" | ||
|
@@ -304,7 +322,11 @@ def __init__( # pylint: disable=too-many-arguments | |
|
||
else: | ||
path = PurePosixPath(filepath) | ||
|
||
if _deployed_on_databricks() and not _path_has_dbfs_prefix(filepath): | ||
logger.error( | ||
"Using SparkDataSet on Databricks without the `/dbfs` prefix in the " | ||
"filepath raises an error. Add this prefix to fix the error." | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A DataSetError gets raised on trying to save with versioning enabled, but it is not clear from the message why this is. This message is supposed to inform the user. Do you think we should raise an error with a clearer message instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I think if this is an invalid code path and we know that it's going to fail we should raise the error earlier, what's the point of delaying until later? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I changed this back to a warning in d3a0a80 and refer to 'undefined behaviour' instead. I also modified the doc string to make it clearer to the users that they need to specify this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Ah sorry, I didn't see this reply before I wrote my last message. I would agree with you but there is one subtlety, which is that if you specify a version in The alternative would be to add another condition to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, in that case I agree we should not break the existing behaviors and I think the current solution with warning is fine. |
||
if filepath.startswith("/dbfs"): | ||
dbutils = _get_dbutils(self._get_spark()) | ||
if dbutils: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
# pylint: disable=too-many-lines | ||
import re | ||
import sys | ||
import tempfile | ||
|
@@ -161,6 +162,7 @@ def isDir(self): | |
return "." not in self.path.split("/")[-1] | ||
|
||
|
||
# pylint: disable=too-many-public-methods | ||
class TestSparkDataSet: | ||
def test_load_parquet(self, tmp_path, sample_pandas_df): | ||
temp_path = (tmp_path / "data").as_posix() | ||
|
@@ -440,6 +442,25 @@ def test_copy(self): | |
assert spark_dataset_copy._file_format == "csv" | ||
assert spark_dataset_copy._save_args == {"mode": "overwrite"} | ||
|
||
def test_dbfs_prefix_warning(self, monkeypatch, caplog): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A bit fussy, but I think this should be 3 separate tests or 3 separate cases of a parameterised test. Even though they're small, each There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right about this, though the linter complains that the file is long when these verbose tests are added (I originally had three separate tests). I would turn the warning off, but honestly I am a bit tired of ignoring the linter. There's not much point to using it if we ignore it every time it tells us off. My solution would be to add this slimmer method for now as this is quite urgent, then open an issue to split the test file. What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, a better solution would be to split this back into three tests, add an ignore directive. Then split the file in a new issue and remove the ignore directive 🤔. This is what I will do. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jmholzer agree with this. Maybe we should just add more global ignores to the linter configuration for tests to make it less strict if you're finding that you continually need to add inline ignores. |
||
expected_message = ( | ||
"Using SparkDataSet on Databricks without the `/dbfs` prefix in the " | ||
"filepath raises an error. Add this prefix to fix the error." | ||
) | ||
|
||
# test that warning is not raised when not on Databricks | ||
SparkDataSet(filepath="my_project/data/02_intermediate/processed_data") | ||
assert expected_message not in caplog.text | ||
|
||
# test that warning is not raised when on Databricks and filepath has /dbfs prefix | ||
monkeypatch.setenv("DATABRICKS_RUNTIME_VERSION", "7.3") | ||
SparkDataSet(filepath="/dbfs/my_project/data/02_intermediate/processed_data") | ||
assert expected_message not in caplog.text | ||
|
||
# test that warning is raised when on Databricks and filepath does not have /dbfs prefix | ||
SparkDataSet(filepath="my_project/data/02_intermediate/processed_data") | ||
assert expected_message in caplog.text | ||
|
||
|
||
class TestSparkDataSetVersionedLocal: | ||
def test_no_version(self, versioned_dataset_local): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/kedro-org/kedro/blob/1c6d108b56e405364be3cf1d9cfca65fe6606096/kedro/framework/project/__init__.py#LL217-L218C87
This should be a function now to determine whether the project is run in
Databricks
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean we should add a function to Kedro in a separate PR and import that here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes since they are doing the exact same thing, we should just keep the logic consistent. If this is a more pressing bug fix I think it's fine to keep this in the PR but we should still refactor it in
kedro
later.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I am generally very much pro-DRY, in this case I would prefer we just kept this duplicated since...
Overall, I'd go for the principle that "duplication is cheaper than getting it wrong" here and so just leave it duplicated.