Skip to content

Commit

Permalink
Add build for local components (#207)
Browse files Browse the repository at this point in the history
  • Loading branch information
GeorgesLorre authored Jun 15, 2023
1 parent 8db3927 commit 1be49dc
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 27 deletions.
7 changes: 6 additions & 1 deletion docs/pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ and use it to compile and run the pipeline with the `compile_and_run()` method.

### Docker-Compose

This compiler and runner is mainly aimed at local development and quick iterations, there is no scaling so using small slices of your data is advised.


The DockerCompiler will take your pipeline and create a docker-compose.yml file where every component is added as a service with the correct dependencies by leveraging the `depends_on` functionality and the `service_completed_successfully` status. See the basic example below:

```yaml
Expand All @@ -79,9 +82,11 @@ services:
depends_on:
component_2:
condition: service_completed_successfully
image: component_3:latest
build: ./component_3
```
Note that for components that do not come from the registry (local custom components) the compiler will add a build subsection instead (see component_3 in the example above) of referring to the image specified in the `component_spec.yaml`. This allows docker-compose to build and rebuild the container used in that component allowing for quicker iteration.

In order to compile your pipeline to a `docker-compose` spec you need to import the `DockerCompiler`

```python
Expand Down
11 changes: 9 additions & 2 deletions fondant/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def compile(
self, pipeline: Pipeline, output_path: str = "docker-compose.yml"
) -> None:
"""Compile a pipeline to docker-compose spec and save it to a specified output path."""
logger.info(f"Compiling {pipeline.name} to docker-compose.yml")
logger.info(f"Compiling {pipeline.name} to {output_path}")
spec = self._generate_spec(pipeline=pipeline)
with open(output_path, "w") as outfile:
yaml.safe_dump(spec, outfile)
Expand Down Expand Up @@ -126,10 +126,17 @@ def _generate_spec(self, pipeline: Pipeline) -> dict:
volumes = [asdict(volume)] if volume else []

services[safe_component_name] = {
"image": component_op.component_spec.image,
"command": command,
"depends_on": depends_on,
"volumes": volumes,
}

if component_op.local_component:
services[safe_component_name][
"build"
] = f"./{Path(component_op.component_spec_path).parent}"
else:
services[safe_component_name][
"image"
] = component_op.component_spec.image
return {"version": "3.8", "services": services}
3 changes: 3 additions & 0 deletions fondant/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class ComponentOp:
def __init__(
self,
component_spec_path: t.Union[str, Path],
local_component: bool = True,
*,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
number_of_gpus: t.Optional[int] = None,
Expand All @@ -59,6 +60,7 @@ def __init__(
ephemeral_storage_size: t.Optional[str] = None,
):
self.component_spec_path = component_spec_path
self.local_component = local_component
self.arguments = arguments or {}
self.number_of_gpus = number_of_gpus
self.node_pool_name = node_pool_name
Expand Down Expand Up @@ -103,6 +105,7 @@ def from_registry(

return ComponentOp(
component_spec_path,
local_component=False,
arguments=arguments,
number_of_gpus=number_of_gpus,
node_pool_name=node_pool_name,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
services:
first_component:
build: ./tests/example_pipelines/valid_pipeline/example_1
command:
- --metadata
- '{"run_id": "test_pipeline", "base_path": "/foo/bar"}'
Expand All @@ -8,9 +9,9 @@ services:
- --storage_args
- a dummy string arg
depends_on: {}
image: example_component:latest
volumes: []
second_component:
build: ./tests/example_pipelines/valid_pipeline/example_1
command:
- --metadata
- '{"run_id": "test_pipeline", "base_path": "/foo/bar"}'
Expand All @@ -23,9 +24,9 @@ services:
depends_on:
first_component:
condition: service_completed_successfully
image: example_component:latest
volumes: []
third_component:
build: ./tests/example_pipelines/valid_pipeline/example_1
command:
- --metadata
- '{"run_id": "test_pipeline", "base_path": "/foo/bar"}'
Expand All @@ -38,6 +39,5 @@ services:
depends_on:
second_component:
condition: service_completed_successfully
image: example_component:latest
volumes: []
version: '3.8'
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
services:
first_component:
build: ./tests/example_pipelines/valid_pipeline/example_1
command:
- --metadata
- '{"run_id": "test_pipeline", "base_path": "/foo/bar"}'
- --output_manifest_path
- /foo/bar/manifest.txt
- --storage_args
- a dummy string arg
depends_on: {}
volumes: []
image_cropping:
command:
- --metadata
- '{"run_id": "test_pipeline", "base_path": "/foo/bar"}'
- --output_manifest_path
- /foo/bar/manifest.txt
- --cropping_threshold
- '0'
- --padding
- '0'
- --input_manifest_path
- /foo/bar/manifest.txt
depends_on:
first_component:
condition: service_completed_successfully
image: ghcr.io/ml6team/image_cropping:latest
volumes: []
version: '3.8'
64 changes: 43 additions & 21 deletions tests/test_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,63 +6,84 @@
from fondant.compiler import DockerCompiler
from fondant.pipeline import ComponentOp, Pipeline

COMPONENTS_PATH = Path(__file__).parent / "example_pipelines/valid_pipeline"
COMPONENTS_PATH = Path("./tests/example_pipelines/valid_pipeline")

VALID_DOCKER_PIPELINE = (
Path(__file__).parent / "example_pipelines/compiled_pipeline/docker-compose.yml"
)
VALID_DOCKER_PIPELINE = Path("./tests/example_pipelines/compiled_pipeline/")

TEST_PIPELINES = [
(
"example_1",
["first_component.yaml", "second_component.yaml", "third_component.yaml"],
[
ComponentOp(
Path(COMPONENTS_PATH / "example_1" / "first_component.yaml"),
arguments={"storage_args": "a dummy string arg"},
),
ComponentOp(
Path(COMPONENTS_PATH / "example_1" / "second_component.yaml"),
arguments={"storage_args": "a dummy string arg"},
),
ComponentOp(
Path(COMPONENTS_PATH / "example_1" / "third_component.yaml"),
arguments={"storage_args": "a dummy string arg"},
),
],
),
(
"example_2",
[
ComponentOp(
Path(COMPONENTS_PATH / "example_1" / "first_component.yaml"),
arguments={"storage_args": "a dummy string arg"},
),
ComponentOp.from_registry(
name="image_cropping", arguments={"cropping_threshold": 0, "padding": 0}
),
],
),
]


@pytest.fixture(params=TEST_PIPELINES)
def pipeline(request, tmp_path, monkeypatch):
def setup_pipeline(request, tmp_path, monkeypatch):
pipeline = Pipeline(
pipeline_name="test_pipeline",
pipeline_description="description of the test pipeline",
base_path="/foo/bar",
)
example_dir, component_specs = request.param

component_args = {"storage_args": "a dummy string arg"}
components_path = Path(COMPONENTS_PATH / example_dir)
example_dir, components = request.param

prev_comp = None
for component_spec in component_specs:
component_op = ComponentOp(
Path(components_path / component_spec), arguments=component_args
)
pipeline.add_op(component_op, dependencies=prev_comp)
prev_comp = component_op
for component in components:
pipeline.add_op(component, dependencies=prev_comp)
prev_comp = component

pipeline.compile()

# override the default package_path with temporary path to avoid the creation of artifacts
monkeypatch.setattr(pipeline, "package_path", str(tmp_path / "test_pipeline.tgz"))

return pipeline
return (example_dir, pipeline)


def test_docker_compiler(pipeline, tmp_path_factory):
def test_docker_compiler(setup_pipeline, tmp_path_factory):
"""Test compiling a pipeline to docker-compose."""
example_dir, pipeline = setup_pipeline
compiler = DockerCompiler()
with tmp_path_factory.mktemp("temp") as fn:
output_path = str(fn / "docker-compose.yml")
compiler.compile(pipeline=pipeline, output_path=output_path)
with open(output_path, "r") as src, open(VALID_DOCKER_PIPELINE, "r") as truth:
with open(output_path, "r") as src, open(
VALID_DOCKER_PIPELINE / example_dir / "docker-compose.yml", "r"
) as truth:
assert src.read() == truth.read()


def test_docker_local_path(pipeline, tmp_path_factory):
def test_docker_local_path(setup_pipeline, tmp_path_factory):
"""Test that a local path is applied correctly as a volume and in the arguments."""
# volumes are only create for local existing directories
with tmp_path_factory.mktemp("temp") as fn:
# this is the directory mounted in the container
_, pipeline = setup_pipeline
work_dir = f"/{fn.stem}"
pipeline.base_path = str(fn)
compiler = DockerCompiler()
Expand Down Expand Up @@ -90,8 +111,9 @@ def test_docker_local_path(pipeline, tmp_path_factory):
assert command in service["command"]


def test_docker_remote_path(pipeline, tmp_path_factory):
def test_docker_remote_path(setup_pipeline, tmp_path_factory):
"""Test that a remote path is applied correctly in the arguments and no volume."""
_, pipeline = setup_pipeline
remote_dir = "gs://somebucket/artifacts"
pipeline.base_path = remote_dir
compiler = DockerCompiler()
Expand Down

0 comments on commit 1be49dc

Please sign in to comment.