Skip to content

Commit

Permalink
Compensate for invalid log files created by Delta Live Tables
Browse files Browse the repository at this point in the history
It would appear that in some cases Delta Live Tables will create a Delta table
which does not adhere to the Delta Table protocol.

The metaData action as a **required** `schemaString` property which simply
doesn't exist. Since it appears that this only exists at version zero of the
transaction log, and the _actual_ schema exists in the following versions of the
table (e.g. 1), this change introduces a default deserializer on the MetaData
action which provides a simple empty schema.

This is an alternative implementation to #1305 which is a bit more invasive and
makes our schema_string struct member `Option<String>` which I do not believe is
worth it for this unfortunate compatibility issue

Closes #1305, #1302, #1357

Sponsored-by: Databricks Inc
  • Loading branch information
rtyler committed Sep 20, 2023
1 parent d13056e commit f6cb3ca
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 0 deletions.
7 changes: 7 additions & 0 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,12 @@ impl Default for Format {
}
}

/// Return a default empty schema to be used for edge-cases when a schema is missing
fn default_schema() -> String {
warn!("A `metaData` action was missing a `schemaString` and has been given an empty schema");
r#"{"type":"struct", "fields": []}"#.into()
}

/// Action that describes the metadata of the table.
/// This is a top-level action in Delta log entries.
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
Expand All @@ -520,6 +526,7 @@ pub struct MetaData {
/// Specification of the encoding for the files stored in the table
pub format: Format,
/// Schema of the table
#[serde(default = "default_schema")]
pub schema_string: String,
/// An array containing the names of columns by which the data should be partitioned
pub partition_columns: Vec<String>,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1681798578373,"operation":"DLT REFRESH","operationParameters":{"pipelineId":"5c7b47e9-12d6-4986-a601-6716734281ce","updateId":"adf780f9-789f-4857-8d41-e4d9938b61d5"},"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{},"engineInfo":"Databricks-Runtime/dlt:11.3-delta-pipelines-801d604-ff1aff6-f0f113d-custom-local","txnId":"a2ad05c8-4559-41bc-b7b1-a94773bd2286"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"ac0a0120-970e-4d8c-ae92-b5244b055d6e","format":{"provider":"parquet","options":{}},"partitionColumns":[],"configuration":{},"createdTime":1681798577757}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1681798612850,"operation":"DLT SETUP","operationParameters":{"pipelineId":"5c7b47e9-12d6-4986-a601-6716734281ce","updateId":"adf780f9-789f-4857-8d41-e4d9938b61d5"},"readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{},"engineInfo":"Databricks-Runtime/dlt:11.3-delta-pipelines-801d604-ff1aff6-f0f113d-custom-local","txnId":"c1e3c149-d6b1-4a51-b7a6-89c328d14833"}}
{"protocol":{"minReaderVersion":2,"minWriterVersion":5}}
{"metaData":{"id":"ac0a0120-970e-4d8c-ae92-b5244b055d6e","name":"SnowflakeTest_Snowflake_DTL_SHERPA_USER_TABLE","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"sherpa_user_id\",\"type\":\"decimal(38,0)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"enabled\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"last_login\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"first_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"last_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"full_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"email\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"job_title\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"hire_date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}},{\"name\":\"skypoint_delta_index\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"pipelines.pipelineId":"5c7b47e9-12d6-4986-a601-6716734281ce","pipelines.autoOptimize.managed":"false","pipelines.metastore.tableName":"automation_retailsandbox.SnowflakeTest_Snowflake_DTL_SHERPA_USER_TABLE"},"createdTime":1681798577757}}
10 changes: 10 additions & 0 deletions rust/tests/read_delta_log_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,13 @@ async fn test_read_liquid_table() -> DeltaResult<()> {
let _table = deltalake::open_table(&path).await?;
Ok(())
}

// test for: https://github.com/delta-io/delta-rs/issues/1302
#[tokio::test]
async fn read_delta_table_from_dlt() {
let table = deltalake::open_table("./tests/data/delta-live-table")
.await
.unwrap();
assert_eq!(table.version(), 1);
assert!(table.schema().is_some());
}

0 comments on commit f6cb3ca

Please sign in to comment.