Skip to content

Commit

Permalink
Introduce schema evolution into the IngestProcessor runlop
Browse files Browse the repository at this point in the history
This commit introduces some interplay between the IngestProcessor and
DataWriter, the latter of which needs to keep track of whether or not it
has a changed schema.

What should be done with that changed schema must necessarily live in
IngestProcessor since that will perform the Delta transaction commits at
the tail end of batch processing.

There is some potential mismatches between the schema in storage and
what the DataWriter has, so this change tries to run the runloop again
if the current schema and the evolved schema are incompatible

Closes #131

Sponsored-by: Raft LLC
  • Loading branch information
rtyler committed Jan 9, 2024
1 parent 72c2f0b commit e4c0f1e
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 73 deletions.
2 changes: 1 addition & 1 deletion src/dead_letters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl DeltaSinkDeadLetterQueue {
dynamo_lock_options::DYNAMO_LOCK_PARTITION_KEY_VALUE.to_string() => std::env::var(env_vars::DEAD_LETTER_DYNAMO_LOCK_PARTITION_KEY_VALUE)
.unwrap_or_else(|_| "kafka_delta_ingest-dead_letters".to_string()),
};
#[cfg(feature = "azure")]
#[cfg(all(feature = "azure", not(feature="s3")))]
let opts = HashMap::default();

let table = crate::delta_helpers::load_table(table_uri, opts.clone()).await?;
Expand Down
22 changes: 9 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ extern crate strum_macros;
extern crate serde_json;

use coercions::CoercionTree;
use deltalake_core::kernel::{Action, Metadata, Format, StructType};
use deltalake_core::kernel::{Action, Format, Metadata, StructType};
use deltalake_core::protocol::DeltaOperation;
use deltalake_core::protocol::OutputMode;
use deltalake_core::{DeltaTable, DeltaTableError};
Expand All @@ -38,8 +38,8 @@ use std::time::{Duration, Instant};
use std::{collections::HashMap, path::PathBuf};
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use url::Url;
use uuid::Uuid;

mod coercions;
/// Doc
Expand Down Expand Up @@ -980,21 +980,20 @@ impl IngestProcessor {
let mut actions = build_actions(&partition_offsets, self.opts.app_id.as_str(), add);
let delta_metadata = self.table.state.delta_metadata().unwrap();
// Determine whether an attempt to update the delta_writer's schema should be performed
//
//
// In most cases, this is desired behavior, except when the table is evolving
let mut update_schema = true;

// If schema evolution is enabled and then kafka-delta-ingest must ensure that the new
// `table_schema` is compatible with the evolved schema in the writer
if self.opts.schema_evolution && self.delta_writer.has_schema_changed() {
if let Ok(arrow_schema) = self.delta_writer.can_merge_with_delta_schema(&delta_metadata.schema) {
if let Ok(arrow_schema) = self
.delta_writer
.can_merge_with_delta_schema(&delta_metadata.schema)
{
debug!("The schema has changed *AND* the schema is evolving..this transaction will include a Metadata action");
update_schema = false;
let new_delta_schema: StructType = self
.delta_writer
.arrow_schema()
.clone()
.try_into()
let new_delta_schema: StructType = arrow_schema.try_into()
.expect("The delta_writer schema was unable to be coerced into a delta schema, this is fatal!");
let schema_string: String = serde_json::to_string(&new_delta_schema)?;
// TODO: Handle partition columns somehow? Can we even evolve partition columns? Maybe
Expand All @@ -1011,10 +1010,7 @@ impl IngestProcessor {
}
}

if update_schema && self
.delta_writer
.update_schema(delta_metadata)?
{
if update_schema && self.delta_writer.update_schema(delta_metadata)? {
info!("Table schema has been updated");
// Update the coercion tree to reflect the new schema
let coercion_tree = coercions::create_coercion_tree(self.table.schema().unwrap());
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ mod test {
));
}

#[cfg(feature="avro")]
#[cfg(feature = "avro")]
#[test]
fn get_avro_argument() {
let schema_registry_url: url::Url = url::Url::parse(SCHEMA_REGISTRY_ADDRESS).unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/serialization/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;
use serde_json::Value;
use log::*;

use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat};

Expand Down
127 changes: 69 additions & 58 deletions src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,18 +281,26 @@ impl DataWriter {
}

/// Determine whether the writer's current schema can be merged with the suggested DeltaSchema
pub fn can_merge_with_delta_schema(&self, suggested_schema: &Schema) -> Result<ArrowSchema, Box<DataWriterError>> {
pub fn can_merge_with_delta_schema(
&self,
suggested_schema: &Schema,
) -> Result<Arc<ArrowSchema>, Box<DataWriterError>> {
let arrow_schema: ArrowSchema =
<ArrowSchema as TryFrom<&Schema>>::try_from(&suggested_schema)?;
<ArrowSchema as TryFrom<&Schema>>::try_from(suggested_schema)?;
self.can_merge_with(&arrow_schema)
}

/// Determine whether the writer's current schema can be merged with `suggested_schema`
pub fn can_merge_with(&self, suggested_schema: &ArrowSchema) -> Result<ArrowSchema, Box<DataWriterError>> {
pub fn can_merge_with(
&self,
suggested_schema: &ArrowSchema,
) -> Result<Arc<ArrowSchema>, Box<DataWriterError>> {
ArrowSchema::try_merge(vec![
suggested_schema.clone(),
self.arrow_schema_ref.as_ref().clone(),
]).map_err(|e| e.into())
suggested_schema.clone(),
self.arrow_schema_ref.as_ref().clone(),
])
.map(Arc::new)
.map_err(|e| e.into())
}

/// Writes the given values to internal parquet buffers for each represented partition.
Expand Down Expand Up @@ -591,74 +599,89 @@ mod datawriter_tests {

async fn get_default_writer() -> (DataWriter, DeltaTable) {
let table = inmemory_table().await;
(DataWriter::with_options(&table, DataWriterOptions::default()).expect("Failed to make writer"),
table)
(
DataWriter::with_options(&table, DataWriterOptions::default())
.expect("Failed to make writer"),
table,
)
}

#[tokio::test]
async fn test_can_merge_with_simple() {
let (writer, _) = get_default_writer().await;
let delta_schema = StructType::new(vec![
StructField::new(
"vid".to_string(),
DeltaDataType::Primitive(PrimitiveType::Integer),
true,
),
]);
let arrow_schema: ArrowSchema =
<ArrowSchema as TryFrom<&Schema>>::try_from(&delta_schema)
.expect("Failed to convert arrow schema somehow");
let delta_schema = StructType::new(vec![StructField::new(
"vid".to_string(),
DeltaDataType::Primitive(PrimitiveType::Integer),
true,
)]);
let arrow_schema: ArrowSchema = <ArrowSchema as TryFrom<&Schema>>::try_from(&delta_schema)
.expect("Failed to convert arrow schema somehow");
let result = writer.can_merge_with(&arrow_schema);
assert_eq!(true, result.is_ok(), "This should be able to merge");
}

#[tokio::test]
async fn test_can_merge_with_diff_column() {
let (writer, _) = get_default_writer().await;
let delta_schema = StructType::new(vec![
StructField::new(
"id".to_string(),
DeltaDataType::Primitive(PrimitiveType::Integer),
true,
),
]);
let arrow_schema: ArrowSchema =
<ArrowSchema as TryFrom<&Schema>>::try_from(&delta_schema)
.expect("Failed to convert arrow schema somehow");
let delta_schema = StructType::new(vec![StructField::new(
"id".to_string(),
DeltaDataType::Primitive(PrimitiveType::Integer),
true,
)]);
let arrow_schema: ArrowSchema = <ArrowSchema as TryFrom<&Schema>>::try_from(&delta_schema)
.expect("Failed to convert arrow schema somehow");
let result = writer.can_merge_with(&arrow_schema);
assert_eq!(true, result.is_err(), "Cannot merge this schema, but DataWriter thinks I can?");
assert_eq!(
true,
result.is_err(),
"Cannot merge this schema, but DataWriter thinks I can?"
);
}

#[tokio::test]
async fn test_update_schema() {
let (mut writer, _) = get_default_writer().await;
let new_schema = StructType::new(vec![
StructField::new(
"vid".to_string(),
DeltaDataType::Primitive(PrimitiveType::Integer),
true,
),
]);
let metadata = DeltaTableMetaData::new(None, None, None, new_schema, vec![], HashMap::new());

let result = writer.update_schema(&metadata).expect("Failed to execute update_schema");
assert_eq!(true, result, "Expected that the new schema would have caused an update");
let new_schema = StructType::new(vec![StructField::new(
"vid".to_string(),
DeltaDataType::Primitive(PrimitiveType::Integer),
true,
)]);
let metadata =
DeltaTableMetaData::new(None, None, None, new_schema, vec![], HashMap::new());

let result = writer
.update_schema(&metadata)
.expect("Failed to execute update_schema");
assert_eq!(
true, result,
"Expected that the new schema would have caused an update"
);
}

#[tokio::test]
async fn test_update_schema_with_new_partition_cols() {
let (mut writer, table) = get_default_writer().await;
let mut metadata = table.state.delta_metadata().unwrap().clone();
metadata.partition_columns = vec!["test".into()];
let result = writer.update_schema(&metadata).expect("Failed to execute update_schema");
assert_eq!(true, result, "Expected that the new schema would have caused an update");
let result = writer
.update_schema(&metadata)
.expect("Failed to execute update_schema");
assert_eq!(
true, result,
"Expected that the new schema would have caused an update"
);
}

#[tokio::test]
async fn test_update_schema_no_changes() {
let (mut writer, table) = get_default_writer().await;
let result = writer.update_schema(table.state.delta_metadata().unwrap()).expect("Failed to execute update_schema");
assert_eq!(false, result, "Expected that there would be no schema changes");
let result = writer
.update_schema(table.state.delta_metadata().unwrap())
.expect("Failed to execute update_schema");
assert_eq!(
false, result,
"Expected that there would be no schema changes"
);
}
}

Expand Down Expand Up @@ -1475,7 +1498,6 @@ mod tests {
}

#[tokio::test]
#[ignore]
async fn test_schema_matching() {
let temp_dir = tempfile::tempdir().unwrap();
let table_path = temp_dir.path();
Expand Down Expand Up @@ -1506,21 +1528,10 @@ mod tests {
.into()];
let result = writer.write(rows).await;
assert!(
result.is_err(),
"Expected the write of our invalid schema rows to fail!\n{:?}",
result.is_ok(),
"Expecting the write of the valid schema to succeed!\n{:?}",
result
);
match result {
Ok(_) => unreachable!(),
//Err(Box<DataWriterError::SchemaMismatch>) => {},
Err(e) => {
assert!(
false,
"I was expecting a schema mismatch, got this instead: {:?}",
e
);
}
}
}

#[tokio::test]
Expand Down

0 comments on commit e4c0f1e

Please sign in to comment.