Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternate example of using two thread pools to run DataFusion IO and CPU operations #13690

Closed
wants to merge 12 commits into from

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Dec 8, 2024

Which issue does this PR close?

Rationale for this change

See Rationale on

@tustvold has (legitimate) concerns with the approach take in #13424: #13424 (comment)

At the risk of repeating myself from datafusion-contrib/datafusion-dft#248 (comment) I would strongly discourage overloading the ObjectStore trait as some sort of IO/CPU boundary.

Not only is this not what the trait is designed for, but it is overly pessimistic. Tokio is designed handle some CPU bound work, e.g. interleaved CSV processing or similar, it just can't handle tasks stalling for seconds at a time.

This PR explores what his suggestion #13424 (comment) would look like

What changes are included in this PR?

I tried to create a DedicatedExecutor::spawn_io / DedicatedExecutor::spawn_cpu and call them internally to Datafusion.

From a user perspective I think this is much nicer as all you would have to do is

// register DedicatedExecutor with runtime
let runtime_env = builder.with_dedicated_executor(dedicated_executor).build()?;
// build SessionContext from runtime_env
let ctx = ...;

// run APIs without worrying about what pool:
let results = ctx.sql("select * from foo").await?.collect().await?;

However I am not sure this is feasible to actually implement internally to DataFusion, for reasons described in #13690 (comment). TDLR is that many existing async function calls capture references (e.g. to &self), and thus can not be sent easily to other threads

Are these changes tested?

N/A

Are there any user-facing changes?

@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate execution Related to the execution crate labels Dec 8, 2024
@alamb alamb changed the title Example triyng to move DataFusion IO operations to another runtime. Alternate example of using two thread pools to run DataFusion IO and CPU operations Dec 8, 2024
//let http_store = dedicated_executor.wrap_object_store(http_store);

// we must also register the dedicated executor with the runtime
let runtime_env = RuntimeEnvBuilder::new()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this example the DedicatedExecutor is registered directly with the RuntimeEnv

I actually think this is a much nicer API than having to wrap various calls with dedicated executors

};

// Do remove catalog operations on a different runtime
runtime_env
Copy link
Contributor Author

@alamb alamb Dec 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shows how annotating the I/O call sites in DataFusion would look like (calling spawn_io internally)

This call could be converted easily because it doesn't pass in &self or capture any fields

@@ -608,10 +608,14 @@ impl SessionContext {
sql: &str,
options: SQLOptions,
) -> Result<DataFrame> {
let plan = self.state().create_logical_plan(sql).await?;
options.verify_plan(&plan)?;
self.runtime_env()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here is an example of how I would like to be able to call high level DataFusion APIs to to run cpu bound work, but I can't get the lifetimes to work out

Comment on lines +202 to +213
/// Runs the specified work on the dedicated executor and returns the result
///
/// Note the future is not 'static (aka it can have internal references)
pub fn spawn_cpu2<'a, T>(&self, task: T) -> impl Future<Output = T::Output>
where
T: Future + Send + 'a,
T::Output: Send,
{
// If we can figure out how to make this work, then
// we could integrate it nicely into DataFusion
async { todo!() }
}
Copy link
Contributor Author

@alamb alamb Dec 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can figure out way to implement this function, the "annotate all callsites" is a plausible strategy. However, I could not make it work.

The key difference between spawn_cpu and spawn_cpu2 is:

-        T: Future + Send + 'static,
+        T: Future + Send + 'a,

The idea is that Datafusion contains a bunch of async function calls like this:

                self
                    .list_files_for_scan(
                        &session_state,
                        &partition_filters,
                        limit,
                    )
                    .await

Rust can figure out somehow that if this future runs on the same runtime, the references are valid for long enough and will compile it.

However when I tried to run this kind of function on a different runtime (the whole point of this PR), Rust can't figure out that the future body doesn't escape the current function and thus I can't get it to compile. The only way to pass the future to a different runtime is if the lifetime is 'static.

I believe the root cause is that all the functions for spawning / running tasks on a tokio runtime require the Futures are 'static -- that is that they have no references internally. For example:

The only way I could make it work was to clone all the arguments (we could likely avoid this by restructing the code to be much more careful about what arguments are passed, need to be cloned, etc) but doing so would be a pretty large undertaking and likely require significantly more copying, like:

        let self_captures = self.clone(); // clone so we can pass future to the other pool
        let (mut partitioned_file_lists, statistics) = state
            .runtime_env()
            .spawn_io(async move {
                self_captured
                    .list_files_for_scan(
                        &session_state_captured,
                        &partition_filters_captured,
                        limit,
                    )
                    .await
            })
            .await?;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the lifetimes you propose are possible for the reasons you've articulated, tokio doesn't provide structured concurrency.

but doing so would be a pretty large undertaking and likely require significantly more copying, like:

In most cases this should be a case of extracting the shared IO state onto a separate struct that can be Arc-wrapped. Whilst this is work, separating the IO component explicitly in this way is IMO actually a good idea anyway

Copy link
Contributor Author

@alamb alamb Dec 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In most cases this should be a case of extracting the shared IO state onto a separate struct that can be Arc-wrapped. Whilst this is work, separating the IO component explicitly in this way is IMO actually a good idea anyway

That seems reasonable. I think the challenge is going to be restructuring the various ListingTable APIs in this way. Maybe once we have a good pattern we can then apply it more broadly

i think if we can show how to make it work with ListingTable we can make the approach work for anything else in DataFusion's codebase

let (mut partitioned_file_lists, statistics) = self
.list_files_for_scan(session_state, &partition_filters, limit)
// TODO avoid these clones when possible.
let session_state_captured = session_state.clone();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are quite a few more call sites like this that would need to be annotated for the correct runtime. However I didn't bother to do it unless we could figure out how to make the rest of the API reasonable (aka run_cpu ,etc)

/// thread pool
///
/// See [`DedicatedExecutor`] for more details
pub async fn spawn_cpu<Fut>(&self, fut: Fut) -> Result<Fut::Output>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the mistake in this PR was to also try and handle spawn CPU explicitly 🤔

Maybe a better paradigm would be to only annotate IO (rather than also CPU)

This would be slightly more awkward to use given my assumption people would want to use the runtime created by tokio for IO tasks (and a dedicated one for CPU tasks) 🤔

But maybe it is ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it becomes a question of who is going to do the work, I agree a solution that makes DF spawn CPU bound work to a dedicated pool is the superior solution, but would require a much bigger community wide investment.

Spawning IO is likely the only practical option

Copy link
Contributor

@tustvold tustvold Dec 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think this might actually be a better way to approach this problem, whilst CPU-bound tasks are a lot more common in DF, they're also a lot easier to work with... I wrote the idea up here 🤔 - #13692

@alamb
Copy link
Contributor Author

alamb commented Dec 13, 2024

Let's keep working on the other approach for now

@alamb alamb closed this Dec 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate execution Related to the execution crate physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Document DataFusion Threading / tokio runtimes (how to separate IO and CPU bound work)
2 participants