Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce repartitioning #309

Merged
merged 37 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2683b93
optimize image download component
PhilippeMoussalli Jul 11, 2023
78a60c2
Merge branch 'main' into improve-imagedownload
PhilippeMoussalli Jul 11, 2023
1b7521c
add dask diagnostics to fondant
PhilippeMoussalli Jul 17, 2023
1c53aa1
add repartitioning strategy to fondant
PhilippeMoussalli Jul 18, 2023
897f5f8
remove changes from component
PhilippeMoussalli Jul 18, 2023
0c48233
Merge branch 'main' into improve-imagedownload
PhilippeMoussalli Jul 18, 2023
2edcd91
change save path diagnostics
PhilippeMoussalli Jul 19, 2023
d63eed7
add client to dataIO
PhilippeMoussalli Jul 19, 2023
51cadfe
remove progressbar
PhilippeMoussalli Jul 19, 2023
678b3e1
move client
PhilippeMoussalli Jul 19, 2023
44d5493
move dataframe visualization
PhilippeMoussalli Jul 19, 2023
592f3ae
add bokeh to dependencies
PhilippeMoussalli Jul 19, 2023
5c56ddf
add bokeh to dependencies
PhilippeMoussalli Jul 19, 2023
8e575c7
add bokeh to dependencies
PhilippeMoussalli Jul 19, 2023
0d1e76b
move client to execute
PhilippeMoussalli Jul 19, 2023
43132f5
silence logs
PhilippeMoussalli Jul 19, 2023
7f38c31
remove log supression
PhilippeMoussalli Jul 19, 2023
d764ad4
add log silencing to cluster
PhilippeMoussalli Jul 19, 2023
d50cc6b
add worker log filter
PhilippeMoussalli Jul 19, 2023
9e1eb31
supress logs
PhilippeMoussalli Jul 19, 2023
f344ea8
supress logs
PhilippeMoussalli Jul 19, 2023
08b74cc
add output_partition_size as an argument and revert diagnosis
PhilippeMoussalli Jul 19, 2023
df19ad3
remove output_partition from user arguments
PhilippeMoussalli Jul 19, 2023
09ef925
enable more control over partitioning
PhilippeMoussalli Jul 19, 2023
6b1e63b
fix test
PhilippeMoussalli Jul 19, 2023
57a8af4
Merge branch 'main' into introduce-repartioning
PhilippeMoussalli Jul 19, 2023
8715369
correct load partitioning
PhilippeMoussalli Jul 19, 2023
c15e24c
address PR feedback
PhilippeMoussalli Jul 24, 2023
9ff1e68
Adjust argument passing
PhilippeMoussalli Jul 24, 2023
4c8d482
debug
PhilippeMoussalli Jul 24, 2023
5bf3a50
Further debugging
PhilippeMoussalli Jul 24, 2023
18e1f79
Further debugging
PhilippeMoussalli Jul 24, 2023
b2bdecc
Revert debug and improve logging
PhilippeMoussalli Jul 24, 2023
951a786
Merge branch 'main' into introduce-repartioning
PhilippeMoussalli Jul 24, 2023
3394fcf
fix indentation
PhilippeMoussalli Jul 24, 2023
02561ab
fix tests
PhilippeMoussalli Jul 24, 2023
99746dc
add docs
PhilippeMoussalli Jul 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ RUN pip3 install --no-cache-dir -r requirements.txt

# Install Fondant
# This is split from other requirements to leverage caching
ARG FONDANT_VERSION=main
ARG FONDANT_VERSION=09ef9254fef5d382d7d60d97b66fa2ac1e0df7e0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be reverted before merging.

RUN pip3 install fondant[aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION}

# Set the working directory to the component folder
Expand Down
11 changes: 6 additions & 5 deletions examples/pipelines/controlnet-interior-design/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
generate_prompts_op = ComponentOp(
component_dir="components/generate_prompts",
arguments={"n_rows_to_load": None},
output_partition_size="disable",
)
laion_retrieval_op = ComponentOp.from_registry(
name="prompt_based_laion_retrieval",
Expand Down Expand Up @@ -72,8 +73,8 @@
pipeline = Pipeline(pipeline_name=pipeline_name, base_path=PipelineConfigs.BASE_PATH)

pipeline.add_op(generate_prompts_op)
pipeline.add_op(laion_retrieval_op, dependencies=generate_prompts_op)
pipeline.add_op(download_images_op, dependencies=laion_retrieval_op)
pipeline.add_op(caption_images_op, dependencies=download_images_op)
pipeline.add_op(segment_images_op, dependencies=caption_images_op)
pipeline.add_op(write_to_hub_controlnet, dependencies=segment_images_op)
# pipeline.add_op(laion_retrieval_op, dependencies=generate_prompts_op)
# pipeline.add_op(download_images_op, dependencies=laion_retrieval_op)
# pipeline.add_op(caption_images_op, dependencies=download_images_op)
# pipeline.add_op(segment_images_op, dependencies=caption_images_op)
# pipeline.add_op(write_to_hub_controlnet, dependencies=segment_images_op)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be reverted before merging.

14 changes: 7 additions & 7 deletions src/fondant/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ def transform(self, dataframe: dd.DataFrame) -> dd.DataFrame:
raise NotImplementedError


class DaskWriteComponent(BaseComponent):
"""Component that accepts a Dask DataFrame and writes its contents."""

def write(self, dataframe: dd.DataFrame) -> None:
raise NotImplementedError


Comment on lines +44 to +50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just moved in the file? I think both orders can be logical (Dask -> Pandas) or (Read -> Transform -> Write)

class PandasTransformComponent(BaseComponent):
"""Component that transforms the incoming dataset partition per partition as a pandas
DataFrame.
Expand All @@ -57,12 +64,5 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
raise NotImplementedError


class DaskWriteComponent(BaseComponent):
"""Component that accepts a Dask DataFrame and writes its contents."""

def write(self, dataframe: dd.DataFrame) -> None:
raise NotImplementedError


Component = t.TypeVar("Component", bound=BaseComponent)
"""Component type which can represents any of the subclasses of BaseComponent"""
9 changes: 9 additions & 0 deletions src/fondant/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,13 @@ def from_fondant_component_spec(
"type": "JsonObject",
"default": "None",
},
{
"name": "output_partition_size",
"description": "The size of the output partition size, defaults"
" to 250MB. Set to `disable` to disable the automatic partitioning",
"type": "String",
"default": "250MB",
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's the output partitioning we need to make dynamic, as this will only impact the following component. I think the user should be able to overwrite the input partitioning, so the partitions can be made small at the start and still fit in memory when the data grows.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I think it would be ideal if the user could specify it in rows instead of MB, but not sure if that's possible.

*(
{
"name": arg.name,
Expand Down Expand Up @@ -285,6 +292,8 @@ def from_fondant_component_spec(
{"inputValue": "metadata"},
"--component_spec",
{"inputValue": "component_spec"},
"--output_partition_size",
{"inputValue": "output_partition_size"},
*cls._dump_args(fondant_component.args.values()),
"--output_manifest_path",
{"outputPath": "output_manifest_path"},
Expand Down
53 changes: 53 additions & 0 deletions src/fondant/data_io.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
import typing as t

import dask.dataframe as dd
Expand All @@ -17,6 +18,31 @@ def __init__(self, *, manifest: Manifest, component_spec: ComponentSpec) -> None


class DaskDataLoader(DataIO):
def __init__(self, *, manifest: Manifest, component_spec: ComponentSpec):
super().__init__(manifest=manifest, component_spec=component_spec)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to overwrite this if yo're just calling super.


@staticmethod
def partition_loaded_dataframe(dataframe: dd.DataFrame) -> dd.DataFrame:
"""
Function that partitions the loaded dataframe depending on its partitions and the available
workers
Returns:
The partitioned dataframe.
"""
n_partitions = dataframe.npartitions
n_workers = os.cpu_count()
logger.info(
f"The number of partitions of the input dataframe is {n_partitions}. The "
f"available number of workers is {n_workers}.",
)
if n_partitions < n_workers:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would then become:

Suggested change
if n_partitions < n_workers:
if input_partition_size:
dataframe.repartition(partition_size=input_partition_size)
elif n_partitions < n_workers:

dataframe = dataframe.repartition(npartitions=n_workers)
logger.info(
"Repartitioning the data before transforming to maximize worker usage",
)

return dataframe

def _load_subset(self, subset_name: str, fields: t.List[str]) -> dd.DataFrame:
"""
Function that loads a subset from the manifest as a Dask dataframe.
Expand Down Expand Up @@ -80,12 +106,37 @@ def load_dataframe(self) -> dd.DataFrame:
how="left",
)

dataframe = self.partition_loaded_dataframe(dataframe)

logging.info(f"Columns of dataframe: {list(dataframe.columns)}")

return dataframe


class DaskDataWriter(DataIO):
def __init__(
self,
*,
manifest: Manifest,
component_spec: ComponentSpec,
output_partition_size: t.Optional[str] = None,
):
super().__init__(manifest=manifest, component_spec=component_spec)
self.output_partition_size = output_partition_size

def partition_written_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame:
"""
Function that partitions the written dataframe to smaller partitions based on a given
partition size.
"""
if self.output_partition_size and self.output_partition_size != "disable":
dataframe = dataframe.repartition(partition_size=self.output_partition_size)
logger.info(
f"Repartitioning the written data such that the size per partition is approx."
f" {self.output_partition_size}",
)
return dataframe

def write_dataframe(self, dataframe: dd.DataFrame) -> None:
write_tasks = []

Expand Down Expand Up @@ -159,6 +210,8 @@ def _write_subset(

schema = {field.name: field.type.value for field in subset_spec.fields.values()}

dataframe = self.partition_written_dataframe(dataframe)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this done on a subset level here? I would do it once on the dataframe level.


return self._create_write_task(dataframe, location=location, schema=schema)

@staticmethod
Expand Down
4 changes: 4 additions & 0 deletions src/fondant/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ class InvalidComponentSpec(ValidationError, FondantException):
"""Thrown when a component spec cannot be validated against the schema."""


class InvalidComponentOpDefinition(ValidationError, FondantException):
"""Thrown when a componentOp is invalid."""


class InvalidPipelineDefinition(ValidationError, FondantException):
"""Thrown when a pipeline definition is invalid."""

Expand Down
14 changes: 11 additions & 3 deletions src/fondant/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@ def __init__(
input_manifest_path: t.Union[str, Path],
output_manifest_path: t.Union[str, Path],
metadata: t.Dict[str, t.Any],
user_arguments: t.Dict[str, Argument],
user_arguments: t.Dict[str, t.Any],
output_partition_size: t.Optional[str] = "250MB",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be added to the parser as well, so it's extracted by from_args?

) -> None:
self.spec = spec
self.input_manifest_path = input_manifest_path
self.output_manifest_path = output_manifest_path
self.metadata = metadata
self.user_arguments = user_arguments
self.output_partition_size = output_partition_size

@classmethod
def from_file(
Expand Down Expand Up @@ -85,7 +87,7 @@ def from_spec(cls, component_spec: ComponentSpec) -> "Executor":
input_manifest_path = args_dict.pop("input_manifest_path")
output_manifest_path = args_dict.pop("output_manifest_path")
metadata = args_dict.pop("metadata")

output_partition_size = args_dict.pop("output_partition_size")
metadata = json.loads(metadata) if metadata else {}

return cls(
Expand All @@ -94,6 +96,7 @@ def from_spec(cls, component_spec: ComponentSpec) -> "Executor":
output_manifest_path=output_manifest_path,
metadata=metadata,
user_arguments=args_dict,
output_partition_size=output_partition_size,
)

@classmethod
Expand Down Expand Up @@ -166,7 +169,12 @@ def _execute_component(

def _write_data(self, dataframe: dd.DataFrame, *, manifest: Manifest):
"""Create a data writer given a manifest and writes out the index and subsets."""
data_writer = DaskDataWriter(manifest=manifest, component_spec=self.spec)
data_writer = DaskDataWriter(
manifest=manifest,
component_spec=self.spec,
output_partition_size=self.output_partition_size,
)

data_writer.write_dataframe(dataframe)

def execute(self, component_cls: t.Type[Component]) -> None:
Expand Down
40 changes: 38 additions & 2 deletions src/fondant/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from importlib_resources import files # type: ignore

from fondant.component_spec import ComponentSpec
from fondant.exceptions import InvalidPipelineDefinition
from fondant.exceptions import InvalidComponentOpDefinition, InvalidPipelineDefinition
from fondant.import_utils import is_kfp_available
from fondant.manifest import Manifest

Expand All @@ -32,6 +32,8 @@ class ComponentOp:
Arguments:
component_dir: The path to the component directory.
arguments: A dictionary containing the argument name and value for the operation.
output_partition_size: the size of the output written dataset. Defaults to 250MB,
set to "disable" to disable automatic repartitioning of the output,
number_of_gpus: The number of gpus to assign to the operation
node_pool_name: The name of the node pool to which the operation will be assigned.
p_volumes: Collection of persistent volumes in a Kubernetes cluster. Keys are mount paths,
Expand All @@ -57,13 +59,15 @@ def __init__(
component_dir: t.Union[str, Path],
*,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
output_partition_size: t.Optional[str] = "250MB",
number_of_gpus: t.Optional[int] = None,
node_pool_name: t.Optional[str] = None,
p_volumes: t.Optional[t.Dict[str, k8s_client.V1Volume]] = None,
ephemeral_storage_size: t.Optional[str] = None,
) -> None:
self.component_dir = Path(component_dir)
self.arguments = arguments or {}
self.output_partitioning_size = output_partition_size
self.arguments = self._set_arguments(arguments)

self.component_spec = ComponentSpec.from_file(
self.component_dir / self.COMPONENT_SPEC_NAME,
Expand All @@ -75,6 +79,38 @@ def __init__(
self.p_volumes = p_volumes
self.ephemeral_storage_size = ephemeral_storage_size

def _set_arguments(
self,
arguments: t.Optional[t.Dict[str, t.Any]],
) -> t.Dict[str, t.Any]:
"""Set component arguments based on provided arguments and relevant ComponentOp
parameters.
"""

def _validate_partition_size_arg(file_size):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this as a type when registering it on the parser in the Executor class?

# Define the regular expression pattern to match file size notations: KB, MB, GB or TB
pattern = r"^(?:\d+(?:\.\d+)?(?:KB|MB|GB|TB)|disable)$"

# Use the re.match() function to check if the provided file_size matches the pattern
return bool(re.match(pattern, file_size, re.I))

arguments = arguments or {}

if self.output_partitioning_size is not None:
if not _validate_partition_size_arg(
file_size=str(self.output_partitioning_size),
):
msg = (
f"Invalid partition size defined `{self.output_partitioning_size}`,"
" partition size must be a string followed by a file size notation"
" e.g. ('250MB') or 'disable' to disable the automatic partitioning"
)
raise InvalidComponentOpDefinition(msg)

arguments["output_partition_size"] = self.output_partitioning_size

return arguments

@property
def dockerfile_path(self) -> t.Optional[Path]:
path = self.component_dir / "Dockerfile"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ services:
- /foo/bar/first_component/manifest.json
- --storage_args
- a dummy string arg
- --output_partition_size
- 250MB
- --component_spec
- '{"name": "First component", "description": "This is an example component",
"image": "example_component:latest", "produces": {"images": {"fields": {"data":
Expand Down Expand Up @@ -48,6 +50,8 @@ services:
- a dummy string arg
- --some_list
- '[1, 2, 3]'
- --output_partition_size
- 10MB
- --component_spec
- '{"name": "Third component", "description": "This is an example component",
"image": "example_component:latest", "consumes": {"images": {"fields": {"data":
Expand All @@ -63,4 +67,4 @@ services:
second_component:
condition: service_completed_successfully
volumes: []
version: '3.8'
version: '3.8'
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ services:
- /foo/bar/first_component/manifest.json
- --storage_args
- a dummy string arg
- --output_partition_size
- 250MB
- --component_spec
- '{"name": "First component", "description": "This is an example component",
"image": "example_component:latest", "produces": {"images": {"fields": {"data":
Expand All @@ -26,6 +28,8 @@ services:
- '0'
- --padding
- '0'
- --output_partition_size
- 250MB
- --component_spec
- '{"name": "Image cropping", "description": "Component that removes single-colored
borders around images and crops them appropriately", "image": "ghcr.io/ml6team/image_cropping:dev",
Expand Down
9 changes: 8 additions & 1 deletion tests/example_specs/component_specs/kubeflow_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ inputs:
description: The component specification as a dictionary
type: JsonObject
default: None
- name: output_partition_size
description: The size of the output partition size, defaults to 250MB. Set to
`disable` to disable the automatic partitioning
type: String
default: 250MB
- name: storage_args
description: Storage arguments
type: String
Expand All @@ -30,7 +35,9 @@ implementation:
- inputValue: metadata
- --component_spec
- inputValue: component_spec
- --output_partition_size
- inputValue: output_partition_size
- --storage_args
- inputValue: storage_args
- --output_manifest_path
- outputPath: output_manifest_path
- outputPath: output_manifest_path
3 changes: 3 additions & 0 deletions tests/test_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@
ComponentOp(
Path(COMPONENTS_PATH / "example_1" / "first_component"),
arguments={"storage_args": "a dummy string arg"},
output_partition_size="250MB",
),
ComponentOp(
Path(COMPONENTS_PATH / "example_1" / "second_component"),
arguments={"storage_args": "a dummy string arg"},
output_partition_size=None,
),
ComponentOp(
Path(COMPONENTS_PATH / "example_1" / "fourth_component"),
arguments={
"storage_args": "a dummy string arg",
"some_list": [1, 2, 3],
},
output_partition_size="10MB",
),
],
),
Expand Down
Loading