Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce schema evolution on RecordBatchWriter #2024

Merged
merged 1 commit into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//! Used to write [RecordBatch]es into a delta table.
//!
//! New Table Semantics
//! - The schema of the [RecordBatch] is used to initialize the table.
Expand Down
111 changes: 91 additions & 20 deletions crates/core/src/operations/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl From<WriteError> for DeltaTableError {
}

/// Configuration to write data into Delta tables
#[derive(Debug)]
pub struct WriterConfig {
/// Schema of the delta table
table_schema: ArrowSchemaRef,
Expand Down Expand Up @@ -112,6 +113,7 @@ impl WriterConfig {
}
}

#[derive(Debug)]
/// A parquet writer implementation tailored to the needs of writing data to a delta table.
pub struct DeltaWriter {
/// An object store pointing at Delta table root
Expand Down Expand Up @@ -212,6 +214,7 @@ impl DeltaWriter {
}
}

#[derive(Debug)]
pub(crate) struct PartitionWriterConfig {
/// Schema of the data written to disk
file_schema: ArrowSchemaRef,
Expand Down Expand Up @@ -257,6 +260,7 @@ impl PartitionWriterConfig {
}
}

#[derive(Debug)]
pub(crate) struct PartitionWriter {
object_store: ObjectStoreRef,
writer_id: uuid::Uuid,
Expand Down Expand Up @@ -396,12 +400,47 @@ impl PartitionWriter {
mod tests {
use super::*;
use crate::storage::utils::flatten_list_stream as list;
use crate::writer::test_utils::get_record_batch;
use crate::writer::test_utils::*;
use crate::DeltaTableBuilder;
use arrow::array::{Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use std::sync::Arc;

fn get_delta_writer(
object_store: ObjectStoreRef,
batch: &RecordBatch,
writer_properties: Option<WriterProperties>,
target_file_size: Option<usize>,
write_batch_size: Option<usize>,
) -> DeltaWriter {
let config = WriterConfig::new(
batch.schema(),
vec![],
writer_properties,
target_file_size,
write_batch_size,
);
DeltaWriter::new(object_store, config)
}

fn get_partition_writer(
object_store: ObjectStoreRef,
batch: &RecordBatch,
writer_properties: Option<WriterProperties>,
target_file_size: Option<usize>,
write_batch_size: Option<usize>,
) -> PartitionWriter {
let config = PartitionWriterConfig::try_new(
batch.schema(),
IndexMap::new(),
writer_properties,
target_file_size,
write_batch_size,
)
.unwrap();
PartitionWriter::try_with_config(object_store, config).unwrap()
}

#[tokio::test]
async fn test_write_partition() {
let log_store = DeltaTableBuilder::from_uri("memory://")
Expand All @@ -411,7 +450,7 @@ mod tests {
let batch = get_record_batch(None, false);

// write single un-partitioned batch
let mut writer = get_writer(object_store.clone(), &batch, None, None, None);
let mut writer = get_partition_writer(object_store.clone(), &batch, None, None, None);
writer.write(&batch).await.unwrap();
let files = list(object_store.as_ref(), None).await.unwrap();
assert_eq!(files.len(), 0);
Expand Down Expand Up @@ -443,8 +482,9 @@ mod tests {
let properties = WriterProperties::builder()
.set_max_row_group_size(1024)
.build();
// configure small target file size and row group size so we can observe multiple files written
let mut writer = get_writer(object_store, &batch, Some(properties), Some(10_000), None);
// configure small target file size and and row group size so we can observe multiple files written
let mut writer =
get_partition_writer(object_store, &batch, Some(properties), Some(10_000), None);
writer.write(&batch).await.unwrap();

// check that we have written more then once file, and no more then 1 is below target size
Expand All @@ -471,7 +511,7 @@ mod tests {
.unwrap()
.object_store();
// configure small target file size so we can observe multiple files written
let mut writer = get_writer(object_store, &batch, None, Some(10_000), None);
let mut writer = get_partition_writer(object_store, &batch, None, Some(10_000), None);
writer.write(&batch).await.unwrap();

// check that we have written more then once file, and no more then 1 is below target size
Expand Down Expand Up @@ -499,28 +539,59 @@ mod tests {
.object_store();
// configure high batch size and low file size to observe one file written and flushed immediately
// upon writing batch, then ensures the buffer is empty upon closing writer
let mut writer = get_writer(object_store, &batch, None, Some(9000), Some(10000));
let mut writer = get_partition_writer(object_store, &batch, None, Some(9000), Some(10000));
writer.write(&batch).await.unwrap();

let adds = writer.close().await.unwrap();
assert!(adds.len() == 1);
}

fn get_writer(
object_store: ObjectStoreRef,
batch: &RecordBatch,
writer_properties: Option<WriterProperties>,
target_file_size: Option<usize>,
write_batch_size: Option<usize>,
) -> PartitionWriter {
let config = PartitionWriterConfig::try_new(
batch.schema(),
IndexMap::new(),
writer_properties,
target_file_size,
write_batch_size,
#[tokio::test]
async fn test_write_mismatched_schema() {
let log_store = DeltaTableBuilder::from_uri("memory://")
.build_storage()
.unwrap();
let object_store = log_store.object_store();
let batch = get_record_batch(None, false);

// write single un-partitioned batch
let mut writer = get_delta_writer(object_store.clone(), &batch, None, None, None);
writer.write(&batch).await.unwrap();
// Ensure the write hasn't been flushed
let files = list(object_store.as_ref(), None).await.unwrap();
assert_eq!(files.len(), 0);

// Create a second batch with a different schema
let second_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, true),
Field::new("name", DataType::Utf8, true),
]));
let second_batch = RecordBatch::try_new(
second_schema,
vec![
Arc::new(Int32Array::from(vec![Some(1), Some(2)])),
Arc::new(StringArray::from(vec![Some("will"), Some("robert")])),
],
)
.unwrap();
PartitionWriter::try_with_config(object_store, config).unwrap()

let result = writer.write(&second_batch).await;
assert!(result.is_err());

match result {
Ok(_) => {
assert!(false, "Should not have successfully written");
}
Err(e) => {
match e {
DeltaTableError::SchemaMismatch { .. } => {
// this is expected
}
others => {
assert!(false, "Got the wrong error: {others:?}");
}
}
}
};
}
}
1 change: 0 additions & 1 deletion crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,6 @@ mod tests {
use serde_json::json;

use super::*;
use crate::kernel::Format;
use crate::kernel::StructType;
use crate::operations::DeltaOps;
use crate::protocol::Metadata;
Expand Down
115 changes: 113 additions & 2 deletions crates/core/src/writer/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use super::utils::{
arrow_schema_without_partitions, next_data_path, record_batch_from_message,
record_batch_without_partitions,
};
use super::{DeltaWriter, DeltaWriterError};
use super::{DeltaWriter, DeltaWriterError, WriteMode};
use crate::errors::DeltaTableError;
use crate::kernel::{Add, PartitionsExt, Scalar, StructType};
use crate::table::builder::DeltaTableBuilder;
Expand Down Expand Up @@ -286,8 +286,20 @@ impl JsonWriter {

#[async_trait::async_trait]
impl DeltaWriter<Vec<Value>> for JsonWriter {
/// Writes the given values to internal parquet buffers for each represented partition.
/// Write a chunk of values into the internal write buffers with the default write mode
async fn write(&mut self, values: Vec<Value>) -> Result<(), DeltaTableError> {
self.write_with_mode(values, WriteMode::Default).await
}

/// Writes the given values to internal parquet buffers for each represented partition.
async fn write_with_mode(
&mut self,
values: Vec<Value>,
mode: WriteMode,
) -> Result<(), DeltaTableError> {
if mode != WriteMode::Default {
warn!("The JsonWriter does not currently support non-default write modes, falling back to default mode");
}
let mut partial_writes: Vec<(Value, ParquetError)> = Vec::new();
let arrow_schema = self.arrow_schema();
let divided = self.divide_by_partition_values(values)?;
Expand Down Expand Up @@ -544,4 +556,103 @@ mod tests {
})
));
}

// The following sets of tests are related to #1386 and mergeSchema support
// <https://github.com/delta-io/delta-rs/issues/1386>
mod schema_evolution {
use super::*;

#[tokio::test]
async fn test_json_write_mismatched_values() {
let table_dir = tempfile::tempdir().unwrap();
let schema = get_delta_schema();
let path = table_dir.path().to_str().unwrap().to_string();

let arrow_schema = <ArrowSchema as TryFrom<&StructType>>::try_from(&schema).unwrap();
let mut writer = JsonWriter::try_new(
path.clone(),
Arc::new(arrow_schema),
Some(vec!["modified".to_string()]),
None,
)
.unwrap();

let data = serde_json::json!(
{
"id" : "A",
"value": 42,
"modified": "2021-02-01"
}
);

writer.write(vec![data]).await.unwrap();
let add_actions = writer.flush().await.unwrap();
assert_eq!(add_actions.len(), 1);

let second_data = serde_json::json!(
{
"id" : 1,
"name" : "Ion"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hehe : )

}
);

match writer.write(vec![second_data]).await {
Ok(_) => {
assert!(false, "Should not have successfully written");
}
_ => {}
}
}

#[tokio::test]
async fn test_json_write_mismatched_schema() {
use crate::operations::create::CreateBuilder;
let table_dir = tempfile::tempdir().unwrap();
let schema = get_delta_schema();
let path = table_dir.path().to_str().unwrap().to_string();

let mut table = CreateBuilder::new()
.with_location(&path)
.with_table_name("test-table")
.with_comment("A table for running tests")
.with_columns(schema.fields().clone())
.await
.unwrap();
table.load().await.expect("Failed to load table");
assert_eq!(table.version(), 0);

let arrow_schema = <ArrowSchema as TryFrom<&StructType>>::try_from(&schema).unwrap();
let mut writer = JsonWriter::try_new(
path.clone(),
Arc::new(arrow_schema),
Some(vec!["modified".to_string()]),
None,
)
.unwrap();

let data = serde_json::json!(
{
"id" : "A",
"value": 42,
"modified": "2021-02-01"
}
);

writer.write(vec![data]).await.unwrap();
let add_actions = writer.flush().await.unwrap();
assert_eq!(add_actions.len(), 1);

let second_data = serde_json::json!(
{
"postcode" : 1,
"name" : "Ion"
}
);

// TODO This should fail because we haven't asked to evolve the schema
writer.write(vec![second_data]).await.unwrap();
writer.flush_and_commit(&mut table).await.unwrap();
assert_eq!(table.version(), 1);
}
}
}
Loading
Loading