Skip to content

Commit

Permalink
fix: schema issue within writebuilder (#2106)
Browse files Browse the repository at this point in the history
# Description
The schema when using `with_input_execution_plan` wasn't being applied.

# Related Issue(s)
closes #2105

# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
universalmind303 authored Jan 25, 2024
1 parent 1ec4612 commit 354fda3
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 3 deletions.
4 changes: 2 additions & 2 deletions crates/deltalake-core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,14 +403,13 @@ impl std::future::IntoFuture for WriteBuilder {
Ok(this.partition_columns.unwrap_or_default())
}?;

let mut schema: ArrowSchemaRef = arrow_schema::Schema::empty().into();
let plan = if let Some(plan) = this.input {
Ok(plan)
} else if let Some(batches) = this.batches {
if batches.is_empty() {
Err(WriteError::MissingData)
} else {
schema = batches[0].schema();
let schema = batches[0].schema();

if let Some(snapshot) = &this.snapshot {
let table_schema = snapshot
Expand Down Expand Up @@ -460,6 +459,7 @@ impl std::future::IntoFuture for WriteBuilder {
} else {
Err(WriteError::MissingData)
}?;
let schema = plan.schema();

let state = match this.state {
Some(state) => state,
Expand Down
59 changes: 58 additions & 1 deletion crates/deltalake-core/tests/integration_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ use std::error::Error;

mod local {
use datafusion::common::stats::Precision;
use deltalake_core::writer::JsonWriter;
use deltalake_core::{logstore::default_logstore, writer::JsonWriter};
use object_store::local::LocalFileSystem;

use super::*;
#[tokio::test]
Expand Down Expand Up @@ -1071,6 +1072,62 @@ mod local {

Ok(())
}

#[tokio::test]
async fn test_issue_2105() -> Result<()> {
use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
let path = tempfile::tempdir().unwrap();
let path = path.into_path();

let file_store = LocalFileSystem::new_with_prefix(path.clone()).unwrap();
let log_store = default_logstore(
Arc::new(file_store),
&Url::from_file_path(path.clone()).unwrap(),
&Default::default(),
);

let tbl = CreateBuilder::new()
.with_log_store(log_store.clone())
.with_save_mode(SaveMode::Overwrite)
.with_table_name("test")
.with_column(
"id",
DataType::Primitive(PrimitiveType::Integer),
true,
None,
);
let tbl = tbl.await.unwrap();
let ctx = SessionContext::new();
let plan = ctx
.sql("SELECT 1 as id")
.await
.unwrap()
.create_physical_plan()
.await
.unwrap();
let write_builder = WriteBuilder::new(log_store, tbl.state);
let _ = write_builder
.with_input_execution_plan(plan)
.with_save_mode(SaveMode::Overwrite)
.await
.unwrap();

let table = open_table(path.to_str().unwrap()).await.unwrap();
let prov: Arc<dyn TableProvider> = Arc::new(table);
ctx.register_table("test", prov).unwrap();
let mut batches = ctx
.sql("SELECT * FROM test")
.await
.unwrap()
.collect()
.await
.unwrap();
let batch = batches.pop().unwrap();

let expected_schema = Schema::new(vec![Field::new("id", ArrowDataType::Int32, true)]);
assert_eq!(batch.schema().as_ref(), &expected_schema);
Ok(())
}
}

async fn test_datafusion(context: &IntegrationContext) -> TestResult {
Expand Down

0 comments on commit 354fda3

Please sign in to comment.