Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Optimize image download component #288

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2683b93
optimize image download component
PhilippeMoussalli Jul 11, 2023
78a60c2
Merge branch 'main' into improve-imagedownload
PhilippeMoussalli Jul 11, 2023
1b7521c
add dask diagnostics to fondant
PhilippeMoussalli Jul 17, 2023
1c53aa1
add repartitioning strategy to fondant
PhilippeMoussalli Jul 18, 2023
897f5f8
remove changes from component
PhilippeMoussalli Jul 18, 2023
0c48233
Merge branch 'main' into improve-imagedownload
PhilippeMoussalli Jul 18, 2023
2edcd91
change save path diagnostics
PhilippeMoussalli Jul 19, 2023
d63eed7
add client to dataIO
PhilippeMoussalli Jul 19, 2023
51cadfe
remove progressbar
PhilippeMoussalli Jul 19, 2023
678b3e1
move client
PhilippeMoussalli Jul 19, 2023
44d5493
move dataframe visualization
PhilippeMoussalli Jul 19, 2023
592f3ae
add bokeh to dependencies
PhilippeMoussalli Jul 19, 2023
5c56ddf
add bokeh to dependencies
PhilippeMoussalli Jul 19, 2023
8e575c7
add bokeh to dependencies
PhilippeMoussalli Jul 19, 2023
0d1e76b
move client to execute
PhilippeMoussalli Jul 19, 2023
43132f5
silence logs
PhilippeMoussalli Jul 19, 2023
7f38c31
remove log supression
PhilippeMoussalli Jul 19, 2023
d764ad4
add log silencing to cluster
PhilippeMoussalli Jul 19, 2023
d50cc6b
add worker log filter
PhilippeMoussalli Jul 19, 2023
9e1eb31
supress logs
PhilippeMoussalli Jul 19, 2023
f344ea8
supress logs
PhilippeMoussalli Jul 19, 2023
08b74cc
add output_partition_size as an argument and revert diagnosis
PhilippeMoussalli Jul 19, 2023
df19ad3
remove output_partition from user arguments
PhilippeMoussalli Jul 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM --platform=linux/amd64 python:3.8-slim
## System dependencies
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install git -y
apt-get install git graphviz -y

# install requirements
COPY requirements.txt /
Expand All @@ -12,7 +12,7 @@ RUN pip3 install --no-cache-dir -r requirements.txt

# Install Fondant
# This is split from other requirements to leverage caching
ARG FONDANT_VERSION=main
ARG FONDANT_VERSION=d764ad42887b628883fe916a8b0605a13c6b08e0
RUN pip3 install fondant[aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION}

# Set the working directory to the component folder
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ classifiers = [

[tool.poetry.dependencies]
python = ">= 3.8"
dask = {extras = ["dataframe"], version = ">= 2023.4.1"}
dask = {extras = ["dataframe", "distributed"], version = ">= 2023.4.1"}
graphviz = ">= 0.20.1"
bokeh = ">= 3.1.1"
importlib-resources = { version = ">= 1.3", python = "<3.9" }
jsonschema = ">= 4.18"
pyarrow = ">= 11.0.0"
Expand Down
14 changes: 7 additions & 7 deletions src/fondant/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ def transform(self, dataframe: dd.DataFrame) -> dd.DataFrame:
raise NotImplementedError


class DaskWriteComponent(BaseComponent):
"""Component that accepts a Dask DataFrame and writes its contents."""

def write(self, dataframe: dd.DataFrame) -> None:
raise NotImplementedError


class PandasTransformComponent(BaseComponent):
"""Component that transforms the incoming dataset partition per partition as a pandas
DataFrame.
Expand All @@ -57,12 +64,5 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
raise NotImplementedError


class DaskWriteComponent(BaseComponent):
"""Component that accepts a Dask DataFrame and writes its contents."""

def write(self, dataframe: dd.DataFrame) -> None:
raise NotImplementedError


Component = t.TypeVar("Component", bound=BaseComponent)
"""Component type which can represents any of the subclasses of BaseComponent"""
9 changes: 9 additions & 0 deletions src/fondant/component_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,13 @@ def from_fondant_component_spec(
"type": "JsonObject",
"default": "None",
},
{
"name": "output_partition_size",
"description": "The size of the output partition size, defaults"
" to 250MB. Set to None to disable partitioning the output",
"type": "String",
"default": "250MB",
},
*(
{
"name": arg.name,
Expand Down Expand Up @@ -285,6 +292,8 @@ def from_fondant_component_spec(
{"inputValue": "metadata"},
"--component_spec",
{"inputValue": "component_spec"},
"--output_partition_size",
{"inputValue": "output_partition_size"},
*cls._dump_args(fondant_component.args.values()),
"--output_manifest_path",
{"outputPath": "output_manifest_path"},
Expand Down
56 changes: 56 additions & 0 deletions src/fondant/data_io.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
import typing as t

import dask.dataframe as dd
Expand All @@ -14,9 +15,37 @@ class DataIO:
def __init__(self, *, manifest: Manifest, component_spec: ComponentSpec) -> None:
self.manifest = manifest
self.component_spec = component_spec
self.diagnostics_path = (
f"{self.manifest.base_path}/" f"{self.manifest.component_id}"
)


class DaskDataLoader(DataIO):
def __init__(self, *, manifest: Manifest, component_spec: ComponentSpec):
super().__init__(manifest=manifest, component_spec=component_spec)

@staticmethod
def partition_loaded_dataframe(dataframe: dd.DataFrame) -> dd.DataFrame:
"""
Function that partitions the loaded dataframe depending on its partitions and the available
workers
Returns:
The partitioned dataframe.
"""
n_partitions = dataframe.npartitions
n_workers = os.cpu_count()
logger.info(
f"The number of partitions of the input dataframe is {n_partitions}. The "
f"available number of workers is {n_workers}.",
)
if n_partitions < n_workers:
dataframe = dataframe.repartition(npartitions=n_partitions)
logger.info(
"Repartitioning the data before transforming to maximize worker usage",
)

return dataframe

def _load_subset(self, subset_name: str, fields: t.List[str]) -> dd.DataFrame:
"""
Function that loads a subset from the manifest as a Dask dataframe.
Expand Down Expand Up @@ -80,12 +109,37 @@ def load_dataframe(self) -> dd.DataFrame:
how="left",
)

dataframe = self.partition_loaded_dataframe(dataframe)

logging.info(f"Columns of dataframe: {list(dataframe.columns)}")

return dataframe


class DaskDataWriter(DataIO):
def __init__(
self,
*,
manifest: Manifest,
component_spec: ComponentSpec,
output_partition_size: t.Optional[str] = None,
):
super().__init__(manifest=manifest, component_spec=component_spec)
self.output_partition_size = output_partition_size

def partition_written_dataframe(self, dataframe: dd.DataFrame) -> dd.DataFrame:
"""
Function that partitions the written dataframe to smaller partitions based on a given
partition size.
"""
if self.output_partition_size:
dataframe = dataframe.repartition(partition_size=self.output_partition_size)
logger.info(
f"repartitioning the written data such that the memory per partition is"
f" {self.output_partition_size}",
)
return dataframe

def write_dataframe(self, dataframe: dd.DataFrame) -> None:
write_tasks = []

Expand Down Expand Up @@ -159,6 +213,8 @@ def _write_subset(

schema = {field.name: field.type.value for field in subset_spec.fields.values()}

dataframe = self.partition_written_dataframe(dataframe)

return self._create_write_task(dataframe, location=location, schema=schema)

@staticmethod
Expand Down
4 changes: 4 additions & 0 deletions src/fondant/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ class InvalidComponentSpec(ValidationError, FondantException):
"""Thrown when a component spec cannot be validated against the schema."""


class InvalidComponentOpDefinition(ValidationError, FondantException):
"""Thrown when a componentOp is invalid."""


class InvalidPipelineDefinition(ValidationError, FondantException):
"""Thrown when a pipeline definition is invalid."""

Expand Down
28 changes: 25 additions & 3 deletions src/fondant/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@

logger = logging.getLogger(__name__)

DASK_LOGS_TO_SUPRESS = [
"distributed.process",
"distributed.scheduler",
"distributed.nanny",
"distributed.worker",
"distributed.core",
"distributed.comm.tcp",
"distributed.batched",
]

for logger_name in DASK_LOGS_TO_SUPRESS:
logger = logging.getLogger(logger_name)
logger.setLevel(logging.WARNING)


class Executor(t.Generic[Component]):
"""An executor executes a Component."""
Expand All @@ -39,13 +53,15 @@ def __init__(
input_manifest_path: t.Union[str, Path],
output_manifest_path: t.Union[str, Path],
metadata: t.Dict[str, t.Any],
user_arguments: t.Dict[str, Argument],
user_arguments: t.Dict[str, t.Any],
output_partition_size: t.Optional[str] = "250MB",
) -> None:
self.spec = spec
self.input_manifest_path = input_manifest_path
self.output_manifest_path = output_manifest_path
self.metadata = metadata
self.user_arguments = user_arguments
self.output_partition_size = output_partition_size

@classmethod
def from_file(
Expand Down Expand Up @@ -85,7 +101,7 @@ def from_spec(cls, component_spec: ComponentSpec) -> "Executor":
input_manifest_path = args_dict.pop("input_manifest_path")
output_manifest_path = args_dict.pop("output_manifest_path")
metadata = args_dict.pop("metadata")

output_partition_size = args_dict.pop("output_partition_size")
metadata = json.loads(metadata) if metadata else {}

return cls(
Expand All @@ -94,6 +110,7 @@ def from_spec(cls, component_spec: ComponentSpec) -> "Executor":
output_manifest_path=output_manifest_path,
metadata=metadata,
user_arguments=args_dict,
output_partition_size=output_partition_size,
)

@classmethod
Expand Down Expand Up @@ -166,7 +183,12 @@ def _execute_component(

def _write_data(self, dataframe: dd.DataFrame, *, manifest: Manifest):
"""Create a data writer given a manifest and writes out the index and subsets."""
data_writer = DaskDataWriter(manifest=manifest, component_spec=self.spec)
data_writer = DaskDataWriter(
manifest=manifest,
component_spec=self.spec,
output_partition_size=self.output_partition_size,
)

data_writer.write_dataframe(dataframe)

def execute(self, component_cls: t.Type[Component]) -> None:
Expand Down
38 changes: 36 additions & 2 deletions src/fondant/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from importlib_resources import files # type: ignore

from fondant.component_spec import ComponentSpec
from fondant.exceptions import InvalidPipelineDefinition
from fondant.exceptions import InvalidComponentOpDefinition, InvalidPipelineDefinition
from fondant.import_utils import is_kfp_available
from fondant.manifest import Manifest

Expand All @@ -32,6 +32,8 @@ class ComponentOp:
Arguments:
component_dir: The path to the component directory.
arguments: A dictionary containing the argument name and value for the operation.
output_partition_size: the size of the output written dataset. Defaults to 250MB,
set to None to disable partitioning the output,
number_of_gpus: The number of gpus to assign to the operation
node_pool_name: The name of the node pool to which the operation will be assigned.
p_volumes: Collection of persistent volumes in a Kubernetes cluster. Keys are mount paths,
Expand All @@ -57,13 +59,15 @@ def __init__(
component_dir: t.Union[str, Path],
*,
arguments: t.Optional[t.Dict[str, t.Any]] = None,
output_partition_size: t.Optional[str] = "250MB",
number_of_gpus: t.Optional[int] = None,
node_pool_name: t.Optional[str] = None,
p_volumes: t.Optional[t.Dict[str, k8s_client.V1Volume]] = None,
ephemeral_storage_size: t.Optional[str] = None,
) -> None:
self.component_dir = Path(component_dir)
self.arguments = arguments or {}
self.output_partitioning_size = output_partition_size
self.arguments = self._set_arguments(arguments)

self.component_spec = ComponentSpec.from_file(
self.component_dir / self.COMPONENT_SPEC_NAME,
Expand All @@ -75,6 +79,36 @@ def __init__(
self.p_volumes = p_volumes
self.ephemeral_storage_size = ephemeral_storage_size

def _set_arguments(
self,
arguments: t.Optional[t.Dict[str, t.Any]],
) -> t.Dict[str, t.Any]:
"""Set component arguments based on provided arguments and relevant ComponentOp
parameters.
"""

def _validate_file_size(file_size):
# Define the regular expression pattern to match file size notations: KB, MB, GB or TB
pattern = r"^\d+(?:\.\d+)?(?:KB|MB|GB|TB)$"

# Use the re.match() function to check if the provided file_size matches the pattern
return bool(re.match(pattern, file_size, re.I))

arguments = arguments or {}

if self.output_partitioning_size is not None:
if not _validate_file_size(file_size=str(self.output_partitioning_size)):
msg = (
f"Invalid partition size defined `{self.output_partitioning_size}`,"
" partition size must be a string followed by a file size notation"
" e.g. ('250MB')"
)
raise InvalidComponentOpDefinition(msg)

arguments["output_partition_size"] = self.output_partitioning_size

return arguments

@property
def dockerfile_path(self) -> t.Optional[Path]:
path = self.component_dir / "Dockerfile"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ services:
- /foo/bar/first_component/manifest.json
- --storage_args
- a dummy string arg
- --output_partition_size
- 250MB
- --component_spec
- '{"name": "First component", "description": "This is an example component",
"image": "example_component:latest", "produces": {"images": {"fields": {"data":
Expand Down Expand Up @@ -48,6 +50,8 @@ services:
- a dummy string arg
- --some_list
- '[1, 2, 3]'
- --output_partition_size
- 10MB
- --component_spec
- '{"name": "Third component", "description": "This is an example component",
"image": "example_component:latest", "consumes": {"images": {"fields": {"data":
Expand All @@ -63,4 +67,4 @@ services:
second_component:
condition: service_completed_successfully
volumes: []
version: '3.8'
version: '3.8'
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ services:
- /foo/bar/first_component/manifest.json
- --storage_args
- a dummy string arg
- --output_partition_size
- 250MB
- --component_spec
- '{"name": "First component", "description": "This is an example component",
"image": "example_component:latest", "produces": {"images": {"fields": {"data":
Expand All @@ -26,6 +28,8 @@ services:
- '0'
- --padding
- '0'
- --output_partition_size
- 250MB
- --component_spec
- '{"name": "Image cropping", "description": "Component that removes single-colored
borders around images and crops them appropriately", "image": "ghcr.io/ml6team/image_cropping:dev",
Expand Down
9 changes: 8 additions & 1 deletion tests/example_specs/component_specs/kubeflow_component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ inputs:
description: The component specification as a dictionary
type: JsonObject
default: None
- name: output_partition_size
description: The size of the output partition size, defaults to 250MB. Set to
None to disable partitioning the output
type: String
default: 250MB
- name: storage_args
description: Storage arguments
type: String
Expand All @@ -30,7 +35,9 @@ implementation:
- inputValue: metadata
- --component_spec
- inputValue: component_spec
- --output_partition_size
- inputValue: output_partition_size
- --storage_args
- inputValue: storage_args
- --output_manifest_path
- outputPath: output_manifest_path
- outputPath: output_manifest_path
Loading