Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KED-1466] Sped up initialization of SparkHiveDataSet #281

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Documentation improvements.
* Updated contribution process in `CONTRIBUTING.md` - added Developer Workflow.
* Fixed a bug where `PartitionedDataSet` and `IncrementalDataSet` were not working with `s3a` or `s3n` protocol.
* Sped up initialization of `SparkHiveDataSet`

## Breaking changes to the API
* Made `invalidate_cache` method on datasets private.
Expand All @@ -16,7 +17,7 @@
* The `release()` method on datasets extending ``AbstractVersionedDataSet`` clears the cached load and save version. All custom datasets must call `super()._release()` inside `_release()`.

## Thanks for supporting contributions
[@foolsgold](https://github.com/foolsgold), [Mani Sarkar](https://github.com/neomatrix369), [Priyanka Shanbhag](https://github.com/priyanka1414), [Luis Blanche](https://github.com/LuisBlanche)
[@foolsgold](https://github.com/foolsgold), [Mani Sarkar](https://github.com/neomatrix369), [Priyanka Shanbhag](https://github.com/priyanka1414), [Luis Blanche](https://github.com/LuisBlanche), [Miguel Rodriguez Gutierrez](https://github.com/MigQ2)

# 0.15.8

Expand Down
39 changes: 21 additions & 18 deletions kedro/contrib/io/pyspark/spark_hive_data_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,23 +172,8 @@ def __init__(
if self._write_mode == "upsert" and not table_pk:
raise DataSetError("table_pk must be set to utilise upsert read mode")
self._table_pk = table_pk

self._table_columns = self._load().columns if self._exists() else None

if (
self._table_pk
and self._exists()
and set(self._table_pk) - set(self._table_columns)
):
raise DataSetError(
"columns [{colnames}] selected as PK not found in table {database}.{table}".format(
colnames=", ".join(
sorted(set(self._table_pk) - set(self._table_columns))
),
database=self._database,
table=self._table,
)
)
# self._table_columns is set up in _save() to speed up initialization
self._table_columns = [] # type: List[str]

@staticmethod
def _get_spark() -> SparkSession:
Expand Down Expand Up @@ -221,7 +206,25 @@ def _load(self) -> DataFrame:
)

def _save(self, data: DataFrame) -> None:
if not self._exists():
table_exists = self._exists()
self._table_columns = self._load().columns if table_exists else None

if (
self._table_pk
and table_exists
and set(self._table_pk) - set(self._table_columns)
):
raise DataSetError(
"columns [{colnames}] selected as PK not found in table {database}.{table}".format(
colnames=", ".join(
sorted(set(self._table_pk) - set(self._table_columns))
),
database=self._database,
table=self._table,
)
)

if not table_exists:
self._create_empty_hive_table(data)
self._table_columns = data.columns
self._validate_save(data)
Expand Down
38 changes: 20 additions & 18 deletions kedro/extras/datasets/spark/spark_hive_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,23 +174,8 @@ def __init__(
if self._write_mode == "upsert" and not table_pk:
raise DataSetError("table_pk must be set to utilise upsert read mode")
self._table_pk = table_pk

self._table_columns = self._load().columns if self._exists() else None

if (
self._table_pk
and self._exists()
and set(self._table_pk) - set(self._table_columns)
):
raise DataSetError(
"columns [{colnames}] selected as PK not found in table {database}.{table}".format(
colnames=", ".join(
sorted(set(self._table_pk) - set(self._table_columns))
),
database=self._database,
table=self._table,
)
)
# self._table_columns is set up in _save() to speed up initialization
self._table_columns = [] # type: List[str]

def _describe(self) -> Dict[str, Any]:
return dict(
Expand Down Expand Up @@ -232,7 +217,24 @@ def _load(self) -> DataFrame:
)

def _save(self, data: DataFrame) -> None:
if not self._exists():
table_exists = self._exists()
self._table_columns = self._load().columns if table_exists else None
if (
self._table_pk
and table_exists
and set(self._table_pk) - set(self._table_columns)
):
raise DataSetError(
"columns [{colnames}] selected as PK not found in table {database}.{table}".format(
colnames=", ".join(
sorted(set(self._table_pk) - set(self._table_columns))
),
database=self._database,
table=self._table,
)
)

if not table_exists:
self._create_empty_hive_table(data)
self._table_columns = data.columns
self._validate_save(data)
Expand Down
2 changes: 1 addition & 1 deletion tests/contrib/io/pyspark/test_spark_hive_data_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def test_invalid_pk_provided(self):
table="table_1",
write_mode="upsert",
table_pk=["column_doesnt_exist"],
)
).save(_generate_spark_df_one())

def test_invalid_write_mode_provided(self):
with pytest.raises(
Expand Down
2 changes: 1 addition & 1 deletion tests/extras/datasets/spark/test_spark_hive_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def test_invalid_pk_provided(self):
table="table_1",
write_mode="upsert",
table_pk=["column_doesnt_exist"],
)
).save(_generate_spark_df_one())

def test_invalid_write_mode_provided(self):
with pytest.raises(
Expand Down