Skip to content

Commit

Permalink
fix(rust): features not maintained in protocol after checkpoint (#2293)
Browse files Browse the repository at this point in the history
# Description
The CheckPoint schema simply didn't contain the features as fields in
the Protocol struct, and we weren't propagating them as well within the
checkpoint function.

- closes #2288
  • Loading branch information
ion-elgreco authored Mar 18, 2024
1 parent 657625d commit dadf264
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 41 deletions.
4 changes: 3 additions & 1 deletion crates/core/src/kernel/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,9 @@ pub(crate) fn delta_log_schema_for_table(
],
protocol[
minReaderVersion:Int32,
minWriterVersion:Int32
minWriterVersion:Int32,
writerFeatures[element]{Utf8},
readerFeatures[element]{Utf8}
],
txn[
appId:Utf8,
Expand Down
73 changes: 33 additions & 40 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ use tracing::{debug, error};

use super::{time_utils, ProtocolError};
use crate::kernel::arrow::delta_log_schema_for_table;
use crate::kernel::{
Action, Add as AddAction, DataType, PrimitiveType, Protocol, Remove, StructField, Txn,
};
use crate::kernel::{Action, Add as AddAction, DataType, PrimitiveType, Remove, StructField, Txn};
use crate::logstore::LogStore;
use crate::table::state::DeltaTableState;
use crate::table::{get_partition_col_data_types, CheckPoint, CheckPointBuilder};
Expand Down Expand Up @@ -276,44 +274,39 @@ fn parquet_bytes_from_state(
}
let files = state.file_actions().unwrap();
// protocol
let jsons = std::iter::once(Action::Protocol(Protocol {
min_reader_version: state.protocol().min_reader_version,
min_writer_version: state.protocol().min_writer_version,
writer_features: None,
reader_features: None,
}))
// metaData
.chain(std::iter::once(Action::Metadata(current_metadata.clone())))
// txns
.chain(
state
.app_transaction_version()
.iter()
.map(|(app_id, version)| {
Action::Txn(Txn {
app_id: app_id.clone(),
version: *version,
last_updated: None,
})
}),
)
// removes
.chain(tombstones.iter().map(|r| {
let mut r = (*r).clone();

// As a "new writer", we should always set `extendedFileMetadata` when writing, and include/ignore the other three fields accordingly.
// https://github.com/delta-io/delta/blob/fb0452c2fb142310211c6d3604eefb767bb4a134/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala#L311-L314
if r.extended_file_metadata.is_none() {
r.extended_file_metadata = Some(false);
}
let jsons = std::iter::once(Action::Protocol(state.protocol().clone()))
// metaData
.chain(std::iter::once(Action::Metadata(current_metadata.clone())))
// txns
.chain(
state
.app_transaction_version()
.iter()
.map(|(app_id, version)| {
Action::Txn(Txn {
app_id: app_id.clone(),
version: *version,
last_updated: None,
})
}),
)
// removes
.chain(tombstones.iter().map(|r| {
let mut r = (*r).clone();

// As a "new writer", we should always set `extendedFileMetadata` when writing, and include/ignore the other three fields accordingly.
// https://github.com/delta-io/delta/blob/fb0452c2fb142310211c6d3604eefb767bb4a134/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala#L311-L314
if r.extended_file_metadata.is_none() {
r.extended_file_metadata = Some(false);
}

Action::Remove(r)
}))
.map(|a| serde_json::to_value(a).map_err(ProtocolError::from))
// adds
.chain(files.iter().map(|f| {
checkpoint_add_from_state(f, partition_col_data_types.as_slice(), &stats_conversions)
}));
Action::Remove(r)
}))
.map(|a| serde_json::to_value(a).map_err(ProtocolError::from))
// adds
.chain(files.iter().map(|f| {
checkpoint_add_from_state(f, partition_col_data_types.as_slice(), &stats_conversions)
}));

// Create the arrow schema that represents the Checkpoint parquet file.
let arrow_schema = delta_log_schema_for_table(
Expand Down
22 changes: 22 additions & 0 deletions python/tests/test_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,25 @@ def test_cleanup_metadata_no_checkpoint(tmp_path: pathlib.Path, sample_data: pa.
assert first_log_path.exists()
assert second_log_path.exists()
assert third_log_path.exists()


def test_features_maintained_after_checkpoint(tmp_path: pathlib.Path):
from datetime import datetime

data = pa.table(
{
"timestamp": pa.array([datetime(2022, 1, 1)]),
}
)
write_deltalake(tmp_path, data)

dt = DeltaTable(tmp_path)
current_protocol = dt.protocol()

dt.create_checkpoint()

dt = DeltaTable(tmp_path)
protocol_after_checkpoint = dt.protocol()

assert protocol_after_checkpoint.reader_features == ["timestampNtz"]
assert current_protocol == protocol_after_checkpoint

0 comments on commit dadf264

Please sign in to comment.