Skip to content

Commit

Permalink
Create load from parquet (#474)
Browse files Browse the repository at this point in the history
PR that creates a generic component to load data stored as a parquet
from remote storage

Largely based on the load from hub component with some modification.
Currently needed for the datacomp pipeline.

Future modification could include loading data from local storage though
we'd need to make sure that the data is mounted locally
  • Loading branch information
PhilippeMoussalli authored Sep 28, 2023
1 parent c8bd3b7 commit 7d3b881
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 60 deletions.
23 changes: 23 additions & 0 deletions components/load_from_parquet/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM --platform=linux/amd64 python:3.8-slim

# System dependencies
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install git -y

# Install requirements
COPY requirements.txt /
RUN pip3 install --no-cache-dir -r requirements.txt

# Install Fondant
# This is split from other requirements to leverage caching
ARG FONDANT_VERSION=main
RUN pip3 install fondant[aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION}

# Set the working directory to the component folder
WORKDIR /component/src

# Copy over src-files
COPY src/ .

ENTRYPOINT ["fondant", "execute", "main"]
26 changes: 26 additions & 0 deletions components/load_from_parquet/fondant_component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: Load from parquet
description: Component that loads a dataset from a parquet uri
image: ghcr.io/ml6team/load_from_parquet:dev

produces:
dummy_variable: #TODO: fill in here
fields:
data:
type: binary

args:
dataset_uri:
description: The remote path to the parquet file/folder containing the dataset
type: str
column_name_mapping:
description: Mapping of the consumed dataset
type: dict
default: None
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: None
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
default: None
97 changes: 97 additions & 0 deletions components/load_from_parquet/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""This component loads a seed dataset from the hub."""
import logging
import typing as t

import dask
import dask.dataframe as dd
import pandas as pd
from fondant.component import DaskLoadComponent
from fondant.component_spec import ComponentSpec

logger = logging.getLogger(__name__)

dask.config.set({"dataframe.convert-string": False})


class LoadFromParquet(DaskLoadComponent):

def __init__(self,
spec: ComponentSpec,
*_,
dataset_uri: str,
column_name_mapping: t.Optional[dict],
n_rows_to_load: t.Optional[int],
index_column: t.Optional[str],
) -> None:
"""
Args:
spec: the component spec
dataset_uri: The remote path to the parquet file/folder containing the dataset
column_name_mapping: Mapping of the consumed dataset to fondant column names
n_rows_to_load: optional argument that defines the number of rows to load. Useful for
testing pipeline runs on a small scale.
index_column: Column to set index to in the load component, if not specified a default
globally unique index will be set.
"""
self.dataset_uri = dataset_uri
self.column_name_mapping = column_name_mapping
self.n_rows_to_load = n_rows_to_load
self.index_column = index_column
self.spec = spec

def load(self) -> dd.DataFrame:
# 1) Load data, read as Dask dataframe
logger.info("Loading dataset from the file...")
dask_df = dd.read_parquet(self.dataset_uri)

# 2) Rename columns
if self.column_name_mapping is not None:
logger.info("Renaming columns...")
dask_df = dask_df.rename(columns=self.column_name_mapping)

# 3) Optional: only return specific amount of rows
if self.n_rows_to_load is not None:
partitions_length = 0
npartitions = 1
for npartitions, partition in enumerate(dask_df.partitions, start=1):
if partitions_length >= self.n_rows_to_load:
logger.info(f"""Required number of partitions to load\n
{self.n_rows_to_load} is {npartitions}""")
break
partitions_length += len(partition)
dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions)
dask_df = dd.from_pandas(dask_df, npartitions=npartitions)

# 4) Set the index
if self.index_column is None:
logger.info(
"Index column not specified, setting a globally unique index",
)

def _set_unique_index(dataframe: pd.DataFrame, partition_info=None):
"""Function that sets a unique index based on the partition and row number."""
dataframe["id"] = 1
dataframe["id"] = (
str(partition_info["number"])
+ "_"
+ (dataframe.id.cumsum()).astype(str)
)
dataframe.index = dataframe.pop("id")
return dataframe

def _get_meta_df() -> pd.DataFrame:
meta_dict = {"id": pd.Series(dtype="object")}
for subset_name, subset in self.spec.produces.items():
for field_name, field in subset.fields.items():
meta_dict[f"{subset_name}_{field_name}"] = pd.Series(
dtype=pd.ArrowDtype(field.type.value),
)
return pd.DataFrame(meta_dict).set_index("id")

meta = _get_meta_df()
dask_df = dask_df.map_partitions(_set_unique_index, meta=meta)
else:
logger.info(f"Setting `{self.index_column}` as index")
dask_df = dask_df.set_index(self.index_column, drop=True)

return dask_df
119 changes: 59 additions & 60 deletions examples/pipelines/datacomp/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,71 +41,70 @@
"dataset_name": "nielsr/datacomp-small-with-text-embeddings",
"column_name_mapping": load_component_column_mapping,
"index_column": "uid",
"n_rows_to_load": 1000,
},
node_pool_label="node_pool",
node_pool_name="n2-standard-64-pool",
cache=False,
)
download_images_op = ComponentOp.from_registry(
name="download_images",
arguments={
"retries": 2,
"min_image_size": 0,
"max_aspect_ratio": float("inf"),
},
node_pool_label="node_pool",
node_pool_name="n2-standard-64-pool",
input_partition_rows=1000,
cache=False,
)
detect_text_op = ComponentOp(
component_dir="components/detect_text",
arguments={
"batch_size": 2,
},
node_pool_label="node_pool",
node_pool_name="model-inference-mega-pool",
number_of_gpus=1,
cache=False,
)
mask_images_op = ComponentOp(
component_dir="components/mask_images",
node_pool_label="node_pool",
node_pool_name="n2-standard-64-pool",
cache=False,
)
embed_images_op = ComponentOp.from_registry(
name="embed_images",
arguments={
"batch_size": 2,
},
node_pool_label="node_pool",
node_pool_name="model-inference-mega-pool",
number_of_gpus=1,
cache=False,
)
add_clip_score_op = ComponentOp(
component_dir="components/add_clip_score",
node_pool_label="node_pool",
node_pool_name="n2-standard-64-pool",
cache=False,
)
filter_clip_score_op = ComponentOp(
component_dir="components/filter_clip_score",
arguments={
"pct_threshold": 0.5,
# "n_rows_to_load": 1000,
},
node_pool_label="node_pool",
node_pool_name="n2-standard-64-pool",
)
# download_images_op = ComponentOp.from_registry(
# name="download_images",
# arguments={
# "retries": 2,
# "min_image_size": 0,
# "max_aspect_ratio": float("inf"),
# },
# node_pool_label="node_pool",
# node_pool_name="n2-standard-64-pool",
# input_partition_rows=1000,
# cache=False,
# )
# detect_text_op = ComponentOp(
# component_dir="components/detect_text",
# arguments={
# "batch_size": 2,
# },
# node_pool_label="node_pool",
# node_pool_name="model-inference-mega-pool",
# number_of_gpus=1,
# cache=False,
# )
# mask_images_op = ComponentOp(
# component_dir="components/mask_images",
# node_pool_label="node_pool",
# node_pool_name="n2-standard-64-pool",
# cache=False,
# )
# embed_images_op = ComponentOp.from_registry(
# name="embed_images",
# arguments={
# "batch_size": 2,
# },
# node_pool_label="node_pool",
# node_pool_name="model-inference-mega-pool",
# number_of_gpus=1,
# cache=False,
# )
# add_clip_score_op = ComponentOp(
# component_dir="components/add_clip_score",
# node_pool_label="node_pool",
# node_pool_name="n2-standard-64-pool",
# cache=False,
# )
# filter_clip_score_op = ComponentOp(
# component_dir="components/filter_clip_score",
# arguments={
# "pct_threshold": 0.5,
# },
# node_pool_label="node_pool",
# node_pool_name="n2-standard-64-pool",
# )


# add ops to pipeline
pipeline.add_op(load_from_hub_op)
pipeline.add_op(download_images_op, dependencies=load_from_hub_op)
pipeline.add_op(detect_text_op, dependencies=download_images_op)
pipeline.add_op(mask_images_op, dependencies=detect_text_op)
pipeline.add_op(embed_images_op, dependencies=mask_images_op)
pipeline.add_op(add_clip_score_op, dependencies=embed_images_op)
pipeline.add_op(filter_clip_score_op, dependencies=add_clip_score_op)
# pipeline.add_op(download_images_op, dependencies=load_from_hub_op)
# pipeline.add_op(detect_text_op, dependencies=download_images_op)
# pipeline.add_op(mask_images_op, dependencies=detect_text_op)
# pipeline.add_op(embed_images_op, dependencies=mask_images_op)
# pipeline.add_op(add_clip_score_op, dependencies=embed_images_op)
# pipeline.add_op(filter_clip_score_op, dependencies=add_clip_score_op)

0 comments on commit 7d3b881

Please sign in to comment.