diff --git a/crates/core/src/kernel/arrow/mod.rs b/crates/core/src/kernel/arrow/mod.rs index 7dbac2854d..d27bc6463b 100644 --- a/crates/core/src/kernel/arrow/mod.rs +++ b/crates/core/src/kernel/arrow/mod.rs @@ -451,7 +451,9 @@ pub(crate) fn delta_log_schema_for_table( ], protocol[ minReaderVersion:Int32, - minWriterVersion:Int32 + minWriterVersion:Int32, + writerFeatures[element]{Utf8}, + readerFeatures[element]{Utf8} ], txn[ appId:Utf8, diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index ae6d1debd8..4a2df18ee8 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -18,9 +18,7 @@ use tracing::{debug, error}; use super::{time_utils, ProtocolError}; use crate::kernel::arrow::delta_log_schema_for_table; -use crate::kernel::{ - Action, Add as AddAction, DataType, PrimitiveType, Protocol, Remove, StructField, Txn, -}; +use crate::kernel::{Action, Add as AddAction, DataType, PrimitiveType, Remove, StructField, Txn}; use crate::logstore::LogStore; use crate::table::state::DeltaTableState; use crate::table::{get_partition_col_data_types, CheckPoint, CheckPointBuilder}; @@ -276,44 +274,39 @@ fn parquet_bytes_from_state( } let files = state.file_actions().unwrap(); // protocol - let jsons = std::iter::once(Action::Protocol(Protocol { - min_reader_version: state.protocol().min_reader_version, - min_writer_version: state.protocol().min_writer_version, - writer_features: None, - reader_features: None, - })) - // metaData - .chain(std::iter::once(Action::Metadata(current_metadata.clone()))) - // txns - .chain( - state - .app_transaction_version() - .iter() - .map(|(app_id, version)| { - Action::Txn(Txn { - app_id: app_id.clone(), - version: *version, - last_updated: None, - }) - }), - ) - // removes - .chain(tombstones.iter().map(|r| { - let mut r = (*r).clone(); - - // As a "new writer", we should always set `extendedFileMetadata` when writing, and include/ignore the other three fields accordingly. - // https://github.com/delta-io/delta/blob/fb0452c2fb142310211c6d3604eefb767bb4a134/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala#L311-L314 - if r.extended_file_metadata.is_none() { - r.extended_file_metadata = Some(false); - } + let jsons = std::iter::once(Action::Protocol(state.protocol().clone())) + // metaData + .chain(std::iter::once(Action::Metadata(current_metadata.clone()))) + // txns + .chain( + state + .app_transaction_version() + .iter() + .map(|(app_id, version)| { + Action::Txn(Txn { + app_id: app_id.clone(), + version: *version, + last_updated: None, + }) + }), + ) + // removes + .chain(tombstones.iter().map(|r| { + let mut r = (*r).clone(); + + // As a "new writer", we should always set `extendedFileMetadata` when writing, and include/ignore the other three fields accordingly. + // https://github.com/delta-io/delta/blob/fb0452c2fb142310211c6d3604eefb767bb4a134/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala#L311-L314 + if r.extended_file_metadata.is_none() { + r.extended_file_metadata = Some(false); + } - Action::Remove(r) - })) - .map(|a| serde_json::to_value(a).map_err(ProtocolError::from)) - // adds - .chain(files.iter().map(|f| { - checkpoint_add_from_state(f, partition_col_data_types.as_slice(), &stats_conversions) - })); + Action::Remove(r) + })) + .map(|a| serde_json::to_value(a).map_err(ProtocolError::from)) + // adds + .chain(files.iter().map(|f| { + checkpoint_add_from_state(f, partition_col_data_types.as_slice(), &stats_conversions) + })); // Create the arrow schema that represents the Checkpoint parquet file. let arrow_schema = delta_log_schema_for_table( diff --git a/python/tests/test_checkpoint.py b/python/tests/test_checkpoint.py index fa1ae6a8ae..1cf4b45dd6 100644 --- a/python/tests/test_checkpoint.py +++ b/python/tests/test_checkpoint.py @@ -83,3 +83,25 @@ def test_cleanup_metadata_no_checkpoint(tmp_path: pathlib.Path, sample_data: pa. assert first_log_path.exists() assert second_log_path.exists() assert third_log_path.exists() + + +def test_features_maintained_after_checkpoint(tmp_path: pathlib.Path): + from datetime import datetime + + data = pa.table( + { + "timestamp": pa.array([datetime(2022, 1, 1)]), + } + ) + write_deltalake(tmp_path, data) + + dt = DeltaTable(tmp_path) + current_protocol = dt.protocol() + + dt.create_checkpoint() + + dt = DeltaTable(tmp_path) + protocol_after_checkpoint = dt.protocol() + + assert protocol_after_checkpoint.reader_features == ["timestampNtz"] + assert current_protocol == protocol_after_checkpoint