Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: schema issue within writebuilder #2106

Merged
merged 4 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/deltalake-core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,14 +403,13 @@ impl std::future::IntoFuture for WriteBuilder {
Ok(this.partition_columns.unwrap_or_default())
}?;

let mut schema: ArrowSchemaRef = arrow_schema::Schema::empty().into();
let plan = if let Some(plan) = this.input {
Ok(plan)
} else if let Some(batches) = this.batches {
if batches.is_empty() {
Err(WriteError::MissingData)
} else {
schema = batches[0].schema();
let schema = batches[0].schema();

if let Some(snapshot) = &this.snapshot {
let table_schema = snapshot
Expand Down Expand Up @@ -460,6 +459,7 @@ impl std::future::IntoFuture for WriteBuilder {
} else {
Err(WriteError::MissingData)
}?;
let schema = plan.schema();

let state = match this.state {
Some(state) => state,
Expand Down
59 changes: 58 additions & 1 deletion crates/deltalake-core/tests/integration_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ use std::error::Error;

mod local {
use datafusion::common::stats::Precision;
use deltalake_core::writer::JsonWriter;
use deltalake_core::{logstore::default_logstore, writer::JsonWriter};
use object_store::local::LocalFileSystem;

use super::*;
#[tokio::test]
Expand Down Expand Up @@ -1071,6 +1072,62 @@ mod local {

Ok(())
}

#[tokio::test]
async fn test_issue_2105() -> Result<()> {
use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
let path = tempfile::tempdir().unwrap();
let path = path.into_path();

let file_store = LocalFileSystem::new_with_prefix(path.clone()).unwrap();
let log_store = default_logstore(
Arc::new(file_store),
&Url::from_file_path(path.clone()).unwrap(),
&Default::default(),
);

let tbl = CreateBuilder::new()
.with_log_store(log_store.clone())
.with_save_mode(SaveMode::Overwrite)
.with_table_name("test")
.with_column(
"id",
DataType::Primitive(PrimitiveType::Integer),
true,
None,
);
let tbl = tbl.await.unwrap();
let ctx = SessionContext::new();
let plan = ctx
.sql("SELECT 1 as id")
.await
.unwrap()
.create_physical_plan()
.await
.unwrap();
let write_builder = WriteBuilder::new(log_store, tbl.state);
let _ = write_builder
.with_input_execution_plan(plan)
.with_save_mode(SaveMode::Overwrite)
.await
.unwrap();

let table = open_table(path.to_str().unwrap()).await.unwrap();
let prov: Arc<dyn TableProvider> = Arc::new(table);
ctx.register_table("test", prov).unwrap();
let mut batches = ctx
.sql("SELECT * FROM test")
.await
.unwrap()
.collect()
.await
.unwrap();
let batch = batches.pop().unwrap();

let expected_schema = Schema::new(vec![Field::new("id", ArrowDataType::Int32, true)]);
assert_eq!(batch.schema().as_ref(), &expected_schema);
Ok(())
}
}

async fn test_datafusion(context: &IntegrationContext) -> TestResult {
Expand Down
Loading