Skip to content

Commit

Permalink
feat(pipeline): update run_pipeline method
Browse files Browse the repository at this point in the history
  • Loading branch information
entelecheia committed Jun 30, 2023
1 parent efbae91 commit 1498128
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions src/hyfi/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class PipelineConfig(BaseRunConfig):
"""Pipeline Configuration"""

steps: Optional[List[Union[str, Dict]]] = []
initial_object: Optional[Any] = None
use_self_as_initial_object: bool = False

@validator("steps", pre=True)
def steps_to_list(cls, v):
Expand Down Expand Up @@ -88,7 +90,7 @@ class PIPELINEs:
@staticmethod
def run_pipeline(
config: Union[Dict, PipelineConfig],
initial_obj: Optional[Any] = None,
initial_object: Optional[Any] = None,
task: Optional[TaskConfig] = None,
) -> Any:
"""
Expand All @@ -106,17 +108,19 @@ def run_pipeline(
if not isinstance(config, PipelineConfig):
config = PipelineConfig(**Composer.to_dict(config))
pipes = config.get_pipes(task)
if initial_object is None and config.initial_object is not None:
initial_object = config.initial_object
# Return initial object for the initial object
if not pipes:
logger.warning("No pipes specified")
return initial_obj
return initial_object

logger.info("Applying %s pipes", len(pipes))
# Run the task in the current directory.
if task is not None:
with change_directory(task.root_dir):
return reduce(PIPELINEs.run_pipe, pipes, initial_obj)
return reduce(PIPELINEs.run_pipe, pipes, initial_obj)
return reduce(PIPELINEs.run_pipe, pipes, initial_object)
return reduce(PIPELINEs.run_pipe, pipes, initial_object)

@staticmethod
def run_pipe(
Expand Down Expand Up @@ -225,10 +229,10 @@ def run_task(task: TaskConfig, project: Optional[ProjectConfig] = None):
task.project = project
# Run all pipelines in the pipeline.
for pipeline in PIPELINEs.get_pipelines(task):
# Run the pipeline if verbose is true
if task.verbose:
logger.info("Running pipeline: %s", pipeline.dict())
PIPELINEs.run_pipeline(pipeline, task=task)
initial_object = task if pipeline.use_self_as_initial_object else None
PIPELINEs.run_pipeline(pipeline, initial_object, task)

@staticmethod
def run_workflow(workflow: WorkflowConfig):
Expand Down

0 comments on commit 1498128

Please sign in to comment.