-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix DataFrame::cache
errors with Plan("Mismatch between schema and batches")
#8510
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this contribution @Asura7969 -- I think we can do this slightly more efficiently / consistenly, but the overall idea is 👍
datafusion/core/src/dataframe/mod.rs
Outdated
@@ -1271,8 +1273,13 @@ impl DataFrame { | |||
/// ``` | |||
pub async fn cache(self) -> Result<DataFrame> { | |||
let context = SessionContext::new_with_state(self.session_state.clone()); | |||
// type cast |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than explcilty running (only) coercion (which is also run as part of execution) I suggest you use the schema that comes directly from the output stream
Something like (copied from Self::collect
): https://docs.rs/datafusion/latest/src/datafusion/dataframe/mod.rs.html#756-760
let task_ctx = Arc::new(self.task_ctx());
let plan = self.create_physical_plan().await?;
let schema = plan.schema();
collect(plan, task_ctx).await
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your advice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot @Asura7969 !
datafusion/core/src/dataframe/mod.rs
Outdated
// The schema is consistent with the output | ||
let physical_plan = self.clone().create_physical_plan().await?; | ||
let mem_table = | ||
MemTable::try_new(physical_plan.schema(), self.collect_partitioned().await?)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW this now runs the planner twice -- we could make it more efficient by calling collect_partitioned()
directly on the physical plan rather than Self::collect_partitioned
that will replan (and recollect)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @Asura7969
Which issue does this PR close?
Closes #8476.
Rationale for this change
#8476 (comment)
What changes are included in this PR?
Are these changes tested?
test_cache_mismatch
Are there any user-facing changes?