Skip to content

Commit

Permalink
Updated example
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <pingsutw@apache.org>
  • Loading branch information
pingsutw committed Dec 6, 2021
1 parent bcda2f6 commit 363684e
Showing 1 changed file with 29 additions and 6 deletions.
35 changes: 29 additions & 6 deletions cookbook/core/type_system/custom_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
This example shows how users can serialize custom JSON-compatible dataclasses between successive tasks using the
excellent `dataclasses_json <https://pypi.org/project/dataclasses-json/>`__ library
"""
import tempfile
import typing
from dataclasses import dataclass

Expand Down Expand Up @@ -76,11 +77,31 @@ def add(x: Datum, y: Datum) -> Datum:


@task
def create_result() -> Result:
schema = FlyteSchema[kwtypes(col1=str)]()
df = pd.DataFrame(data={"col1": ["a", "b", "c"]})
schema.open().write(df)
return Result(schema=schema, file=FlyteFile("s3://my-s3-bucket/key"), directory=FlyteDirectory("s3://my-s3-bucket"))
def upload_result() -> Result:
"""
Flytekit will upload the FlyteFile, FlyteDirectory, FlyteSchema to blob store (GCP, S3)
"""
df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]})
temp_dir = tempfile.mkdtemp(prefix="flyte-")

schema_path = temp_dir + "/schema.parquet"
df.to_parquet(schema_path)

file_path = tempfile.NamedTemporaryFile(delete=False)
file_path.write(b'Hello world!')
fs = Result(schema=FlyteSchema(temp_dir), file=FlyteFile(file_path.name), directory=FlyteDirectory(temp_dir))
return fs


@task
def download_result(res: Result):
"""
Flytekit will lazily load the FlyteFile, FlyteDirectory, FlyteSchema. We download the files only when users invoke
download() or open().
"""
assert pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}).equals(res.schema.open().all())
res.file.download()
res.directory.download()


# %%
Expand All @@ -90,7 +111,9 @@ def wf(x: int, y: int) -> (Datum, Result):
"""
Dataclasses (JSON) can be returned from a workflow as well.
"""
return add(x=stringify(x=x), y=stringify(x=y)), create_result()
res = upload_result()
download_result(res=res)
return add(x=stringify(x=x), y=stringify(x=y)), res


if __name__ == "__main__":
Expand Down

0 comments on commit 363684e

Please sign in to comment.