Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KED-2891] Implement spark.DeltaTable dataset #964

Merged
merged 56 commits into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
3c3e774
branch init
Oct 14, 2021
bcb7ed1
non-working draft that leads to strat pattern resolution on _save method
Oct 14, 2021
603f7bd
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 14, 2021
3d85fe3
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 15, 2021
462d26b
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 15, 2021
fd37c4d
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 20, 2021
cdbeabd
commenting and removal of redundancy
Oct 20, 2021
17ea2ac
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 22, 2021
e703a66
linting, fixes, removal
Oct 22, 2021
be6106c
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 22, 2021
beed054
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 25, 2021
763ab92
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 26, 2021
82ad4c3
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 26, 2021
8984385
added RELEASE.md
Oct 26, 2021
7b1cd03
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Oct 29, 2021
566dd57
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Nov 2, 2021
b6cbb25
Merge branch 'master' into feature/databricks-deltatable-dataset
jiriklein Nov 3, 2021
00d9e0f
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Nov 3, 2021
65e0da0
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Nov 4, 2021
3f6f449
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Nov 5, 2021
88ab9af
reworking and cleanup - delegated decent amount of logic to SparkDataSet
Nov 5, 2021
dbea396
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Nov 5, 2021
c92c883
logging message multiline
Nov 5, 2021
a1925af
Merge branch 'master' of github.com:quantumblacklabs/kedro into featu…
Nov 8, 2021
b8a0282
removal of licencing
Nov 8, 2021
155bc64
Merge branch 'master' into feature/databricks-deltatable-dataset
Nov 11, 2021
aa92baf
Merge branch 'master' into feature/databricks-deltatable-dataset
Nov 12, 2021
058be40
Add requirements, remove confirms(), linting fixes
Nov 12, 2021
721e046
Add options
Nov 12, 2021
0747ad9
Add test file
Nov 12, 2021
01397a4
Fix tests
Nov 12, 2021
1ae97f3
Merge branch 'master' into feature/databricks-deltatable-dataset
Nov 16, 2021
abd1740
Fix tests properly
Nov 16, 2021
c6efb74
Write minimal unit test
Nov 17, 2021
66f17ac
Remove delta_options
Nov 17, 2021
2b697f5
Merge branch 'master' into feature/databricks-deltatable-dataset
Nov 17, 2021
7bf65cf
Add some docstrings
Nov 17, 2021
1ed7ed5
Merge branch 'master' into feature/databricks-deltatable-dataset
Nov 23, 2021
a2559aa
Make deltatable dataset test pass
Nov 23, 2021
2792538
Merge branch 'master' into feature/databricks-deltatable-dataset
Nov 23, 2021
f8005b6
Define delta spark session locally
Nov 23, 2021
cfb0f9e
More robust test
Nov 23, 2021
cfba1db
Add unit tests
Nov 23, 2021
799658d
Inherit from AbstractDataSet directly
Nov 23, 2021
7ca691a
Add more tests
Nov 24, 2021
d3f3e39
Use DataFrameWriter.options instead of .option
Nov 24, 2021
da00700
Fix linkchecker
Nov 24, 2021
b135a92
Move test
Nov 24, 2021
29d116f
Try fix tests that fail only on CCI
Nov 24, 2021
78789e6
Add some rough documentation
Nov 24, 2021
56ea747
Apply suggestions from code review
Nov 24, 2021
c837288
Try this on CI
Nov 29, 2021
55a9b2d
Run sparksession before
Dec 2, 2021
5bbd157
Merge branch 'master' into feature/databricks-deltatable-dataset
limdauto Dec 3, 2021
933feef
Fix tests (#1088)
limdauto Dec 3, 2021
75ee172
Apply suggestions from code review and general cleanup
Dec 3, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ executors:
py36:
docker:
- image: 350138855857.dkr.ecr.eu-west-2.amazonaws.com/kedro-builder:3.6
resource_class: medium+
py37:
docker:
- image: 350138855857.dkr.ecr.eu-west-2.amazonaws.com/kedro-builder:3.7
resource_class: medium+
py38:
docker:
- image: 350138855857.dkr.ecr.eu-west-2.amazonaws.com/kedro-builder:3.8
resource_class: medium+

commands:
setup_conda:
Expand All @@ -38,6 +41,14 @@ commands:
- run:
name: Install requirements and test requirements
command: pip install --upgrade -r test_requirements.txt
- run:
# this is needed to fix java cacerts so
# spark can automatically download packages from mvn
# https://stackoverflow.com/a/50103533/1684058
name: Fix cacerts
command: |
sudo rm /etc/ssl/certs/java/cacerts
sudo update-ca-certificates -f
- run:
# Since recently Spark installation for some reason does not have enough permissions to execute
# /home/circleci/miniconda/envs/kedro_builder/lib/python3.X/site-packages/pyspark/bin/spark-class.
Expand Down Expand Up @@ -67,6 +78,9 @@ commands:
- checkout
- setup_conda
- setup_requirements
- run:
name: Set up SparkSession
command: echo """from pyspark.sql import SparkSession;spark = SparkSession.builder.appName('tmp').getOrCreate();""" | python
- run:
name: Run unit tests
command: make test
Expand Down
3 changes: 2 additions & 1 deletion RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
* Enabled overriding nested parameters with `params` in CLI, i.e. `kedro run --params="model.model_tuning.booster:gbtree"` updates parameters to `{"model": {"model_tuning": {"booster": "gbtree"}}}`.
* Added option to `pandas.SQLQueryDataSet` to specify a `filepath` with a SQL query, in addition to the current method of supplying the query itself in the `sql` argument.
* Extended `ExcelDataSet` to support saving Excel files with multiple sheets.
* Added the following new dataset (see ([Issue #839](https://github.com/quantumblacklabs/kedro/issues/839)):
* Added the following new datasets (see ([Issue #839](https://github.com/quantumblacklabs/kedro/issues/839)):

| Type | Description | Location |
| --------------------------- | ---------------------------------------------------- | --------------------------------- |
| `plotly.JSONDataSet` | Works with plotly graph object Figures (saves as json file) | `kedro.extras.datasets.plotly` |
| `pandas.GenericDataSet` | Provides a 'best effort' facility to read / write any format provided by the `pandas` library | `kedro.extras.datasets.pandas` |
| `pandas.GBQQueryDataSet` | Loads data from a Google Bigquery table using provided SQL query | `kedro.extras.datasets.pandas` |
| `spark.DeltaTableDataSet` | Dataset designed to handle Delta Lake Tables and their CRUD-style operations, including `update`, `merge` and `delete` | `kedro.extras.datasets.spark` |

## Bug fixes and other changes
* Fixed an issue where `kedro new --config config.yml` was ignoring the config file when `prompts.yml` didn't exist.
Expand Down
76 changes: 76 additions & 0 deletions docs/source/11_tools_integration/01_pyspark.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ CONTEXT_CLASS = CustomContext

We recommend using Kedro's built-in Spark datasets to load raw data into Spark's [DataFrame](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html), as well as to write them back to storage. Some of our built-in Spark datasets include:

* [spark.DeltaTableDataSet](/kedro.extras.datasets.spark.DeltaTableDataSet)
* [spark.SparkDataSet](/kedro.extras.datasets.spark.SparkDataSet)
* [spark.SparkJDBCDataSet](/kedro.extras.datasets.spark.SparkJDBCDataSet)
* [spark.SparkHiveDataSet](/kedro.extras.datasets.spark.SparkHiveDataSet)
Expand Down Expand Up @@ -115,6 +116,81 @@ df = catalog.load("weather")
assert isinstance(df, pyspark.sql.DataFrame)
```

## Spark and Delta Lake interaction

[Delta Lake](https://delta.io/) is an open-source project that enables building a Lakehouse architecture on top of data lakes. It provides ACID transactions and unifies streaming and batch data processing on top of existing data lakes, such as S3, ADLS, GCS, and HDFS.
To setup PySpark with Delta Lake, have a look at [the recommendations in Delta Lake's documentation](https://docs.delta.io/latest/quick-start.html#python).

We recommend the following workflow, which makes use of the [Transcoding](../05_data/01_data_catalog.md) feature in Kedro.

```yaml
temperature:
type: spark.SparkDataSet
filepath: data/01_raw/data.csv
file_format: csv
load_args:
header: True
inferSchema: True
save_args:
sep: '|'
header: True

weather@spark:
type: spark.SparkDataSet
filepath: s3a://my_bucket/03_primary/temperature
file_format: delta
save_args:
mode: "overwrite"
df_writer:
versionAsOf: 0

weather@delta:
type: spark.DeltaTableDataSet
filepath: s3a://my_bucket/03_primary/weather
```

The `DeltaTableDataSet` does not support `save()` operation, as the updates happen in place inside the node function, i.e. through `DeltaTable.update()`, `DeltaTable.delete()`, `DeltaTable.merge()`.


> Since the save operation happens within the `node` via the DeltaTable API, the Kedro `before_dataset_saved` hook will not be triggered.

```python
Pipeline(
[
node(
func=process_barometer_data, inputs="temperature", outputs="weather@spark"
),
node(
func=update_meterological_state,
inputs="weather@delta",
outputs="first_operation_complete",
),
node(
func=estimate_weather_trend,
inputs=["first_operation_complete", "weather@delta"],
outputs="second_operation_complete",
),
]
)
```

`first_operation_complete` is a `MemoryDataSet` and it signals that any Delta operations which occur "outside" the Kedro DAG are complete. This can be used as input to a downstream node, to preserve the shape of the DAG. Otherwise, if no downstream nodes need to run after this, the node can simply not return anything:

```python
Pipeline(
[
node(func=..., inputs="temperature", outputs="weather@spark"),
node(func=..., inputs="weather@delta", outputs=None),
]
)
```

The following diagram is the visual representation of the workflow explained above:

![Spark and Delta Lake workflow](../meta/images/spark_delta_workflow.png)

> Note: This pattern of creating "dummy" datasets to preserve the data flow also applies to other "out of DAG" execution operations such as SQL operations within a node.

## Use `MemoryDataSet` for intermediary `DataFrame`

For nodes operating on `DataFrame` that doesn't need to perform Spark actions such as writing the `DataFrame` to storage, we recommend using the default `MemoryDataSet` to hold the `DataFrame`. In other words, there is no need to specify it in the `DataCatalog` or `catalog.yml`. This allows you to take advantage of Spark's optimiser and lazy evaluation.
Expand Down
1 change: 1 addition & 0 deletions docs/source/15_api_docs/kedro.extras.datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ kedro.extras.datasets
kedro.extras.datasets.pillow.ImageDataSet
kedro.extras.datasets.plotly.JSONDataSet
kedro.extras.datasets.plotly.PlotlyDataSet
kedro.extras.datasets.spark.DeltaTableDataSet
kedro.extras.datasets.spark.SparkDataSet
kedro.extras.datasets.spark.SparkHiveDataSet
kedro.extras.datasets.spark.SparkJDBCDataSet
Expand Down
Binary file added docs/source/meta/images/spark_delta_workflow.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion kedro/extras/datasets/spark/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Provides I/O modules for Apache Spark."""

__all__ = ["SparkDataSet", "SparkHiveDataSet", "SparkJDBCDataSet"]
__all__ = ["SparkDataSet", "SparkHiveDataSet", "SparkJDBCDataSet", "DeltaTableDataSet"]

from contextlib import suppress

Expand All @@ -10,3 +10,5 @@
from .spark_hive_dataset import SparkHiveDataSet
with suppress(ImportError):
from .spark_jdbc_dataset import SparkJDBCDataSet
with suppress(ImportError):
from .deltatable_dataset import DeltaTableDataSet
116 changes: 116 additions & 0 deletions kedro/extras/datasets/spark/deltatable_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""``AbstractVersionedDataSet`` implementation to access DeltaTables using
``delta-spark``
"""
import logging
from copy import deepcopy
from pathlib import PurePosixPath
from typing import Any, Dict

from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException

from kedro.extras.datasets.spark.spark_dataset import (
_split_filepath,
_strip_dbfs_prefix,
)
from kedro.io.core import AbstractDataSet

logger = logging.getLogger(__name__)


class DeltaTableDataSet(AbstractDataSet):
"""``DeltaTableDataSet`` loads data into DeltaTable objects.

Example adding a catalog entry with
`YAML API <https://kedro.readthedocs.io/en/stable/05_data/\
01_data_catalog.html#using-the-data-catalog-with-the-yaml-api>`_:

.. code-block:: yaml

>>> weather@spark:
>>> type: spark.SparkDataSet
>>> filepath: data/02_intermediate/data.parquet
>>> file_format: "delta"
>>>
>>> weather@delta:
>>> type: spark.DeltaTableDataSet
>>> filepath: data/02_intermediate/data.parquet

Example using Python API:
::

>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import (StructField, StringType,
>>> IntegerType, StructType)
>>>
>>> from kedro.extras.datasets.spark import DeltaTableDataSet, SparkDataSet
>>>
>>> schema = StructType([StructField("name", StringType(), True),
>>> StructField("age", IntegerType(), True)])
>>>
>>> data = [('Alex', 31), ('Bob', 12), ('Clarke', 65), ('Dave', 29)]
>>>
>>> spark_df = SparkSession.builder.getOrCreate().createDataFrame(data, schema)
>>>
>>> data_set = SparkDataSet(filepath="test_data", file_format="delta")
>>> data_set.save(spark_df)
>>> deltatable_dataset = DeltaTableDataSet(filepath="test_data")
>>> delta_table = deltatable_dataset.load()
>>>
>>> delta_table.update()
"""

# this dataset cannot be used with ``ParallelRunner``,
# therefore it has the attribute ``_SINGLE_PROCESS = True``
# for parallelism within a Spark pipeline please consider
# ``ThreadRunner`` instead
_SINGLE_PROCESS = True

def __init__(self, filepath: str, credentials: Dict[str, Any] = None) -> None:
"""Creates a new instance of ``DeltaTableDataSet``.

Args:
filepath: Filepath in POSIX format to a Spark dataframe. When using Databricks
and working with data written to mount path points,
specify ``filepath``s for (versioned) ``SparkDataSet``s
starting with ``/dbfs/mnt``.
credentials: Credentials to access the S3 bucket, such as
``key``, ``secret``, if ``filepath`` prefix is ``s3a://`` or ``s3n://``.
Optional keyword arguments passed to ``hdfs.client.InsecureClient``
if ``filepath`` prefix is ``hdfs://``. Ignored otherwise.
"""
credentials = deepcopy(credentials) or {} # do we need these anywhere??
Comment on lines +75 to +80
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that we don't pass credentials to neither SparkHiveDataSet nor SparkJDBCDataSet. We only use them in SparkDataSet to be able to do versioning later (we instantiate the filesystem and get the glob_function and exist_function). Can I assume that configuration for DeltaTable will be done separately in a spark.yml?

fs_prefix, filepath = _split_filepath(filepath)

self._fs_prefix = fs_prefix
self._filepath = PurePosixPath(filepath)

@staticmethod
def _get_spark():
return SparkSession.builder.getOrCreate()

def _load(self) -> DeltaTable:
load_path = self._fs_prefix + str(self._filepath)
return DeltaTable.forPath(self._get_spark(), load_path)

def _save(self, data: Any) -> None:
logger.info(
"Saving was performed on `DeltaTable` object within the context of the node function"
)
# raise DataSetError(f"{self.__class__.__name__} is a read only dataset type")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm tempted to turn into into an actual error instead of a log message. I think that's more consistent with what we have on other datasets (e.g. APIDataSet) and also signals to the users that saving doesn't actually do anything (the operation is done in the node already).


def _exists(self) -> bool:
load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._filepath))

try:
self._get_spark().read.load(path=load_path, format="delta")
except AnalysisException as exception:
if "is not a Delta table" in exception.desc:
return False
raise

return True

def _describe(self):
return dict(filepath=str(self._filepath), fs_prefix=self._fs_prefix)
29 changes: 24 additions & 5 deletions kedro/extras/datasets/spark/spark_dataset.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""``AbstractDataSet`` implementation to access Spark dataframes using
"""``AbstractVersionedDataSet`` implementation to access Spark dataframes using
``pyspark``
"""
from copy import deepcopy
Expand All @@ -13,7 +13,7 @@
from pyspark.sql.utils import AnalysisException
from s3fs import S3FileSystem

from kedro.io.core import AbstractVersionedDataSet, Version
from kedro.io.core import AbstractVersionedDataSet, DataSetError, Version


def _parse_glob_pattern(pattern: str) -> str:
Expand Down Expand Up @@ -223,7 +223,7 @@ def __init__( # pylint: disable=too-many-arguments
starting with ``/dbfs/mnt``.
file_format: File format used during load and save
operations. These are formats supported by the running
SparkContext include parquet, csv. For a list of supported
SparkContext include parquet, csv, delta. For a list of supported
formats please refer to Apache Spark documentation at
https://spark.apache.org/docs/latest/sql-programming-guide.html
load_args: Load args passed to Spark DataFrameReader load method.
Expand Down Expand Up @@ -304,9 +304,13 @@ def __init__( # pylint: disable=too-many-arguments
if save_args is not None:
self._save_args.update(save_args)

### would they be relevant on load_args / on read as well?
self._dfwriter_options = self._save_args.pop("dfwriter_options", {}) or {}
self._file_format = file_format
self._fs_prefix = fs_prefix

self._handle_delta_format()

def _describe(self) -> Dict[str, Any]:
return dict(
filepath=self._fs_prefix + str(self._filepath),
Expand All @@ -329,15 +333,30 @@ def _load(self) -> DataFrame:

def _save(self, data: DataFrame) -> None:
save_path = _strip_dbfs_prefix(self._fs_prefix + str(self._get_save_path()))
data.write.save(save_path, self._file_format, **self._save_args)
data.write.options(**self._dfwriter_options).save(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've discovered .options on DataFrameWriter which does the .option().option().. thing, so I picked this as it's cleaner. Previously in the PR we were doing the chaining ourselves.

save_path, self._file_format, **self._save_args
)

def _exists(self) -> bool:
load_path = _strip_dbfs_prefix(self._fs_prefix + str(self._get_load_path()))

try:
self._get_spark().read.load(load_path, self._file_format)
except AnalysisException as exception:
if exception.desc.startswith("Path does not exist:"):
if (
exception.desc.startswith("Path does not exist:")
or "is not a Delta table" in exception.desc
):
return False
raise
return True

def _handle_delta_format(self) -> None:
unsupported_modes = {"merge", "delete", "update"}
write_mode = self._save_args.get("mode") or ""
if self._file_format == "delta" and write_mode.lower() in unsupported_modes:
raise DataSetError(
f"It is not possible to perform `save()` for file format `delta` "
f"with mode `{write_mode}` on `SparkDataSet`. "
f"Please use `spark.DeltaTableDataSet` instead."
)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def _collect_requirements(requires):
"spark.SparkDataSet": [SPARK, HDFS, S3FS],
"spark.SparkHiveDataSet": [SPARK, HDFS, S3FS],
"spark.SparkJDBCDataSet": [SPARK, HDFS, S3FS],
"spark.DeltaTableDataSet": [SPARK, HDFS, S3FS, "delta-spark~=1.0"],
}
tensorflow_required = {
"tensorflow.TensorflowModelDataset": [
Expand Down
2 changes: 2 additions & 0 deletions test_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ blacken-docs==1.9.2
compress-pickle[lz4]~=1.2.0
dask>=2021.10.0, <2022.01; python_version > '3.6' # not directly required, pinned by Snyk to avoid a vulnerability
dask[complete]~=2.6; python_version == '3.6'
delta-spark~=1.0
dill~=0.3.1
filelock>=3.4.0, <4.0
gcsfs>=2021.04, <2022.01 # Upper bound set arbitrarily, to be reassessed in early 2022
geopandas>=0.6.0, <1.0
hdfs>=2.5.8, <3.0
Expand Down
Loading