-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Set target_partitions on table scan in physical planner #972
Conversation
@houqp PTAL, thanks! |
cc @alamb since will impact iox. |
I am not sure if this is the way we should go. I think this should be a setting which could be set on the session to limit the number of partitions in each execution node. This property should be used throughout the code, instead of using the number of CPU cores. It's OK to use the number of CPU cores by default in DataFusion (as is the case currently) but it should be possible to override this, e.g. from Ballista. |
@Dandandan if I read the code correctly, it's only using cpu cores in tests. For actual execution, the partition count is read from the context config in physical planner when creating the table scan node:
|
datafusion/src/datasource/memory.rs
Outdated
@@ -115,7 +115,7 @@ impl MemTable { | |||
output_partitions: Option<usize>, | |||
) -> Result<Self> { | |||
let schema = t.schema(); | |||
let exec = t.scan(&None, batch_size, &[], None)?; | |||
let exec = t.scan(&None, batch_size, &[], None, num_cpus::get())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not use the number of cpus here, but the default nr of partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if I read the code correctly, MemTable::load
only is used by bench. What's more, target_partitions
in MemTable::scan
is unused. cc @houqp
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @Dandandan on this one. Even though MemTable::scan
is not using target_partitions
, the passed in table provider t
's scan
method might actually use it. Given load
is defined as a pub method, I am concerned that there are downstream code from other projects calling this method already. If it's a private method only used in test, then I think it would be fine to hard code the partition count to cpu cores here.
In short, I recommend adding the partition value as an argument to the load
method and let caller handle the value resolution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
got it!
Thanks, you're right. I skimmed the PR too quickly. Two suggestions I have for the PR:
|
Thanks @xudong963 for your work! @Dandandan for me
You can find an interesting reference on datasource partitioning in Spark according to configs here. Here is a summary of some interesting points:
My conclusion would be the following:
Whatever decision we make, I have a final remark, mostly about code hygiene: the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the PR @xudong963
I looked at the code and I see that adding target_partitions
to scan
is basically following the same pattern as options such asbatch_size
and thus I think it is a reasonable thing to do.
I think @rdettai also has some good points which I will address as a second comment
true, | ||
)?; | ||
LogicalPlanBuilder::scan( | ||
&scan.table_name, | ||
Arc::new(parquet_table), | ||
projection, | ||
)? //TODO remove hard-coded max_partitions | ||
)? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
I think of So a system that uses DataFusion would set
I think I would phrase it differently as "keep the choice of distribution of data across partitions at the |
100% agree -- can you file a ticket for this? It is likely a fairly straightforward mechanical change that might be a good project for a new contributor |
Thanks @rdettai for the detailed write up. Based on how max_partitions is used in the parquet scan, I think |
Yes, I will do it! |
Fixed, PTAL~ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Thanks @alamb and @houqp for your insights and @xudong963 for quickly reacting to this feedback! But I am still not 100% convinced 😃
|
Sorry for not participating in the discussion on the relationship among parallelism, number of partitions, target concurrencies, target partition size, etc., at an early stage. I was intended to keeping the original term Since we are getting more on a generalized design discussion here, I want to share my understandings: My core idea is to partition the dataset referred to in each query based on required size in bytes, i.e., our users should set the desired partition size in bytes (or tune the partition size parameter step by step). Or in other words, users choose execution granularity based on the amount of work performed by each task. This has several implications:
Therefore, I like to have Besides, I'm aware that the scope is too much for this PR. Stop where it's appropriate, and we could move sophisticated later. |
Learn much from this pr. |
@rdettai -- this is a good discussion: I think one core challenge in our understanding is that the term
I think the parameter in this PR is referring to the second, even though the first would likely be a better direction to take DataFusion in the longer term. Perhaps related to #64
I can see how specifying the size of each output stream may make sense
I think it also may be related to Spark's scheduler which I think can control the number of concurrently "active" partitions. DataFusion just starts them all at once.
I agree |
I think that is a reasonable idea too. The
Yes, I am very much on board with this idea (related to #64 I think) All in all I think improving the abstractions around "partitions" in datafusion and decoupling the execution concurrency from the number of distinct data streams, would be very helpful and a good direction to head. I think this is aligned too with what @rdettai is saying as well |
In my mind, DataFusion already supports any form of scheduling of the partitions. It is just the
I do think that we agree that whatever layout we have for the data in the storage, we can map it to any number of partition in the So what does
Proposed outcome: If there is too much ambiguity around the |
@rdettai how opposed are you to this PR's change? What do you think we should do with this PR? I agree there are several problems with DataFusion's use of |
Sorry for the delay, still trying to catch up with all the interesting design discussions across different PRs. It looks like we all agree that it's better to drive the partitioning logic in table scan using For this particular PR, I agree with @alamb that it is an improvement and doesn't make anything worse. I think the newly added ScanConfig also lays out a good foundation for us to add more scan config in the future including I also agree with @rdettai that we should avoid unnecessary API churn if possible. However, I do think the table |
conflict fixed |
Thanks for your feedback.
Having specific configurations for each kind of datasource would also have another great benefit: we could provide much better API documentation. Currently, if someone specifies |
Thanks @rdettai for the clarification. After thinking more on this, I agree with your conclusion that specifying these configs at the TableProvider constructor level would be a more flexible design. But for a different reason ;) With regards to table specific configs like But passing in scan configuration at the Just to make sure we are on the same page here, the proposed short term design looks like the following:
Then we can work on follow up PRs to migrate the code base over to center the scanning logic around partition size instead of partition count. NOTE: with regards to file format specific scan config, I think we might need to come up with a more dynamic design instead of using static fields in |
no worry, take your time @xudong963 , if you need any help, feel free to ping us here. |
feel free to close this one 😉 |
Which issue does this PR close?
Closes #708
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?