Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added tutorial and example for handling missing references #716

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions docs/tutorials/9-missing-references.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Handling Job Dependencies that Can Fail\n",
"\n",
"In this tutorial, we will demonstrate how to handle missing references in JobFlow. This is useful when you have jobs that may fail, but you still want to proceed with the workflow.\n",
"\n",
"First, we import the necessary modules and define a job that can fail based on an input parameter.\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"from jobflow import Flow, OnMissing, job, run_locally\n",
"\n",
"\n",
"@job\n",
"def func(fail: bool = False):\n",
" \"\"\"Failable function.\"\"\"\n",
" if fail:\n",
" raise ValueError(\"An error occurred.\")\n",
" return 1"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next, we define a job that collects the outputs of other jobs. This job can handle missing references.\n",
"\n",
"**Note:** You must explicitly define how missing references are handled in each job.\n",
"setting `on_missing_refs` to `OnMissing.None` will only provide a `None` whenever an output is missing.\n",
"you must handle those `None` values in your job."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"@job\n",
"def collect(job_outputs):\n",
" \"\"\"Job that allows some parents to fail.\"\"\"\n",
" total = 0\n",
" for output in job_outputs:\n",
" if output is None:\n",
" continue\n",
" total += output\n",
" if total < 1:\n",
" raise ValueError(\"No enough finished parents.\")\n",
" return total"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, we create instances of the `func` job, one of which will fail.\n",
"Then, we create an instance of the `collect` job and pass the outputs of the `func` jobs to it.\n",
"\n",
"By setting the `on_missing_refs` parameter to `OnMissing.None`, and handling the `None` values in the `collect` job, we can proceed with the workflow even if some references are missing."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"job1, job2, job3 = func(), func(), func(fail=True)\n",
"job_outputs = [job1.output, job2.output, job3.output]\n",
"collect_job = collect(job_outputs)\n",
"collect_job.config.on_missing_references = OnMissing.NONE"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As the workflow is running, `job1` and `job2` will each return 1, while job3 will fail. \n",
"Since `collect_job` has `on_missing_references` set to `OnMissing.NONE`, it proceeds despite the missing output from `job3`. \n"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"2024-12-13 17:28:10,890 INFO Started executing jobs locally\n",
"2024-12-13 17:28:10,991 INFO Starting job - func (12fc77ef-233e-4bee-a36c-7f61dc5badf9)\n",
"2024-12-13 17:28:10,996 INFO Finished job - func (12fc77ef-233e-4bee-a36c-7f61dc5badf9)\n",
"2024-12-13 17:28:10,997 INFO Starting job - func (626268f0-ecb0-4fa2-bf6d-0bb9dd72586c)\n",
"2024-12-13 17:28:10,999 INFO Finished job - func (626268f0-ecb0-4fa2-bf6d-0bb9dd72586c)\n",
"2024-12-13 17:28:11,000 INFO Starting job - func (ba559709-99f7-4ec3-9fe7-6804a002ff0a)\n",
"2024-12-13 17:28:11,002 INFO func failed with exception:\n",
"Traceback (most recent call last):\n",
" File \"/home/jmmshn/miniconda3/envs/af/lib/python3.10/site-packages/jobflow/managers/local.py\", line 114, in _run_job\n",
" response = job.run(store=store)\n",
" File \"/home/jmmshn/miniconda3/envs/af/lib/python3.10/site-packages/jobflow/core/job.py\", line 600, in run\n",
" response = function(*self.function_args, **self.function_kwargs)\n",
" File \"/tmp/ipykernel_298791/2449992254.py\", line 7, in func\n",
" raise ValueError(\"An error occurred.\")\n",
"ValueError: An error occurred.\n",
"\n",
"2024-12-13 17:28:11,003 INFO Starting job - collect (0becc3b9-532c-4284-9c90-2a890e791ef2)\n",
"2024-12-13 17:28:11,011 INFO Finished job - collect (0becc3b9-532c-4284-9c90-2a890e791ef2)\n",
"2024-12-13 17:28:11,013 INFO Finished executing jobs locally\n"
]
}
],
"source": [
"flow = Flow([job1, job2, job3, collect_job])\n",
"res = run_locally(flow)\n",
"n_finished = 2\n",
"assert res[collect_job.uuid][1].output == n_finished"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "af",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.15"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
36 changes: 36 additions & 0 deletions examples/missing_reference.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""A demonstration of sequential jobs with missing references."""

from jobflow import Flow, OnMissing, job, run_locally


@job
def func(fail: bool = False):
"""Failable function."""
if fail:
raise ValueError("An error occurred.")
return 1


@job
def collect(job_outputs):
"""Job that allows some parents to fail."""
total = 0
for jo in job_outputs:
if jo is None:
continue
total += jo
if total < 1:
raise ValueError("No enough finished parents.")
return total


job1, job2, job3 = func(), func(), func(fail=True)
job_outputs = [job1.output, job2.output, job3.output]
collect_job = collect(job_outputs)
collect_job.config.on_missing_references = OnMissing.NONE
flow = Flow([job1, job2, job3, collect_job])

# run the flow, you can
res = run_locally(flow)
n_finished = 2
assert res[collect_job.uuid][1].output == n_finished
Loading