From 62f43a970ebfa2d923a3399c59b7abfa8591e9f4 Mon Sep 17 00:00:00 2001 From: Jimmy Shen Date: Fri, 13 Dec 2024 18:17:59 -0800 Subject: [PATCH 1/2] added tutorial with text info --- docs/tutorials/9-missing-references.ipynb | 158 ++++++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 docs/tutorials/9-missing-references.ipynb diff --git a/docs/tutorials/9-missing-references.ipynb b/docs/tutorials/9-missing-references.ipynb new file mode 100644 index 00000000..fcce26af --- /dev/null +++ b/docs/tutorials/9-missing-references.ipynb @@ -0,0 +1,158 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Handling Job Dependencies that Can Fail\n", + "\n", + "In this tutorial, we will demonstrate how to handle missing references in JobFlow. This is useful when you have jobs that may fail, but you still want to proceed with the workflow.\n", + "\n", + "First, we import the necessary modules and define a job that can fail based on an input parameter.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "from jobflow import Flow, OnMissing, job, run_locally\n", + "\n", + "\n", + "@job\n", + "def func(fail: bool = False):\n", + " \"\"\"Failable function.\"\"\"\n", + " if fail:\n", + " raise ValueError(\"An error occurred.\")\n", + " return 1" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, we define a job that collects the outputs of other jobs. This job can handle missing references.\n", + "\n", + "**Note:** You must explicitly define how missing references are handled in each job.\n", + "setting `on_missing_refs` to `OnMissing.None` will only provide a `None` whenever an output is missing.\n", + "you must handle those `None` values in your job." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "@job\n", + "def collect(job_outputs):\n", + " \"\"\"Job that allows some parents to fail.\"\"\"\n", + " total = 0\n", + " for output in job_outputs:\n", + " if output is None:\n", + " continue\n", + " total += output\n", + " if total < 1:\n", + " raise ValueError(\"No enough finished parents.\")\n", + " return total" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now, we create instances of the `func` job, one of which will fail.\n", + "Then, we create an instance of the `collect` job and pass the outputs of the `func` jobs to it.\n", + "\n", + "By setting the `on_missing_refs` parameter to `OnMissing.None`, and handling the `None` values in the `collect` job, we can proceed with the workflow even if some references are missing." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "job1, job2, job3 = func(), func(), func(fail=True)\n", + "job_outputs = [job1.output, job2.output, job3.output]\n", + "collect_job = collect(job_outputs)\n", + "collect_job.config.on_missing_references = OnMissing.NONE" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "As the workflow is running, `job1` and `job2` will each return 1, while job3 will fail. \n", + "Since `collect_job` has `on_missing_references` set to `OnMissing.NONE`, it proceeds despite the missing output from `job3`. \n" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2024-12-13 17:28:10,890 INFO Started executing jobs locally\n", + "2024-12-13 17:28:10,991 INFO Starting job - func (12fc77ef-233e-4bee-a36c-7f61dc5badf9)\n", + "2024-12-13 17:28:10,996 INFO Finished job - func (12fc77ef-233e-4bee-a36c-7f61dc5badf9)\n", + "2024-12-13 17:28:10,997 INFO Starting job - func (626268f0-ecb0-4fa2-bf6d-0bb9dd72586c)\n", + "2024-12-13 17:28:10,999 INFO Finished job - func (626268f0-ecb0-4fa2-bf6d-0bb9dd72586c)\n", + "2024-12-13 17:28:11,000 INFO Starting job - func (ba559709-99f7-4ec3-9fe7-6804a002ff0a)\n", + "2024-12-13 17:28:11,002 INFO func failed with exception:\n", + "Traceback (most recent call last):\n", + " File \"/home/jmmshn/miniconda3/envs/af/lib/python3.10/site-packages/jobflow/managers/local.py\", line 114, in _run_job\n", + " response = job.run(store=store)\n", + " File \"/home/jmmshn/miniconda3/envs/af/lib/python3.10/site-packages/jobflow/core/job.py\", line 600, in run\n", + " response = function(*self.function_args, **self.function_kwargs)\n", + " File \"/tmp/ipykernel_298791/2449992254.py\", line 7, in func\n", + " raise ValueError(\"An error occurred.\")\n", + "ValueError: An error occurred.\n", + "\n", + "2024-12-13 17:28:11,003 INFO Starting job - collect (0becc3b9-532c-4284-9c90-2a890e791ef2)\n", + "2024-12-13 17:28:11,011 INFO Finished job - collect (0becc3b9-532c-4284-9c90-2a890e791ef2)\n", + "2024-12-13 17:28:11,013 INFO Finished executing jobs locally\n" + ] + } + ], + "source": [ + "flow = Flow([job1, job2, job3, collect_job])\n", + "res = run_locally(flow)\n", + "n_finished = 2\n", + "assert res[collect_job.uuid][1].output == n_finished" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "af", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.15" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} From af107c7158991b75dff0fcb16c37fa741888ee78 Mon Sep 17 00:00:00 2001 From: Jimmy Shen Date: Fri, 13 Dec 2024 17:19:02 -0800 Subject: [PATCH 2/2] added example for missing ref. --- examples/missing_reference.py | 36 +++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 examples/missing_reference.py diff --git a/examples/missing_reference.py b/examples/missing_reference.py new file mode 100644 index 00000000..d0f2aeb4 --- /dev/null +++ b/examples/missing_reference.py @@ -0,0 +1,36 @@ +"""A demonstration of sequential jobs with missing references.""" + +from jobflow import Flow, OnMissing, job, run_locally + + +@job +def func(fail: bool = False): + """Failable function.""" + if fail: + raise ValueError("An error occurred.") + return 1 + + +@job +def collect(job_outputs): + """Job that allows some parents to fail.""" + total = 0 + for jo in job_outputs: + if jo is None: + continue + total += jo + if total < 1: + raise ValueError("No enough finished parents.") + return total + + +job1, job2, job3 = func(), func(), func(fail=True) +job_outputs = [job1.output, job2.output, job3.output] +collect_job = collect(job_outputs) +collect_job.config.on_missing_references = OnMissing.NONE +flow = Flow([job1, job2, job3, collect_job]) + +# run the flow, you can +res = run_locally(flow) +n_finished = 2 +assert res[collect_job.uuid][1].output == n_finished