diff --git a/src/dead_letters.rs b/src/dead_letters.rs index e7f6e1f..a4a1c7c 100644 --- a/src/dead_letters.rs +++ b/src/dead_letters.rs @@ -255,7 +255,7 @@ impl DeltaSinkDeadLetterQueue { dynamo_lock_options::DYNAMO_LOCK_PARTITION_KEY_VALUE.to_string() => std::env::var(env_vars::DEAD_LETTER_DYNAMO_LOCK_PARTITION_KEY_VALUE) .unwrap_or_else(|_| "kafka_delta_ingest-dead_letters".to_string()), }; - #[cfg(feature = "azure")] + #[cfg(all(feature = "azure", not(feature="s3")))] let opts = HashMap::default(); let table = crate::delta_helpers::load_table(table_uri, opts.clone()).await?; diff --git a/src/lib.rs b/src/lib.rs index 6711fd9..2af55a6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,7 @@ extern crate strum_macros; extern crate serde_json; use coercions::CoercionTree; -use deltalake_core::kernel::{Action, Metadata, Format, StructType}; +use deltalake_core::kernel::{Action, Format, Metadata, StructType}; use deltalake_core::protocol::DeltaOperation; use deltalake_core::protocol::OutputMode; use deltalake_core::{DeltaTable, DeltaTableError}; @@ -38,8 +38,8 @@ use std::time::{Duration, Instant}; use std::{collections::HashMap, path::PathBuf}; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; -use uuid::Uuid; use url::Url; +use uuid::Uuid; mod coercions; /// Doc @@ -980,21 +980,20 @@ impl IngestProcessor { let mut actions = build_actions(&partition_offsets, self.opts.app_id.as_str(), add); let delta_metadata = self.table.state.delta_metadata().unwrap(); // Determine whether an attempt to update the delta_writer's schema should be performed - // + // // In most cases, this is desired behavior, except when the table is evolving let mut update_schema = true; // If schema evolution is enabled and then kafka-delta-ingest must ensure that the new // `table_schema` is compatible with the evolved schema in the writer if self.opts.schema_evolution && self.delta_writer.has_schema_changed() { - if let Ok(arrow_schema) = self.delta_writer.can_merge_with_delta_schema(&delta_metadata.schema) { + if let Ok(arrow_schema) = self + .delta_writer + .can_merge_with_delta_schema(&delta_metadata.schema) + { debug!("The schema has changed *AND* the schema is evolving..this transaction will include a Metadata action"); update_schema = false; - let new_delta_schema: StructType = self - .delta_writer - .arrow_schema() - .clone() - .try_into() + let new_delta_schema: StructType = arrow_schema.try_into() .expect("The delta_writer schema was unable to be coerced into a delta schema, this is fatal!"); let schema_string: String = serde_json::to_string(&new_delta_schema)?; // TODO: Handle partition columns somehow? Can we even evolve partition columns? Maybe @@ -1011,10 +1010,7 @@ impl IngestProcessor { } } - if update_schema && self - .delta_writer - .update_schema(delta_metadata)? - { + if update_schema && self.delta_writer.update_schema(delta_metadata)? { info!("Table schema has been updated"); // Update the coercion tree to reflect the new schema let coercion_tree = coercions::create_coercion_tree(self.table.schema().unwrap()); diff --git a/src/main.rs b/src/main.rs index bd265dc..c4ec806 100644 --- a/src/main.rs +++ b/src/main.rs @@ -503,7 +503,7 @@ mod test { )); } - #[cfg(feature="avro")] + #[cfg(feature = "avro")] #[test] fn get_avro_argument() { let schema_registry_url: url::Url = url::Url::parse(SCHEMA_REGISTRY_ADDRESS).unwrap(); diff --git a/src/serialization/mod.rs b/src/serialization/mod.rs index 9187eaf..136b80b 100644 --- a/src/serialization/mod.rs +++ b/src/serialization/mod.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use serde_json::Value; +use log::*; use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat}; diff --git a/src/writer.rs b/src/writer.rs index 90b455e..7b3339b 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -281,18 +281,26 @@ impl DataWriter { } /// Determine whether the writer's current schema can be merged with the suggested DeltaSchema - pub fn can_merge_with_delta_schema(&self, suggested_schema: &Schema) -> Result> { + pub fn can_merge_with_delta_schema( + &self, + suggested_schema: &Schema, + ) -> Result, Box> { let arrow_schema: ArrowSchema = - >::try_from(&suggested_schema)?; + >::try_from(suggested_schema)?; self.can_merge_with(&arrow_schema) } /// Determine whether the writer's current schema can be merged with `suggested_schema` - pub fn can_merge_with(&self, suggested_schema: &ArrowSchema) -> Result> { + pub fn can_merge_with( + &self, + suggested_schema: &ArrowSchema, + ) -> Result, Box> { ArrowSchema::try_merge(vec![ - suggested_schema.clone(), - self.arrow_schema_ref.as_ref().clone(), - ]).map_err(|e| e.into()) + suggested_schema.clone(), + self.arrow_schema_ref.as_ref().clone(), + ]) + .map(Arc::new) + .map_err(|e| e.into()) } /// Writes the given values to internal parquet buffers for each represented partition. @@ -591,23 +599,23 @@ mod datawriter_tests { async fn get_default_writer() -> (DataWriter, DeltaTable) { let table = inmemory_table().await; - (DataWriter::with_options(&table, DataWriterOptions::default()).expect("Failed to make writer"), - table) + ( + DataWriter::with_options(&table, DataWriterOptions::default()) + .expect("Failed to make writer"), + table, + ) } #[tokio::test] async fn test_can_merge_with_simple() { let (writer, _) = get_default_writer().await; - let delta_schema = StructType::new(vec![ - StructField::new( - "vid".to_string(), - DeltaDataType::Primitive(PrimitiveType::Integer), - true, - ), - ]); - let arrow_schema: ArrowSchema = - >::try_from(&delta_schema) - .expect("Failed to convert arrow schema somehow"); + let delta_schema = StructType::new(vec![StructField::new( + "vid".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + )]); + let arrow_schema: ArrowSchema = >::try_from(&delta_schema) + .expect("Failed to convert arrow schema somehow"); let result = writer.can_merge_with(&arrow_schema); assert_eq!(true, result.is_ok(), "This should be able to merge"); } @@ -615,34 +623,39 @@ mod datawriter_tests { #[tokio::test] async fn test_can_merge_with_diff_column() { let (writer, _) = get_default_writer().await; - let delta_schema = StructType::new(vec![ - StructField::new( - "id".to_string(), - DeltaDataType::Primitive(PrimitiveType::Integer), - true, - ), - ]); - let arrow_schema: ArrowSchema = - >::try_from(&delta_schema) - .expect("Failed to convert arrow schema somehow"); + let delta_schema = StructType::new(vec![StructField::new( + "id".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + )]); + let arrow_schema: ArrowSchema = >::try_from(&delta_schema) + .expect("Failed to convert arrow schema somehow"); let result = writer.can_merge_with(&arrow_schema); - assert_eq!(true, result.is_err(), "Cannot merge this schema, but DataWriter thinks I can?"); + assert_eq!( + true, + result.is_err(), + "Cannot merge this schema, but DataWriter thinks I can?" + ); } #[tokio::test] async fn test_update_schema() { let (mut writer, _) = get_default_writer().await; - let new_schema = StructType::new(vec![ - StructField::new( - "vid".to_string(), - DeltaDataType::Primitive(PrimitiveType::Integer), - true, - ), - ]); - let metadata = DeltaTableMetaData::new(None, None, None, new_schema, vec![], HashMap::new()); - - let result = writer.update_schema(&metadata).expect("Failed to execute update_schema"); - assert_eq!(true, result, "Expected that the new schema would have caused an update"); + let new_schema = StructType::new(vec![StructField::new( + "vid".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + )]); + let metadata = + DeltaTableMetaData::new(None, None, None, new_schema, vec![], HashMap::new()); + + let result = writer + .update_schema(&metadata) + .expect("Failed to execute update_schema"); + assert_eq!( + true, result, + "Expected that the new schema would have caused an update" + ); } #[tokio::test] @@ -650,15 +663,25 @@ mod datawriter_tests { let (mut writer, table) = get_default_writer().await; let mut metadata = table.state.delta_metadata().unwrap().clone(); metadata.partition_columns = vec!["test".into()]; - let result = writer.update_schema(&metadata).expect("Failed to execute update_schema"); - assert_eq!(true, result, "Expected that the new schema would have caused an update"); + let result = writer + .update_schema(&metadata) + .expect("Failed to execute update_schema"); + assert_eq!( + true, result, + "Expected that the new schema would have caused an update" + ); } #[tokio::test] async fn test_update_schema_no_changes() { let (mut writer, table) = get_default_writer().await; - let result = writer.update_schema(table.state.delta_metadata().unwrap()).expect("Failed to execute update_schema"); - assert_eq!(false, result, "Expected that there would be no schema changes"); + let result = writer + .update_schema(table.state.delta_metadata().unwrap()) + .expect("Failed to execute update_schema"); + assert_eq!( + false, result, + "Expected that there would be no schema changes" + ); } } @@ -1475,7 +1498,6 @@ mod tests { } #[tokio::test] - #[ignore] async fn test_schema_matching() { let temp_dir = tempfile::tempdir().unwrap(); let table_path = temp_dir.path(); @@ -1506,21 +1528,10 @@ mod tests { .into()]; let result = writer.write(rows).await; assert!( - result.is_err(), - "Expected the write of our invalid schema rows to fail!\n{:?}", + result.is_ok(), + "Expecting the write of the valid schema to succeed!\n{:?}", result ); - match result { - Ok(_) => unreachable!(), - //Err(Box) => {}, - Err(e) => { - assert!( - false, - "I was expecting a schema mismatch, got this instead: {:?}", - e - ); - } - } } #[tokio::test]