Skip to content

Commit

Permalink
Merge branch 'main' into enhancement/delta_exist_function_should_not_…
Browse files Browse the repository at this point in the history
…log_error
  • Loading branch information
dannymeijer authored Oct 4, 2024
2 parents 3ef2bf7 + 4a6a1d7 commit 5764890
Showing 1 changed file with 15 additions and 1 deletion.
16 changes: 15 additions & 1 deletion src/koheesio/integrations/spark/tableau/hyper.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ class HyperFileWriter(HyperFile):
"""

path: PurePath = Field(
default=TemporaryDirectory().name, description="Path to the Hyper file", examples=["PurePath(/tmp/hyper/)"]
default=TemporaryDirectory().name,
description="Path to the Hyper file, if executing in Databricks "
"set the path manually and ensure to specify the scheme `dbfs:/`.",
examples=["PurePath(/tmp/hyper/)", "PurePath(dbfs:/tmp/hyper/)"],
)
name: str = Field(default="extract", description="Name of the Hyper file")
table_definition: TableDefinition = Field(
Expand Down Expand Up @@ -316,6 +319,13 @@ class HyperFileDataFrameWriter(HyperFileWriter):
name="test",
).execute()
# or in Databricks
hw = HyperFileDataFrameWriter(
df=spark.createDataFrame([(1, "foo"), (2, "bar")], ["id", "name"]),
name="test",
path="dbfs:/tmp/hyper/",
).execute()
# do somthing with returned file path
hw.hyper_path
```
Expand Down Expand Up @@ -436,6 +446,10 @@ def write_parquet(self):
.parquet(_path.as_posix())
)

if _path.as_posix().startswith("dbfs:"):
_path = PurePath(_path.as_posix().replace("dbfs:", "/dbfs"))
self.log.debug("Parquet location on DBFS: %s}", _path)

for _, _, files in os.walk(_path):
for file in files:
if file.endswith(".parquet"):
Expand Down

0 comments on commit 5764890

Please sign in to comment.