Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create load from parquet #474

Merged
merged 4 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions components/load_from_parquet/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM --platform=linux/amd64 python:3.8-slim

# System dependencies
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install git -y

# Install requirements
COPY requirements.txt /
RUN pip3 install --no-cache-dir -r requirements.txt

# Install Fondant
# This is split from other requirements to leverage caching
ARG FONDANT_VERSION=main
RUN pip3 install fondant[aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION}

# Set the working directory to the component folder
WORKDIR /component/src

# Copy over src-files
COPY src/ .

ENTRYPOINT ["fondant", "execute", "main"]
26 changes: 26 additions & 0 deletions components/load_from_parquet/fondant_component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: Load from parquet
description: Component that loads a dataset from a parquet uri
image: ghcr.io/ml6team/load_from_parquet:dev

produces:
dummy_variable: #TODO: fill in here
fields:
data:
type: binary

args:
dataset_uri:
description: The remote path to the parquet file/folder containing the dataset
type: str
column_name_mapping:
description: Mapping of the consumed dataset
type: dict
default: None
n_rows_to_load:
description: Optional argument that defines the number of rows to load. Useful for testing pipeline runs on a small scale
type: int
default: None
index_column:
description: Column to set index to in the load component, if not specified a default globally unique index will be set
type: str
default: None
97 changes: 97 additions & 0 deletions components/load_from_parquet/src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""This component loads a seed dataset from the hub."""
import logging
import typing as t

import dask
import dask.dataframe as dd
import pandas as pd
from fondant.component import DaskLoadComponent
from fondant.component_spec import ComponentSpec

logger = logging.getLogger(__name__)

dask.config.set({"dataframe.convert-string": False})


class LoadFromParquet(DaskLoadComponent):

def __init__(self,
spec: ComponentSpec,
*_,
dataset_uri: str,
column_name_mapping: t.Optional[dict],
n_rows_to_load: t.Optional[int],
index_column: t.Optional[str],
) -> None:
"""
Args:
spec: the component spec
dataset_uri: The remote path to the parquet file/folder containing the dataset
column_name_mapping: Mapping of the consumed dataset to fondant column names
n_rows_to_load: optional argument that defines the number of rows to load. Useful for
testing pipeline runs on a small scale.
index_column: Column to set index to in the load component, if not specified a default
globally unique index will be set.
"""
self.dataset_uri = dataset_uri
self.column_name_mapping = column_name_mapping
self.n_rows_to_load = n_rows_to_load
self.index_column = index_column
self.spec = spec

def load(self) -> dd.DataFrame:
# 1) Load data, read as Dask dataframe
logger.info("Loading dataset from the file...")
dask_df = dd.read_parquet(self.dataset_uri)

# 2) Rename columns
if self.column_name_mapping is not None:
logger.info("Renaming columns...")
dask_df = dask_df.rename(columns=self.column_name_mapping)

# 3) Optional: only return specific amount of rows
if self.n_rows_to_load is not None:
partitions_length = 0
npartitions = 1
for npartitions, partition in enumerate(dask_df.partitions, start=1):
if partitions_length >= self.n_rows_to_load:
logger.info(f"""Required number of partitions to load\n
{self.n_rows_to_load} is {npartitions}""")
break
partitions_length += len(partition)
dask_df = dask_df.head(self.n_rows_to_load, npartitions=npartitions)
dask_df = dd.from_pandas(dask_df, npartitions=npartitions)

# 4) Set the index
if self.index_column is None:
logger.info(
"Index column not specified, setting a globally unique index",
)

def _set_unique_index(dataframe: pd.DataFrame, partition_info=None):
"""Function that sets a unique index based on the partition and row number."""
dataframe["id"] = 1
dataframe["id"] = (
str(partition_info["number"])
+ "_"
+ (dataframe.id.cumsum()).astype(str)
)
dataframe.index = dataframe.pop("id")
return dataframe

def _get_meta_df() -> pd.DataFrame:
meta_dict = {"id": pd.Series(dtype="object")}
for subset_name, subset in self.spec.produces.items():
for field_name, field in subset.fields.items():
meta_dict[f"{subset_name}_{field_name}"] = pd.Series(
dtype=pd.ArrowDtype(field.type.value),
)
return pd.DataFrame(meta_dict).set_index("id")

meta = _get_meta_df()
dask_df = dask_df.map_partitions(_set_unique_index, meta=meta)
else:
logger.info(f"Setting `{self.index_column}` as index")
dask_df = dask_df.set_index(self.index_column, drop=True)

return dask_df
119 changes: 59 additions & 60 deletions examples/pipelines/datacomp/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,71 +41,70 @@
"dataset_name": "nielsr/datacomp-small-with-text-embeddings",
"column_name_mapping": load_component_column_mapping,
"index_column": "uid",
"n_rows_to_load": 1000,
},
node_pool_label="node_pool",
node_pool_name="n2-standard-64-pool",
cache=False,
)
download_images_op = ComponentOp.from_registry(
name="download_images",
arguments={
"retries": 2,
"min_image_size": 0,
"max_aspect_ratio": float("inf"),
},
node_pool_label="node_pool",
node_pool_name="n2-standard-64-pool",
input_partition_rows=1000,
cache=False,
)
detect_text_op = ComponentOp(
component_dir="components/detect_text",
arguments={
"batch_size": 2,
},
node_pool_label="node_pool",
node_pool_name="model-inference-mega-pool",
number_of_gpus=1,
cache=False,
)
mask_images_op = ComponentOp(
component_dir="components/mask_images",
node_pool_label="node_pool",
node_pool_name="n2-standard-64-pool",
cache=False,
)
embed_images_op = ComponentOp.from_registry(
name="embed_images",
arguments={
"batch_size": 2,
},
node_pool_label="node_pool",
node_pool_name="model-inference-mega-pool",
number_of_gpus=1,
cache=False,
)
add_clip_score_op = ComponentOp(
component_dir="components/add_clip_score",
node_pool_label="node_pool",
node_pool_name="n2-standard-64-pool",
cache=False,
)
filter_clip_score_op = ComponentOp(
component_dir="components/filter_clip_score",
arguments={
"pct_threshold": 0.5,
# "n_rows_to_load": 1000,
},
node_pool_label="node_pool",
node_pool_name="n2-standard-64-pool",
)
# download_images_op = ComponentOp.from_registry(
# name="download_images",
# arguments={
# "retries": 2,
# "min_image_size": 0,
# "max_aspect_ratio": float("inf"),
# },
# node_pool_label="node_pool",
# node_pool_name="n2-standard-64-pool",
# input_partition_rows=1000,
# cache=False,
# )
# detect_text_op = ComponentOp(
# component_dir="components/detect_text",
# arguments={
# "batch_size": 2,
# },
# node_pool_label="node_pool",
# node_pool_name="model-inference-mega-pool",
# number_of_gpus=1,
# cache=False,
# )
# mask_images_op = ComponentOp(
# component_dir="components/mask_images",
# node_pool_label="node_pool",
# node_pool_name="n2-standard-64-pool",
# cache=False,
# )
# embed_images_op = ComponentOp.from_registry(
# name="embed_images",
# arguments={
# "batch_size": 2,
# },
# node_pool_label="node_pool",
# node_pool_name="model-inference-mega-pool",
# number_of_gpus=1,
# cache=False,
# )
# add_clip_score_op = ComponentOp(
# component_dir="components/add_clip_score",
# node_pool_label="node_pool",
# node_pool_name="n2-standard-64-pool",
# cache=False,
# )
# filter_clip_score_op = ComponentOp(
# component_dir="components/filter_clip_score",
# arguments={
# "pct_threshold": 0.5,
# },
# node_pool_label="node_pool",
# node_pool_name="n2-standard-64-pool",
# )


# add ops to pipeline
pipeline.add_op(load_from_hub_op)
pipeline.add_op(download_images_op, dependencies=load_from_hub_op)
pipeline.add_op(detect_text_op, dependencies=download_images_op)
pipeline.add_op(mask_images_op, dependencies=detect_text_op)
pipeline.add_op(embed_images_op, dependencies=mask_images_op)
pipeline.add_op(add_clip_score_op, dependencies=embed_images_op)
pipeline.add_op(filter_clip_score_op, dependencies=add_clip_score_op)
# pipeline.add_op(download_images_op, dependencies=load_from_hub_op)
# pipeline.add_op(detect_text_op, dependencies=download_images_op)
# pipeline.add_op(mask_images_op, dependencies=detect_text_op)
# pipeline.add_op(embed_images_op, dependencies=mask_images_op)
# pipeline.add_op(add_clip_score_op, dependencies=embed_images_op)
# pipeline.add_op(filter_clip_score_op, dependencies=add_clip_score_op)
Loading