From fa326eec1193a762b9197b1d8c9334eea73406b6 Mon Sep 17 00:00:00 2001 From: Sharon Grundmann Date: Wed, 5 Jul 2023 11:37:04 +0200 Subject: [PATCH 1/3] Add component load_from_commoncrawl --- .../load_from_commoncrawl/Dockerfile | 18 +++++ .../fondant_component.yaml | 18 +++++ .../load_from_commoncrawl/requirements.txt | 3 + .../load_from_commoncrawl/src/main.py | 71 +++++++++++++++++++ 4 files changed, 110 insertions(+) create mode 100644 examples/pipelines/commoncrawl/components/load_from_commoncrawl/Dockerfile create mode 100644 examples/pipelines/commoncrawl/components/load_from_commoncrawl/fondant_component.yaml create mode 100644 examples/pipelines/commoncrawl/components/load_from_commoncrawl/requirements.txt create mode 100644 examples/pipelines/commoncrawl/components/load_from_commoncrawl/src/main.py diff --git a/examples/pipelines/commoncrawl/components/load_from_commoncrawl/Dockerfile b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/Dockerfile new file mode 100644 index 000000000..605adc7e9 --- /dev/null +++ b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/Dockerfile @@ -0,0 +1,18 @@ +FROM --platform=linux/amd64 python:3.8-slim + +## System dependencies +RUN apt-get update && \ + apt-get upgrade -y && \ + apt-get install git -y + +# install requirements +COPY requirements.txt / +RUN pip3 install --no-cache-dir -r requirements.txt + +# Set the working directory to the component folder +WORKDIR /component/src + +# Copy over src-files +COPY src/ . + +ENTRYPOINT ["python", "main.py"] \ No newline at end of file diff --git a/examples/pipelines/commoncrawl/components/load_from_commoncrawl/fondant_component.yaml b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/fondant_component.yaml new file mode 100644 index 000000000..4037b77fa --- /dev/null +++ b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/fondant_component.yaml @@ -0,0 +1,18 @@ +name: Load index file from commoncrawl +description: Component that loads a given index file from commoncrawl +image: ghcr.io/ml6team/load_from_commoncrawl:latest + +produces: + segment: + fields: + path: + type: string + +args: + index_name: + description: Name of index file on commoncrawl + type: str + n_segments_to_load: + description: Number of segments to load from the commoncrawl index file + type: int + default: None \ No newline at end of file diff --git a/examples/pipelines/commoncrawl/components/load_from_commoncrawl/requirements.txt b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/requirements.txt new file mode 100644 index 000000000..57f8b2336 --- /dev/null +++ b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/requirements.txt @@ -0,0 +1,3 @@ +boto3==1.26.158 +fondant +pyarrow>=7.0 \ No newline at end of file diff --git a/examples/pipelines/commoncrawl/components/load_from_commoncrawl/src/main.py b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/src/main.py new file mode 100644 index 000000000..5a532eb5e --- /dev/null +++ b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/src/main.py @@ -0,0 +1,71 @@ +"""This component loads a dataset from CommonCrawl based on a given index.""" +import logging +import typing as t + +import io +import boto3 +import gzip + +import dask.dataframe as dd +import pandas as pd + +from fondant.component import LoadComponent + +logger = logging.getLogger(__name__) + +S3_BASE_URL = "s3://commoncrawl/crawl-data" +S3_COMMONCRAWL_BUCKET = "commoncrawl" + + +def fetch_warc_file_from_s3(s3_bucket: str, s3_key) -> dd.DataFrame: + """Fetches a WARC file from S3 and returns its content as a Dask DataFrame.""" + logger.info(f"Fetching WARC file from S3: {s3_bucket}/{s3_key}...") + + s3 = boto3.client("s3") + file_obj = io.BytesIO() + s3.download_fileobj(s3_bucket, s3_key, file_obj) + file_obj.seek(0) + + return file_obj + + +def read_warc_paths_file( + warc_file: bytes, n_segments_to_load: t.Optional[int] = None +) -> dd.DataFrame: + """Reads a WARC file and returns its content as a Dask DataFrame.""" + logger.info(f"Reading WARC file...") + warc_paths = [] + with gzip.open(warc_file, mode="rt") as f: + warc_paths = [line.strip() for line in f] + + df = pd.DataFrame(warc_paths, columns=["warc_paths"]) + dask_df = dd.from_pandas(df, npartitions=1) + dask_df = dask_df.rename(columns={"warc_paths": "segment_path"}) + + if n_segments_to_load: + dask_df = dask_df.head(n_segments_to_load) + dask_df = dd.from_pandas(dask_df, npartitions=1) + + return dask_df + + +class LoadFromCommonCrawl(LoadComponent): + def load( + self, index_name: str, n_segments_to_load: t.Optional[int] = None + ) -> dd.DataFrame: + logger.info(f"Loading CommonCrawl index {index_name}...") + warc_paths_file_key = f"crawl-data/{index_name}/warc.paths.gz" + warc_paths_file_content = fetch_warc_file_from_s3( + S3_COMMONCRAWL_BUCKET, warc_paths_file_key + ) + + warc_paths_df = read_warc_paths_file( + warc_paths_file_content, n_segments_to_load + ) + + return warc_paths_df + + +if __name__ == "__main__": + component = LoadFromCommonCrawl.from_args() + component.run() From 367ae8639737262cb43a04504e5b3e2d1a00c2f6 Mon Sep 17 00:00:00 2001 From: Sharon Grundmann Date: Wed, 5 Jul 2023 14:18:48 +0200 Subject: [PATCH 2/3] Add component load_from_commoncrawl --- .../load_from_commoncrawl/requirements.txt | 3 +-- .../load_from_commoncrawl/src/main.py | 26 ++++++++++++++++--- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/examples/pipelines/commoncrawl/components/load_from_commoncrawl/requirements.txt b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/requirements.txt index 57f8b2336..9bfb0cb3d 100644 --- a/examples/pipelines/commoncrawl/components/load_from_commoncrawl/requirements.txt +++ b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/requirements.txt @@ -1,3 +1,2 @@ boto3==1.26.158 -fondant -pyarrow>=7.0 \ No newline at end of file +fondant \ No newline at end of file diff --git a/examples/pipelines/commoncrawl/components/load_from_commoncrawl/src/main.py b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/src/main.py index 5a532eb5e..99755f1fa 100644 --- a/examples/pipelines/commoncrawl/components/load_from_commoncrawl/src/main.py +++ b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/src/main.py @@ -13,12 +13,17 @@ logger = logging.getLogger(__name__) -S3_BASE_URL = "s3://commoncrawl/crawl-data" S3_COMMONCRAWL_BUCKET = "commoncrawl" -def fetch_warc_file_from_s3(s3_bucket: str, s3_key) -> dd.DataFrame: - """Fetches a WARC file from S3 and returns its content as a Dask DataFrame.""" +def fetch_warc_file_from_s3(s3_bucket: str, s3_key: str) -> bytes: + """Fetches a WARC file from S3 and returns its content as a Dask DataFrame. + Args: + s3_bucket: The name of the S3 bucket. + s3_key: The key of the S3 object to be downloaded. + Returns: + File object containing the WARC file content. + """ logger.info(f"Fetching WARC file from S3: {s3_bucket}/{s3_key}...") s3 = boto3.client("s3") @@ -32,7 +37,13 @@ def fetch_warc_file_from_s3(s3_bucket: str, s3_key) -> dd.DataFrame: def read_warc_paths_file( warc_file: bytes, n_segments_to_load: t.Optional[int] = None ) -> dd.DataFrame: - """Reads a WARC file and returns its content as a Dask DataFrame.""" + """Reads a WARC file containing a list of segment file paths and returns a Dask DataFrame. + Args: + warc_file: The WARC file to read. + n_segments_to_load: The number of segments to load from the WARC file. + Returns: + A Dask DataFrame containing the segment file paths. + """ logger.info(f"Reading WARC file...") warc_paths = [] with gzip.open(warc_file, mode="rt") as f: @@ -53,6 +64,13 @@ class LoadFromCommonCrawl(LoadComponent): def load( self, index_name: str, n_segments_to_load: t.Optional[int] = None ) -> dd.DataFrame: + """Loads a dataset of segment file paths from CommonCrawl based on a given index. + Args: + index_name: The name of the CommonCrawl index to load. + n_segments_to_load: The number of segments to load from the index. + Returns: + A Dask DataFrame containing the segment file paths. + """ logger.info(f"Loading CommonCrawl index {index_name}...") warc_paths_file_key = f"crawl-data/{index_name}/warc.paths.gz" warc_paths_file_content = fetch_warc_file_from_s3( From 64e88ae25069b6c7e481df8852c9146572698035 Mon Sep 17 00:00:00 2001 From: Sharon Grundmann Date: Wed, 5 Jul 2023 14:45:07 +0200 Subject: [PATCH 3/3] Add component load_from_commoncrawl --- .../components/load_from_commoncrawl/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/pipelines/commoncrawl/components/load_from_commoncrawl/requirements.txt b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/requirements.txt index 9bfb0cb3d..3814fd014 100644 --- a/examples/pipelines/commoncrawl/components/load_from_commoncrawl/requirements.txt +++ b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/requirements.txt @@ -1,2 +1,2 @@ boto3==1.26.158 -fondant \ No newline at end of file +git+https://github.com/ml6team/fondant@main \ No newline at end of file