-
Notifications
You must be signed in to change notification settings - Fork 442
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: typed commit info #1207
feat: typed commit info #1207
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,8 @@ mod parquet_read; | |
#[cfg(feature = "parquet2")] | ||
pub mod parquet2_read; | ||
|
||
use crate::{schema::*, DeltaTableError, DeltaTableMetaData}; | ||
use crate::delta_config::IsolationLevel; | ||
use crate::{schema::*, DeltaResult, DeltaTableError, DeltaTableMetaData}; | ||
use percent_encoding::percent_decode; | ||
use serde::{Deserialize, Serialize}; | ||
use serde_json::{Map, Value}; | ||
|
@@ -44,6 +45,13 @@ pub enum ActionError { | |
#[from] | ||
source: parquet::errors::ParquetError, | ||
}, | ||
/// Faild to serialize operation | ||
#[error("Failed to serialize operation: {source}")] | ||
SerializeOperation { | ||
#[from] | ||
/// The source error | ||
source: serde_json::Error, | ||
}, | ||
} | ||
|
||
fn decode_path(raw_path: &str) -> Result<String, ActionError> { | ||
|
@@ -435,7 +443,46 @@ pub struct Protocol { | |
pub min_writer_version: DeltaDataTypeInt, | ||
} | ||
|
||
type CommitInfo = Map<String, Value>; | ||
/// The commitInfo is a fairly flexible action within the delta specification, where arbitrary data can be stored. | ||
/// However the reference implementation as well as delta-rs store useful information that may for instance | ||
/// allow us to be more permissive in commit conflict resolution. | ||
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)] | ||
#[serde(rename_all = "camelCase")] | ||
pub struct CommitInfo { | ||
/// Version number the commit corresponds to | ||
#[serde(skip_serializing_if = "Option::is_none")] | ||
pub version: Option<DeltaDataTypeVersion>, | ||
/// Timestamp in millis when the commit was created | ||
#[serde(skip_serializing_if = "Option::is_none")] | ||
pub timestamp: Option<DeltaDataTypeTimestamp>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As an aside, it would be cool if we swapped out There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Absolutely, we may also be able to implement custom serde, so it is parsed as a datetime. |
||
/// Id of the user invoking the commit | ||
#[serde(skip_serializing_if = "Option::is_none")] | ||
pub user_id: Option<String>, | ||
/// Name of the user invoking the commit | ||
#[serde(skip_serializing_if = "Option::is_none")] | ||
pub user_name: Option<String>, | ||
/// The operation performed during the | ||
#[serde(skip_serializing_if = "Option::is_none")] | ||
pub operation: Option<String>, | ||
/// Parameters used for table operation | ||
#[serde(skip_serializing_if = "Option::is_none")] | ||
pub operation_parameters: Option<HashMap<String, serde_json::Value>>, | ||
/// Version of the table when the operation was started | ||
#[serde(skip_serializing_if = "Option::is_none")] | ||
pub read_version: Option<i64>, | ||
/// The isolation level of the commit | ||
#[serde(skip_serializing_if = "Option::is_none")] | ||
pub isolation_level: Option<IsolationLevel>, | ||
/// TODO | ||
#[serde(skip_serializing_if = "Option::is_none")] | ||
pub is_blind_append: Option<bool>, | ||
/// Delta engine which created the commit. | ||
#[serde(skip_serializing_if = "Option::is_none")] | ||
pub engine_info: Option<String>, | ||
/// Additional provenance information for the commit | ||
#[serde(flatten, default)] | ||
pub info: Map<String, serde_json::Value>, | ||
} | ||
|
||
/// Represents an action in the Delta log. The Delta log is an aggregate of all actions performed | ||
/// on the table, so the full list of actions is required to properly read a table. | ||
|
@@ -459,6 +506,16 @@ pub enum Action { | |
commitInfo(CommitInfo), | ||
} | ||
|
||
impl Action { | ||
/// Create a commit info from a map | ||
pub fn commit_info(info: Map<String, serde_json::Value>) -> Self { | ||
Self::commitInfo(CommitInfo { | ||
info, | ||
..Default::default() | ||
}) | ||
} | ||
} | ||
|
||
/// Operation performed when creating a new log entry with one or more actions. | ||
/// This is a key element of the `CommitInfo` action. | ||
#[allow(clippy::large_enum_variant)] | ||
|
@@ -517,45 +574,71 @@ pub enum DeltaOperation { | |
} | ||
|
||
impl DeltaOperation { | ||
/// Retrieve basic commit information to be added to Delta commits | ||
pub fn get_commit_info(&self) -> Map<String, Value> { | ||
let mut commit_info = Map::<String, Value>::new(); | ||
let operation = match &self { | ||
DeltaOperation::Create { .. } => "delta-rs.Create", | ||
DeltaOperation::Write { .. } => "delta-rs.Write", | ||
DeltaOperation::StreamingUpdate { .. } => "delta-rs.StreamingUpdate", | ||
DeltaOperation::Optimize { .. } => "delta-rs.Optimize", | ||
DeltaOperation::FileSystemCheck { .. } => "delta-rs.FileSystemCheck", | ||
}; | ||
commit_info.insert( | ||
"operation".to_string(), | ||
serde_json::Value::String(operation.into()), | ||
); | ||
/// A human readable name for the operation | ||
pub fn name(&self) -> &str { | ||
// operation names taken from https://learn.microsoft.com/en-us/azure/databricks/delta/history#--operation-metrics-keys | ||
match &self { | ||
DeltaOperation::Create { mode, .. } if matches!(mode, SaveMode::Overwrite) => { | ||
"CREATE OR REPLACE TABLE" | ||
} | ||
DeltaOperation::Create { .. } => "CREATE TABLE", | ||
DeltaOperation::Write { .. } => "WRITE", | ||
DeltaOperation::StreamingUpdate { .. } => "STREAMING UPDATE", | ||
DeltaOperation::Optimize { .. } => "OPTIMIZE", | ||
DeltaOperation::FileSystemCheck { .. } => "FSCK", | ||
} | ||
} | ||
|
||
if let Ok(serde_json::Value::Object(map)) = serde_json::to_value(self) { | ||
let all_operation_fields = map.values().next().unwrap().as_object().unwrap(); | ||
let converted_operation_fields: Map<String, Value> = all_operation_fields | ||
.iter() | ||
/// Paraemters configured for operation. | ||
roeap marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pub fn operation_parameters(&self) -> DeltaResult<impl Iterator<Item = (String, Value)>> { | ||
// TODO remove unwrap | ||
let serialized = serde_json::to_value(self) | ||
.map_err(|err| ActionError::SerializeOperation { source: err })?; | ||
if let serde_json::Value::Object(map) = serialized { | ||
let all_operation_fields = map.values().next().unwrap().as_object().unwrap().clone(); | ||
roeap marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Ok(all_operation_fields | ||
.into_iter() | ||
.filter(|item| !item.1.is_null()) | ||
.map(|(k, v)| { | ||
( | ||
k.clone(), | ||
k, | ||
serde_json::Value::String(if v.is_string() { | ||
String::from(v.as_str().unwrap()) | ||
} else { | ||
v.to_string() | ||
}), | ||
) | ||
}) | ||
.collect(); | ||
})) | ||
} else { | ||
Err(ActionError::Generic( | ||
"operation parameetrs serialized into unexpected shape".into(), | ||
roeap marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) | ||
.into()) | ||
} | ||
} | ||
|
||
commit_info.insert( | ||
"operationParameters".to_string(), | ||
serde_json::Value::Object(converted_operation_fields), | ||
); | ||
}; | ||
/// Denotes if the operation changes the data contained in the table | ||
pub fn changes_data(&self) -> bool { | ||
!matches!(self, Self::Optimize { .. }) | ||
roeap marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
/// Retrieve basic commit information to be added to Delta commits | ||
pub fn get_commit_info(&self) -> CommitInfo { | ||
// TODO infer additional info from operation parameters ... | ||
CommitInfo { | ||
operation: Some(self.name().into()), | ||
operation_parameters: self.operation_parameters().ok().map(|iter| iter.collect()), | ||
..Default::default() | ||
} | ||
} | ||
|
||
commit_info | ||
/// Get predicate expression applien when the operation reads data from the table. | ||
roeap marked this conversation as resolved.
Show resolved
Hide resolved
|
||
pub fn read_predicate(&self) -> Option<String> { | ||
match self { | ||
// TODO add more operations | ||
Self::Write { predicate, .. } => predicate.clone(), | ||
_ => None, | ||
} | ||
} | ||
} | ||
|
||
|
@@ -654,4 +737,65 @@ mod tests { | |
1 | ||
); | ||
} | ||
|
||
#[test] | ||
fn test_read_commit_info() { | ||
let raw = r#" | ||
{ | ||
"timestamp": 1670892998177, | ||
"operation": "WRITE", | ||
"operationParameters": { | ||
"mode": "Append", | ||
"partitionBy": "[\"c1\",\"c2\"]" | ||
}, | ||
"isolationLevel": "Serializable", | ||
"isBlindAppend": true, | ||
"operationMetrics": { | ||
"numFiles": "3", | ||
"numOutputRows": "3", | ||
"numOutputBytes": "1356" | ||
}, | ||
"engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.2.0", | ||
"txnId": "046a258f-45e3-4657-b0bf-abfb0f76681c" | ||
}"#; | ||
|
||
let info = serde_json::from_str::<CommitInfo>(raw); | ||
assert!(info.is_ok()); | ||
|
||
println!("{:?}", info); | ||
roeap marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// assert that commit info has no required filelds | ||
let raw = "{}"; | ||
let info = serde_json::from_str::<CommitInfo>(raw); | ||
assert!(info.is_ok()); | ||
|
||
// arbitrary field data may be added to commit | ||
let raw = r#" | ||
{ | ||
"timestamp": 1670892998177, | ||
"operation": "WRITE", | ||
"operationParameters": { | ||
"mode": "Append", | ||
"partitionBy": "[\"c1\",\"c2\"]" | ||
}, | ||
"isolationLevel": "Serializable", | ||
"isBlindAppend": true, | ||
"operationMetrics": { | ||
"numFiles": "3", | ||
"numOutputRows": "3", | ||
"numOutputBytes": "1356" | ||
}, | ||
"engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.2.0", | ||
"txnId": "046a258f-45e3-4657-b0bf-abfb0f76681c", | ||
"additionalField": "more data", | ||
"additionalStruct": { | ||
"key": "value", | ||
"otherKey": 123 | ||
} | ||
}"#; | ||
|
||
let info = serde_json::from_str::<CommitInfo>(raw).expect("should parse"); | ||
assert!(info.info.contains_key("additionalField")); | ||
assert!(info.info.contains_key("additionalStruct")); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this redundant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is, and nobody actually writes it to the commit info in the log. Initially I was following the spark implementation for the optimistic commits quite closely, where this field is being used. Not sure if we still do though. I'll remove it for now, and bring it back in case its needed in the follow up PR...