Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Flow.submit interface for subflows #6689

Open
3 tasks done
zanieb opened this issue Sep 2, 2022 · 20 comments
Open
3 tasks done

Add Flow.submit interface for subflows #6689

zanieb opened this issue Sep 2, 2022 · 20 comments
Labels
enhancement An improvement of an existing feature needs:design Blocked by a need for an implementation outline

Comments

@zanieb
Copy link
Contributor

zanieb commented Sep 2, 2022

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar request and didn't find it.
  • I searched the Prefect documentation for this feature.

Prefect Version

2.x

Describe the current behavior

Currently, subflows can only be run in the main process and only tasks can be submitted to other infrastructure and run in the background.

Describe the proposed behavior

Flows should be submittable to run in the background. Calling Flow.submit should return a future that can be used to wait for the flow run's completion.

As a stretch and perhaps future goal: Flows should be submittable to external infrastructure without a deployment.

All of the features available for flow calls should be available for submission. We may limit this in the first iteration to get the feature in user's hands faster.

Example Use

from prefect import flow

@flow
def bar():
   return 1

@flow
def foo():
    future = bar.submit()  # run in a worker thread
    x = future.result()

As a stretch goal:

from prefect import flow

@flow(infrastructure=KubernetesJob(...))
def bar():
   return 1

@flow
def foo():
    future = bar.submit()
    x = future.result()

Additional context

An extension of #6688

This feature will require expert changes to Prefect internals.

For in a background thread:

  • Futures need to be updated to support flows
  • Waiting for flow run completion from a future needs to be performant
  • It needs to be clear where the flow is being submitted to

For external infrastructure:

  • Infrastructure declaration needs to be added at the flow decorator level
  • Flow result retrieval from a future will require design
    • We may need to require result handling to be configured to use this feature
    • We may be able to pass results back via cloudpickle and stdout or a mounted file
  • For container based infrastructure, we will need to determine what image the flow should run in
    • Since the user is calling it from the current flow we may be able to use the parent's image in some cases
  • We need to determine how to get the flow to the infrastructure, i.e. serialization
@zanieb zanieb added enhancement An improvement of an existing feature status:triage and removed status:triage labels Sep 2, 2022
@polivbr
Copy link

polivbr commented Sep 12, 2022

100% yes.

We have use cases where part of a flow needs to be run on Windows. Ideally we could set up a separate work queue and submit the subflow to this queue and have it be picked up by agents running on the Windows boxes, but there currently isn't a way to do this (at least not that I've found).

@zanieb
Copy link
Contributor Author

zanieb commented Oct 26, 2022

@polivbr This interface would likely avoid the use of agents, I believe your use-case is addressed by run_deployment (#7047).

@anna-geller
Copy link
Contributor

cc @anticorrelator - tagging you because I know you wanted to write a raft doc about it

use cases we should consider:

  • being able to run downstream flows or tasks even when one subflow fails
  • being able to submit several child flows for concurrent execution
from datetime import date
from prefect import flow
from flows.transformation.jaffle_shop.dbt_run_from_manifest import dbt_jaffle_shop
from flows.ingestion.ingest_jaffle_shop import raw_data_jaffle_shop
from flows.analytics.dashboards import dashboards
from flows.ml.sales_forecast import sales_forecast, run_critical_work


@flow
def jaffle_shop_ingest_transform(
    start_date: date = date(2022, 11, 1),  # parametrized for backfills
    end_date: date = date.today(),
    dataset_size: int = 10_000,
):
    raw_data_jaffle_shop(start_date, end_date, dataset_size)
    dbt_jaffle_shop()
    dashboards()
    sales_forecast()
    run_critical_work()

I want to run the dashboards and sales_forecast subflows concurrently and I want to run run_critical_work only when those two subflows are complete, even if they fail. Currently, if any subflow run fails, run_critical_work cannot be executed. This would be possible with the allow_failure annotation added in PR 7120 but it's only applicable to tasks.

@anna-geller
Copy link
Contributor

another user reporting a similar request: https://discourse.prefect.io/t/running-subflows-in-parallel-with-downstream-dependency/1903/2

for this user, this is something that makes migration difficult

@zanieb
Copy link
Contributor Author

zanieb commented Nov 11, 2022

@anna-geller

Currently, if any subflow run fails, run_critical_work cannot be executed.

If you run your subflows with return_state=True failed states will not be raised and a downstream can still be run.

I agree we should definitely mirror all the task futures here though.

@billpalombi billpalombi added the needs:design Blocked by a need for an implementation outline label Nov 14, 2022
@kpweiler
Copy link

kpweiler commented Jan 5, 2023

If/When this implemented - will it require the use of async def on the subflow functions (and then, transitively, the task functions)?

@zanieb
Copy link
Contributor Author

zanieb commented Jan 5, 2023

@kpweiler no, this would be available as a sync and async interface.

@jmesterh
Copy link

FWIW I am trying to build a POC of a 10,000 table ETL using Prefect and just ran into this issue. The first attempt looked like this (using task tags to set concurrency limits):

@task
def extract()...
@task
def transform()...
@task
def load()...

@flow
def main():
  loads = []
  # ETL 10,000 tables in parallel 
  for table in tables:
    extracted = extract.submit(table)
    transform = transform.submit(extracted)
    loads.append(load.submit(transform))
  for load in loads:
    result = load.result()
    print("load complete for "+result)

This works, but it creates 30,000 tasks waiting for their dependencies to complete. What I would like to do is call each task sequentially (per table) while processing the tables in parallel:

@task
def extract()...
@task
def transform()...
@task
def load()...

@flow
def etl(table):
    return load(transform(extract(table)))

@flow
def main()
  loads = []
  # ETL 10,000 tables in parallel
  for table in tables:
    loads.append(etl.submit(table))
  for load in loads:
    result = load.result()
    print("load complete for "+result)

I see there is an async workaround, but I can't use async as there is no async driver for Oracle or MSSQL in SQLAlchemy. There is also the issue of not being able to run multiple instances of the same sub-flow. If these could be fixed so that the example above works that would be awesome.

@rsampaths16
Copy link
Contributor

only tasks can be submitted to external infrastructure and run in the background

Hi @madkinsz can you please provide example for this. I tried looking in the docs but couldn't find a way to do this. Thanks.

@zanieb zanieb changed the title Add Flow.submit interface for subflows on external infrastructure Add Flow.submit interface for subflows May 22, 2023
@daniel-oh-sunpower
Copy link

Just a gentle question: is this feature still in active development? If not, anyone else interested in seeing this implemented? (please up-vote if so!)

@nicolasiltis
Copy link

I'm very interested in seeing this feature implemented !! It would be awesome for the orchestration.
Just adding an idea here and open the discussion :
We have TaskRunner, why can't we have FlowRunner in the same way ? (SequentialFlowRunner[default], ConcurrentFlowRunner, DaskFlowRunner, RayFlowRunner, ...)

@ricosaurus
Copy link

Chiming in that it would be great to know if this is still under development -- for me it is currently a major obstacle for using prefect. I had naively assumed I could run subflows in parallel (without async) and am considering next steps.

@williamjamir
Copy link
Contributor

williamjamir commented Nov 8, 2023

I'm excited about this feature too! It would make the Prefect API more consistent, in my view, since flows and tasks will share common functionalities; big win for simplicity.

My user case is because I want to run flows with different parameters, like hyperparameters.
It can make experiment executions more simple :)

@wakatara
Copy link

wakatara commented Jan 25, 2024

Have to echo @ricosaurus here, I had assumed being able to concurrently run a subflow (mine just takes a list of files to be processed and then submits them to a flow) up to the concurrency limit for the "subflow" it submits would have been fairly trivial, so right now this is completely blocking me migratig over to Prefect from Airflow (which I'd like to migrate from, but which supports concurrency in-DAGs.).

Can we understand when we may see this? I feel Prefect is the superior product and like the way it works much better than AF (also feel you can modularly code),but no parallel execution of sub/flows is kinda a deal killer.

Can we get an update on where this is as a feature release? Or if it is already supported but just not clearly documented?

@snowdrop4
Copy link

snowdrop4 commented Feb 14, 2024

The docs on flows, I feel, are quite misleading. They make it seem like parallel execution of sub-flows is already supported:

Subflow runs behave like normal flow runs. There is a full representation of the flow run in the backend as if it had been called separately

The above makes it sound like subflows behave like normal flows (literally). But really, they aren't comparable, as you can launch multiple normal flows and have them run in parallel, but this isn't the case for subflows.

Subflows will block execution of the parent flow until completion. However, asynchronous subflows can be run in parallel by using AnyIO task groups or asyncio.gather.

And then the above makes it sound like all you need to do is use asyncio.gather, and then the sub-flows will run in parallel. This also isn't true.

@kevingrismore
Copy link
Contributor

@snowdrop4 I think this is a fair point. Docs will be updated in the next release to word that section slightly differently. In-process subflow runs can be concurrent and take advantage of async execution, but are not parallel in the way separate, simultaneous deployment runs are.

@snowdrop4
Copy link

Thank you 🙏

@thomasfrederikhoeck
Copy link
Contributor

Hi @kevingrismore . Is this something which is actively worked on?

@daviddwlee84
Copy link

In my scenario, I have to process different date/item's data.
Each day/item is independent and can be run parallel.
And a single-day task is too complex to be a single task. (exists some sequential dependency, parallel calculation, cache for middle results, etc.)
In this scenario, submitting a subflow is required for running these subflows parallel.
#7322
#12563

@glesperance
Copy link
Contributor

This feature would go a long way in making prefect more intuitive to adopt. Moreover, since we can't run multiple flows at the same time one has to wrap them in dummy tasks to achieve paralellism.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement An improvement of an existing feature needs:design Blocked by a need for an implementation outline
Projects
None yet
Development

No branches or pull requests