From 0385521b7edbab1499b2807cab87400246e3eca5 Mon Sep 17 00:00:00 2001 From: Heni FAZZANI Date: Sun, 25 Dec 2022 11:59:38 +0100 Subject: [PATCH 1/2] Add provisional workaround to support CDC #1039 --- python/tests/test_table_read.py | 8 ++++++++ rust/src/action/mod.rs | 18 ++++++++++++++++++ rust/src/action/parquet_read/mod.rs | 12 +++++++++++- rust/src/table_state.rs | 2 ++ ...484d-bef7-0e63557786ca.c000.snappy.parquet | Bin 0 -> 1155 bytes .../_delta_log/00000000000000000000.crc | 1 + .../_delta_log/00000000000000000000.json | 3 +++ .../_delta_log/00000000000000000001.crc | 1 + .../_delta_log/00000000000000000001.json | 2 ++ .../_delta_log/00000000000000000002.crc | 1 + .../_delta_log/00000000000000000002.json | 4 ++++ ...4a4c-8abe-3323499043e9.c000.snappy.parquet | Bin 0 -> 1026 bytes ...4a5f-9921-6e56269ec2c9-c000.snappy.parquet | Bin 0 -> 815 bytes 13 files changed, 51 insertions(+), 1 deletion(-) create mode 100644 rust/tests/data/simple_table_with_cdc/_change_data/cdc-00000-a846ce80-2eec-484d-bef7-0e63557786ca.c000.snappy.parquet create mode 100644 rust/tests/data/simple_table_with_cdc/_delta_log/00000000000000000000.crc create mode 100644 rust/tests/data/simple_table_with_cdc/_delta_log/00000000000000000000.json create mode 100644 rust/tests/data/simple_table_with_cdc/_delta_log/00000000000000000001.crc create mode 100644 rust/tests/data/simple_table_with_cdc/_delta_log/00000000000000000001.json create mode 100644 rust/tests/data/simple_table_with_cdc/_delta_log/00000000000000000002.crc create mode 100644 rust/tests/data/simple_table_with_cdc/_delta_log/00000000000000000002.json create mode 100644 rust/tests/data/simple_table_with_cdc/part-00000-7444aec4-710a-4a4c-8abe-3323499043e9.c000.snappy.parquet create mode 100644 rust/tests/data/simple_table_with_cdc/part-00000-996384f7-3fc5-4a5f-9921-6e56269ec2c9-c000.snappy.parquet diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index 3543a8bc52..6d57b0bf7b 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -237,6 +237,14 @@ def test_read_partitioned_table_protocol(): assert protocol.min_writer_version == 2 +def test_read_table_with_cdc(): + table_path = "../rust/tests/data/simple_table_with_cdc" + dt = DeltaTable(table_path) + df = dt.to_pandas() + assert df.loc[0][0].item() == 0 + assert len(dt.history()) == 3 + + def test_history_partitioned_table_metadata(): table_path = "../rust/tests/data/delta-0.8.0-partitioned" dt = DeltaTable(table_path) diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index 6e72f35f27..433bbcb4c7 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -149,6 +149,22 @@ pub struct StatsParsed { pub null_count: HashMap, } +/// Delta AddCDCFile action that describes a parquet CDC data file. +#[derive(Serialize, Deserialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct AddCDCFile { + /// A relative path, from the root of the table, to a CDC file + pub path: String, + /// The size of this file in bytes + pub size: DeltaDataTypeLong, + /// A map from partition column to value for this file + pub partition_values: HashMap>, + /// Should always be set to false because they do not change the underlying data of the table + pub data_change: bool, + /// Map containing metadata about this file + pub tags: Option>>, +} + /// Delta log action that describes a parquet data file that is part of the table. #[derive(Serialize, Deserialize, Clone, Debug, Default)] #[serde(rename_all = "camelCase")] @@ -395,6 +411,8 @@ pub enum Action { /// Changes the current metadata of the table. Must be present in the first version of a table. /// Subsequent `metaData` actions completely overwrite previous metadata. metaData(MetaData), + /// Adds CDC a file to the table state. + cdc(AddCDCFile), /// Adds a file to the table state. add(Add), /// Removes a file from the table state. diff --git a/rust/src/action/parquet_read/mod.rs b/rust/src/action/parquet_read/mod.rs index 3d0ff935fe..6882ddd04b 100644 --- a/rust/src/action/parquet_read/mod.rs +++ b/rust/src/action/parquet_read/mod.rs @@ -7,7 +7,7 @@ use std::collections::HashMap; use crate::action::{ Action, ActionError, Add, ColumnCountStat, ColumnValueStat, MetaData, Protocol, Remove, Stats, - Txn, + Txn, AddCDCFile }; fn populate_hashmap_with_option_from_parquet_map( @@ -35,6 +35,15 @@ fn gen_action_type_error(action: &str, field: &str, expected_type: &str) -> Acti )) } +impl AddCDCFile { + fn from_parquet_record(_record: &parquet::record::Row) -> Result { + let re = Self { + ..Default::default() + }; + Ok(re) + } +} + impl Add { fn from_parquet_record(record: &parquet::record::Row) -> Result { let mut re = Self { @@ -586,6 +595,7 @@ impl Action { "remove" => Action::remove(Remove::from_parquet_record(col_data)?), "txn" => Action::txn(Txn::from_parquet_record(col_data)?), "protocol" => Action::protocol(Protocol::from_parquet_record(col_data)?), + "cds" => Action::cdc(AddCDCFile::from_parquet_record(col_data)?), name => { return Err(ActionError::InvalidField(format!( "Unexpected action from checkpoint: {}", diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 4d8945ddbe..db9e8e356b 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -283,6 +283,8 @@ impl DeltaTableState { require_files: bool, ) -> Result<(), ApplyLogError> { match action { + action::Action::cdc(_v) => { + } action::Action::add(v) => { if require_files { self.files.push(v.path_decoded()?); diff --git a/rust/tests/data/simple_table_with_cdc/_change_data/cdc-00000-a846ce80-2eec-484d-bef7-0e63557786ca.c000.snappy.parquet b/rust/tests/data/simple_table_with_cdc/_change_data/cdc-00000-a846ce80-2eec-484d-bef7-0e63557786ca.c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..5ba49ec42b33d4f41419f47b25160c89a7d6ec7b GIT binary patch literal 1155 zcma)+zi-n(6vr=)8!`l1MBiacmWV7hYNfu?X{h^o;MY{sLL1dPNGq2)&zf55!$ zrf+TW9)lDH=Ojh4+XON=kX_c}b7WZ)xCmsL))sTSycz5c#tT$dHMNneYbaPrr({*+ zgG6QO&>&aOrM2jU(LW{g;d!GOQS)IFB+(cd&@}p9hFN+nfkAR34Kiwlq*G{u*hwh5 zK{s=wMipg*FR@E_LKMK=qrfL2aF7I(%G4lY!Es zBy14+1J9XD)1T5Ty1b#NeXnb<0Yf{D&{+ff%-8!aSFSt^tlQ7m?h{3ufE?as*6!mc zYfDvy5Ji~)UUSd4`>rxoHp+%cl!Xbx-*F6V<<5`?hSz^%^mxGVxns(&#GWp?rPp;o z@P<~P=k<=wowl#nU+(F`S6s5&7>+U94S1J(7;uMEwnNFqw98A3jQi8btzIWtx{%_*==Yx+a_71 zl-{L(L;ng7;$Ps&lXy}*iO{1Uc=F(zByCsrQV5fIFQ50FH{Y48-+Nx6K#P>qUq9a+ zPZ^Y8n4@`sncD;arZgM?a|SKZ({INg#*u6F6q!cknHLNZE70ro&+kt^X2z6q#RLQW z2GO}gI9lchDCBMFKkpc^btR}ojU4((*K6OFE@DMD+H3(=kkcef*F zM-BwdVRDvm;|0__ zpjlSpn-zNlxj76ZzxpV4?!4G|0Nj{>eAySy!IRF$a)SeKJ^{RzVdM=1ex+vBEF1Xz z1QG4~79Q08NXAw;d}9q{EL;%_{#`pX)mfM=Gm68J6PpWW&y&6znXRouQ=K1v@?1=3 zm>0ydFGEZOBi|QY9|5eC3+9=q**ZKrS~xFGG|tO$6))&tzIyz^Tr_bxl~+f=Z%ufS z+WAn#as%C0j&5ewEDpj_=4Gj3RjQV~s7&gHp0gV*+O~bSG}j$@zFU&bCE+YPwQjxV pG?#3#+;tljyJFYeUZdKqR-I%70M_yE{|AhP^7Q}! literal 0 HcmV?d00001 diff --git a/rust/tests/data/simple_table_with_cdc/part-00000-996384f7-3fc5-4a5f-9921-6e56269ec2c9-c000.snappy.parquet b/rust/tests/data/simple_table_with_cdc/part-00000-996384f7-3fc5-4a5f-9921-6e56269ec2c9-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..4970368b358ca3597dec6359cc5c6e4360894ae0 GIT binary patch literal 815 zcmah|&59F25bo(s)CANEu#=hW%tTp2 zUO+r}P`vmQK7%JOK81Hro;+CdSFy$_SwU$C)+jhxxLP1eTPAd> zl0!fsO{WfVvKwT^4{FI$#3!#3_qALIQi0$eX4v--sllZZiiO-mk9B}ktm#_QRADTw zzl&5YgAq(Iy+aR8bzF>7oD}Wn!xbL%1X(~0B!3iAr(YDpkqH88B~#uf=XyjP(;1hu zmj>7hO{IU4?*j(D?+WQ4`d*>fROv^h4k}+TEsEz{ECndzxk#S+3wa!cl3lu=x;LLq z?jojdfdV-f?%6|UG8{037~2B8k#XWjA**(Ims^M(Y$1}s^p-DVDm;-2 z_E}jQ>Df{mBO^)Um75w(W9G}iON`O;wUI3wQoaY_66S}goXZ#j;VKBkGynip%BJy; z)EKQdo920O&T(E2UHowVzv`D8#IA3tC>C)~q;dj#%XV>GifS$5M!AJXo3}gMYGv}K zvG1NFEz7cQHTI_~KkypT?hAM5cBj3WYxgZNoO*+{)wa6cY|ybg9oL@OJt=z!S`(gF N_R2oNBRPVv@)w^D#D4$) literal 0 HcmV?d00001 From 87db4648739ce2ef891e74a3999c07c99a69fc6b Mon Sep 17 00:00:00 2001 From: Heni FAZZANI Date: Sun, 25 Dec 2022 16:24:53 +0100 Subject: [PATCH 2/2] Fix pipelines --- python/tests/test_table_read.py | 7 ++++--- rust/src/action/mod.rs | 3 ++- rust/src/action/parquet_read/mod.rs | 6 +++--- rust/src/table_state.rs | 4 ++-- rust/tests/read_delta_test.rs | 14 ++++++++++++++ 5 files changed, 25 insertions(+), 9 deletions(-) diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index 6d57b0bf7b..2c8b085a6b 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -240,9 +240,10 @@ def test_read_partitioned_table_protocol(): def test_read_table_with_cdc(): table_path = "../rust/tests/data/simple_table_with_cdc" dt = DeltaTable(table_path) - df = dt.to_pandas() - assert df.loc[0][0].item() == 0 - assert len(dt.history()) == 3 + assert dt.to_pyarrow_table().to_pydict() == { + "id": [0], + "name": ["Mino"], + } def test_history_partitioned_table_metadata(): diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index 433bbcb4c7..3e7b62fffb 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -153,7 +153,8 @@ pub struct StatsParsed { #[derive(Serialize, Deserialize, Clone, Debug, Default)] #[serde(rename_all = "camelCase")] pub struct AddCDCFile { - /// A relative path, from the root of the table, to a CDC file + /// A relative path, from the root of the table, or an + /// absolute path to a CDC file pub path: String, /// The size of this file in bytes pub size: DeltaDataTypeLong, diff --git a/rust/src/action/parquet_read/mod.rs b/rust/src/action/parquet_read/mod.rs index 6882ddd04b..e12ddca2ef 100644 --- a/rust/src/action/parquet_read/mod.rs +++ b/rust/src/action/parquet_read/mod.rs @@ -6,8 +6,8 @@ use serde_json::json; use std::collections::HashMap; use crate::action::{ - Action, ActionError, Add, ColumnCountStat, ColumnValueStat, MetaData, Protocol, Remove, Stats, - Txn, AddCDCFile + Action, ActionError, Add, AddCDCFile, ColumnCountStat, ColumnValueStat, MetaData, Protocol, + Remove, Stats, Txn, }; fn populate_hashmap_with_option_from_parquet_map( @@ -595,7 +595,7 @@ impl Action { "remove" => Action::remove(Remove::from_parquet_record(col_data)?), "txn" => Action::txn(Txn::from_parquet_record(col_data)?), "protocol" => Action::protocol(Protocol::from_parquet_record(col_data)?), - "cds" => Action::cdc(AddCDCFile::from_parquet_record(col_data)?), + "cdc" => Action::cdc(AddCDCFile::from_parquet_record(col_data)?), name => { return Err(ActionError::InvalidField(format!( "Unexpected action from checkpoint: {}", diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index db9e8e356b..0d39926655 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -283,8 +283,8 @@ impl DeltaTableState { require_files: bool, ) -> Result<(), ApplyLogError> { match action { - action::Action::cdc(_v) => { - } + // TODO: optionally load CDC into TableState + action::Action::cdc(_v) => {} action::Action::add(v) => { if require_files { self.files.push(v.path_decoded()?); diff --git a/rust/tests/read_delta_test.rs b/rust/tests/read_delta_test.rs index 3415fb0157..6958e71b77 100644 --- a/rust/tests/read_delta_test.rs +++ b/rust/tests/read_delta_test.rs @@ -582,3 +582,17 @@ async fn read_empty_folder() { deltalake::DeltaTableError::NotATable(_), )); } + +#[tokio::test] +async fn read_delta_table_with_cdc() { + let table = deltalake::open_table("./tests/data/simple_table_with_cdc") + .await + .unwrap(); + assert_eq!(table.version(), 2); + assert_eq!( + table.get_files(), + vec![Path::from( + "part-00000-7444aec4-710a-4a4c-8abe-3323499043e9.c000.snappy.parquet" + ),] + ); +}