Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve debuggability of json ser/de errors #1119

Merged
merged 1 commit into from
Feb 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod parquet_read;
#[cfg(feature = "parquet2")]
pub mod parquet2_read;

use crate::{schema::*, DeltaTableMetaData};
use crate::{schema::*, DeltaTableError, DeltaTableMetaData};
use percent_encoding::percent_decode;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
Expand Down Expand Up @@ -332,6 +332,25 @@ impl MetaData {
}
}

impl TryFrom<DeltaTableMetaData> for MetaData {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive by refactor, makes more sense to keep all MetaData impl in the same place.

type Error = DeltaTableError;

fn try_from(metadata: DeltaTableMetaData) -> Result<Self, Self::Error> {
let schema_string = serde_json::to_string(&metadata.schema)
.map_err(|e| DeltaTableError::SerializeSchemaJson { json_err: e })?;
Ok(Self {
id: metadata.id,
name: metadata.name,
description: metadata.description,
format: metadata.format,
schema_string,
partition_columns: metadata.partition_columns,
created_time: metadata.created_time,
configuration: metadata.configuration,
})
}
}

/// Represents a tombstone (deleted file) in the Delta log.
/// This is a top-level action in Delta log entries.
#[derive(Serialize, Deserialize, Clone, Eq, Debug, Default)]
Expand Down
100 changes: 68 additions & 32 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,28 @@ pub enum DeltaTableError {
},

/// Error returned when the log record has an invalid JSON.
#[error("Invalid JSON in log record: {}", .source)]
InvalidJson {
/// JSON error details returned when the log record has an invalid JSON.
#[from]
source: serde_json::error::Error,
#[error("Invalid JSON in log record, version={}, line=`{}`, err=`{}`", .version, .line, .json_err)]
InvalidJsonLog {
/// JSON error details returned when parsing the record JSON.
json_err: serde_json::error::Error,
/// invalid log entry content.
line: String,
/// corresponding table version for the log file.
version: DeltaDataTypeVersion,
},
/// Error returned when the log contains invalid stats JSON.
#[error("Invalid JSON in file stats: {}", .json_err)]
InvalidStatsJson {
/// JSON error details returned when parsing the stats JSON.
json_err: serde_json::error::Error,
},
/// Error returned when the log contains invalid stats JSON.
#[error("Invalid JSON in invariant expression, line=`{line}`, err=`{json_err}`")]
InvalidInvariantJson {
/// JSON error details returned when parsing the invariant expression JSON.
json_err: serde_json::error::Error,
/// Invariant expression.
line: String,
},
/// Error returned when the DeltaTable has an invalid version.
#[error("Invalid table version: {0}")]
Expand Down Expand Up @@ -211,6 +228,18 @@ pub enum DeltaTableError {
#[error("Cannot infer storage location from: {0}")]
InvalidTableLocation(String),
/// Generic Delta Table error
#[error("Log JSON serialization error: {json_err}")]
SerializeLogJson {
/// JSON serialization error
json_err: serde_json::error::Error,
},
/// Generic Delta Table error
#[error("Schema JSON serialization error: {json_err}")]
SerializeSchemaJson {
/// JSON serialization error
json_err: serde_json::error::Error,
},
/// Generic Delta Table error
#[error("Generic DeltaTable error: {0}")]
Generic(String),
/// Generic Delta Table error
Expand Down Expand Up @@ -321,32 +350,14 @@ impl TryFrom<action::MetaData> for DeltaTableMetaData {
}
}

impl TryFrom<DeltaTableMetaData> for action::MetaData {
type Error = serde_json::error::Error;

fn try_from(metadata: DeltaTableMetaData) -> Result<Self, Self::Error> {
let schema_string = serde_json::to_string(&metadata.schema)?;
Ok(Self {
id: metadata.id,
name: metadata.name,
description: metadata.description,
format: metadata.format,
schema_string,
partition_columns: metadata.partition_columns,
created_time: metadata.created_time,
configuration: metadata.configuration,
})
}
}

/// Error related to Delta log application
#[derive(thiserror::Error, Debug)]
pub enum ApplyLogError {
/// Error returned when the end of transaction log is reached.
#[error("End of transaction log")]
EndOfLog,
/// Error returned when the JSON of the log record is invalid.
#[error("Invalid JSON in log record")]
#[error("Invalid JSON found when applying log record")]
InvalidJson {
/// JSON error details returned when reading the JSON log record.
#[from]
Expand Down Expand Up @@ -612,6 +623,7 @@ impl DeltaTable {

async fn get_last_checkpoint(&self) -> Result<CheckPoint, LoadCheckpointError> {
let last_checkpoint_path = Path::from_iter(["_delta_log", "_last_checkpoint"]);
debug!("loading checkpoint from {last_checkpoint_path}");
match self.storage.get(&last_checkpoint_path).await {
Ok(data) => Ok(serde_json::from_slice(&data.bytes().await?)?),
Err(ObjectStoreError::NotFound { .. }) => {
Expand Down Expand Up @@ -711,6 +723,8 @@ impl DeltaTable {
}
};

debug!("start with latest checkpoint version: {version}");

// scan logs after checkpoint
loop {
match self
Expand Down Expand Up @@ -772,11 +786,19 @@ impl DeltaTable {
Ok(result) => result.bytes().await,
}?;

debug!("parsing commit with version {next_version}...");
let reader = BufReader::new(Cursor::new(commit_log_bytes));

let mut actions = Vec::new();
for line in reader.lines() {
let action: action::Action = serde_json::from_str(line?.as_str())?;
for re_line in reader.lines() {
let line = re_line?;
let lstr = line.as_str();
let action =
serde_json::from_str(lstr).map_err(|e| DeltaTableError::InvalidJsonLog {
json_err: e,
version: next_version,
line,
})?;
actions.push(action);
}
Ok(PeekCommit::New(next_version, actions))
Expand All @@ -788,6 +810,7 @@ impl DeltaTable {
pub async fn update(&mut self) -> Result<(), DeltaTableError> {
match self.get_last_checkpoint().await {
Ok(last_check_point) => {
debug!("update with latest checkpoint {last_check_point:?}");
if Some(last_check_point) == self.last_check_point {
self.update_incremental(None).await
} else {
Expand All @@ -796,7 +819,10 @@ impl DeltaTable {
self.update_incremental(None).await
}
}
Err(LoadCheckpointError::NotFound) => self.update_incremental(None).await,
Err(LoadCheckpointError::NotFound) => {
debug!("update without checkpoint");
self.update_incremental(None).await
}
Err(source) => Err(DeltaTableError::LoadCheckpoint { source }),
}
}
Expand All @@ -813,9 +839,15 @@ impl DeltaTable {
&mut self,
max_version: Option<DeltaDataTypeVersion>,
) -> Result<(), DeltaTableError> {
debug!(
"incremental update with version({}) and max_version({max_version:?})",
self.version(),
);

while let PeekCommit::New(new_version, actions) =
self.peek_next_commit(self.version()).await?
{
debug!("merging table state with version: {new_version}");
let s = DeltaTableState::from_actions(actions, new_version)?;
self.state
.merge(s, self.config.require_tombstones, self.config.require_files);
Expand Down Expand Up @@ -864,6 +896,7 @@ impl DeltaTable {
}
}

debug!("update incrementally from version {version}");
// 2. apply all logs starting from checkpoint
self.update_incremental(Some(version)).await?;

Expand Down Expand Up @@ -1002,10 +1035,10 @@ impl DeltaTable {

/// Returns statistics for files, in order
pub fn get_stats(&self) -> impl Iterator<Item = Result<Option<Stats>, DeltaTableError>> + '_ {
self.state
.files()
.iter()
.map(|add| add.get_stats().map_err(DeltaTableError::from))
self.state.files().iter().map(|add| {
add.get_stats()
.map_err(|e| DeltaTableError::InvalidStatsJson { json_err: e })
})
}

/// Returns partition values for files, in order
Expand Down Expand Up @@ -1378,7 +1411,10 @@ impl<'a> DeltaTransaction<'a> {
}

// Serialize all actions that are part of this log entry.
let log_entry = bytes::Bytes::from(log_entry_from_actions(&self.actions)?);
let log_entry = bytes::Bytes::from(
log_entry_from_actions(&self.actions)
.map_err(|e| DeltaTableError::SerializeLogJson { json_err: e })?,
);

// Write delta log entry as temporary file to storage. For the actual commit,
// the temporary file is moved (atomic rename) to the delta log folder within `commit` function.
Expand Down
16 changes: 9 additions & 7 deletions rust/src/operations/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ enum TransactionError {
VersionAlreadyExists(DeltaDataTypeVersion),

/// Error returned when reading the delta log object failed.
#[error("Error serializing commit: {}", .source)]
Serialize {
/// Storage error details when reading the delta log object failed.
#[from]
source: serde_json::Error,
#[error("Error serializing commit log to json: {json_err}")]
SerializeLogJson {
/// Commit log record JSON serialization error.
json_err: serde_json::error::Error,
},

/// Error returned when reading the delta log object failed.
Expand All @@ -38,7 +37,9 @@ impl From<TransactionError> for DeltaTableError {
TransactionError::VersionAlreadyExists(version) => {
DeltaTableError::VersionAlreadyExists(version)
}
TransactionError::Serialize { source } => DeltaTableError::InvalidJson { source },
TransactionError::SerializeLogJson { json_err } => {
DeltaTableError::SerializeLogJson { json_err }
}
TransactionError::ObjectStore { source } => DeltaTableError::ObjectStore { source },
}
}
Expand All @@ -54,7 +55,8 @@ fn commit_uri_from_version(version: DeltaDataTypeVersion) -> Path {
fn log_entry_from_actions(actions: &[Action]) -> Result<String, TransactionError> {
let mut jsons = Vec::<String>::new();
for action in actions {
let json = serde_json::to_string(action)?;
let json = serde_json::to_string(action)
.map_err(|e| TransactionError::SerializeLogJson { json_err: e })?;
jsons.push(json);
}
Ok(jsons.join("\n"))
Expand Down
17 changes: 12 additions & 5 deletions rust/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use serde_json::Value;
use std::borrow::Cow;
use std::collections::HashMap;

use crate::DeltaTableError;

/// Type alias for a string expected to match a GUID/UUID format
pub type Guid = String;
/// Type alias for i64/Delta long
Expand Down Expand Up @@ -64,25 +66,25 @@ impl SchemaTypeStruct {
}

/// Returns an immutable reference of a specific `Field` instance selected by name.
pub fn get_field_with_name(&self, name: &str) -> Result<&SchemaField, crate::DeltaTableError> {
pub fn get_field_with_name(&self, name: &str) -> Result<&SchemaField, DeltaTableError> {
Ok(&self.fields[self.index_of(name)?])
}

/// Find the index of the column with the given name.
pub fn index_of(&self, name: &str) -> Result<usize, crate::DeltaTableError> {
pub fn index_of(&self, name: &str) -> Result<usize, DeltaTableError> {
for i in 0..self.fields.len() {
if self.fields[i].get_name() == name {
return Ok(i);
}
}
let valid_fields: Vec<String> = self.fields.iter().map(|f| f.name.clone()).collect();
Err(crate::DeltaTableError::Generic(format!(
Err(DeltaTableError::Generic(format!(
"Unable to get field named \"{name}\". Valid fields: {valid_fields:?}"
)))
}

/// Get all invariants in the schemas
pub fn get_invariants(&self) -> Result<Vec<Invariant>, crate::DeltaTableError> {
pub fn get_invariants(&self) -> Result<Vec<Invariant>, DeltaTableError> {
let mut remaining_fields: Vec<(String, SchemaField)> = self
.get_fields()
.iter()
Expand Down Expand Up @@ -135,7 +137,12 @@ impl SchemaTypeStruct {
}
// JSON format: {"expression": {"expression": "<SQL STRING>"} }
if let Some(Value::String(invariant_json)) = field.metadata.get("delta.invariants") {
let json: Value = serde_json::from_str(invariant_json)?;
let json: Value = serde_json::from_str(invariant_json).map_err(|e| {
DeltaTableError::InvalidInvariantJson {
json_err: e,
line: invariant_json.to_string(),
}
})?;
if let Value::Object(json) = json {
if let Some(Value::Object(expr1)) = json.get("expression") {
if let Some(Value::String(sql)) = expr1.get("expression") {
Expand Down
2 changes: 1 addition & 1 deletion rust/src/table_state_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl DeltaTableState {
.iter()
.map(|f| {
f.get_stats()
.map_err(|err| DeltaTableError::InvalidJson { source: err })
.map_err(|err| DeltaTableError::InvalidStatsJson { json_err: err })
})
.collect::<Result<_, DeltaTableError>>()?;

Expand Down