Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A global, shared ExecutionContext #824

Closed
yjshen opened this issue Aug 5, 2021 · 11 comments
Closed

A global, shared ExecutionContext #824

yjshen opened this issue Aug 5, 2021 · 11 comments
Labels
enhancement New feature or request

Comments

@yjshen
Copy link
Member

yjshen commented Aug 5, 2021

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

I'd like to propose making an ExecutionContext static and publicly accessible. Several cases would benefit from a global context:

  1. We could have a protocol handler registry that users could plug in third-party protocol handlers at runtime, and get the physical operator execution logic to use the newly registered handler reading files in different remote stores, S3, HDFS for example.
  2. When we consider controlling the total amount of memory Datafusion could use. All physical operators should register their memory usage with a global context, and try to allocate more memory or spill itself if used up all allowed memory.

Describe the solution you'd like
A single, global ExecutionContext that is inited on its first use and a getter to always returning the same instance.

Describe alternatives you've considered
Passing around a shared state/context to each operator needs it. For example, table scan, join, hash agg, and sort.

Additional context
Similar to the shared SparkEnv in each process (driver/executor) of Spark.

@yjshen yjshen added the enhancement New feature or request label Aug 5, 2021
@jorgecarleitao
Copy link
Member

Thanks for the suggestion.

I do not think we should do this in DataFusion. These use-cases imo should addressed by consumers of DataFusion that decide how they want to manage the Context within their application.

Also, imo having a singleton like this in Rust library is an anti-pattern.

@houqp
Copy link
Member

houqp commented Aug 6, 2021

@yjshen based on the sample code in your io source PR, it looks like you want the following API as a consumer of datafusion:

let ctx = ExecutionContext::get();  // returns global context singleton
ExecutionContext::set(ctx);  // updates global context singleton

If so, I think this pattern can be implemented entirely within a self-contained crate that can be published in crates.io. The global singleton context crate could provide a get/set API like so:

let ctx = global_df_ctx::get();
global_df_ctx::set(ctx);

The end user experience would stay the same, but the code would be better decoupled and users can choose to opt-in to the feature by simply adding the extension crate as a dependency.

@yjshen
Copy link
Member Author

yjshen commented Aug 6, 2021

If a singleton context is not the preferred way, is it possible to control total memory usage for DataFusion across all physical operators?

  • Should we extend ExecutionPlan to allow setting/getting context for each operator?
  • Or I should just give up the idea to have memory control in DataFusion and do it in my own lib by extending DataFusion?

@houqp
Copy link
Member

houqp commented Aug 8, 2021

If it's for memory control, perhaps we could extend ExecutionPlan's execute method to take an extra ExecutionContext as a second argument? Then within each physical operator's implementation, we can have the operator access the passed in context for memory control.

@jorgecarleitao @andygrove @alamb @Dandandan for the context, @yjshen is working on a native spark executor using Datafusion: https://the-asf.slack.com/archives/C01QUFS30TD/p1621582734043500.

Executor memory control would be very relevant to ballista as well, so I am curious if @andygrove has any opinion on what's the right abstraction here.

@jorgecarleitao
Copy link
Member

Thanks a lot for the context.

I think this takes 2 components:

  • track memory
  • limit execution for when memory crosses threshold

With respect to the first item, what about adding a new argument to PhysicalPlan::execute containing an execution state (which contains an AtomicUsize with current memory), and modify every implementation to update which memory was added and removed? This is passed by the execution context when the execution starts, and each node is responsible for passing to nodes that they themselves call. We can start by tracking memory of RecordBatches, since they are the bulk of the memory.

wrt to the second, since the execute is already async, we can block its execution at any point, e.g. if memory > limit return Pending.

@houqp
Copy link
Member

houqp commented Aug 8, 2021

With respect to the first item, what about adding a new argument to PhysicalPlan::execute containing an execution state

By execution state, you are referring to ExecutionContextState specifically right? If so, I think that would be better than passing in a ExecutionContext directly 👍 .

@alamb
Copy link
Contributor

alamb commented Aug 8, 2021

I am also very interested in the ability to both track and limit memory usage for a datafusion plan -- there is some additional discussion / context related to this on #587.

@jorgecarleitao 's implementation idea in #824 (comment) sounds like a good start. I don't think we can realistically try and limit memory usage before we can measure it.

As for the single global context, it is also my preference that the code that is using DataFusion to manage the contexts. So specifically it would make sense to me if datafusion-cli had a single global context that it then passed into the datafusion crate but not to add a global context into the datafusion crate itself

@andygrove
Copy link
Member

andygrove commented Aug 8, 2021

If it's for memory control, perhaps we could extend ExecutionPlan's execute method to take an extra ExecutionContext as a second argument?

I would not want the execution plan traits to have a dependency on DataFusion's context. Ballista has its own context that also creates execution plans.

@andygrove
Copy link
Member

I am very interested in this work. I have been talking about deterministic memory use as being one of the advantages of Rust over JVM for some time and it would be great to see this implemented.

I like the idea of passing in some form of context state with a memory tracker. It would be good if this is not tied specifically to a DataFusion context, so that physical operators can be used in other contexts.

I also think this gets us back into discussing scheduling and I have just added the following note to #587:

We should also discuss creating a scheduler in DataFusion (see #64) since it is related to this work. Rather than try and run all the things at once, it would be better to schedule work based on the available resources (cores / memory). We would still need the ability to track/limit memory use within operators but the scheduler could be aware of this and only allocate tasks if there is memory budget available.

@jorgecarleitao
Copy link
Member

The tying to datafusion is easily achived via a trait and Box<dyn Trait> or some other variation, but I definitely agree with you, @andygrove , that the broader problem is addressed via a scheduler.

E.g. in ballista memory is not even from a single node, so which node is being run changes what resources are available.

@houqp
Copy link
Member

houqp commented Aug 10, 2021

Based on the discussion so far, I recommend closing this issue since it was originally created for global shared ExecutionContext and I think we have already reached a consensus on this particular topic.

We can move all memory ExecutionPlan measurement and enforcement related discussions to #587 instead.

@alamb alamb closed this as completed Aug 10, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants