Skip to content

Commit

Permalink
Fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mrchtr committed Mar 20, 2024
1 parent 7dd6dfa commit 3402571
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 30 deletions.
18 changes: 11 additions & 7 deletions examples/sample_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,35 @@
import pyarrow as pa

from fondant.component import PandasTransformComponent
from fondant.pipeline import Pipeline, lightweight_component
from fondant.dataset import Workspace, lightweight_component, Dataset

BASE_PATH = Path("./.artifacts").resolve()

# Define pipeline
pipeline = Pipeline(name="dummy-pipeline", base_path=str(BASE_PATH))
workspace = Workspace(name="dummy-pipeline", base_path=str(BASE_PATH))

# Load from hub component
load_component_column_mapping = {
"text": "text_data",
}

dataset = pipeline.read(
dataset = Dataset.read(
"load_from_parquet",
arguments={
"dataset_uri": "/data/sample.parquet",
"column_name_mapping": load_component_column_mapping,
},
produces={"text_data": pa.string()},
workspace=workspace,
)

dataset = dataset.apply(
"./components/dummy_component",
)
dataset = dataset.apply("./components/dummy_component", workspace=workspace)

dataset = dataset.apply(
"chunk_text",
arguments={"chunk_size": 10, "chunk_overlap": 2},
consumes={"text": "text_data"},
workspace=workspace,
)


Expand All @@ -60,8 +60,12 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
ref=CalculateChunkLength,
produces={"chunk_length": pa.int32()},
arguments={"arg_x": "value_x"},
workspace=workspace,
)

dataset.write(
ref="write_to_file", arguments={"path": "/data/export"}, consumes={"text": "text"}
ref="write_to_file",
arguments={"path": "/data/export"},
consumes={"text": "text"},
workspace=workspace,
)
21 changes: 12 additions & 9 deletions src/fondant/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,10 +607,15 @@ def run_local(args):
from fondant.dataset.runner import DockerRunner

extra_volumes = []
workspace = Workspace(
name="dummy_workspace",
base_path="",
) # TODO: handle in #887 -> retrieve global workspace, or cli command
# use workspace from cli command
# if args.workspace exists

workspace = getattr(args, "workspace", None)
if workspace is None:
workspace = Workspace(
name="dummy_workspace",
base_path=".artifacts",
) # TODO: handle in #887 -> retrieve global workspace or init default one

if args.extra_volumes:
extra_volumes.extend(args.extra_volumes)
Expand Down Expand Up @@ -875,16 +880,14 @@ def dataset_from_module(module_str: str) -> Dataset:
msg = f"No dataset found in module {module_str}"
raise DatasetImportError(msg)

# TODO: now there might be several dataset instances in a single module? how to handle?
# Skip this one and choose the first dataset instance?
if len(dataset_instances) > 1:
msg = (
f"Found multiple instantiated workspaces in {module_str}. Only one workspace "
f"can be present"
f"Found multiple instantiated datasets in {module_str}. Use the first dataset to start "
f"the execution."
)
raise DatasetImportError(msg)
logger.info(msg)

logger.info(f"Dataset found in module {module_str}")
return dataset_instances[0]


Expand Down
6 changes: 3 additions & 3 deletions src/fondant/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,22 +463,22 @@ def _validate_workspace_name(name: str) -> str:
return name

def get_run_id(self) -> str:
"""Get a unique run ID for the pipeline."""
# TODO: eager execution: every execution a single run? or use the latest one?
"""Get a unique run ID for the workspace."""
timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
return f"{self.name}-{timestamp}"


class Dataset:
# TODO: prevent calling the init?
def __init__(
self,
name: t.Optional[str] = None,
description: t.Optional[str] = None,
manifest: t.Optional[Manifest] = None,
):
if name is not None:
self.name = self._validate_dataset_name(name)

self.description = description
self._graph: t.OrderedDict[str, t.Any] = OrderedDict()
self.task_without_dependencies_added = False
self.manifest = manifest
Expand Down
15 changes: 9 additions & 6 deletions src/fondant/dataset/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ def run(
compiler = KubeFlowCompiler()
compiler.compile(
dataset,
workspace,
output_path=output_path,
)
self._run(output_path, experiment_name=experiment_name)
Expand Down Expand Up @@ -269,29 +270,30 @@ def __init__(

def run(
self,
input: t.Union[Dataset, str],
dataset: t.Union[Dataset, str],
workspace: Workspace,
):
"""Run a pipeline, either from a compiled vertex spec or from a fondant pipeline.
Args:
input: the pipeline to compile or a path to a already compiled sagemaker spec
dataset: the dataset to compile or a path to an already compiled sagemaker spec
workspace: workspace to operate in
"""
if isinstance(input, Dataset):
if isinstance(dataset, Dataset):
os.makedirs(".fondant", exist_ok=True)
output_path = ".fondant/vertex-pipeline.yaml"
logging.info(
"Found reference to un-compiled pipeline... compiling",
)
compiler = VertexCompiler()
compiler.compile(
input,
dataset,
workspace,
output_path=output_path,
)
self._run(output_path)
else:
self._run(input)
self._run(dataset)

def _run(self, input_spec: str, *args, **kwargs):
job = self.aip.PipelineJob(
Expand Down Expand Up @@ -353,7 +355,8 @@ def run(
)
compiler = SagemakerCompiler()
compiler.compile(
dataset,
dataset=dataset,
workspace=workspace,
output_path=output_path,
role_arn=role_arn,
)
Expand Down
2 changes: 1 addition & 1 deletion tests/pipeline/test_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def setup_pipeline(request, tmp_path, monkeypatch):
base_path=workspace.base_path,
run_id=workspace.get_run_id(),
)
dataset = Dataset(manifest)
dataset = Dataset(manifest=manifest)
cache_dict = {}
example_dir, components = request.param
for component_dict in components:
Expand Down
10 changes: 6 additions & 4 deletions tests/pipeline/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ class MockKubeFlowCompiler:
def compile(
self,
dataset,
workspace,
output_path,
) -> None:
with open(output_path, "w") as f:
Expand Down Expand Up @@ -236,15 +237,15 @@ def test_vertex_runner():
"google.cloud.aiplatform.PipelineJob",
):
runner = VertexRunner(project_id="some_project", region="some_region")
runner.run(input=input_spec_path, workspace=WORKSPACE)
runner.run(dataset=input_spec_path, workspace=WORKSPACE)

# test with service account
runner2 = VertexRunner(
project_id="some_project",
region="some_region",
service_account="some_account",
)
runner2.run(input=input_spec_path, workspace=WORKSPACE)
runner2.run(dataset=input_spec_path, workspace=WORKSPACE)


def test_vertex_runner_from_pipeline():
Expand All @@ -257,7 +258,7 @@ def test_vertex_runner_from_pipeline():
):
runner = VertexRunner(project_id="some_project", region="some_region")
runner.run(
input=Dataset(),
dataset=Dataset(),
workspace=WORKSPACE,
)

Expand Down Expand Up @@ -324,7 +325,8 @@ def test_sagemaker_runner(tmp_path_factory):
class MockSagemakerCompiler:
def compile(
self,
pipeline,
dataset,
workspace,
output_path,
*,
role_arn,
Expand Down
1 change: 1 addition & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ def test_local_run_cloud_credentials(mock_docker_installation):
credentials=None,
extra_volumes=[],
build_arg=[],
workspace=TEST_WORKSPACE,
)
run_local(args)

Expand Down

0 comments on commit 3402571

Please sign in to comment.