Skip to content

Commit

Permalink
fix(rust): unable to read delta table when table contains both null a…
Browse files Browse the repository at this point in the history
…nd non-null add stats (#2476)

# Description
To fix the issue when a delta table contains add action with
stats_parsed: null.

As shown in the test case, `001.json` contains an Add action with stats,
while `002.json` contains an Add action with `stats_parsed: null`,
before this fix, it will complain:

```
Arrow { source: InvalidArgumentError("all columns in a record batch must have the same length") }
```

The issue is that the array for `num_records` has two values, while for
other stats such as null_count, the None value is filtered out by
`flat_map`, so there is only one value in the array.


# Related Issue(s)
closes #2477 

# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
yjshen authored May 6, 2024
1 parent d7165cf commit e370d34
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 3 deletions.
26 changes: 26 additions & 0 deletions crates/core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,32 @@ mod tests {
assert_eq!(expected, actions);
}

#[tokio::test]
async fn test_table_not_always_with_stats() {
let path = "../test/tests/data/delta-stats-optional";
let mut table = crate::open_table(path).await.unwrap();
table.load().await.unwrap();
let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
let actions = sort_batch_by(&actions, "path").unwrap();
// get column-0 path, and column-4 num_records, and column_5 null_count.integer
let expected_path: ArrayRef = Arc::new(array::StringArray::from(vec![
"part-00000-28925d3a-bdf2-411e-bca9-b067444cbcb0-c000.snappy.parquet",
"part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet",
]));
let expected_num_records: ArrayRef =
Arc::new(array::Int64Array::from(vec![None, Some(1)]));
let expected_null_count: ArrayRef =
Arc::new(array::Int64Array::from(vec![None, Some(0)]));

let path_column = actions.column(0);
let num_records_column = actions.column(4);
let null_count_column = actions.column(5);

assert_eq!(&expected_path, path_column);
assert_eq!(&expected_num_records, num_records_column);
assert_eq!(&expected_null_count, null_count_column);
}

#[tokio::test]
async fn test_only_struct_stats() {
// test table with no json stats
Expand Down
9 changes: 6 additions & 3 deletions crates/core/src/table/state_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,11 +447,12 @@ impl DeltaTableState {
.map(|(path, datatype)| -> Result<ColStats, DeltaTableError> {
let null_count = stats
.iter()
.flat_map(|maybe_stat| {
.map(|maybe_stat| {
maybe_stat
.as_ref()
.map(|stat| resolve_column_count_stat(&stat.null_count, &path))
})
.map(|null_count| null_count.flatten())
.collect::<Vec<Option<i64>>>();
let null_count = Some(value_vec_to_array(null_count, |values| {
Ok(Arc::new(arrow::array::Int64Array::from(values)))
Expand All @@ -463,11 +464,12 @@ impl DeltaTableState {
let min_values = if matches!(datatype, DeltaDataType::Primitive(_)) {
let min_values = stats
.iter()
.flat_map(|maybe_stat| {
.map(|maybe_stat| {
maybe_stat
.as_ref()
.map(|stat| resolve_column_value_stat(&stat.min_values, &path))
})
.map(|min_value| min_value.flatten())
.collect::<Vec<Option<&serde_json::Value>>>();

Some(value_vec_to_array(min_values, |values| {
Expand All @@ -480,11 +482,12 @@ impl DeltaTableState {
let max_values = if matches!(datatype, DeltaDataType::Primitive(_)) {
let max_values = stats
.iter()
.flat_map(|maybe_stat| {
.map(|maybe_stat| {
maybe_stat
.as_ref()
.map(|stat| resolve_column_value_stat(&stat.max_values, &path))
})
.map(|max_value| max_value.flatten())
.collect::<Vec<Option<&serde_json::Value>>>();
Some(value_vec_to_array(max_values, |values| {
json_value_to_array_general(&arrow_type, values.into_iter())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1666652369577,"userId":"6114986638742036","userName":"dummy_username","operation":"CREATE OR REPLACE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{\"delta.checkpoint.writeStatsAsJson\":\"false\",\"delta.checkpoint.writeStatsAsStruct\":\"true\"}"},"notebook":{"notebookId":"1829280694121074"},"clusterId":"1007-161845-fa2h8e50","isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Databricks-Runtime/10.4.x-scala2.12","txnId":"a8510a45-92dc-4e9f-9f7a-42bbcc9b752d"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"8d3d2b8a-f091-4d7d-8a37-432a9beaf17b","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"integer\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpoint.writeStatsAsJson":"false","delta.checkpoint.writeStatsAsStruct":"true"},"createdTime":1666652369483}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1666652373383,"userId":"6114986638742036","userName":"dummy_username","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"1829280694121074"},"clusterId":"1007-161845-fa2h8e50","readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"5489"},"engineInfo":"Databricks-Runtime/10.4.x-scala2.12","txnId":"35e88c76-9cfb-4e0e-bce8-2317f3c49c75"}}
{"metaData":{"id":"8d3d2b8a-f091-4d7d-8a37-432a9beaf17b","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"integer\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"null\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"boolean\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal\",\"type\":\"decimal(8,5)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"string\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"binary\",\"type\":\"binary\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}},{\"name\":\"timestamp\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"struct\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"struct_element\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"map\",\"type\":{\"type\":\"map\",\"keyType\":\"string\",\"valueType\":\"string\",\"valueContainsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"array\",\"type\":{\"type\":\"array\",\"elementType\":\"string\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"nested_struct\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"struct_element\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"nested_struct_element\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"struct_of_array_of_map\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"struct_element\",\"type\":{\"type\":\"array\",\"elementType\":{\"type\":\"map\",\"keyType\":\"string\",\"valueType\":\"string\",\"valueContainsNull\":true},\"containsNull\":true},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpoint.writeStatsAsJson":"false","delta.checkpoint.writeStatsAsStruct":"true"},"createdTime":1666652369483}}
{"add":{"path":"part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet","partitionValues":{},"size":5489,"modificationTime":1666652373000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"integer\":0,\"double\":1.234,\"decimal\":-5.67800,\"string\":\"string\",\"date\":\"2022-10-24\",\"timestamp\":\"2022-10-24T22:59:32.846Z\",\"struct\":{\"struct_element\":\"struct_value\"},\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}}},\"maxValues\":{\"integer\":0,\"double\":1.234,\"decimal\":-5.67800,\"string\":\"string\",\"date\":\"2022-10-24\",\"timestamp\":\"2022-10-24T22:59:32.846Z\",\"struct\":{\"struct_element\":\"struct_value\"},\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":\"nested_struct_value\"}}},\"nullCount\":{\"integer\":0,\"null\":1,\"boolean\":0,\"double\":0,\"decimal\":0,\"string\":0,\"binary\":0,\"date\":0,\"timestamp\":0,\"struct\":{\"struct_element\":0},\"map\":0,\"array\":0,\"nested_struct\":{\"struct_element\":{\"nested_struct_element\":0}},\"struct_of_array_of_map\":{\"struct_element\":0}}}","tags":{"INSERTION_TIME":"1666652373000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1666652374424,"userId":"6114986638742036","userName":"dummy_username","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"1829280694121074"},"clusterId":"1007-161845-fa2h8e50","readVersion":1,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"5489"},"engineInfo":"Databricks-Runtime/10.4.x-scala2.12","txnId":"efe25f5f-e03a-458d-8fbe-34ed2111b3c1"}}
{"add":{"path":"part-00000-28925d3a-bdf2-411e-bca9-b067444cbcb0-c000.snappy.parquet","partitionValues":{},"size":5489,"modificationTime":1666652374000,"dataChange":true,"stats_parsed":null,"tags":{"INSERTION_TIME":"1666652374000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
Binary file not shown.
Binary file not shown.

0 comments on commit e370d34

Please sign in to comment.