Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable caching for write components #813

Closed
PhilippeMoussalli opened this issue Jan 24, 2024 · 0 comments · Fixed by #814
Closed

Enable caching for write components #813

PhilippeMoussalli opened this issue Jan 24, 2024 · 0 comments · Fixed by #814

Comments

@PhilippeMoussalli
Copy link
Contributor

Write components do not write any new manifests since they don't alter the data. However, the cache estimation is based on checking for existing manifests based on the cache key. We need to somehow enable this either by writing a dummy manifest (propagating the previous one) or handling the logic separately.

Reproducible example:

import dask.dataframe as dd
import pandas as pd
import pyarrow as pa

from fondant.component import (
    DaskLoadComponent,
    DaskWriteComponent,
    PandasTransformComponent,
)
from fondant.pipeline import Pipeline, lightweight_component

pipeline = Pipeline(
    name="all-bio-is-just-chemistry",
    base_path="/Users/georgeslorre/ML6/internal/express/sweet-pipelines/pure/artifacts",
)


@lightweight_component(
    base_image="fndnt/fondant-base:dc8d9b208802ee85796238ff9a903c8bf66906e8-python3.8"
)
class CreateData(DaskLoadComponent):
    def load(self) -> dd.DataFrame:
        df = pd.DataFrame(
            {
                "x": [1, 2, 3],
                "y": [4, 5, 6],
            },
            index=pd.Index(["a", "b", "c"], name="id"),
        )
        return dd.from_pandas(df, npartitions=1)


dataset = pipeline.read(
    ref=CreateData,
    produces={"x": pa.int32(), "y": pa.int32()},
)


@lightweight_component(
    base_image="fndnt/fondant-base:dc8d9b208802ee85796238ff9a903c8bf66906e8-python3.8"
)
class FeatOne(PandasTransformComponent):
    def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
        dataframe["one"] = dataframe["x"].map(lambda x: x + 1)
        return dataframe


f1 = dataset.apply(
    ref=FeatOne,
    produces={"x": pa.int32(), "y": pa.int32(), "one": pa.int32()},
    consumes={"x": pa.int32(), "y": pa.int32()},
)


@lightweight_component(
    base_image="fndnt/fondant-base:dc8d9b208802ee85796238ff9a903c8bf66906e8-python3.8"
)
class WriteOut(DaskWriteComponent):
    def write(self, dataframe: dd.DataFrame) -> None:
        dataframe.to_parquet("./artifacts/out")


_ = f1.write(
    ref=WriteOut,
    consumes={"x": pa.int32(), "y": pa.int32(), "one": pa.int32()},
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

1 participant