Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

allow_failure() and quote() wrappers don't work with DaskTaskRunner #43

Closed
david-elliott-deliveroo opened this issue Oct 19, 2022 · 9 comments · Fixed by PrefectHQ/prefect#7248 or PrefectHQ/prefect#8037

Comments

@david-elliott-deliveroo
Copy link

david-elliott-deliveroo commented Oct 19, 2022

Issue

When trying to use the allow_failure() and quote() wrappers in a Prefect flow executed with the DaskTaskRunner, it hits the error TypeError: cannot pickle '_asyncio.Future' object.

Versions

prefect==2.6.1
prefect-dask==0.2.0.post1

MRE for allow_failure()

from prefect import flow, task, allow_failure
from prefect_dask.task_runners import DaskTaskRunner
import timedef sim_failure(task_name: str):
    if task_name == "task_2":
        raise Exception("Failure")
​
@task()
def sql_task(task_name: str):
    print(f"Running task: {task_name}")
    sim_failure(task_name)
    time.sleep(3)
​
@flow(name="demo_4",task_runner=DaskTaskRunner())
def demo_4():
    task_1 = sql_task.with_options(name="task_1").submit(task_name="task_1", wait_for=[])
    task_2 = sql_task.with_options(name="task_2").submit(task_name="task_2", wait_for=[allow_failure(task_1)])
    task_3 = sql_task.with_options(name="task_3").submit(task_name="task_3", wait_for=[allow_failure(task_2)])
​
if __name__ == "__main__":
    demo_4()​

Findings

Have tested with both the local DaskTaskRunner, and with ephemeral Dask spun up on k8s.
Task 1 is submitted fine, but it errors when trying to submit task 2 + 3 throwing TypeError: cannot pickle '_asyncio.Future' object.

Same underlying issue for quote() wrapper.

@zanieb
Copy link
Contributor

zanieb commented Oct 19, 2022

Thanks for the report!

This is an issue with conversion of Prefect futures -> Dask futures. It'll need to support checking for annotated objects like this.

I do not think quote will work since we explicitly never descend into quoted objects. However, we should be able to add support in visit_collection for allow_failure.

@david-elliott-deliveroo
Copy link
Author

Thanks @madkinsz! I actually only need the allow_failure() functionality so if there were a fix for that it would be amazing! (I assumed it'd be a similar thing for quote() hence why I included it but I don't actually need that). This is sadly blocking our 2.0 migration currently

@zanieb
Copy link
Contributor

zanieb commented Oct 19, 2022

@david-elliott-deliveroo Can you give PrefectHQ/prefect#7248 a try?

@david-elliott-deliveroo
Copy link
Author

Yeah that seems to be working locally @madkinsz ! 🙏 haven't tested k8s yet but is working with the local DaskTaskRunner

@david-elliott-deliveroo
Copy link
Author

@madkinsz actually sorry I spoke too soon. It fixes the future serialization error, however it causes a different bug whereby Dask doesn't then respect the wait_for() and just runs all tasks at once (not in the order they should be ran in). The Prefect UI shows the order / lineage between the tasks, however from logs I can see they're all just running at the same time.

MRE below - on prefect==2.6.1, it runs the 3 tasks sequentially (when you remove the allow_failure wrapper), however under the above PR's version, all 3 tasks run simultaneously (local Dask).

from prefect import flow, task, allow_failure
from prefect_dask.task_runners import DaskTaskRunner
import time

def sim_failure(task_name: str):
    for x in range(0,5):
        print(f"{task_name} - {x}")
        time.sleep(1)
    if task_name == "task_2":
        raise Exception("Failure")

@task()
def sql_task(task_name: str):
    print(f"Running task: {task_name}")
    sim_failure(task_name)

@flow(name="demo_6",task_runner=DaskTaskRunner())
def demo_6():
    task_1 = sql_task.with_options(name="task_1").submit(task_name="task_1", wait_for=[])
    task_2 = sql_task.with_options(name="task_2").submit(task_name="task_2", wait_for=[allow_failure(task_1)])
    task_3 = sql_task.with_options(name="task_3").submit(task_name="task_3", wait_for=[allow_failure(task_2)])

if __name__ == "__main__":
    demo_6()

@david-elliott-deliveroo
Copy link
Author

@madkinsz I don't think this is fixed? I just installed main (git+https://github.com/PrefectHQ/prefect) and the below MRE locally and it's still erroring

prefect version                                                                                                                                                                                                           [22/11/1| 6:17pm]
Version:             2.6.5+32.gf2f716e2f
API version:         0.8.3
Python version:      3.8.7
Git commit:          f2f716e2
Built:               Tue, Nov 1, 2022 11:24 AM
OS/Arch:             darwin/x86_64
Profile:             bi-p
Server type:         cloud
from prefect import flow, task, allow_failure, get_run_logger
from prefect_dask.task_runners import DaskTaskRunner
import time

@task()
def sql_task(task_name: str):
    logger = get_run_logger()
    logger.info(f"RUNNING: {task_name}")
    time.sleep(3)
    if task_name == "task_2":
        raise Exception("failed task")

@flow(name="demo_11", task_runner=DaskTaskRunner())
def demo_11():
    logger = get_run_logger()
    logger.info(f"Flow now starting")
    task_1 = sql_task.with_options(name='task_1').submit(task_name='task_1', wait_for=[])
    task_2 = sql_task.with_options(name='task_2').submit(task_name='task_2', wait_for=[allow_failure(task_1)])
    task_3 = sql_task.with_options(name='task_3').submit(task_name='task_3', wait_for=[allow_failure(task_2)])

if __name__ == "__main__":
    demo_11()

full log + traceback attached below
out.log

@zanieb
Copy link
Contributor

zanieb commented Nov 1, 2022

@david-elliott-deliveroo
Copy link
Author

Hey @ahuang11 / @madkinsz , wondering if you have any update on this one please? 🙂

@zanieb
Copy link
Contributor

zanieb commented Dec 13, 2022

We're waiting on a change in Dask itself :) The changes to support this are weirdly complicated since they need to know how to serialize/deserialize these annotations.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
2 participants