Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add save_artifact method to _Step #871

Merged
merged 11 commits into from
Aug 14, 2024
10 changes: 10 additions & 0 deletions src/distilabel/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@
CONVERGENCE_STEP_ATTR_NAME: Final[str] = "convergence_step"
LAST_BATCH_SENT_FLAG: Final[str] = "last_batch_sent"

# Data paths constants
STEPS_OUTPUTS_PATH = "steps_outputs"
STEPS_ARTIFACTS_PATH = "steps_artifacts"

# Distiset related constants
DISTISET_CONFIG_FOLDER: Final[str] = "distiset_configs"
DISTISET_ARTIFACTS_FOLDER: Final[str] = "artifacts"
PIPELINE_CONFIG_FILENAME: Final[str] = "pipeline.yaml"
PIPELINE_LOG_FILENAME: Final[str] = "pipeline.log"


__all__ = [
"STEP_ATTR_NAME",
Expand Down
123 changes: 103 additions & 20 deletions src/distilabel/distiset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,34 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging
import os.path as posixpath
import re
import sys
from collections import defaultdict
from os import PathLike
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Final, List, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Union

import fsspec
import yaml
from datasets import Dataset, load_dataset, load_from_disk
from datasets.filesystems import is_remote_filesystem
from huggingface_hub import DatasetCardData, HfApi, upload_file
from huggingface_hub import DatasetCardData, HfApi, upload_file, upload_folder
from huggingface_hub.file_download import hf_hub_download
from pyarrow.lib import ArrowInvalid
from typing_extensions import Self

from distilabel.constants import STEP_ATTR_NAME
from distilabel.constants import (
DISTISET_ARTIFACTS_FOLDER,
DISTISET_CONFIG_FOLDER,
PIPELINE_CONFIG_FILENAME,
PIPELINE_LOG_FILENAME,
STEP_ATTR_NAME,
STEPS_ARTIFACTS_PATH,
STEPS_OUTPUTS_PATH,
)
from distilabel.utils.card.dataset_card import (
DistilabelDatasetCard,
size_categories_parser,
Expand All @@ -42,24 +52,25 @@
from distilabel.pipeline._dag import DAG


DISTISET_CONFIG_FOLDER: Final[str] = "distiset_configs"
PIPELINE_CONFIG_FILENAME: Final[str] = "pipeline.yaml"
PIPELINE_LOG_FILENAME: Final[str] = "pipeline.log"


class Distiset(dict):
"""Convenient wrapper around `datasets.Dataset` to push to the Hugging Face Hub.

It's a dictionary where the keys correspond to the different leaf_steps from the internal
`DAG` and the values are `datasets.Dataset`.

Attributes:
pipeline_path: Optional path to the pipeline.yaml file that generated the dataset.
log_filename_path: Optional path to the pipeline.log file that generated was written by the
pipeline.
_pipeline_path: Optional path to the `pipeline.yaml` file that generated the dataset.
Defaults to `None`.
_artifacts_path: Optional path to the directory containing the generated artifacts
by the pipeline steps. Defaults to `None`.
_log_filename_path: Optional path to the `pipeline.log` file that generated was written
by the pipeline. Defaults to `None`.
_citations: Optional list containing citations that will be included in the dataset
card. Defaults to `None`.
"""

_pipeline_path: Optional[Path] = None
_artifacts_path: Optional[Path] = None
_log_filename_path: Optional[Path] = None
_citations: Optional[List[str]] = None

Expand Down Expand Up @@ -121,14 +132,24 @@ def push_to_hub(
**kwargs,
)

if self.artifacts_path:
upload_folder(
repo_id=repo_id,
folder_path=self.artifacts_path,
path_in_repo="artifacts",
token=token,
repo_type="dataset",
commit_message="Include pipeline artifacts",
)

if include_script and script_path.exists():
upload_file(
path_or_fileobj=script_path,
path_in_repo=filename_py,
repo_id=repo_id,
repo_type="dataset",
token=token,
commit_message="Include pipeline script.",
commit_message="Include pipeline script",
)

if generate_card:
Expand Down Expand Up @@ -185,11 +206,38 @@ def _get_card(
sample_records=sample_records,
include_script=include_script,
filename_py=filename_py,
artifacts=self._get_artifacts_metadata(),
references=self.citations,
)

return card

def _get_artifacts_metadata(self) -> Dict[str, List[Dict[str, Any]]]:
"""Gets a dictionary with the metadata of the artifacts generated by the pipeline steps.

Returns:
A dictionary in which the key is the name of the step and the value is a list
of dictionaries, each of them containing the name and metadata of the step artifact.
"""
if not self.artifacts_path:
return {}

def iterdir_ignore_hidden(path: Path) -> Generator[Path, None, None]:
return (f for f in Path(path).iterdir() if not f.name.startswith("."))

artifacts_metadata = defaultdict(list)
for step_artifacts_dir in iterdir_ignore_hidden(self.artifacts_path):
step_name = step_artifacts_dir.stem
for artifact_dir in iterdir_ignore_hidden(step_artifacts_dir):
artifact_name = artifact_dir.stem
metadata_path = artifact_dir / "metadata.json"
metadata = json.loads(metadata_path.read_text())
artifacts_metadata[step_name].append(
{"name": artifact_name, "metadata": metadata}
)

return dict(artifacts_metadata)

def _extract_readme_metadata(
self, repo_id: str, token: Optional[str]
) -> Dict[str, Any]:
Expand Down Expand Up @@ -243,6 +291,7 @@ def _generate_card(
repo_type="dataset",
token=token,
)

if self.pipeline_path:
# If the pipeline.yaml is available, upload it to the Hugging Face Hub as well.
HfApi().upload_file(
Expand All @@ -252,6 +301,7 @@ def _generate_card(
repo_type="dataset",
token=token,
)

if self.log_filename_path:
# The same we had with "pipeline.yaml" but with the log file.
HfApi().upload_file(
Expand Down Expand Up @@ -360,6 +410,12 @@ def save_to_disk(
)
fs.makedirs(distiset_config_folder, exist_ok=True)

if self.artifacts_path:
distiset_artifacts_folder = posixpath.join(
distiset_path, DISTISET_ARTIFACTS_FOLDER
)
fs.copy(str(self.artifacts_path), distiset_artifacts_folder, recursive=True)

if save_card:
# NOTE: Currently the card is not the same if we write to disk or push to the HF hub,
# as we aren't generating the README copying/updating the data from the dataset repo.
Expand Down Expand Up @@ -415,7 +471,7 @@ def load_from_disk(
original_distiset_path = str(distiset_path)

fs: fsspec.AbstractFileSystem
fs, _, [distiset_path] = fsspec.get_fs_token_paths(
fs, _, [distiset_path] = fsspec.get_fs_token_paths( # type: ignore
original_distiset_path, storage_options=storage_options
)
dest_distiset_path = distiset_path
Expand All @@ -425,26 +481,31 @@ def load_from_disk(
), "`distiset_path` must be a `PathLike` object pointing to a folder or a URI of a remote filesystem."

has_config = False
has_artifacts = False
distiset = cls()

if is_remote_filesystem(fs):
src_dataset_path = distiset_path
if download_dir:
dest_distiset_path = download_dir
else:
dest_distiset_path = Dataset._build_local_temp_path(src_dataset_path)
fs.download(src_dataset_path, dest_distiset_path.as_posix(), recursive=True)
dest_distiset_path = Dataset._build_local_temp_path(src_dataset_path) # type: ignore
fs.download(src_dataset_path, dest_distiset_path.as_posix(), recursive=True) # type: ignore

# Now we should have the distiset locally, so we can read those files
for folder in Path(dest_distiset_path).iterdir():
if folder.stem == DISTISET_CONFIG_FOLDER:
has_config = True
continue
elif folder.stem == DISTISET_ARTIFACTS_FOLDER:
has_artifacts = True
continue
distiset[folder.stem] = load_from_disk(
str(folder),
keep_in_memory=keep_in_memory,
)
# From the config folder we just need to point to the files. Once downloaded we set the path

# From the config folder we just need to point to the files. Once downloaded we set the pathto pointto point to the files. Once downloaded we set the path
gabrielmbmb marked this conversation as resolved.
Show resolved Hide resolved
# to wherever they are.
if has_config:
distiset_config_folder = posixpath.join(
Expand All @@ -463,6 +524,11 @@ def load_from_disk(
if Path(log_filename_path).exists():
distiset.log_filename_path = Path(log_filename_path)

if has_artifacts:
distiset.artifacts_path = Path(
posixpath.join(dest_distiset_path, DISTISET_ARTIFACTS_FOLDER)
)

return distiset

@property
Expand All @@ -474,6 +540,16 @@ def pipeline_path(self) -> Union[Path, None]:
def pipeline_path(self, path: PathLike) -> None:
self._pipeline_path = Path(path)

@property
def artifacts_path(self) -> Union[Path, None]:
"""Returns the path to the directory containing the artifacts generated by the steps
of the pipeline."""
return self._artifacts_path

@artifacts_path.setter
def artifacts_path(self, path: PathLike) -> None:
self._artifacts_path = Path(path)

@property
def log_filename_path(self) -> Union[Path, None]:
"""Returns the path to the `pipeline.log` file that generated the `Pipeline`."""
Expand Down Expand Up @@ -540,10 +616,10 @@ def create_distiset( # noqa: C901

logger = logging.getLogger("distilabel.distiset")

data_dir = Path(data_dir)
steps_outputs_dir = data_dir / STEPS_OUTPUTS_PATH

distiset = Distiset()
for file in data_dir.iterdir():
for file in steps_outputs_dir.iterdir():
if file.is_file():
continue

Expand All @@ -569,19 +645,26 @@ def create_distiset( # noqa: C901
if len(distiset.keys()) == 1:
distiset["default"] = distiset.pop(list(distiset.keys())[0])

# If there's any artifact set the `artifacts_path` so they can be uploaded
steps_artifacts_dir = data_dir / STEPS_ARTIFACTS_PATH
if any(steps_artifacts_dir.rglob("*")):
distiset.artifacts_path = steps_artifacts_dir

# Include `pipeline.yaml` if exists
if pipeline_path:
distiset.pipeline_path = pipeline_path
else:
# If the pipeline path is not provided, try to find it in the parent directory
# and assume that's the wanted file.
pipeline_path = data_dir.parent / "pipeline.yaml"
pipeline_path = steps_outputs_dir.parent / "pipeline.yaml"
if pipeline_path.exists():
distiset.pipeline_path = pipeline_path

# Include `pipeline.log` if exists
if log_filename_path:
distiset.log_filename_path = log_filename_path
else:
log_filename_path = data_dir.parent / "pipeline.log"
log_filename_path = steps_outputs_dir.parent / "pipeline.log"
if log_filename_path.exists():
distiset.log_filename_path = log_filename_path

Expand Down
Loading
Loading