Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to run lightweight python components in docker runner #786

Merged
merged 2 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions examples/sample_pipeline/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
# This file contains a sample pipeline. Loading data from a parquet file,
# using the load_from_parquet component, chain a custom dummy component, and use
# the reusable chunking component
import os
from pathlib import Path

import pandas as pd
import pyarrow as pa

from fondant.pipeline import Pipeline
from fondant.component import PandasTransformComponent
from fondant.pipeline import Pipeline, lightweight_component

BASE_PATH = Path("./.artifacts").resolve()
BASE_PATH.mkdir(parents=True, exist_ok=True)
Expand All @@ -32,8 +35,28 @@
"./components/dummy_component",
)

dataset.apply(
dataset = dataset.apply(
"chunk_text",
arguments={"chunk_size": 10, "chunk_overlap": 2},
consumes={"text": "text_data"},
)


@lightweight_component(
base_image="python:3.8",
extra_requires=[
f"fondant[component]@git+https://github.com/ml6team/fondant@"
f"{os.environ.get('FONDANT_VERSION', 'main')}",
mrchtr marked this conversation as resolved.
Show resolved Hide resolved
],
)
class CalculateChunkLength(PandasTransformComponent):
def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
dataframe["chunk_length"] = dataframe["text"].apply(len)
return dataframe


_ = dataset.apply(
ref=CalculateChunkLength,
consumes={"text": pa.string()},
produces={"chunk_length": pa.int32()},
)
2 changes: 2 additions & 0 deletions examples/sample_pipeline/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ trap cleanup EXIT
# Bind local data directory to pipeline
data_dir=$(readlink -f "data")

export FONDANT_VERSION=$GIT_HASH
mrchtr marked this conversation as resolved.
Show resolved Hide resolved

# Run pipeline
poetry run fondant run local pipeline.py \
--extra-volumes $data_dir:/data --build-arg FONDANT_VERSION=$GIT_HASH
36 changes: 36 additions & 0 deletions src/fondant/pipeline/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import shlex
import tempfile
import textwrap
import typing as t
from abc import ABC, abstractmethod
from dataclasses import asdict
Expand All @@ -16,6 +17,7 @@
from fondant.pipeline import (
VALID_ACCELERATOR_TYPES,
VALID_VERTEX_ACCELERATOR_TYPES,
Image,
Pipeline,
)

Expand Down Expand Up @@ -45,6 +47,37 @@ def log_unused_configurations(self, **kwargs):
f" for runner `{self.__class__.__name__}`.",
)

@staticmethod
def _build_entrypoint(image: Image) -> t.List[str]:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't really know how to test this without either replicating the logic or comparing against an unreadable string, so for now this is only tested via the integration test.

Open for ideas.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO comparing to a hard to read string is fine

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how valuable it is though, because I would just copy paste the generated string at this moment. It would be the old compiler tests all over again :)

"""Build the entrypoint to execute the provided image."""
if not image.script:
# Not a lightweight python component
return ["fondant", "execute", "main"]

command = ""
if image.extra_requires:
requirements = "\n".join(image.extra_requires)
command += textwrap.dedent(
f"""\
printf {shlex.quote(requirements)} > 'requirements.txt'
python3 -m pip install -r requirements.txt
""",
)

command += textwrap.dedent(
f"""\
printf {shlex.quote(image.script)} > 'main.py'
fondant execute main "$@"
""",
)

return [
"sh",
"-ec",
command,
"--", # All arguments provided after this will be passed to `fondant execute main`
]


class DockerCompiler(Compiler):
"""Compiler that creates a docker-compose spec from a pipeline."""
Expand Down Expand Up @@ -157,6 +190,8 @@ def _generate_spec(

logger.info(f"Compiling service for {component_name}")

entrypoint = self._build_entrypoint(component_op.image)

# add metadata argument to command
command = ["--metadata", metadata.to_json()]

Expand Down Expand Up @@ -204,6 +239,7 @@ def _generate_spec(
)

services[component_name] = {
"entrypoint": entrypoint,
"command": command,
"depends_on": depends_on,
"volumes": volumes,
Expand Down
66 changes: 60 additions & 6 deletions src/fondant/pipeline/lightweight_component.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
import inspect
import itertools
import textwrap
import typing as t
from dataclasses import dataclass
from functools import wraps

from fondant.component import Component


@dataclass
class Image:
base_image: str = "fondant:latest"
extra_requires: t.Optional[t.List[str]] = None
script: t.Optional[str] = None

def __post_init__(self):
if self.base_image is None:
# TODO: link to Fondant version
mrchtr marked this conversation as resolved.
Show resolved Hide resolved
self.base_image = "fondant:latest"


class PythonComponent:
@classmethod
Expand All @@ -23,12 +33,12 @@ def lightweight_component(
"""Decorator to enable a python component."""

def wrapper(cls):
kwargs = {}
if base_image:
kwargs["base_image"] = base_image
if extra_requires:
kwargs["extra_requires"] = extra_requires
image = Image(**kwargs)
script = build_python_script(cls)
image = Image(
base_image=base_image,
extra_requires=extra_requires,
script=script,
)

# updated=() is needed to prevent an attempt to update the class's __dict__
@wraps(cls, updated=())
Expand All @@ -40,3 +50,47 @@ def image(cls) -> Image:
return AppliedPythonComponent

return wrapper


def build_python_script(component_cls: t.Type[Component]) -> str:
"""Build a self-contained python script for the provided component class, which will act as
the `src/main.py` script to execute the component.
"""
imports_source = textwrap.dedent(
"""\
from typing import *
mrchtr marked this conversation as resolved.
Show resolved Hide resolved
import typing as t

import dask.dataframe as dd
import fondant
import pandas as pd
from fondant.component import *
from fondant.core import *
""",
)

component_source = inspect.getsource(component_cls)
component_source = textwrap.dedent(component_source)
component_source_lines = component_source.split("\n")

# Removing possible decorators (can be multiline) until the class
# definition is found
component_source_lines = list(
itertools.dropwhile(
lambda x: not x.startswith("class"),
component_source_lines,
),
)

if not component_source_lines:
msg = (
f'Failed to dedent and clean up the source of function "{component_cls.__name__}". '
f"Its probably not properly indented."
)
raise ValueError(
msg,
)

component_source = "\n".join(component_source_lines)

return "\n\n".join([imports_source, component_source])
11 changes: 5 additions & 6 deletions src/fondant/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,11 @@ def _configure_caching_from_image_tag(

return cache

def dockerfile_path(self, path: t.Union[str, Path]) -> t.Optional[Path]:
if self._is_custom_component(path):
component_dir = Path(path)
else:
component_dir = self._get_registry_path(str(path))
docker_path = component_dir / "Dockerfile"
@property
def dockerfile_path(self) -> t.Optional[Path]:
if not self.component_dir:
return None
docker_path = self.component_dir / "Dockerfile"
return docker_path if docker_path.exists() else None

@staticmethod
Expand Down
52 changes: 46 additions & 6 deletions tests/pipeline/test_python_component.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,62 @@
import textwrap

import dask.dataframe as dd
import pandas as pd
import pyarrow as pa
from fondant.component import DaskLoadComponent, PandasTransformComponent
from fondant.pipeline import Pipeline, lightweight_component


def test_lightweight_component():
pipeline = Pipeline(name="dummy-pipeline", base_path="./data")
def test_build_python_script():
@lightweight_component()
class CreateData(DaskLoadComponent):
def load(self) -> dd.DataFrame:
df = pd.DataFrame(
{
"x": [1, 2, 3],
"y": [4, 5, 6],
},
index=pd.Index(["a", "b", "c"], name="id"),
)
return dd.from_pandas(df, npartitions=1)

assert CreateData.image().script == textwrap.dedent(
"""\
from typing import *
import typing as t

import dask.dataframe as dd
import fondant
import pandas as pd
from fondant.component import *
from fondant.core import *


class CreateData(DaskLoadComponent):
def load(self) -> dd.DataFrame:
df = pd.DataFrame(
{
"x": [1, 2, 3],
"y": [4, 5, 6],
},
index=pd.Index(["a", "b", "c"], name="id"),
)
return dd.from_pandas(df, npartitions=1)
""",
)


def test_lightweight_component(tmp_path_factory):
pipeline = Pipeline(
name="dummy-pipeline",
base_path="/home/robbe/workspace/fondant/tests/pipeline/data",
RobbeSneyders marked this conversation as resolved.
Show resolved Hide resolved
)

@lightweight_component(
base_image="python:3.8-slim-buster",
extra_requires=["pandas", "dask"],
)
class CreateData(DaskLoadComponent):
def __init__(self, **kwargs):
pass

def load(self) -> dd.DataFrame:
df = pd.DataFrame(
{
Expand Down Expand Up @@ -44,5 +85,4 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
ref=AddN,
produces={"x": pa.int32(), "y": pa.int32()},
consumes={"x": pa.int32(), "y": pa.int32()},
arguments={"n": 1},
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to integrate #763 to support arguments.

)
Loading