From 0abc5973f82c13b3deca544cd80f9b697e55a14a Mon Sep 17 00:00:00 2001 From: SATISH J Date: Wed, 26 Jul 2023 18:46:37 +0530 Subject: [PATCH] Add code for reusable load from files component #290 (#296) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR contains code for load from files component related to #290 --------- Co-authored-by: Robbe Sneyders Co-authored-by: Matthias Richter Co-authored-by: NielsRogge <48327001+NielsRogge@users.noreply.github.com> Co-authored-by: Philippe Moussalli Co-authored-by: Georges Lorré <35808396+GeorgesLorre@users.noreply.github.com> Co-authored-by: Sharon Grundmann --- components/load_from_files/Dockerfile | 23 ++ components/load_from_files/README.md | 36 ++ .../load_from_files/fondant_component.yaml | 16 + components/load_from_files/requirements.txt | 3 + components/load_from_files/src/main.py | 318 ++++++++++++++++++ .../tests/test_load_from_files_component.py | 295 ++++++++++++++++ 6 files changed, 691 insertions(+) create mode 100644 components/load_from_files/Dockerfile create mode 100644 components/load_from_files/README.md create mode 100644 components/load_from_files/fondant_component.yaml create mode 100644 components/load_from_files/requirements.txt create mode 100644 components/load_from_files/src/main.py create mode 100644 components/load_from_files/tests/test_load_from_files_component.py diff --git a/components/load_from_files/Dockerfile b/components/load_from_files/Dockerfile new file mode 100644 index 000000000..abfa9a414 --- /dev/null +++ b/components/load_from_files/Dockerfile @@ -0,0 +1,23 @@ +FROM --platform=linux/amd64 python:3.8-slim + +# System dependencies +RUN apt-get update && \ + apt-get upgrade -y && \ + apt-get install git -y + +# Install requirements +COPY requirements.txt / +RUN pip3 install --no-cache-dir -r requirements.txt + +# Install Fondant +# This is split from other requirements to leverage caching +ARG FONDANT_VERSION=main +RUN pip3 install fondant[aws,azure,gcp]@git+https://github.com/ml6team/fondant@${FONDANT_VERSION} + +# Set the working directory to the component folder +WORKDIR /component/src + +# Copy over src-files +COPY src/ . + +ENTRYPOINT ["python", "main.py"] \ No newline at end of file diff --git a/components/load_from_files/README.md b/components/load_from_files/README.md new file mode 100644 index 000000000..ae5f40684 --- /dev/null +++ b/components/load_from_files/README.md @@ -0,0 +1,36 @@ +# Load from files + +## Description +This component is based on the `DaskLoadComponent` and is used to load dataset from files within a directory. +It allows you to load datasets which +- Have files within a local data directory +- Have compressed files like .zip, gzip, tar or tar.gz within the data directory +- Are hosted on remote locations like AWS S3 bucket, Azure's Blob storage or GCP's cloud storage + +And returns a dataframe with two columns +- file_filename(containing the file name in string format) +- file_content (containing the respective file content in bytes format) + +Here is an illustration of how to use this component in your pipeline +on a local directory with zip files + +```python +from fondant.pipeline import Pipeline, ComponentOp + +my_pipeline = Pipeline( + pipeline_name="my_pipeline", + base_path="./", # TODO: update this + pipeline_description="This is my pipeline", +) + +load_from_files = ComponentOp( + component_dir="components/load_from_files", + arguments={ + "directory_uri": "./data.zip", # change this to your + # directory_uri, remote or local + }, + output_partition_size="10MB", +) + +my_pipeline.add_op(load_from_files, dependencies=[]) +``` \ No newline at end of file diff --git a/components/load_from_files/fondant_component.yaml b/components/load_from_files/fondant_component.yaml new file mode 100644 index 000000000..2673e13db --- /dev/null +++ b/components/load_from_files/fondant_component.yaml @@ -0,0 +1,16 @@ +name: Load from files +description: Component that loads a dataset from files +image: ghcr.io/ml6team/load_from_files:dev + +produces: + file: + fields: + filename: + type: string + content: + type: binary + +args: + directory_uri: + description: Local or remote path to the directory containing the files + type: str \ No newline at end of file diff --git a/components/load_from_files/requirements.txt b/components/load_from_files/requirements.txt new file mode 100644 index 000000000..29736026f --- /dev/null +++ b/components/load_from_files/requirements.txt @@ -0,0 +1,3 @@ +fsspec==2023.6.0 +pandas==2.0.3 +dask==2023.5.0 \ No newline at end of file diff --git a/components/load_from_files/src/main.py b/components/load_from_files/src/main.py new file mode 100644 index 000000000..f2ebc8c77 --- /dev/null +++ b/components/load_from_files/src/main.py @@ -0,0 +1,318 @@ +""" +This component loads dataset from files in a directory, these +files can be either in local directory or in remote location. +""" +from __future__ import annotations + +import gzip +import logging +import os +import tarfile +import zipfile +from abc import ABC, abstractmethod +from io import BytesIO +from pathlib import Path +from typing import Generator + +import dask.dataframe as dd +import fsspec +import pandas as pd +from dask import delayed +from fondant.component import DaskLoadComponent +from fondant.executor import DaskLoadExecutor + +logger = logging.getLogger(__name__) + + +class AbstractFileHandler(ABC): + """Abstract base class for file handlers.""" + + def __init__( + self, + filepath: str, + fs: fsspec.AbstractFileSystem | None = None, + ) -> None: + """ + Initiate a new AbstractFileHandler with filepath and filesystem (fs). + + Args: + filepath : Path to the file to be read. + fs : Filesystem to use (default is local filesystem). + """ + self.filepath = filepath + self.fs = fs if fs else fsspec.filesystem("file") + + @abstractmethod + def read(self) -> Generator[tuple[str, BytesIO], None, None]: + """Abstract method to read a file. Must be overridden by subclasses.""" + + +class FileHandler(AbstractFileHandler): + """Handler for reading files.""" + + def read(self) -> Generator[tuple[str, BytesIO], None, None]: + """ + Reads files and yields the contents of the file. + + Yields: + Tuple consisting of filename and content of the file. + """ + logger.debug(f"Reading file {self.filepath}....") + with self.fs.open(self.filepath, "rb") as f: + yield self.filepath.split("/")[-1], BytesIO(f.read()) + + +class GzipFileHandler(AbstractFileHandler): + """Handler for reading gzip compressed files.""" + + def read(self) -> Generator[tuple[str, BytesIO], None, None]: + """ + Reads gzip compressed files and yields the contents of the file. + + Yields: + Tuple consisting of filename and content of the gzipped file. + """ + logger.debug(f"Uncompressing {Path(self.filepath).name}......") + with self.fs.open(self.filepath, "rb") as buffer, gzip.GzipFile( + fileobj=buffer, + ) as gz: + yield self.filepath.split("/")[-1], BytesIO(gz.read()) + + +class ZipFileHandler(AbstractFileHandler): + """Handler for reading zip compressed files.""" + + def read(self) -> Generator[tuple[str, BytesIO], None, None]: + """ + Reads zip compressed files and yields the content of each file in the archive. + + Yields: + Tuple consisting of filename and content of each file within the zipped archive. + """ + logger.info(f"Uncompressing {Path(self.filepath).name}......") + with self.fs.open(self.filepath, "rb") as buffer, zipfile.ZipFile(buffer) as z: + for filename in z.namelist(): + with z.open(filename) as file_buffer: + buffer_content = file_buffer.read() + if not buffer_content: # The buffer is empty. + continue + yield filename.split("/")[-1], BytesIO(buffer_content) + + +class TarFileHandler(AbstractFileHandler): + """Handler for reading tar archived files.""" + + def read(self) -> Generator[tuple[str, BytesIO], None, None]: + """ + Reads tar archived files and yields the content of each file in the archive. + + Yields: + Tuple consisting of filename and content of each file within the tar archive. + """ + logger.info(f"Uncompressing {Path(self.filepath).name}......") + with self.fs.open(self.filepath, "rb") as buffer, tarfile.open( + fileobj=buffer, + ) as tar: + for tarinfo in tar: + if tarinfo.isfile(): + file = tar.extractfile(tarinfo) + if file is not None: + yield tarinfo.name.split("/")[-1], BytesIO(file.read()) + + +class DirectoryHandler(AbstractFileHandler): + """Handler for reading a directory of files.""" + + def read(self) -> Generator[tuple[str, BytesIO], None, None]: + """ + Reads a directory of files and yields the content of each file in the directory. + + Yields: + Tuple consisting of filename and content of each file within the directory. + """ + logger.info(f"Loading files from {self.filepath} ......") + filenames = self.fs.glob(os.path.join(self.filepath, "*")) + handler: AbstractFileHandler + for filename in filenames: + if filename.endswith(".gz"): + handler = GzipFileHandler(filename, self.fs) + elif filename.endswith(".zip"): + handler = ZipFileHandler(filename, self.fs) + elif filename.endswith(".tar"): + handler = TarFileHandler(filename, self.fs) + else: + handler = FileHandler(filename, self.fs) + yield from handler.read() + + +def get_file_handler( + filepath: str, + fs: fsspec.spec.AbstractFileSystem, +) -> AbstractFileHandler: + """ + This function returns an appropriate file handler based on the file extension + of the input file. + + It supports .gz (gzip), .zip and .tar files. For any other file type, it defaults + to a DirectoryHandler. + + Args: + filepath: The file path (including name) to be processed. Should end with one + of the supported extensions. + fs: An instance of a FSSpec filesystem. This + filesystem will be used to read the file. + + + Returns: + AbstractFileHandler: One of GzipFileHandler, ZipFileHandler, TarFileHandler or DirectoryHandler + depending on the file extension. + + Raises: + ValueError: If the file extension is not one of the supported ones (.gz, .zip, .tar). + """ + if filepath.endswith((".tar", ".tar.gz")): + return TarFileHandler(filepath=filepath, fs=fs) + if filepath.endswith(".gz"): + return GzipFileHandler(filepath=filepath, fs=fs) + if filepath.endswith(".zip"): + return ZipFileHandler(filepath=filepath, fs=fs) + + return DirectoryHandler(filepath) + + +class FilesToDaskConverter: + """This class is responsible for converting file contents to a Dask DataFrame.""" + + def __init__(self, handler: AbstractFileHandler) -> None: + """ + Constructs all the necessary attributes for the file converter object. + + Args: + handler: Handles the files and reads their content. + Should have a 'read' method that yields tuples (file_name, file_content). + """ + self.handler = handler + + @staticmethod + @delayed + def create_record(file_name: str, file_content: str) -> pd.DataFrame: + """ + Static helper method that creates a dictionary record of file name and + its content in string format. + + Args: + file_name : Name of the file. + file_content : The content of the file. + + Returns: + A pandas dataframe with 'filename' as index and 'Content' column for + binary file content. + """ + if type(file_content) is tuple: + file_content = file_content[0] + return pd.DataFrame( + data={"file_filename": [file_name], "file_content": [file_content]}, + ) + + def to_dask_dataframe(self, chunksize: int = 1000) -> dd.DataFrame: + """ + This method converts the read file content to binary form and returns a + Dask DataFrame. + + Returns: + The created Dask DataFrame with filenames as indices and file content + in binary form as Content column. + """ + # Initialize an empty list to hold all our records in 'delayed' objects. + records = [] + temp_records = [] + + # Iterate over each file handled by the handler + for i, file_data in enumerate(self.handler.read()): + file_name, file_content = file_data + record = self.create_record(file_name, file_content) + temp_records.append(record) + + if (i + 1) % chunksize == 0: + # When we hit the chunk size, we combine all the records so far, + # create a Delayed object, and add it to the list of partitions. + records.extend(temp_records) + temp_records = [] + + # Take care of any remaining records + if temp_records: + records.extend(temp_records) + + # Create an empty pandas dataframe with correct column names and types as meta + metadata = pd.DataFrame( + data={ + "file_filename": pd.Series([], dtype="object"), + "file_content": pd.Series([], dtype="bytes"), + }, + ) + + # Use the delayed objects to create a Dask DataFrame. + return dd.from_delayed(records, meta=metadata) + + +def get_filesystem(path_uri: str) -> fsspec.spec.AbstractFileSystem | None: + """Function to create fsspec.filesystem based on path_uri. + + Creates a abstract handle using fsspec.filesystem to + remote or local directories to read files as if they + are on device. + + Args: + path_uri: can be either local or remote directory/fiel path + + Returns: + A fsspec.filesystem (if path_uri is either local or belongs to + one of these cloud sources s3, gcs or azure blob storage) or None + if path_uri has invalid scheme + """ + scheme = fsspec.utils.get_protocol(path_uri) + + if scheme == "file": + return fsspec.filesystem("file") + if scheme == "s3": + return fsspec.filesystem("s3") + if scheme == "gs": + return fsspec.filesystem("gcs") + if scheme == "abfs": + return fsspec.filesystem("abfs") + + logger.warning( + f"""Unable to create fsspec filesystem object + because of unsupported scheme: {scheme}""", + ) + return None + + +class LoadFromFiles(DaskLoadComponent): + """Component that loads datasets from files.""" + + def __init__(self, *_, directory_uri: str) -> None: + self.directory_uri = directory_uri + + def load(self) -> dd.DataFrame: + """Loads dataset by reading all files in directory_uri.""" + fs = get_filesystem(self.directory_uri) + if fs: + # create a handler to read files from directory + handler = get_file_handler(self.directory_uri, fs=fs) + + # convert files to dask dataframe + converter = FilesToDaskConverter(handler) + dataframe = converter.to_dask_dataframe() + return dataframe + logger.error( + f"Could not load data from {self.directory_uri} because \ + directory_uri doesn't belong to currently supported \ + schemes: s3, gcs, abfs", + ) + return None + + +if __name__ == "__main__": + executor = DaskLoadExecutor.from_args() + executor.execute(LoadFromFiles) diff --git a/components/load_from_files/tests/test_load_from_files_component.py b/components/load_from_files/tests/test_load_from_files_component.py new file mode 100644 index 000000000..fec4ec2f1 --- /dev/null +++ b/components/load_from_files/tests/test_load_from_files_component.py @@ -0,0 +1,295 @@ +import gzip +import os +import tarfile +import zipfile +from io import BytesIO + +import dask +import fsspec +import pytest +from fsspec.spec import AbstractFileSystem +from load_from_files.src.main import ( + AbstractFileHandler, + DirectoryHandler, + FileHandler, + FilesToDaskConverter, + GzipFileHandler, + TarFileHandler, + ZipFileHandler, + get_file_handler, + get_filesystem, +) + + +class TestFileHandler: + @pytest.fixture(autouse=True) + def __setup_and_teardown(self): + """Create dummy file before test and remove it after.""" + self.file_content = "This is a test content" + self.file_name = "test.txt" + self.file_path = "/tmp/" + self.file_name + + with open(self.file_path, "w") as f: + f.write(self.file_content) + + self.file_handler = FileHandler(self.file_path) + yield + os.remove(self.file_path) + + def test_read(self): + """Test the read method.""" + file_name, file_content = next(self.file_handler.read()) + assert isinstance(file_name, str) + assert isinstance(file_content, BytesIO) + result = (file_name, file_content.getvalue()) + expected_result = ( + self.file_name, + BytesIO(self.file_content.encode()).getvalue(), + ) + + assert result == expected_result + + +class TestGzipFileHandler: + @pytest.fixture(autouse=True) + def __setup_and_teardown(self): + print("Setup...") + self.filename = "test.gz" + self.filepath = "/tmp/" + self.filename + self.file_content = b"Some test content" + with gzip.open(self.filepath, "wb") as f: + f.write(self.file_content) + + self.handler = GzipFileHandler(self.filepath) + yield + print("Teardown...") + os.remove(self.filepath) + + def test_read(self): + result = list(self.handler.read()) + assert len(result) > 0, "No data read from gzip file" + + filename, content = result[0] + assert isinstance(filename, str), "Filename should be a string" + assert isinstance(content, BytesIO), "Content should be a BytesIO object" + + assert filename == self.filename + assert content.getvalue() == self.file_content + + +class TestZipFileHandler: + @pytest.fixture(autouse=True) + def __setup_method(self): + print("Setting up...") + self.filepath = "test.zip" + + with zipfile.ZipFile(self.filepath, "w") as zipf: + zipf.writestr("test1.txt", b"some content") + + self.handler = ZipFileHandler(self.filepath) + + yield + print("Tearing down...") + os.remove(self.filepath) + + def test_read_normal_zipfile(self): + filename, content = next(self.handler.read()) + assert isinstance(filename, str), "Filename should be a string" + assert isinstance(content, BytesIO), "Content should be a BytesIO object" + + assert filename == "test1.txt" + assert content.read() == b"some content" + + +class TestTarFileHandler: + @pytest.fixture(autouse=True) + def __setup_method(self): + print("Setting up...") + self.filepath = "test.tar.gz" + + with tarfile.open(self.filepath, "w:gz") as tarf: + data = b"some content" + info = tarfile.TarInfo(name="test1.txt") + info.size = len(data) + tarf.addfile(info, BytesIO(data)) + + self.handler = TarFileHandler(self.filepath) + + yield + print("Tearing down...") + os.remove(self.filepath) + + def test_read_normal_tarfile(self): + filename, content = next(self.handler.read()) + assert isinstance(filename, str), "Filename should be a string" + assert isinstance(content, BytesIO), "Content should be a BytesIO object" + + assert filename == "test1.txt" + assert content.read() == b"some content" + + +class TestDirectoryHandler: + @pytest.fixture(autouse=True) + def __setup_method(self, tmpdir): + print("Setting up...") + self.num_files = 4 + # Setting up some test data + self.test_dir = str(tmpdir.mkdir("sub")) + + # Create normal text file + self.filepath = os.path.join(self.test_dir, "test1.txt") + with open(self.filepath, "w") as f: + f.write("some content") + + # Create gzipped file + self.gz_filepath = os.path.join(self.test_dir, "test2.txt.gz") + with gzip.open(self.gz_filepath, "wb") as f: + f.write(b"some gzipped content") + + # Create zip file + self.zip_filepath = os.path.join(self.test_dir, "test3.zip") + with zipfile.ZipFile(self.zip_filepath, "w") as zp: + zp.writestr("text_file_inside.zip", "zip content") + + # Create tar file + self.tar_filepath = os.path.join(self.test_dir, "test4.tar") + with tarfile.open(self.tar_filepath, "w") as tp: + tp.add(self.filepath, arcname="text_file_inside.tar") + + self.directory_handler = DirectoryHandler(self.test_dir) + + def test_read_directory_with_compressed_files(self): + file_list = list(self.directory_handler.read()) + + assert len(file_list) == self.num_files, "Four Files should be read" + print("filelist: ", file_list) + txt_file = next((f for f in file_list if f[0] == "test1.txt"), None) + gz_file = next((f for f in file_list if f[0] == "test2.txt.gz"), None) + zip_file = next((f for f in file_list if f[0] == "text_file_inside.zip"), None) + tar_file = next((f for f in file_list if f[0] == "text_file_inside.tar"), None) + + assert txt_file is not None, "txt file not read correctly" + assert gz_file is not None, "gz file not read correctly" + assert zip_file is not None, "zip file not read correctly" + assert tar_file is not None, "tar file not read correctly" + + # Further assertions to verify the content of the read files + with open(self.filepath) as f: + assert f.read() == txt_file[1].getvalue().decode( + "utf-8", + ), "Text file content does not match" + + with gzip.open(self.gz_filepath, "rb") as f: + assert ( + f.read() == gz_file[1].getvalue() + ), "Gzipped file content does not match" + + with zipfile.ZipFile(self.zip_filepath, "r") as zp: + assert ( + zp.read("text_file_inside.zip") == zip_file[1].getvalue() + ), "Zip file content does not match" + + with tarfile.open(self.tar_filepath, "r") as tp: + assert ( + tp.extractfile("text_file_inside.tar").read() == tar_file[1].getvalue() + ), "Tar file content does not match" + + +def test_get_file_handler(): + fs = fsspec.filesystem("file") # setting up a local file system + + # Test case 1: For .gz file + handler = get_file_handler("testfile.gz", fs) + assert isinstance( + handler, GzipFileHandler, + ), "For .gz file, the handler should be of type GzipFileHandler." + + # Test case 2: For .tar file + handler = get_file_handler("testfile.tar", fs) + assert isinstance( + handler, TarFileHandler, + ), "For .tar file, the handler should be of type TarFileHandler." + + # Test case 3: For .tar.gz file + handler = get_file_handler("testfile.tar.gz", fs) + assert isinstance( + handler, TarFileHandler, + ), "For .tar.gz file, the handler should be of type TarFileHandler." + + # Test case 4: For .zip file + handler = get_file_handler("testfile.zip", fs) + assert isinstance( + handler, ZipFileHandler, + ), "For .zip file, the handler should be of type ZipFileHandler." + + # Test case 5: For unsupported file type + handler = get_file_handler("testfile.txt", fs) + assert isinstance( + handler, DirectoryHandler, + ), "For unsupported file types, the handler should default to DirectoryHandler." + + +def test_files_to_dask_converter(): + # Use a mock file handler + class MockFileHandler(AbstractFileHandler): + def read(self): + yield "file1", BytesIO(b"content1") + yield "file2", BytesIO(b"content2") + + # Test case 1: Testing FilesToDaskConverter with two simple files + handler = MockFileHandler("/path", fsspec.filesystem("file")) + converter = FilesToDaskConverter(handler) + ddf = converter.to_dask_dataframe(chunksize=2) + num_records = 2 + assert ddf.npartitions == num_records, "Number of partitions should match chunksize." + + result = dask.compute(ddf)[0] + + assert len(result) == num_records, "The dataframe should have content of two files." + assert list(result.index) == [ + "file1", + "file2", + ], "The dataframe index should include the filenames." + assert list(result.Content) == [ + b"content1", + b"content2", + ], "The dataframe Content column should include the file contents." + + +def test_get_filesystem_local(mocker): + """Test local file system.""" + mock_fs = mocker.patch("fsspec.filesystem", return_value=AbstractFileSystem()) + fs = get_filesystem("file:///path/to/file") + assert isinstance(fs, AbstractFileSystem) + mock_fs.assert_called_once_with("file") + + +def test_get_filesystem_s3(mocker): + """Test S3 file system.""" + mock_fs = mocker.patch("fsspec.filesystem", return_value=AbstractFileSystem()) + fs = get_filesystem("s3://bucket/key") + assert isinstance(fs, AbstractFileSystem) + mock_fs.assert_called_once_with("s3") + + +def test_get_filesystem_gcs(mocker): + """Test Google Cloud Storage file system.""" + mock_fs = mocker.patch("fsspec.filesystem", return_value=AbstractFileSystem()) + fs = get_filesystem("gs://bucket/key") + assert isinstance(fs, AbstractFileSystem) + mock_fs.assert_called_once_with("gcs") + + +def test_get_filesystem_abfs(mocker): + """Test Azure Blob Storage file system.""" + mock_fs = mocker.patch("fsspec.filesystem", return_value=AbstractFileSystem()) + fs = get_filesystem("abfs://container/path") + assert isinstance(fs, AbstractFileSystem) + mock_fs.assert_called_once_with("abfs") + + +def test_get_filesystem_unsupported_scheme(mocker): + """Test unsupported scheme.""" + mocker.patch("fsspec.filesystem", return_value=AbstractFileSystem()) + fs = get_filesystem("unsupported://bucket/key") + assert fs is None