From 2f43a7ece8665e1421d05710bc937a8a5daa9199 Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Sat, 4 Feb 2023 18:41:54 -0800 Subject: [PATCH] improve debuggability of json ser/de errors --- rust/src/action/mod.rs | 21 +++++- rust/src/delta.rs | 100 ++++++++++++++++++++--------- rust/src/operations/transaction.rs | 16 +++-- rust/src/schema.rs | 17 +++-- rust/src/table_state_arrow.rs | 2 +- 5 files changed, 110 insertions(+), 46 deletions(-) diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index 4bd6916527..978a485836 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -8,7 +8,7 @@ mod parquet_read; #[cfg(feature = "parquet2")] pub mod parquet2_read; -use crate::{schema::*, DeltaTableMetaData}; +use crate::{schema::*, DeltaTableError, DeltaTableMetaData}; use percent_encoding::percent_decode; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; @@ -332,6 +332,25 @@ impl MetaData { } } +impl TryFrom for MetaData { + type Error = DeltaTableError; + + fn try_from(metadata: DeltaTableMetaData) -> Result { + let schema_string = serde_json::to_string(&metadata.schema) + .map_err(|e| DeltaTableError::SerializeSchemaJson { json_err: e })?; + Ok(Self { + id: metadata.id, + name: metadata.name, + description: metadata.description, + format: metadata.format, + schema_string, + partition_columns: metadata.partition_columns, + created_time: metadata.created_time, + configuration: metadata.configuration, + }) + } +} + /// Represents a tombstone (deleted file) in the Delta log. /// This is a top-level action in Delta log entries. #[derive(Serialize, Deserialize, Clone, Eq, Debug, Default)] diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 3f311ff8e3..f3b95fe899 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -115,11 +115,28 @@ pub enum DeltaTableError { }, /// Error returned when the log record has an invalid JSON. - #[error("Invalid JSON in log record: {}", .source)] - InvalidJson { - /// JSON error details returned when the log record has an invalid JSON. - #[from] - source: serde_json::error::Error, + #[error("Invalid JSON in log record, version={}, line=`{}`, err=`{}`", .version, .line, .json_err)] + InvalidJsonLog { + /// JSON error details returned when parsing the record JSON. + json_err: serde_json::error::Error, + /// invalid log entry content. + line: String, + /// corresponding table version for the log file. + version: DeltaDataTypeVersion, + }, + /// Error returned when the log contains invalid stats JSON. + #[error("Invalid JSON in file stats: {}", .json_err)] + InvalidStatsJson { + /// JSON error details returned when parsing the stats JSON. + json_err: serde_json::error::Error, + }, + /// Error returned when the log contains invalid stats JSON. + #[error("Invalid JSON in invariant expression, line=`{line}`, err=`{json_err}`")] + InvalidInvariantJson { + /// JSON error details returned when parsing the invariant expression JSON. + json_err: serde_json::error::Error, + /// Invariant expression. + line: String, }, /// Error returned when the DeltaTable has an invalid version. #[error("Invalid table version: {0}")] @@ -211,6 +228,18 @@ pub enum DeltaTableError { #[error("Cannot infer storage location from: {0}")] InvalidTableLocation(String), /// Generic Delta Table error + #[error("Log JSON serialization error: {json_err}")] + SerializeLogJson { + /// JSON serialization error + json_err: serde_json::error::Error, + }, + /// Generic Delta Table error + #[error("Schema JSON serialization error: {json_err}")] + SerializeSchemaJson { + /// JSON serialization error + json_err: serde_json::error::Error, + }, + /// Generic Delta Table error #[error("Generic DeltaTable error: {0}")] Generic(String), /// Generic Delta Table error @@ -321,24 +350,6 @@ impl TryFrom for DeltaTableMetaData { } } -impl TryFrom for action::MetaData { - type Error = serde_json::error::Error; - - fn try_from(metadata: DeltaTableMetaData) -> Result { - let schema_string = serde_json::to_string(&metadata.schema)?; - Ok(Self { - id: metadata.id, - name: metadata.name, - description: metadata.description, - format: metadata.format, - schema_string, - partition_columns: metadata.partition_columns, - created_time: metadata.created_time, - configuration: metadata.configuration, - }) - } -} - /// Error related to Delta log application #[derive(thiserror::Error, Debug)] pub enum ApplyLogError { @@ -346,7 +357,7 @@ pub enum ApplyLogError { #[error("End of transaction log")] EndOfLog, /// Error returned when the JSON of the log record is invalid. - #[error("Invalid JSON in log record")] + #[error("Invalid JSON found when applying log record")] InvalidJson { /// JSON error details returned when reading the JSON log record. #[from] @@ -612,6 +623,7 @@ impl DeltaTable { async fn get_last_checkpoint(&self) -> Result { let last_checkpoint_path = Path::from_iter(["_delta_log", "_last_checkpoint"]); + debug!("loading checkpoint from {last_checkpoint_path}"); match self.storage.get(&last_checkpoint_path).await { Ok(data) => Ok(serde_json::from_slice(&data.bytes().await?)?), Err(ObjectStoreError::NotFound { .. }) => { @@ -711,6 +723,8 @@ impl DeltaTable { } }; + debug!("start with latest checkpoint version: {version}"); + // scan logs after checkpoint loop { match self @@ -772,11 +786,19 @@ impl DeltaTable { Ok(result) => result.bytes().await, }?; + debug!("parsing commit with version {next_version}..."); let reader = BufReader::new(Cursor::new(commit_log_bytes)); let mut actions = Vec::new(); - for line in reader.lines() { - let action: action::Action = serde_json::from_str(line?.as_str())?; + for re_line in reader.lines() { + let line = re_line?; + let lstr = line.as_str(); + let action = + serde_json::from_str(lstr).map_err(|e| DeltaTableError::InvalidJsonLog { + json_err: e, + version: next_version, + line, + })?; actions.push(action); } Ok(PeekCommit::New(next_version, actions)) @@ -788,6 +810,7 @@ impl DeltaTable { pub async fn update(&mut self) -> Result<(), DeltaTableError> { match self.get_last_checkpoint().await { Ok(last_check_point) => { + debug!("update with latest checkpoint {last_check_point:?}"); if Some(last_check_point) == self.last_check_point { self.update_incremental(None).await } else { @@ -796,7 +819,10 @@ impl DeltaTable { self.update_incremental(None).await } } - Err(LoadCheckpointError::NotFound) => self.update_incremental(None).await, + Err(LoadCheckpointError::NotFound) => { + debug!("update without checkpoint"); + self.update_incremental(None).await + } Err(source) => Err(DeltaTableError::LoadCheckpoint { source }), } } @@ -813,9 +839,15 @@ impl DeltaTable { &mut self, max_version: Option, ) -> Result<(), DeltaTableError> { + debug!( + "incremental update with version({}) and max_version({max_version:?})", + self.version(), + ); + while let PeekCommit::New(new_version, actions) = self.peek_next_commit(self.version()).await? { + debug!("merging table state with version: {new_version}"); let s = DeltaTableState::from_actions(actions, new_version)?; self.state .merge(s, self.config.require_tombstones, self.config.require_files); @@ -864,6 +896,7 @@ impl DeltaTable { } } + debug!("update incrementally from version {version}"); // 2. apply all logs starting from checkpoint self.update_incremental(Some(version)).await?; @@ -1002,10 +1035,10 @@ impl DeltaTable { /// Returns statistics for files, in order pub fn get_stats(&self) -> impl Iterator, DeltaTableError>> + '_ { - self.state - .files() - .iter() - .map(|add| add.get_stats().map_err(DeltaTableError::from)) + self.state.files().iter().map(|add| { + add.get_stats() + .map_err(|e| DeltaTableError::InvalidStatsJson { json_err: e }) + }) } /// Returns partition values for files, in order @@ -1378,7 +1411,10 @@ impl<'a> DeltaTransaction<'a> { } // Serialize all actions that are part of this log entry. - let log_entry = bytes::Bytes::from(log_entry_from_actions(&self.actions)?); + let log_entry = bytes::Bytes::from( + log_entry_from_actions(&self.actions) + .map_err(|e| DeltaTableError::SerializeLogJson { json_err: e })?, + ); // Write delta log entry as temporary file to storage. For the actual commit, // the temporary file is moved (atomic rename) to the delta log folder within `commit` function. diff --git a/rust/src/operations/transaction.rs b/rust/src/operations/transaction.rs index 16e45f0e47..974eaeeedc 100644 --- a/rust/src/operations/transaction.rs +++ b/rust/src/operations/transaction.rs @@ -16,11 +16,10 @@ enum TransactionError { VersionAlreadyExists(DeltaDataTypeVersion), /// Error returned when reading the delta log object failed. - #[error("Error serializing commit: {}", .source)] - Serialize { - /// Storage error details when reading the delta log object failed. - #[from] - source: serde_json::Error, + #[error("Error serializing commit log to json: {json_err}")] + SerializeLogJson { + /// Commit log record JSON serialization error. + json_err: serde_json::error::Error, }, /// Error returned when reading the delta log object failed. @@ -38,7 +37,9 @@ impl From for DeltaTableError { TransactionError::VersionAlreadyExists(version) => { DeltaTableError::VersionAlreadyExists(version) } - TransactionError::Serialize { source } => DeltaTableError::InvalidJson { source }, + TransactionError::SerializeLogJson { json_err } => { + DeltaTableError::SerializeLogJson { json_err } + } TransactionError::ObjectStore { source } => DeltaTableError::ObjectStore { source }, } } @@ -54,7 +55,8 @@ fn commit_uri_from_version(version: DeltaDataTypeVersion) -> Path { fn log_entry_from_actions(actions: &[Action]) -> Result { let mut jsons = Vec::::new(); for action in actions { - let json = serde_json::to_string(action)?; + let json = serde_json::to_string(action) + .map_err(|e| TransactionError::SerializeLogJson { json_err: e })?; jsons.push(json); } Ok(jsons.join("\n")) diff --git a/rust/src/schema.rs b/rust/src/schema.rs index 2851f3122c..831d57b4f9 100644 --- a/rust/src/schema.rs +++ b/rust/src/schema.rs @@ -6,6 +6,8 @@ use serde_json::Value; use std::borrow::Cow; use std::collections::HashMap; +use crate::DeltaTableError; + /// Type alias for a string expected to match a GUID/UUID format pub type Guid = String; /// Type alias for i64/Delta long @@ -64,25 +66,25 @@ impl SchemaTypeStruct { } /// Returns an immutable reference of a specific `Field` instance selected by name. - pub fn get_field_with_name(&self, name: &str) -> Result<&SchemaField, crate::DeltaTableError> { + pub fn get_field_with_name(&self, name: &str) -> Result<&SchemaField, DeltaTableError> { Ok(&self.fields[self.index_of(name)?]) } /// Find the index of the column with the given name. - pub fn index_of(&self, name: &str) -> Result { + pub fn index_of(&self, name: &str) -> Result { for i in 0..self.fields.len() { if self.fields[i].get_name() == name { return Ok(i); } } let valid_fields: Vec = self.fields.iter().map(|f| f.name.clone()).collect(); - Err(crate::DeltaTableError::Generic(format!( + Err(DeltaTableError::Generic(format!( "Unable to get field named \"{name}\". Valid fields: {valid_fields:?}" ))) } /// Get all invariants in the schemas - pub fn get_invariants(&self) -> Result, crate::DeltaTableError> { + pub fn get_invariants(&self) -> Result, DeltaTableError> { let mut remaining_fields: Vec<(String, SchemaField)> = self .get_fields() .iter() @@ -135,7 +137,12 @@ impl SchemaTypeStruct { } // JSON format: {"expression": {"expression": ""} } if let Some(Value::String(invariant_json)) = field.metadata.get("delta.invariants") { - let json: Value = serde_json::from_str(invariant_json)?; + let json: Value = serde_json::from_str(invariant_json).map_err(|e| { + DeltaTableError::InvalidInvariantJson { + json_err: e, + line: invariant_json.to_string(), + } + })?; if let Value::Object(json) = json { if let Some(Value::Object(expr1)) = json.get("expression") { if let Some(Value::String(sql)) = expr1.get("expression") { diff --git a/rust/src/table_state_arrow.rs b/rust/src/table_state_arrow.rs index cc46693ca2..a1845b9503 100644 --- a/rust/src/table_state_arrow.rs +++ b/rust/src/table_state_arrow.rs @@ -283,7 +283,7 @@ impl DeltaTableState { .iter() .map(|f| { f.get_stats() - .map_err(|err| DeltaTableError::InvalidJson { source: err }) + .map_err(|err| DeltaTableError::InvalidStatsJson { json_err: err }) }) .collect::>()?;