Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add provisional workaround to support CDC #1039 #1042

Merged
merged 2 commits into from
Dec 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,15 @@ def test_read_partitioned_table_protocol():
assert protocol.min_writer_version == 2


def test_read_table_with_cdc():
table_path = "../rust/tests/data/simple_table_with_cdc"
dt = DeltaTable(table_path)
assert dt.to_pyarrow_table().to_pydict() == {
"id": [0],
"name": ["Mino"],
}


def test_history_partitioned_table_metadata():
table_path = "../rust/tests/data/delta-0.8.0-partitioned"
dt = DeltaTable(table_path)
Expand Down
19 changes: 19 additions & 0 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,23 @@ pub struct StatsParsed {
pub null_count: HashMap<String, DeltaDataTypeLong>,
}

/// Delta AddCDCFile action that describes a parquet CDC data file.
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct AddCDCFile {
/// A relative path, from the root of the table, or an
/// absolute path to a CDC file
pub path: String,
/// The size of this file in bytes
pub size: DeltaDataTypeLong,
/// A map from partition column to value for this file
pub partition_values: HashMap<String, Option<String>>,
/// Should always be set to false because they do not change the underlying data of the table
pub data_change: bool,
/// Map containing metadata about this file
pub tags: Option<HashMap<String, Option<String>>>,
}

/// Delta log action that describes a parquet data file that is part of the table.
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -395,6 +412,8 @@ pub enum Action {
/// Changes the current metadata of the table. Must be present in the first version of a table.
/// Subsequent `metaData` actions completely overwrite previous metadata.
metaData(MetaData),
/// Adds CDC a file to the table state.
cdc(AddCDCFile),
/// Adds a file to the table state.
add(Add),
/// Removes a file from the table state.
Expand Down
14 changes: 12 additions & 2 deletions rust/src/action/parquet_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use serde_json::json;
use std::collections::HashMap;

use crate::action::{
Action, ActionError, Add, ColumnCountStat, ColumnValueStat, MetaData, Protocol, Remove, Stats,
Txn,
Action, ActionError, Add, AddCDCFile, ColumnCountStat, ColumnValueStat, MetaData, Protocol,
Remove, Stats, Txn,
};

fn populate_hashmap_with_option_from_parquet_map(
Expand Down Expand Up @@ -35,6 +35,15 @@ fn gen_action_type_error(action: &str, field: &str, expected_type: &str) -> Acti
))
}

impl AddCDCFile {
fn from_parquet_record(_record: &parquet::record::Row) -> Result<Self, ActionError> {
let re = Self {
..Default::default()
};
Ok(re)
}
}

impl Add {
fn from_parquet_record(record: &parquet::record::Row) -> Result<Self, ActionError> {
let mut re = Self {
Expand Down Expand Up @@ -586,6 +595,7 @@ impl Action {
"remove" => Action::remove(Remove::from_parquet_record(col_data)?),
"txn" => Action::txn(Txn::from_parquet_record(col_data)?),
"protocol" => Action::protocol(Protocol::from_parquet_record(col_data)?),
"cdc" => Action::cdc(AddCDCFile::from_parquet_record(col_data)?),
name => {
return Err(ActionError::InvalidField(format!(
"Unexpected action from checkpoint: {}",
Expand Down
2 changes: 2 additions & 0 deletions rust/src/table_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ impl DeltaTableState {
require_files: bool,
) -> Result<(), ApplyLogError> {
match action {
// TODO: optionally load CDC into TableState
action::Action::cdc(_v) => {}
Fazzani marked this conversation as resolved.
Show resolved Hide resolved
action::Action::add(v) => {
if require_files {
self.files.push(v.path_decoded()?);
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"tableSizeBytes":0,"numFiles":0,"numMetadata":1,"numProtocol":1,"protocol":{"minReaderVersion":1,"minWriterVersion":4},"metadata":{"id":"abd7ff9d-5fac-48fb-8d13-06c3ce1811ac","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"description":"CDC test Table","delta.enableChangeDataFeed":"true"},"createdTime":1671962429466},"histogramOpt":{"sortedBinBoundaries":[0,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,12582912,16777216,20971520,25165824,29360128,33554432,37748736,41943040,50331648,58720256,67108864,75497472,83886080,92274688,100663296,109051904,117440512,125829120,130023424,134217728,138412032,142606336,146800640,150994944,167772160,184549376,201326592,218103808,234881024,251658240,268435456,285212672,301989888,318767104,335544320,352321536,369098752,385875968,402653184,419430400,436207616,452984832,469762048,486539264,503316480,520093696,536870912,553648128,570425344,587202560,603979776,671088640,738197504,805306368,872415232,939524096,1006632960,1073741824,1140850688,1207959552,1275068416,1342177280,1409286144,1476395008,1610612736,1744830464,1879048192,2013265920,2147483648,2415919104,2684354560,2952790016,3221225472,3489660928,3758096384,4026531840,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944],"fileCounts":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"totalBytes":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"txnId":"ffd3d108-41fa-4573-ada8-6e51e2fdd157","allFiles":[]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1671962429524,"userId":"722754018832045","userName":"xxxx@gmail.com","operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{\"description\":\"CDC test Table\",\"delta.enableChangeDataFeed\":\"true\"}"},"notebook":{"notebookId":"3736423866542999"},"clusterId":"1018-130235-jmbkv29y","isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Databricks-Runtime/11.3.x-scala2.12","txnId":"ffd3d108-41fa-4573-ada8-6e51e2fdd157"}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":4}}
{"metaData":{"id":"abd7ff9d-5fac-48fb-8d13-06c3ce1811ac","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"description":"CDC test Table","delta.enableChangeDataFeed":"true"},"createdTime":1671962429466}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"tableSizeBytes":815,"numFiles":1,"numMetadata":1,"numProtocol":1,"protocol":{"minReaderVersion":1,"minWriterVersion":4},"metadata":{"id":"abd7ff9d-5fac-48fb-8d13-06c3ce1811ac","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"description":"CDC test Table","delta.enableChangeDataFeed":"true"},"createdTime":1671962429466},"histogramOpt":{"sortedBinBoundaries":[0,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,12582912,16777216,20971520,25165824,29360128,33554432,37748736,41943040,50331648,58720256,67108864,75497472,83886080,92274688,100663296,109051904,117440512,125829120,130023424,134217728,138412032,142606336,146800640,150994944,167772160,184549376,201326592,218103808,234881024,251658240,268435456,285212672,301989888,318767104,335544320,352321536,369098752,385875968,402653184,419430400,436207616,452984832,469762048,486539264,503316480,520093696,536870912,553648128,570425344,587202560,603979776,671088640,738197504,805306368,872415232,939524096,1006632960,1073741824,1140850688,1207959552,1275068416,1342177280,1409286144,1476395008,1610612736,1744830464,1879048192,2013265920,2147483648,2415919104,2684354560,2952790016,3221225472,3489660928,3758096384,4026531840,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944],"fileCounts":[1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"totalBytes":[815,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"txnId":"5725ad07-a6b1-4c92-b2f1-cd012b18e597","allFiles":[{"path":"part-00000-996384f7-3fc5-4a5f-9921-6e56269ec2c9-c000.snappy.parquet","partitionValues":{},"size":815,"modificationTime":1671962472000,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":0,\"name\":\"Mario\"},\"maxValues\":{\"id\":0,\"name\":\"Mario\"},\"nullCount\":{\"id\":0,\"name\":0}}","tags":{"INSERTION_TIME":"1671962472000000","MIN_INSERTION_TIME":"1671962472000000","MAX_INSERTION_TIME":"1671962472000000","OPTIMIZE_TARGET_SIZE":"268435456"}}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1671962472835,"userId":"722754018832045","userName":"xxxx@gmail.com","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"3736423866542999"},"clusterId":"1018-130235-jmbkv29y","readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"815"},"engineInfo":"Databricks-Runtime/11.3.x-scala2.12","txnId":"5725ad07-a6b1-4c92-b2f1-cd012b18e597"}}
{"add":{"path":"part-00000-996384f7-3fc5-4a5f-9921-6e56269ec2c9-c000.snappy.parquet","partitionValues":{},"size":815,"modificationTime":1671962472000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":0,\"name\":\"Mario\"},\"maxValues\":{\"id\":0,\"name\":\"Mario\"},\"nullCount\":{\"id\":0,\"name\":0}}","tags":{"INSERTION_TIME":"1671962472000000","MIN_INSERTION_TIME":"1671962472000000","MAX_INSERTION_TIME":"1671962472000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"tableSizeBytes":1026,"numFiles":1,"numMetadata":1,"numProtocol":1,"protocol":{"minReaderVersion":1,"minWriterVersion":4},"metadata":{"id":"abd7ff9d-5fac-48fb-8d13-06c3ce1811ac","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"description":"CDC test Table","delta.enableChangeDataFeed":"true"},"createdTime":1671962429466},"histogramOpt":{"sortedBinBoundaries":[0,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,4194304,8388608,12582912,16777216,20971520,25165824,29360128,33554432,37748736,41943040,50331648,58720256,67108864,75497472,83886080,92274688,100663296,109051904,117440512,125829120,130023424,134217728,138412032,142606336,146800640,150994944,167772160,184549376,201326592,218103808,234881024,251658240,268435456,285212672,301989888,318767104,335544320,352321536,369098752,385875968,402653184,419430400,436207616,452984832,469762048,486539264,503316480,520093696,536870912,553648128,570425344,587202560,603979776,671088640,738197504,805306368,872415232,939524096,1006632960,1073741824,1140850688,1207959552,1275068416,1342177280,1409286144,1476395008,1610612736,1744830464,1879048192,2013265920,2147483648,2415919104,2684354560,2952790016,3221225472,3489660928,3758096384,4026531840,4294967296,8589934592,17179869184,34359738368,68719476736,137438953472,274877906944],"fileCounts":[1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"totalBytes":[1026,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]},"txnId":"a7ce7173-dcd4-4623-a412-d8e2bc76873f","allFiles":[{"path":"part-00000-7444aec4-710a-4a4c-8abe-3323499043e9.c000.snappy.parquet","partitionValues":{},"size":1026,"modificationTime":1671962475000,"dataChange":false,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":0,\"name\":\"Mino\"},\"maxValues\":{\"id\":0,\"name\":\"Mino\"},\"nullCount\":{\"id\":0,\"name\":0}}","tags":{"MAX_INSERTION_TIME":"1671962472000000","INSERTION_TIME":"1671962472000000","MIN_INSERTION_TIME":"1671962472000000","OPTIMIZE_TARGET_SIZE":"268435456"}}]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"commitInfo":{"timestamp":1671962475765,"userId":"722754018832045","userName":"xxxx@gmail.com","operation":"UPDATE","operationParameters":{"predicate":"(id#1188678 = 0)"},"notebook":{"notebookId":"3736423866542999"},"clusterId":"1018-130235-jmbkv29y","readVersion":1,"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"1","numCopiedRows":"0","numAddedChangeFiles":"1","executionTimeMs":"1628","scanTimeMs":"928","numAddedFiles":"1","numUpdatedRows":"1","rewriteTimeMs":"690"},"engineInfo":"Databricks-Runtime/11.3.x-scala2.12","txnId":"a7ce7173-dcd4-4623-a412-d8e2bc76873f"}}
{"remove":{"path":"part-00000-996384f7-3fc5-4a5f-9921-6e56269ec2c9-c000.snappy.parquet","deletionTimestamp":1671962475760,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":815,"tags":{"INSERTION_TIME":"1671962472000000","MIN_INSERTION_TIME":"1671962472000000","MAX_INSERTION_TIME":"1671962472000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"add":{"path":"part-00000-7444aec4-710a-4a4c-8abe-3323499043e9.c000.snappy.parquet","partitionValues":{},"size":1026,"modificationTime":1671962475000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":0,\"name\":\"Mino\"},\"maxValues\":{\"id\":0,\"name\":\"Mino\"},\"nullCount\":{\"id\":0,\"name\":0}}","tags":{"MAX_INSERTION_TIME":"1671962472000000","INSERTION_TIME":"1671962472000000","MIN_INSERTION_TIME":"1671962472000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"cdc":{"path":"_change_data/cdc-00000-a846ce80-2eec-484d-bef7-0e63557786ca.c000.snappy.parquet","partitionValues":{},"size":1155,"dataChange":false}}
Binary file not shown.
Binary file not shown.
14 changes: 14 additions & 0 deletions rust/tests/read_delta_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,3 +582,17 @@ async fn read_empty_folder() {
deltalake::DeltaTableError::NotATable(_),
));
}

#[tokio::test]
async fn read_delta_table_with_cdc() {
let table = deltalake::open_table("./tests/data/simple_table_with_cdc")
.await
.unwrap();
assert_eq!(table.version(), 2);
assert_eq!(
table.get_files(),
vec![Path::from(
"part-00000-7444aec4-710a-4a4c-8abe-3323499043e9.c000.snappy.parquet"
),]
);
}