diff --git a/examples/pipelines/commoncrawl/components/load_from_commoncrawl/Dockerfile b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/Dockerfile new file mode 100644 index 000000000..605adc7e9 --- /dev/null +++ b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/Dockerfile @@ -0,0 +1,18 @@ +FROM --platform=linux/amd64 python:3.8-slim + +## System dependencies +RUN apt-get update && \ + apt-get upgrade -y && \ + apt-get install git -y + +# install requirements +COPY requirements.txt / +RUN pip3 install --no-cache-dir -r requirements.txt + +# Set the working directory to the component folder +WORKDIR /component/src + +# Copy over src-files +COPY src/ . + +ENTRYPOINT ["python", "main.py"] \ No newline at end of file diff --git a/examples/pipelines/commoncrawl/components/load_from_commoncrawl/fondant_component.yaml b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/fondant_component.yaml new file mode 100644 index 000000000..4037b77fa --- /dev/null +++ b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/fondant_component.yaml @@ -0,0 +1,18 @@ +name: Load index file from commoncrawl +description: Component that loads a given index file from commoncrawl +image: ghcr.io/ml6team/load_from_commoncrawl:latest + +produces: + segment: + fields: + path: + type: string + +args: + index_name: + description: Name of index file on commoncrawl + type: str + n_segments_to_load: + description: Number of segments to load from the commoncrawl index file + type: int + default: None \ No newline at end of file diff --git a/examples/pipelines/commoncrawl/components/load_from_commoncrawl/requirements.txt b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/requirements.txt new file mode 100644 index 000000000..3814fd014 --- /dev/null +++ b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/requirements.txt @@ -0,0 +1,2 @@ +boto3==1.26.158 +git+https://github.com/ml6team/fondant@main \ No newline at end of file diff --git a/examples/pipelines/commoncrawl/components/load_from_commoncrawl/src/main.py b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/src/main.py new file mode 100644 index 000000000..99755f1fa --- /dev/null +++ b/examples/pipelines/commoncrawl/components/load_from_commoncrawl/src/main.py @@ -0,0 +1,89 @@ +"""This component loads a dataset from CommonCrawl based on a given index.""" +import logging +import typing as t + +import io +import boto3 +import gzip + +import dask.dataframe as dd +import pandas as pd + +from fondant.component import LoadComponent + +logger = logging.getLogger(__name__) + +S3_COMMONCRAWL_BUCKET = "commoncrawl" + + +def fetch_warc_file_from_s3(s3_bucket: str, s3_key: str) -> bytes: + """Fetches a WARC file from S3 and returns its content as a Dask DataFrame. + Args: + s3_bucket: The name of the S3 bucket. + s3_key: The key of the S3 object to be downloaded. + Returns: + File object containing the WARC file content. + """ + logger.info(f"Fetching WARC file from S3: {s3_bucket}/{s3_key}...") + + s3 = boto3.client("s3") + file_obj = io.BytesIO() + s3.download_fileobj(s3_bucket, s3_key, file_obj) + file_obj.seek(0) + + return file_obj + + +def read_warc_paths_file( + warc_file: bytes, n_segments_to_load: t.Optional[int] = None +) -> dd.DataFrame: + """Reads a WARC file containing a list of segment file paths and returns a Dask DataFrame. + Args: + warc_file: The WARC file to read. + n_segments_to_load: The number of segments to load from the WARC file. + Returns: + A Dask DataFrame containing the segment file paths. + """ + logger.info(f"Reading WARC file...") + warc_paths = [] + with gzip.open(warc_file, mode="rt") as f: + warc_paths = [line.strip() for line in f] + + df = pd.DataFrame(warc_paths, columns=["warc_paths"]) + dask_df = dd.from_pandas(df, npartitions=1) + dask_df = dask_df.rename(columns={"warc_paths": "segment_path"}) + + if n_segments_to_load: + dask_df = dask_df.head(n_segments_to_load) + dask_df = dd.from_pandas(dask_df, npartitions=1) + + return dask_df + + +class LoadFromCommonCrawl(LoadComponent): + def load( + self, index_name: str, n_segments_to_load: t.Optional[int] = None + ) -> dd.DataFrame: + """Loads a dataset of segment file paths from CommonCrawl based on a given index. + Args: + index_name: The name of the CommonCrawl index to load. + n_segments_to_load: The number of segments to load from the index. + Returns: + A Dask DataFrame containing the segment file paths. + """ + logger.info(f"Loading CommonCrawl index {index_name}...") + warc_paths_file_key = f"crawl-data/{index_name}/warc.paths.gz" + warc_paths_file_content = fetch_warc_file_from_s3( + S3_COMMONCRAWL_BUCKET, warc_paths_file_key + ) + + warc_paths_df = read_warc_paths_file( + warc_paths_file_content, n_segments_to_load + ) + + return warc_paths_df + + +if __name__ == "__main__": + component = LoadFromCommonCrawl.from_args() + component.run()