Skip to content

Commit

Permalink
Fix DataFrame::cache errors with `Plan("Mismatch between schema and…
Browse files Browse the repository at this point in the history
… batches")` (#8510)

* type cast

* add test

* use physical plan

* logic optimization
  • Loading branch information
Asura7969 authored Dec 13, 2023
1 parent 4578f3d commit 2bc67ef
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1276,11 +1276,12 @@ impl DataFrame {
/// ```
pub async fn cache(self) -> Result<DataFrame> {
let context = SessionContext::new_with_state(self.session_state.clone());
let mem_table = MemTable::try_new(
SchemaRef::from(self.schema().clone()),
self.collect_partitioned().await?,
)?;

// The schema is consistent with the output
let plan = self.clone().create_physical_plan().await?;
let schema = plan.schema();
let task_ctx = Arc::new(self.task_ctx());
let partitions = collect_partitioned(plan, task_ctx).await?;
let mem_table = MemTable::try_new(schema, partitions)?;
context.read_table(Arc::new(mem_table))
}
}
Expand Down Expand Up @@ -2638,6 +2639,17 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_cache_mismatch() -> Result<()> {
let ctx = SessionContext::new();
let df = ctx
.sql("SELECT CASE WHEN true THEN NULL ELSE 1 END")
.await?;
let cache_df = df.cache().await;
assert!(cache_df.is_ok());
Ok(())
}

#[tokio::test]
async fn cache_test() -> Result<()> {
let df = test_table()
Expand Down

0 comments on commit 2bc67ef

Please sign in to comment.