From 5131708ead4ce08807036a31fd8bd3f2f608f2ae Mon Sep 17 00:00:00 2001 From: xianwill Date: Thu, 5 Aug 2021 13:01:29 -0400 Subject: [PATCH 1/5] Fix stats calculation --- Cargo.lock | 128 +++++++++++----------- Cargo.toml | 10 +- src/deltalake_ext.rs | 77 +++++++------ src/lib.rs | 2 +- src/transforms.rs | 8 +- tests/checkpoint_bug_tests.rs | 178 +++++++++++++++++++++++++++++++ tests/json/web_requests-100.json | 100 +++++++++++++++++ 7 files changed, 388 insertions(+), 115 deletions(-) create mode 100644 tests/checkpoint_bug_tests.rs create mode 100644 tests/json/web_requests-100.json diff --git a/Cargo.lock b/Cargo.lock index 1dc46f6..1ccc950 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,9 +19,9 @@ dependencies = [ [[package]] name = "alloc-no-stdlib" -version = "2.0.3" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35ef4730490ad1c4eae5c4325b2a95f521d023e5c885853ff7aca0a6a1631db3" +checksum = "5192ec435945d87bc2f70992b4d818154b5feede43c09fb7592146374eac90a6" [[package]] name = "alloc-stdlib" @@ -56,7 +56,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "arrow" version = "6.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow-rs?rev=6bf1988852f87da21a163203eec4c83a7b692901#6bf1988852f87da21a163203eec4c83a7b692901" +source = "git+https://github.com/apache/arrow-rs?rev=fa5acd971c973161f17e69d5c6b50d6e77c7da03#fa5acd971c973161f17e69d5c6b50d6e77c7da03" dependencies = [ "bitflags", "chrono", @@ -77,9 +77,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.51" +version = "0.1.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e" +checksum = "0b98e84bbb4cbcdd97da190ba0c58a1bb0de2c1fdf67d159e192ed766aeca722" dependencies = [ "proc-macro2", "quote", @@ -138,9 +138,9 @@ dependencies = [ [[package]] name = "brotli" -version = "3.3.2" +version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71cb90ade945043d3d53597b2fc359bb063db8ade2bcffe7997351d0756e9d50" +checksum = "7f29919120f08613aadcd4383764e00526fc9f18b6c0895814faeed0dd78613e" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -149,9 +149,9 @@ dependencies = [ [[package]] name = "brotli-decompressor" -version = "2.3.2" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59ad2d4653bf5ca36ae797b1f4bb4dbddb60ce49ca4aed8a2ce4829f60425b80" +checksum = "1052e1c3b8d4d80eb84a8b94f0a1498797b5fb96314c001156a1c761940ef4ec" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -374,7 +374,7 @@ dependencies = [ [[package]] name = "deltalake" version = "0.4.1" -source = "git+https://github.com/delta-io/delta-rs.git?branch=writer-map-support#56235b1eae1de7ea75023622dc6438ced9890ec5" +source = "git+https://github.com/xianwill/delta-rs.git?branch=checkpoint_and_map_fixes#f8f077a21fe23dc2261d5f47fbb7a1087d60f0ac" dependencies = [ "anyhow", "arrow", @@ -572,9 +572,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.16" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1adc00f486adfc9ce99f77d717836f0c5aa84965eb0b4f051f4e83f7cab53f8b" +checksum = "0e7e43a803dae2fa37c1f6a8fe121e1f7bf9548b4dfc0522a42f34145dadfc27" dependencies = [ "futures-channel", "futures-core", @@ -587,9 +587,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.16" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74ed2411805f6e4e3d9bc904c95d5d423b89b3b25dc0250aa74729de20629ff9" +checksum = "e682a68b29a882df0545c143dc3646daefe80ba479bcdede94d5a703de2871e2" dependencies = [ "futures-core", "futures-sink", @@ -597,15 +597,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.16" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af51b1b4a7fdff033703db39de8802c673eb91855f2e0d47dcf3bf2c0ef01f99" +checksum = "0402f765d8a89a26043b889b26ce3c4679d268fa6bb22cd7c6aad98340e179d1" [[package]] name = "futures-executor" -version = "0.3.16" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d0d535a57b87e1ae31437b892713aee90cd2d7b0ee48727cd11fc72ef54761c" +checksum = "badaa6a909fac9e7236d0620a2f57f7664640c56575b71a7552fbd68deafab79" dependencies = [ "futures-core", "futures-task", @@ -614,15 +614,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.16" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b0e06c393068f3a6ef246c75cdca793d6a46347e75286933e5e75fd2fd11582" +checksum = "acc499defb3b348f8d8f3f66415835a9131856ff7714bf10dadfc4ec4bdb29a1" [[package]] name = "futures-macro" -version = "0.3.16" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c54913bae956fb8df7f4dc6fc90362aa72e69148e3f39041fbe8742d21e0ac57" +checksum = "a4c40298486cdf52cc00cd6d6987892ba502c7656a16a4192a9992b1ccedd121" dependencies = [ "autocfg", "proc-macro-hack", @@ -633,21 +633,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.16" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0f30aaa67363d119812743aa5f33c201a7a66329f97d1a887022971feea4b53" +checksum = "a57bead0ceff0d6dde8f465ecd96c9338121bb7717d3e7b108059531870c4282" [[package]] name = "futures-task" -version = "0.3.16" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbe54a98670017f3be909561f6ad13e810d9a51f3f061b902062ca3da80799f2" +checksum = "8a16bef9fc1a4dddb5bee51c989e3fbba26569cbb0e31f5b303c184e3dd33dae" [[package]] name = "futures-util" -version = "0.3.16" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67eb846bfd58e44a8481a00049e82c43e0ccb5d61f8dc071057cb19249dd4d78" +checksum = "feb5c238d27e2bf94ffdfd27b2c29e3df4a68c4193bb6427384259e2bf191967" dependencies = [ "autocfg", "futures-channel", @@ -753,9 +753,9 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.3" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "399c583b2979440c60be0821a6199eca73bc3c8dcd9d070d75ac726e2c6186e5" +checksum = "60daa14be0e0786db0f03a9e57cb404c9d756eed2b6c62b9ea98ec5743ec75a9" dependencies = [ "bytes", "http", @@ -872,9 +872,9 @@ dependencies = [ [[package]] name = "jobserver" -version = "0.1.23" +version = "0.1.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5ca711fd837261e14ec9e674f092cbb931d3fa1482b017ae59328ddc6f3212b" +checksum = "972f5ae5d1cb9c6ae417789196c803205313edde988685da5e3aae0827b9e7fd" dependencies = [ "libc", ] @@ -1240,9 +1240,9 @@ dependencies = [ [[package]] name = "num_enum" -version = "0.5.3" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee2c8fd66061a707503d515639b8af10fd3807a5b5ee6959f7ff1bd303634bd5" +checksum = "e5adf0198d427ee515335639f275e806ca01acf9f07d7cf14bb36a10532a6169" dependencies = [ "derivative", "num_enum_derive", @@ -1250,9 +1250,9 @@ dependencies = [ [[package]] name = "num_enum_derive" -version = "0.5.3" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "474fd1d096da3ad17084694eebed40ba09c4a36c5255cd772bd8b98859cc562e" +checksum = "b1def5a3f69d4707d8a040b12785b98029a39e8c610ae685c7f6265669767482" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1374,7 +1374,7 @@ dependencies = [ "cfg-if 1.0.0", "instant", "libc", - "redox_syscall 0.2.10", + "redox_syscall 0.2.9", "smallvec", "winapi", ] @@ -1382,7 +1382,7 @@ dependencies = [ [[package]] name = "parquet" version = "6.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow-rs?rev=6bf1988852f87da21a163203eec4c83a7b692901#6bf1988852f87da21a163203eec4c83a7b692901" +source = "git+https://github.com/apache/arrow-rs?rev=fa5acd971c973161f17e69d5c6b50d6e77c7da03#fa5acd971c973161f17e69d5c6b50d6e77c7da03" dependencies = [ "arrow", "base64", @@ -1582,9 +1582,9 @@ checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" [[package]] name = "redox_syscall" -version = "0.2.10" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff" +checksum = "5ab49abadf3f9e1c4bc499e8845e152ad87d2ad2d30371841171169e9d75feee" dependencies = [ "bitflags", ] @@ -1596,7 +1596,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "528532f3d801c87aec9def2add9ca802fe569e44a544afe633765267840abe64" dependencies = [ "getrandom", - "redox_syscall 0.2.10", + "redox_syscall 0.2.9", ] [[package]] @@ -1812,18 +1812,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.127" +version = "1.0.126" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f03b9878abf6d14e6779d3f24f07b2cfa90352cfec4acc5aab8f1ac7f146fae8" +checksum = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.127" +version = "1.0.126" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a024926d3432516606328597e0f224a51355a493b49fdd67e9209187cbe55ecc" +checksum = "963a7dbc9895aeac7ac90e74f34a5d5261828f79df35cbed41e10189d3804d43" dependencies = [ "proc-macro2", "quote", @@ -1832,9 +1832,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.66" +version = "1.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "336b10da19a12ad094b59d870ebde26a45402e5b470add4b5fd03c5048a32127" +checksum = "799e97dc9fdae36a5c8b8f2cae9ce2ee9fdce2058c57a93e6099d919fd982f79" dependencies = [ "indexmap", "itoa", @@ -1912,9 +1912,9 @@ dependencies = [ [[package]] name = "slab" -version = "0.4.4" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c307a32c1c5c437f38c7fd45d753050587732ba8628319fbdf12a7e289ccc590" +checksum = "f173ac3d1a7e3b28003f40de0b5ce7fe2710f9b9dc3fc38664cebee46b3b6527" [[package]] name = "slug" @@ -1939,9 +1939,9 @@ checksum = "45456094d1983e2ee2a18fdfebce3189fa451699d0502cb8e3b49dba5ba41451" [[package]] name = "socket2" -version = "0.4.1" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "765f090f0e423d2b55843402a07915add955e7d60657db13707a159727326cad" +checksum = "9e3dfc207c526015c632472a77be09cf1b6e46866581aecae5cc38fb4235dea2" dependencies = [ "libc", "winapi", @@ -2067,7 +2067,7 @@ dependencies = [ "cfg-if 1.0.0", "libc", "rand", - "redox_syscall 0.2.10", + "redox_syscall 0.2.9", "remove_dir_all", "winapi", ] @@ -2421,9 +2421,9 @@ checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" [[package]] name = "wasm-bindgen" -version = "0.2.75" +version = "0.2.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b608ecc8f4198fe8680e2ed18eccab5f0cd4caaf3d83516fa5fb2e927fda2586" +checksum = "d54ee1d4ed486f78874278e63e4069fc1ab9f6a18ca492076ffb90c5eb2997fd" dependencies = [ "cfg-if 1.0.0", "wasm-bindgen-macro", @@ -2431,9 +2431,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.75" +version = "0.2.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "580aa3a91a63d23aac5b6b267e2d13cb4f363e31dce6c352fca4752ae12e479f" +checksum = "3b33f6a0694ccfea53d94db8b2ed1c3a8a4c86dd936b13b9f0a15ec4a451b900" dependencies = [ "bumpalo", "lazy_static", @@ -2446,9 +2446,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.75" +version = "0.2.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "171ebf0ed9e1458810dfcb31f2e766ad6b3a89dbda42d8901f2b268277e5f09c" +checksum = "088169ca61430fe1e58b8096c24975251700e7b1f6fd91cc9d59b04fb9b18bd4" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2456,9 +2456,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.75" +version = "0.2.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c2657dd393f03aa2a659c25c6ae18a13a4048cebd220e147933ea837efc589f" +checksum = "be2241542ff3d9f241f5e2cb6dd09b37efe786df8851c54957683a49f0987a97" dependencies = [ "proc-macro2", "quote", @@ -2469,9 +2469,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.75" +version = "0.2.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e0c4a743a309662d45f4ede961d7afa4ba4131a59a639f29b0069c3798bbcc2" +checksum = "d7cff876b8f18eed75a66cf49b65e7f967cb354a7aa16003fb55dbfd25b44b4f" [[package]] name = "winapi" @@ -2506,9 +2506,9 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "xml-rs" -version = "0.8.4" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3" +checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a" [[package]] name = "zeroize" diff --git a/Cargo.toml b/Cargo.toml index 792cd17..842a113 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,12 +31,14 @@ tokio-stream = { version = "0", features = ["fs"] } tokio-util = "0.6.3" uuid = { version = "0.8", features = ["serde", "v4"] } -arrow = { git = "https://github.com/apache/arrow-rs", rev = "6bf1988852f87da21a163203eec4c83a7b692901" } -parquet = { git = "https://github.com/apache/arrow-rs", rev = "6bf1988852f87da21a163203eec4c83a7b692901" } -deltalake = { git = "https://github.com/delta-io/delta-rs.git", branch = "writer-map-support", features = ["s3"] } +arrow = { git = "https://github.com/apache/arrow-rs", rev = "fa5acd971c973161f17e69d5c6b50d6e77c7da03" } +parquet = { git = "https://github.com/apache/arrow-rs", rev = "fa5acd971c973161f17e69d5c6b50d6e77c7da03" } + +deltalake = { git = "https://github.com/xianwill/delta-rs.git", branch = "checkpoint_and_map_fixes", features = ["s3"] } [dev-dependencies] utime = "0.3" serial_test = "*" tempfile = "3" -parquet = { git = "https://github.com/apache/arrow-rs", rev = "6bf1988852f87da21a163203eec4c83a7b692901", features = ["cli"] } +parquet = { git = "https://github.com/apache/arrow-rs", rev = "fa5acd971c973161f17e69d5c6b50d6e77c7da03", features = ["cli"] } + diff --git a/src/deltalake_ext.rs b/src/deltalake_ext.rs index 0b56dba..48c0bc6 100644 --- a/src/deltalake_ext.rs +++ b/src/deltalake_ext.rs @@ -10,11 +10,11 @@ use arrow::{ json::reader::Decoder, record_batch::*, }; -use chrono::prelude::*; use deltalake::{ action::{Add, ColumnCountStat, ColumnValueStat, Stats}, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable, DeltaTableError, DeltaTransactionError, Schema, StorageBackend, StorageError, UriError, + writer::time_utils::timestamp_to_delta_stats_string }; use log::debug; use parquet::{ @@ -34,8 +34,8 @@ use std::convert::TryFrom; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use uuid::Uuid; +use parquet::basic::TimestampType; -const NANOSECONDS: i64 = 1_000_000_000; const NULL_PARTITION_VALUE_DATA_PATH: &str = "__HIVE_DEFAULT_PARTITION__"; type MinAndMaxValues = ( @@ -139,7 +139,7 @@ impl DeltaArrowWriter { self.arrow_writer.write(&record_batch)?; self.buffered_record_batch_count += 1; - apply_null_counts(&record_batch.into(), &mut self.null_counts); + apply_null_counts(partition_columns, &record_batch.into(), &mut self.null_counts, 0); Ok(()) } } @@ -416,6 +416,7 @@ pub fn record_batch_from_json( } fn min_max_values_from_file_metadata( + partition_values: &HashMap>, file_metadata: &FileMetaData, ) -> Result { let type_ptr = parquet::schema::types::from_thrift(file_metadata.schema.as_slice()); @@ -442,14 +443,21 @@ fn min_max_values_from_file_metadata( continue; } + let column_path = column_descr.path(); + let column_path_parts = column_path.parts(); + + // Do not include partition columns in statistics + if partition_values.contains_key(&column_path_parts[0]) { + continue; + } + let statistics: Vec<&Statistics> = row_group_metadata .iter() - .filter_map(|g| g.column(i).statistics()) + .filter_map(|g| + g.column(i).statistics() + ) .collect(); - let column_path = column_descr.path(); - let column_path_parts = column_path.parts(); - let _ = apply_min_max_for_column( statistics.as_slice(), column_descr.clone(), @@ -462,7 +470,7 @@ fn min_max_values_from_file_metadata( Ok((min_values, max_values)) } -fn apply_null_counts(array: &StructArray, null_counts: &mut HashMap) { +fn apply_null_counts(partition_columns: &[String], array: &StructArray, null_counts: &mut HashMap, nest_level: i32) { let fields = match array.data_type() { DataType::Struct(fields) => fields, _ => unreachable!(), @@ -475,6 +483,11 @@ fn apply_null_counts(array: &StructArray, null_counts: &mut HashMap { @@ -484,7 +497,7 @@ fn apply_null_counts(array: &StructArray, null_counts: &mut HashMap { - apply_null_counts(as_struct_array(column), map); + apply_null_counts(partition_columns, as_struct_array(column), map, nest_level + 1); } _ => unreachable!(), } @@ -643,26 +656,26 @@ fn min_and_max_from_parquet_statistics( Ok((min, max)) } - DataType::Int64 if is_timestamp(column_descr.logical_type()) => { - let min_array = as_primitive_array::(&min_array); - let min = arrow::compute::min(min_array); - let min = min.map(|i| timestamp_ns_to_json_value(i)).flatten(); - - let max_array = as_primitive_array::(&max_array); - let max = arrow::compute::max(max_array); - let max = max.map(|i| timestamp_ns_to_json_value(i)).flatten(); - Ok((min, max)) - } DataType::Int64 => { let min_array = as_primitive_array::(&min_array); let min = arrow::compute::min(min_array); - let min = min.map(|i| Value::Number(Number::from(i))); - let max_array = as_primitive_array::(&max_array); let max = arrow::compute::max(max_array); - let max = max.map(|i| Value::Number(Number::from(i))); - Ok((min, max)) + match column_descr.logical_type().as_ref() { + Some(LogicalType::TIMESTAMP(TimestampType { unit, .. })) => { + let min = min.map(|n| Value::String(timestamp_to_delta_stats_string(n, &unit))); + let max = max.map(|n| Value::String(timestamp_to_delta_stats_string(n, &unit))); + + Ok((min, max)) + } + _ => { + let min = min.map(|i| Value::Number(Number::from(i))); + let max = max.map(|i| Value::Number(Number::from(i))); + + Ok((min, max)) + } + } } DataType::Float32 => { let min_array = as_primitive_array::(&min_array); @@ -706,14 +719,6 @@ fn is_utf8(opt: Option) -> bool { } } -#[inline] -fn is_timestamp(opt: Option) -> bool { - match opt.as_ref() { - Some(LogicalType::TIMESTAMP(_)) => true, - _ => false, - } -} - fn min_max_strings_from_stats( stats_with_min_max: &Vec<&Statistics>, ) -> (Option, Option) { @@ -736,14 +741,6 @@ fn min_max_strings_from_stats( return (min_value, max_value); } -#[inline] -pub fn timestamp_ns_to_json_value(ns: i64) -> Option { - match Utc.timestamp_opt(ns / NANOSECONDS, (ns % NANOSECONDS) as u32) { - chrono::offset::LocalResult::Single(dt) => Some(Value::String(format!("{:?}", dt))), - _ => None, - } -} - fn arrow_array_from_bytes( data_type: DataType, capacity: usize, @@ -771,7 +768,7 @@ fn create_add( size: i64, file_metadata: &FileMetaData, ) -> Result { - let (min_values, max_values) = min_max_values_from_file_metadata(file_metadata)?; + let (min_values, max_values) = min_max_values_from_file_metadata(partition_values, file_metadata)?; let stats = Stats { num_records: file_metadata.num_rows, diff --git a/src/lib.rs b/src/lib.rs index 83ed51f..e3b29d3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,7 @@ use tokio_util::sync::CancellationToken; pub mod deltalake_ext; pub mod instrumentation; -mod transforms; +pub mod transforms; use crate::transforms::*; use crate::{ diff --git a/src/transforms.rs b/src/transforms.rs index 5f3bb55..94730f8 100644 --- a/src/transforms.rs +++ b/src/transforms.rs @@ -373,15 +373,11 @@ fn timestamp_value_from_kafka( match kafka_timestamp { rdkafka::Timestamp::NotAvailable => serde_json::to_value(None as Option), // Convert to milliseconds to microseconds for delta format - rdkafka::Timestamp::CreateTime(ms) => serde_json::to_value(millis_to_nanos(ms)), - rdkafka::Timestamp::LogAppendTime(ms) => serde_json::to_value(millis_to_nanos(ms)), + rdkafka::Timestamp::CreateTime(ms) => serde_json::to_value(ms * 1000), + rdkafka::Timestamp::LogAppendTime(ms) => serde_json::to_value(ms * 1000), } } -fn millis_to_nanos(ms: i64) -> i64 { - ms * 1_000_000 -} - #[cfg(test)] mod tests { use rdkafka::message::OwnedMessage; diff --git a/tests/checkpoint_bug_tests.rs b/tests/checkpoint_bug_tests.rs new file mode 100644 index 0000000..0528be7 --- /dev/null +++ b/tests/checkpoint_bug_tests.rs @@ -0,0 +1,178 @@ +extern crate kafka_delta_ingest; + +#[allow(dead_code)] +mod helpers; + +use deltalake::action::Action; +use deltalake::delta_arrow::*; +use deltalake::checkpoints::CheckPointWriter; +use dipstick::{Input, Prefixed, Statsd}; +use kafka_delta_ingest::{ + deltalake_ext::*, + instrumentation::StatsHandler, + KafkaJsonToDelta, + Options, + transforms::Transformer +}; +use log::debug; +use parquet::{ + file::reader::{FileReader, SerializedFileReader}, + record::RowAccessor, +}; +use rdkafka::{producer::FutureProducer, producer::FutureRecord, util::Timeout, ClientConfig}; +use serde_json::{json, Value}; +use std::env; +use std::fs::File; +use std::io::{BufRead, BufReader}; +use std::sync::Once; +use std::{collections::HashMap, fs, path::PathBuf}; +use tokio_util::sync::CancellationToken; +use uuid::Uuid; + +// const TEST_TABLE_URI: &str = "tests/temp/checkpoint_bug"; +const TEST_TABLE_URI: &str = "tests/temp/timestamp_bug"; + +#[tokio::test] +async fn checkpoint_debugging_test() { + let test_json_uri = "tests/json/web_requests-100.json"; + + cleanup_delta_files(TEST_TABLE_URI); + + let mut transforms = HashMap::new(); + transforms.insert( + "date".to_string(), + "substr(meta.producer.timestamp, `0`, `10`)".to_string(), + ); + transforms.insert("meta.kafka.topic".to_string(), "kafka.topic".to_string()); + transforms.insert( + "meta.kafka.offset".to_string(), + "kafka.offset".to_string(), + ); + transforms.insert( + "meta.kafka.partition".to_string(), + "kafka.partition".to_string(), + ); + transforms.insert( + "meta.kafka.timestamp".to_string(), + "kafka.timestamp".to_string(), + ); + transforms.insert( + "meta.kafka.timestamp_type".to_string(), + "kafka.timestamp_type".to_string(), + ); + let transformer = Transformer::from_transforms(&transforms).unwrap(); + + let s = std::fs::read_to_string(test_json_uri).unwrap(); + let mut values: Vec = s.lines().map(|l| { + serde_json::from_str(l).unwrap() + }).collect(); + + for (i, v) in values.iter_mut().enumerate() { + let message = rdkafka::message::OwnedMessage::new( + Some(v.to_string().into_bytes()), + None, + "test".to_string(), + rdkafka::Timestamp::CreateTime(chrono::Utc::now().timestamp_millis()), + 0, + i as i64, + None + ); + transformer.transform(v, &message).unwrap(); + } + + let chunks = values.chunks(10); + + let mut writer = DeltaWriter::for_table_path(TEST_TABLE_URI).await.unwrap(); + + for chunk in chunks { + writer.write(chunk.into()).await.unwrap(); + let adds = writer.write_parquet_files().await.unwrap(); + let mut tx = writer.table.create_transaction(None); + tx.add_actions(adds.iter().map(|a| deltalake::action::Action::add(a.to_owned())).collect()); + let prepared = tx.prepare_commit(None).await.unwrap(); + let version = writer.table_version() + 1; + let res = writer.table.try_commit_transaction(&prepared, version).await; + } + + let checkpoint_writer = CheckPointWriter::new_for_table_uri(TEST_TABLE_URI).unwrap(); + checkpoint_writer.create_checkpoint_from_state(10, writer.table.get_state()).await.unwrap(); + + // ROUND 2 + + let chunks = values.chunks(10); + + let mut writer = DeltaWriter::for_table_path(TEST_TABLE_URI).await.unwrap(); + + for chunk in chunks { + writer.write(chunk.into()).await.unwrap(); + let adds = writer.write_parquet_files().await.unwrap(); + let mut tx = writer.table.create_transaction(None); + tx.add_actions(adds.iter().map(|a| deltalake::action::Action::add(a.to_owned())).collect()); + let prepared = tx.prepare_commit(None).await.unwrap(); + let version = writer.table_version() + 1; + let res = writer.table.try_commit_transaction(&prepared, version).await; + } + + let checkpoint_writer = CheckPointWriter::new_for_table_uri(TEST_TABLE_URI).unwrap(); + checkpoint_writer.create_checkpoint_from_state(20, writer.table.get_state()).await.unwrap(); +} + +#[test] +fn checkpoint_parquet_test() { + let checkpoint_file = format!("{}/_delta_log/00000000000000000010.checkpoint.parquet", TEST_TABLE_URI); + let p = SerializedFileReader::new(File::open(checkpoint_file).unwrap()).unwrap(); + let row_group = p.metadata().row_group(0); + let schema_descriptor = row_group.schema_descr(); + let root_schema = schema_descriptor.root_schema(); + + // println!("{:#?}", schema_descriptor); + // println!("{:#?}", root_schema); + + for r in p.get_row_iter(None).unwrap() { + println!("{:#?}", r.to_json_value()); + } +} + +fn cleanup_delta_files(table_location: &str) { + let table_path = PathBuf::from(table_location); + + let log_dir = table_path.join("_delta_log"); + + let paths = fs::read_dir(log_dir.as_path()).unwrap(); + + for p in paths { + match p { + Ok(d) => { + let path = d.path(); + + if let Some(extension) = path.extension() { + // Keep the staged log entry that contains the metadata with schemaString action, but delete all the rest + if (extension == "json" && path.file_stem().unwrap() != "00000000000000000000") || extension == "parquet" { + fs::remove_file(path).unwrap(); + } + } else if path.file_name().unwrap() == "_last_checkpoint" { + fs::remove_file(path).unwrap() + } + } + _ => {} + } + } + + let paths = fs::read_dir(table_path.as_path()).unwrap(); + + for p in paths { + match p { + Ok(d) => { + let path = d.path(); + if path.is_dir() && path.to_str().unwrap().contains("=") { + fs::remove_dir_all(path).unwrap(); + } else if let Some(extension) = path.extension() { + if extension == "parquet" { + fs::remove_file(path).unwrap(); + } + } + } + _ => {} + } + } +} \ No newline at end of file diff --git a/tests/json/web_requests-100.json b/tests/json/web_requests-100.json new file mode 100644 index 0000000..dd5b984 --- /dev/null +++ b/tests/json/web_requests-100.json @@ -0,0 +1,100 @@ +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.321710+00:00"}},"method":"DELETE","session_id":"7c28bcf9-be26-4d0b-931a-3374ab4bb458","status":204,"url":"http://www.youku.com","uuid":"831c6afa-375c-4988-b248-096f9ed101f8"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.321893+00:00"}},"method":"DELETE","session_id":"fe69d619-d708-45e6-b750-f437155736ff","status":401,"url":"http://www.google.co.uk","uuid":"e2fcb261-a668-4c87-a574-bc1f75ad3c0a"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.321946+00:00"}},"method":"DELETE","session_id":"bb5dd3c6-de4e-48c4-a14c-bed2fa48465e","status":400,"url":"http://www.imgur.com","uuid":"e4638297-c59a-4d66-b6df-90d6bb7c6971"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.321984+00:00"}},"method":"GET","session_id":"650c035c-7df3-435e-97f6-ead20f0e6bb7","status":200,"url":"http://www.stackoverflow.com","uuid":"28261ac8-d970-48fe-9244-0ade81c6651f"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322022+00:00"}},"method":"POST","session_id":"cb960654-43de-4127-a5f9-4fa5d2f2aa58","status":302,"url":"http://www.live.com","uuid":"f1c82dd5-7240-4d9a-8504-9b8bd6fb09cc"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322076+00:00"}},"method":"HEAD","session_id":"8b37aa00-048a-4682-a09b-fa1ff983a40b","status":302,"url":"http://www.popads.net","uuid":"4ba912c4-f466-4e3e-a3e6-30721d155bd5"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322114+00:00"}},"method":"GET","session_id":"9f545131-46b6-40dc-8f4e-99f8267651d2","status":400,"url":"http://www.google.co.uk","uuid":"08df56af-600d-4be3-a839-1252246b8170"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322151+00:00"}},"method":"DELETE","session_id":"bc0916f8-3dff-427f-8fdd-14ac4dab4e52","status":500,"url":"http://www.hao123.com","uuid":"2fbb8242-d87a-4532-92d5-1d9af720a959"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322183+00:00"}},"method":"DELETE","session_id":"665a4256-5ea8-4a05-822a-8fde4237554f","status":302,"url":"http://www.amazon.co.uk","uuid":"ce22b493-7ce5-4bdc-8600-5f1e27f3285d"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322215+00:00"}},"method":"HEAD","session_id":"2ff2d494-eb00-4466-8572-c290a5c3bef5","status":200,"url":"http://www.google.fr","uuid":"7a675957-ad0c-4d7e-8e3c-e1ce6ca80ed8"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322263+00:00"}},"method":"POST","session_id":"08ddf8ec-754b-4727-bac1-c07d742754ee","status":404,"url":"http://www.taobao.com","uuid":"b3d7b551-c6b1-43e9-a433-28f8869450c3"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322323+00:00"}},"method":"PUT","session_id":"29fecdf7-64a6-40fe-8d44-bb726470ccbb","status":202,"url":"http://www.amazon.com","uuid":"6a76b119-6026-43cd-ad08-2320a69b0b62"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322361+00:00"}},"method":"POST","session_id":"baa886c8-10e8-4817-ac93-d13c3a29db50","status":500,"url":"http://www.popads.net","uuid":"5805b7c1-fa1f-41e9-b88f-64718cb56d6b"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322395+00:00"}},"method":"DELETE","session_id":"4676dbc8-e0bd-4e0e-a68e-6d5c6bf2d66f","status":202,"url":"http://www.wordpress.com","uuid":"6044d6fb-9dc7-437c-adbe-74947cc4464d"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322434+00:00"}},"method":"POST","session_id":"bd3524e9-9b8a-49ff-8901-ff4442338705","status":204,"url":"http://www.dropbox.com","uuid":"b074a467-9b29-4715-a7e5-86bfeb90dab0"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322475+00:00"}},"method":"PUT","session_id":"1de80f6e-0cd2-4d76-927d-02b44a17395a","status":201,"url":"http://www.tmall.com","uuid":"b0bf594c-c8fa-4f4f-a446-00e38cc44765"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322530+00:00"}},"method":"DELETE","session_id":"aee04338-ba80-41a9-8caf-427a272a660c","status":200,"url":"http://www.imdb.com","uuid":"e7de57c7-bb8b-4b59-8b40-7f96b42b454f"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322566+00:00"}},"method":"POST","session_id":"7f895906-f2dd-4e7e-889e-8b6d630ae059","status":500,"url":"http://www.booking.com","uuid":"4ab1cf7e-d83c-4715-a497-76d73467312a"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322627+00:00"}},"method":"HEAD","session_id":"b0623039-13ca-43a7-bbbd-595ac884bf95","status":400,"url":"http://www.gmw.cn","uuid":"e3283d81-8e4c-4156-891c-22bfc8f68eda"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322685+00:00"}},"method":"DELETE","session_id":"c5f4ebb5-5ea4-4977-83d3-4a9a11661e96","status":200,"url":"http://www.weibo.com","uuid":"9c26c3aa-2f8b-470a-85eb-8f0b33f6cdab"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322742+00:00"}},"method":"GET","session_id":"6253f37d-4ac6-4e85-b298-4c6026de5e2c","status":500,"url":"http://www.hao123.com","uuid":"4ea690ca-2f5a-4137-a3bd-2df799337c1e"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322785+00:00"}},"method":"DELETE","session_id":"da4db5e9-d4f0-49e8-a663-46a71a34a555","status":200,"url":"http://www.googleweblight.com","uuid":"78b6843f-028d-4741-b7e5-f471f66a0b1a"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322834+00:00"}},"method":"POST","session_id":"3ac77d38-18f9-4cd5-b933-83e376b9cf1b","status":500,"url":"http://www.ok.ru","uuid":"b80c4b17-f9e0-49ba-b10f-ea5742279f9c"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322892+00:00"}},"method":"HEAD","session_id":"6c6b9f9a-7d4e-43cd-86aa-f895d58c4938","status":302,"url":"http://www.sohu.com","uuid":"9dcfe297-3873-4028-8676-74b5d58ce9ae"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322947+00:00"}},"method":"POST","session_id":"c053dad5-aac0-44f4-af53-0aa06ebeb1bd","status":401,"url":"http://www.yahoo.co.jp","uuid":"39b73ab6-1333-4b3a-84a9-21c2c238a44c"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.322983+00:00"}},"method":"HEAD","session_id":"72c61998-eece-4e88-8a1a-02062e16587d","status":500,"url":"http://www.chinadaily.com.cn","uuid":"d93026cd-d1c9-4a1f-b00b-a108d8f8c556"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323016+00:00"}},"method":"PUT","session_id":"3a193f23-e3b5-43c3-9074-94d8ef7a280d","status":201,"url":"http://www.googleusercontent.com","uuid":"312a36c0-8acf-430b-8c27-3f841589ca1f"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323048+00:00"}},"method":"GET","session_id":"5fe70b96-e027-429c-9b80-34ce74655a03","status":200,"url":"http://www.dropbox.com","uuid":"2e1821e7-fbfd-4649-aa31-2551ad609754"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323080+00:00"}},"method":"GET","session_id":"b0fee219-e8a4-41ff-8b89-fdc4bc2f4013","status":404,"url":"http://www.doubleclick.net","uuid":"5cf6964d-d059-4e85-91a5-1394b57abd41"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323112+00:00"}},"method":"PUT","session_id":"0d96442a-a50e-478c-abcd-219ce1946169","status":403,"url":"http://www.google.co.th","uuid":"9b6ab2ae-1924-4619-ab73-552e64baf767"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323143+00:00"}},"method":"POST","session_id":"1017c94b-f87f-4f05-98a0-0217ddd15cb6","status":202,"url":"http://www.reddit.com","uuid":"18de3f5b-ff46-44ae-9d10-f9ad6671806e"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323174+00:00"}},"method":"PUT","session_id":"f9897f93-b5d2-41de-babf-22963d3edd42","status":201,"url":"http://www.google.ca","uuid":"848ad5c6-12d9-40b7-a178-026c20ffa8ec"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323206+00:00"}},"method":"PUT","session_id":"ccf94a47-0d4d-4dbf-926a-c483da6c80af","status":404,"url":"http://www.twitch.tv","uuid":"6a49ab8d-f45b-454c-8d16-2ef9977fef5b"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323238+00:00"}},"method":"DELETE","session_id":"f00c20e0-6b55-4024-869e-3f0844d4263f","status":301,"url":"http://www.163.com","uuid":"7d7747ae-546a-43d7-85b8-66760e0ef953"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323307+00:00"}},"method":"HEAD","session_id":"e12f07ac-d88f-4d08-ac1b-216267b1cf9f","status":301,"url":"http://www.accuweather.com","uuid":"d121e7c7-f754-45b3-b143-5c707113fa5a"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323357+00:00"}},"method":"HEAD","session_id":"885db1f6-5969-4b5b-9aa6-418c30d8b581","status":403,"url":"http://www.office.com","uuid":"b0556cd6-555e-4fb7-818e-798c47401857"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323408+00:00"}},"method":"PUT","session_id":"46378501-a19d-4e81-b483-3d1b2bf8e1ca","status":202,"url":"http://www.accuweather.com","uuid":"b1ae4f6a-d4e4-4fbe-ac55-dbace7fc9bbd"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323441+00:00"}},"method":"PUT","session_id":"666c74b1-4f57-4736-a1d0-1339f6d53abd","status":200,"url":"http://www.google.com.br","uuid":"4ff8b47f-f3b4-42f3-bde6-d22dea4ff654"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323473+00:00"}},"method":"POST","session_id":"2b0bbe1c-774e-402d-b261-8274ba9c0fdc","status":301,"url":"http://www.vk.com","uuid":"a9458e54-5826-4451-a964-610507da0a4d"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323513+00:00"}},"method":"DELETE","session_id":"dbd04beb-be54-4246-aaf7-668110ecd01d","status":401,"url":"http://www.tianya.cn","uuid":"dc9a0455-0ef2-4c59-bcbc-b723ebc63f27"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323562+00:00"}},"method":"POST","session_id":"660f03b2-3ad9-4838-9779-3f9b5e659903","status":201,"url":"http://www.microsoftonline.com","uuid":"1aae51b7-ee5e-45a2-8d68-ae7f63d175bd"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323597+00:00"}},"method":"HEAD","session_id":"e74e0235-15dc-4554-a85a-8f193d4687a7","status":204,"url":"http://www.craigslist.org","uuid":"72121ec8-a96b-4142-b8b1-e6ecf49e1a94"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323629+00:00"}},"method":"DELETE","session_id":"92881316-13aa-4a02-b718-fa793d231752","status":404,"url":"http://www.github.com","uuid":"832cb698-e353-48db-81d9-a5d05508c07b"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323660+00:00"}},"method":"POST","session_id":"8f14fc3d-3540-4698-97c0-57104a7e3a5e","status":200,"url":"http://www.360.cn","uuid":"ccdbfd84-b470-4eac-82c7-b669ec236b52"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323692+00:00"}},"method":"DELETE","session_id":"88af4c39-10fb-459a-b9cc-85b830ed8401","status":204,"url":"http://www.chinadaily.com.cn","uuid":"31b26473-c017-4ff8-b543-0dd2d7e71228"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323724+00:00"}},"method":"GET","session_id":"3cd53604-60e5-496f-99dc-48a1bc8d3e26","status":500,"url":"http://www.wikipedia.org","uuid":"035a9601-b637-42d7-b4f4-6940cb2e8da1"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323755+00:00"}},"method":"GET","session_id":"5cae985b-1fee-471d-a454-f2788e93501d","status":200,"url":"http://www.tianya.cn","uuid":"bd86a812-b82e-4522-be81-21e5fc9248e7"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323786+00:00"}},"method":"PUT","session_id":"ee46f7b4-69db-4380-81a2-5c797f5e8e12","status":404,"url":"http://www.apple.com","uuid":"209e969a-c055-4f82-ad4f-18a8ca5d9cd1"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323817+00:00"}},"method":"DELETE","session_id":"7fd261cd-02b6-48e0-b7bc-b640b2e91e01","status":404,"url":"http://www.imgur.com","uuid":"164d570c-0a05-450e-bad3-3cf0bd6f416f"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323849+00:00"}},"method":"PUT","session_id":"f4cbef2e-5897-4ed0-8b0b-75cae77d7b51","status":200,"url":"http://www.imdb.com","uuid":"0308b440-1cf3-4a60-a91b-ce99f7780cf6"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323880+00:00"}},"method":"HEAD","session_id":"b6d402e6-9d43-436b-800d-4c75d4349cda","status":500,"url":"http://www.sina.com.cn","uuid":"3c17d48f-2792-453a-b21e-de3e1a659902"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323911+00:00"}},"method":"DELETE","session_id":"2351288b-d1d7-4700-bec8-62fac6a2a1f9","status":202,"url":"http://www.yahoo.com","uuid":"10a5ed82-e4b1-402e-90a3-a9348c4646b2"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323942+00:00"}},"method":"PUT","session_id":"4d0ca751-4ae6-4988-b6b4-cb4aebe34454","status":500,"url":"http://www.apple.com","uuid":"d5425b76-4548-406e-a7f6-361e715ae5c3"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.323973+00:00"}},"method":"PUT","session_id":"51f5ca49-94f0-4f61-acce-be119807e1b6","status":302,"url":"http://www.twitter.com","uuid":"9b59ead7-8688-4516-b04a-b4047c8b7240"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324020+00:00"}},"method":"DELETE","session_id":"f48af01c-4b55-4a2a-8afa-bead98084471","status":301,"url":"http://www.apple.com","uuid":"29ef6134-9f71-46a1-ae8a-2e56823e5891"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324052+00:00"}},"method":"POST","session_id":"3a32dc4b-1f38-4154-9027-a501f8f4b0c1","status":401,"url":"http://www.163.com","uuid":"7ab8ffab-6966-4e2f-8d94-9613243100cf"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324083+00:00"}},"method":"PUT","session_id":"0ece48ca-9a33-474e-b6ac-5d0d14d07be6","status":500,"url":"http://www.ebay.com","uuid":"b4e5ff51-a482-4e48-bece-828605f6d60a"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324115+00:00"}},"method":"GET","session_id":"e8cdc88c-3281-41b3-8006-43c741662a9f","status":401,"url":"http://www.cnn.com","uuid":"a09296a2-a6b8-4564-8118-3498614c5847"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324146+00:00"}},"method":"PUT","session_id":"73c8ce6d-7fa8-4e95-ae66-2412db44d086","status":200,"url":"http://www.twitter.com","uuid":"157f0e00-4f6c-4086-aba9-8e90c337666c"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324177+00:00"}},"method":"GET","session_id":"1b46fa61-2a96-4748-b89c-80ed00cef45e","status":500,"url":"http://www.hao123.com","uuid":"1d04b4cd-ed41-4681-b331-edb563af0003"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324208+00:00"}},"method":"DELETE","session_id":"e868f0a7-e7a8-4f84-9850-09d2bfba9b26","status":500,"url":"http://www.bbc.co.uk","uuid":"59a58e85-ec18-4181-a5a6-6d8610f06546"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324239+00:00"}},"method":"PUT","session_id":"5f63f91c-1c27-4ee6-b1fe-28f7d6b519fc","status":400,"url":"http://www.google.com.au","uuid":"41e2ba86-4399-4224-be64-9ac9ab0c3212"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324270+00:00"}},"method":"HEAD","session_id":"bd492547-e009-49b5-a465-1be939eb0883","status":204,"url":"http://www.google.co.uk","uuid":"2e12f580-4779-4047-ac45-7fab708f72d6"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324301+00:00"}},"method":"PUT","session_id":"9f545131-46b6-40dc-8f4e-99f8267651d2","status":202,"url":"http://www.answers.yahoo.com","uuid":"add407cb-cec9-4319-9fb9-e9e2cac19862"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324332+00:00"}},"method":"PUT","session_id":"df98f48f-4856-4846-a2eb-851a2e6aeb27","status":201,"url":"http://www.paypal.com","uuid":"0aaec3ba-5d8d-4f1a-b13a-6c80262c2e4e"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324363+00:00"}},"method":"HEAD","session_id":"1a1f2d41-6b84-4ab8-87a9-280f70a461ff","status":202,"url":"http://www.soso.com","uuid":"02f7d3bc-7bb5-4ecd-aeb8-350c8c2f608d"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324394+00:00"}},"method":"HEAD","session_id":"1699ad41-5a43-4677-9a2c-3bc06a927aed","status":500,"url":"http://www.amazon.co.jp","uuid":"e6e98111-d02c-4997-adf7-af364384537d"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324426+00:00"}},"method":"PUT","session_id":"77d7cd07-941d-4e9d-94be-7d8f71e791a9","status":302,"url":"http://www.google.com.au","uuid":"d973064b-3758-4d31-8040-f9f76ed7d7d4"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324489+00:00"}},"method":"DELETE","session_id":"5b4a942b-6ed0-41e6-b5a4-fb4a00080c41","status":202,"url":"http://www.tumblr.com","uuid":"ea65c7f8-c277-4ac2-ab71-e574b605d4c0"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324539+00:00"}},"method":"PUT","session_id":"2c1c55b0-32b6-4b78-a6b6-067f47d0650a","status":500,"url":"http://www.google.co.id","uuid":"5815abed-1520-4260-8985-8e5662b755e4"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324574+00:00"}},"method":"GET","session_id":"879d7dd5-69cd-4500-adb0-b9b3b0d11458","status":500,"url":"http://www.gmw.cn","uuid":"8810d13e-2fff-40e8-84d6-6d163fe625dd"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324607+00:00"}},"method":"POST","session_id":"17591a18-4de9-42fe-b355-ec2484c61f8b","status":200,"url":"http://www.amazon.com","uuid":"2f8c2ed3-1271-4529-a8cc-a05835b174a3"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324655+00:00"}},"method":"HEAD","session_id":"aa5739ac-9765-4ce1-868a-d02838aca514","status":301,"url":"http://www.google.co.in","uuid":"88294372-4a9d-476c-9380-1ed8c994a8d0"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324687+00:00"}},"method":"DELETE","session_id":"88d80063-3a7e-46e1-af1f-e3c47428c1e8","status":200,"url":"http://www.googleusercontent.com","uuid":"840176bb-d684-447c-a04a-e3d529e7b367"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324718+00:00"}},"method":"HEAD","session_id":"5b6da56c-3fdb-4d88-ba1d-09fe511f7374","status":202,"url":"http://www.google.co.uk","uuid":"d2b280e4-6ee1-4a98-aa9c-24ca2fa54bb5"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324749+00:00"}},"method":"DELETE","session_id":"0471b329-97bf-4e47-87e1-31bb74b53e4b","status":302,"url":"http://www.sina.com.cn","uuid":"632bbb36-242e-4bec-8c48-abd0e14e4d3e"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324780+00:00"}},"method":"HEAD","session_id":"0548bf61-2a4b-409b-bc8e-a1814a1bdd29","status":404,"url":"http://www.bing.com","uuid":"8052db8d-d97f-4762-b07b-e722422d10c9"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324811+00:00"}},"method":"POST","session_id":"751d1e29-b131-4bdc-8cfc-ac1665d557db","status":400,"url":"http://www.google.ca","uuid":"2fb860b0-6a47-417c-8c64-702c9ffb076d"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324842+00:00"}},"method":"GET","session_id":"4fa37411-49b0-480b-b59a-e6f32c985fc1","status":500,"url":"http://www.weibo.com","uuid":"c2d795b6-f6ff-471f-afb1-624226c4cd3f"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324873+00:00"}},"method":"POST","session_id":"d38ac236-e8e8-438c-9a12-194d78194c73","status":500,"url":"http://www.zhihu.com","uuid":"f2dd7bd5-7c36-4683-b418-db59b35468ed"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324914+00:00"}},"method":"GET","session_id":"8f14fc3d-3540-4698-97c0-57104a7e3a5e","status":301,"url":"http://www.soso.com","uuid":"e980e3e6-032a-4930-9d17-83634b22953c"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324964+00:00"}},"method":"DELETE","session_id":"118e7d05-9b25-403b-a42c-7be622611f8e","status":400,"url":"http://www.imdb.com","uuid":"cd9e893b-cebc-40b1-b2de-6e7861471353"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.324999+00:00"}},"method":"GET","session_id":"48b136d7-d9db-4ccc-8c40-4f67bf355110","status":401,"url":"http://www.coccoc.com","uuid":"376211f5-6551-4eb5-8eb2-9120db0a9dcf"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.325032+00:00"}},"method":"POST","session_id":"f1484bc3-96f8-4306-8e30-28b200459de7","status":500,"url":"http://www.weibo.com","uuid":"746d4f3c-ed4b-4e82-b724-e42422a8d3c4"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.325063+00:00"}},"method":"DELETE","session_id":"a7451f21-0c3d-45c3-afa0-5fd6b441105b","status":200,"url":"http://www.pinterest.com","uuid":"b4e1eadd-1753-410b-be78-f54dfcc3e4d9"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.325095+00:00"}},"method":"GET","session_id":"7d824756-8e43-4b7e-9c52-85ec2061eab9","status":401,"url":"http://www.diply.com","uuid":"53eb2fc1-2ec5-47a8-9aeb-54d23b87ea9d"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.325127+00:00"}},"method":"HEAD","session_id":"8f36b18d-ce21-4c07-b14d-8d42f984efb2","status":401,"url":"http://www.amazon.co.jp","uuid":"17a6eced-ef9a-4a1f-885b-79ed5f4f5038"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.325158+00:00"}},"method":"PUT","session_id":"2966f6a9-4ca3-4720-abc7-ec2a7401e956","status":404,"url":"http://www.google.com.au","uuid":"6e04a523-0537-48a8-b7ee-a7d1c31b7e2b"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.325190+00:00"}},"method":"PUT","session_id":"89af7ab7-8ca6-46aa-a14e-1de7f05a60e2","status":404,"url":"http://www.google.co.in","uuid":"08cb9807-99d8-4fcf-82c7-13b0279e4495"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.325221+00:00"}},"method":"GET","session_id":"02cdd050-d642-495d-8983-5786e6ce0527","status":404,"url":"http://www.outbrain.com","uuid":"1de11838-63b8-4abc-82e6-6c4c3b07429f"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.325269+00:00"}},"method":"PUT","session_id":"1de80f6e-0cd2-4d76-927d-02b44a17395a","status":400,"url":"http://www.bing.com","uuid":"a9b4b110-1ad2-42f4-a9ad-ba968b0ea449"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.325301+00:00"}},"method":"POST","session_id":"a8a3d0e3-7b4e-4f17-b264-76cb792bdb96","status":201,"url":"http://www.fc2.com","uuid":"3148d7a4-7609-4b30-86d6-4ec696ce25c4"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.325333+00:00"}},"method":"PUT","session_id":"8fe907f8-2d97-4b78-820c-a6076f5e981e","status":401,"url":"http://www.taobao.com","uuid":"31c42eec-91be-4ee5-b14e-6b7dbb31a441"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.325364+00:00"}},"method":"DELETE","session_id":"83179396-7fd8-41ac-bde7-fb7a1b32fdcf","status":500,"url":"http://www.vk.com","uuid":"613c7c4b-aed5-4941-aac7-56f1a94d13e7"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.325396+00:00"}},"method":"DELETE","session_id":"4ef205ef-637f-446b-b5fe-23603a6f0d0a","status":202,"url":"http://www.outbrain.com","uuid":"75ac824c-f7d7-4184-8ce0-0e2b3f04c4bc"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.325446+00:00"}},"method":"HEAD","session_id":"e622a1f2-5c74-4331-b10e-9b25677ea2ea","status":500,"url":"http://www.weibo.com","uuid":"c3b886f5-ad72-4756-bbe6-ad73000e9427"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.325500+00:00"}},"method":"GET","session_id":"4e96054b-69a4-4713-adc8-b3d731f79842","status":204,"url":"http://www.stackoverflow.com","uuid":"fd56cf79-b2d5-4c73-b27b-aea7e2ad7dbf"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.325536+00:00"}},"method":"POST","session_id":"258937ce-9a84-44f6-9d1b-1a2db01fda06","status":500,"url":"http://www.weibo.com","uuid":"28be36ca-90d2-4df3-8b1b-c4fb49f9a702"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.325569+00:00"}},"method":"GET","session_id":"51042039-f68b-428d-8b3d-39dfc45946d8","status":302,"url":"http://www.amazon.de","uuid":"4526f639-e7de-4cb6-8bee-52cd1b57aff8"} +{"meta":{"producer":{"timestamp":"2021-03-24T15:06:17.325600+00:00"}},"method":"POST","session_id":"fed401d4-66bd-4903-972c-785abe563e21","status":200,"url":"http://www.amazon.in","uuid":"f89ae841-410c-4e00-b0e6-c0108ad8e063"} From 21fe134f361eedc1fbcabbabeae66bfaa18c0c16 Mon Sep 17 00:00:00 2001 From: xianwill Date: Wed, 11 Aug 2021 19:47:51 -0400 Subject: [PATCH 2/5] mergefix --- Cargo.lock | 2 +- Cargo.toml | 2 +- tests/checkpoint_bug_tests.rs | 178 ---------------------------------- 3 files changed, 2 insertions(+), 180 deletions(-) delete mode 100644 tests/checkpoint_bug_tests.rs diff --git a/Cargo.lock b/Cargo.lock index 1ccc950..4456749 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -374,7 +374,7 @@ dependencies = [ [[package]] name = "deltalake" version = "0.4.1" -source = "git+https://github.com/xianwill/delta-rs.git?branch=checkpoint_and_map_fixes#f8f077a21fe23dc2261d5f47fbb7a1087d60f0ac" +source = "git+https://github.com/delta-io/delta-rs.git?branch=writer-map-support#7f31207e624e60ee779dbbb8890ee1abceaefe25" dependencies = [ "anyhow", "arrow", diff --git a/Cargo.toml b/Cargo.toml index 842a113..c9895c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,7 +34,7 @@ uuid = { version = "0.8", features = ["serde", "v4"] } arrow = { git = "https://github.com/apache/arrow-rs", rev = "fa5acd971c973161f17e69d5c6b50d6e77c7da03" } parquet = { git = "https://github.com/apache/arrow-rs", rev = "fa5acd971c973161f17e69d5c6b50d6e77c7da03" } -deltalake = { git = "https://github.com/xianwill/delta-rs.git", branch = "checkpoint_and_map_fixes", features = ["s3"] } +deltalake = { git = "https://github.com/delta-io/delta-rs.git", branch = "writer-map-support", features = ["s3"] } [dev-dependencies] utime = "0.3" diff --git a/tests/checkpoint_bug_tests.rs b/tests/checkpoint_bug_tests.rs deleted file mode 100644 index 0528be7..0000000 --- a/tests/checkpoint_bug_tests.rs +++ /dev/null @@ -1,178 +0,0 @@ -extern crate kafka_delta_ingest; - -#[allow(dead_code)] -mod helpers; - -use deltalake::action::Action; -use deltalake::delta_arrow::*; -use deltalake::checkpoints::CheckPointWriter; -use dipstick::{Input, Prefixed, Statsd}; -use kafka_delta_ingest::{ - deltalake_ext::*, - instrumentation::StatsHandler, - KafkaJsonToDelta, - Options, - transforms::Transformer -}; -use log::debug; -use parquet::{ - file::reader::{FileReader, SerializedFileReader}, - record::RowAccessor, -}; -use rdkafka::{producer::FutureProducer, producer::FutureRecord, util::Timeout, ClientConfig}; -use serde_json::{json, Value}; -use std::env; -use std::fs::File; -use std::io::{BufRead, BufReader}; -use std::sync::Once; -use std::{collections::HashMap, fs, path::PathBuf}; -use tokio_util::sync::CancellationToken; -use uuid::Uuid; - -// const TEST_TABLE_URI: &str = "tests/temp/checkpoint_bug"; -const TEST_TABLE_URI: &str = "tests/temp/timestamp_bug"; - -#[tokio::test] -async fn checkpoint_debugging_test() { - let test_json_uri = "tests/json/web_requests-100.json"; - - cleanup_delta_files(TEST_TABLE_URI); - - let mut transforms = HashMap::new(); - transforms.insert( - "date".to_string(), - "substr(meta.producer.timestamp, `0`, `10`)".to_string(), - ); - transforms.insert("meta.kafka.topic".to_string(), "kafka.topic".to_string()); - transforms.insert( - "meta.kafka.offset".to_string(), - "kafka.offset".to_string(), - ); - transforms.insert( - "meta.kafka.partition".to_string(), - "kafka.partition".to_string(), - ); - transforms.insert( - "meta.kafka.timestamp".to_string(), - "kafka.timestamp".to_string(), - ); - transforms.insert( - "meta.kafka.timestamp_type".to_string(), - "kafka.timestamp_type".to_string(), - ); - let transformer = Transformer::from_transforms(&transforms).unwrap(); - - let s = std::fs::read_to_string(test_json_uri).unwrap(); - let mut values: Vec = s.lines().map(|l| { - serde_json::from_str(l).unwrap() - }).collect(); - - for (i, v) in values.iter_mut().enumerate() { - let message = rdkafka::message::OwnedMessage::new( - Some(v.to_string().into_bytes()), - None, - "test".to_string(), - rdkafka::Timestamp::CreateTime(chrono::Utc::now().timestamp_millis()), - 0, - i as i64, - None - ); - transformer.transform(v, &message).unwrap(); - } - - let chunks = values.chunks(10); - - let mut writer = DeltaWriter::for_table_path(TEST_TABLE_URI).await.unwrap(); - - for chunk in chunks { - writer.write(chunk.into()).await.unwrap(); - let adds = writer.write_parquet_files().await.unwrap(); - let mut tx = writer.table.create_transaction(None); - tx.add_actions(adds.iter().map(|a| deltalake::action::Action::add(a.to_owned())).collect()); - let prepared = tx.prepare_commit(None).await.unwrap(); - let version = writer.table_version() + 1; - let res = writer.table.try_commit_transaction(&prepared, version).await; - } - - let checkpoint_writer = CheckPointWriter::new_for_table_uri(TEST_TABLE_URI).unwrap(); - checkpoint_writer.create_checkpoint_from_state(10, writer.table.get_state()).await.unwrap(); - - // ROUND 2 - - let chunks = values.chunks(10); - - let mut writer = DeltaWriter::for_table_path(TEST_TABLE_URI).await.unwrap(); - - for chunk in chunks { - writer.write(chunk.into()).await.unwrap(); - let adds = writer.write_parquet_files().await.unwrap(); - let mut tx = writer.table.create_transaction(None); - tx.add_actions(adds.iter().map(|a| deltalake::action::Action::add(a.to_owned())).collect()); - let prepared = tx.prepare_commit(None).await.unwrap(); - let version = writer.table_version() + 1; - let res = writer.table.try_commit_transaction(&prepared, version).await; - } - - let checkpoint_writer = CheckPointWriter::new_for_table_uri(TEST_TABLE_URI).unwrap(); - checkpoint_writer.create_checkpoint_from_state(20, writer.table.get_state()).await.unwrap(); -} - -#[test] -fn checkpoint_parquet_test() { - let checkpoint_file = format!("{}/_delta_log/00000000000000000010.checkpoint.parquet", TEST_TABLE_URI); - let p = SerializedFileReader::new(File::open(checkpoint_file).unwrap()).unwrap(); - let row_group = p.metadata().row_group(0); - let schema_descriptor = row_group.schema_descr(); - let root_schema = schema_descriptor.root_schema(); - - // println!("{:#?}", schema_descriptor); - // println!("{:#?}", root_schema); - - for r in p.get_row_iter(None).unwrap() { - println!("{:#?}", r.to_json_value()); - } -} - -fn cleanup_delta_files(table_location: &str) { - let table_path = PathBuf::from(table_location); - - let log_dir = table_path.join("_delta_log"); - - let paths = fs::read_dir(log_dir.as_path()).unwrap(); - - for p in paths { - match p { - Ok(d) => { - let path = d.path(); - - if let Some(extension) = path.extension() { - // Keep the staged log entry that contains the metadata with schemaString action, but delete all the rest - if (extension == "json" && path.file_stem().unwrap() != "00000000000000000000") || extension == "parquet" { - fs::remove_file(path).unwrap(); - } - } else if path.file_name().unwrap() == "_last_checkpoint" { - fs::remove_file(path).unwrap() - } - } - _ => {} - } - } - - let paths = fs::read_dir(table_path.as_path()).unwrap(); - - for p in paths { - match p { - Ok(d) => { - let path = d.path(); - if path.is_dir() && path.to_str().unwrap().contains("=") { - fs::remove_dir_all(path).unwrap(); - } else if let Some(extension) = path.extension() { - if extension == "parquet" { - fs::remove_file(path).unwrap(); - } - } - } - _ => {} - } - } -} \ No newline at end of file From 7f5b706617bfcfb4009197e8bcf0d2684b5201e7 Mon Sep 17 00:00:00 2001 From: mosyp Date: Thu, 12 Aug 2021 10:07:25 +0300 Subject: [PATCH 3/5] Cargo fmt and delta-rs bump --- Cargo.lock | 136 +++++++++++++++++++++---------------------- src/deltalake_ext.rs | 32 +++++++--- src/lib.rs | 10 ++-- 3 files changed, 97 insertions(+), 81 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4456749..01c8936 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19,9 +19,9 @@ dependencies = [ [[package]] name = "alloc-no-stdlib" -version = "2.0.1" +version = "2.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5192ec435945d87bc2f70992b4d818154b5feede43c09fb7592146374eac90a6" +checksum = "35ef4730490ad1c4eae5c4325b2a95f521d023e5c885853ff7aca0a6a1631db3" [[package]] name = "alloc-stdlib" @@ -77,9 +77,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.50" +version = "0.1.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b98e84bbb4cbcdd97da190ba0c58a1bb0de2c1fdf67d159e192ed766aeca722" +checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e" dependencies = [ "proc-macro2", "quote", @@ -123,9 +123,9 @@ checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" [[package]] name = "bitflags" -version = "1.2.1" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +checksum = "2da1976d75adbe5fbc88130ecd119529cf1cc6a93ae1546d8696ee66f0d21af1" [[package]] name = "block-buffer" @@ -138,9 +138,9 @@ dependencies = [ [[package]] name = "brotli" -version = "3.3.0" +version = "3.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f29919120f08613aadcd4383764e00526fc9f18b6c0895814faeed0dd78613e" +checksum = "71cb90ade945043d3d53597b2fc359bb063db8ade2bcffe7997351d0756e9d50" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -149,9 +149,9 @@ dependencies = [ [[package]] name = "brotli-decompressor" -version = "2.3.1" +version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1052e1c3b8d4d80eb84a8b94f0a1498797b5fb96314c001156a1c761940ef4ec" +checksum = "59ad2d4653bf5ca36ae797b1f4bb4dbddb60ce49ca4aed8a2ce4829f60425b80" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -374,7 +374,7 @@ dependencies = [ [[package]] name = "deltalake" version = "0.4.1" -source = "git+https://github.com/delta-io/delta-rs.git?branch=writer-map-support#7f31207e624e60ee779dbbb8890ee1abceaefe25" +source = "git+https://github.com/delta-io/delta-rs.git?branch=writer-map-support#d66239fd7189919ee9ce52cf7734a6ad6fb5eab5" dependencies = [ "anyhow", "arrow", @@ -572,9 +572,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.15" +version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e7e43a803dae2fa37c1f6a8fe121e1f7bf9548b4dfc0522a42f34145dadfc27" +checksum = "1adc00f486adfc9ce99f77d717836f0c5aa84965eb0b4f051f4e83f7cab53f8b" dependencies = [ "futures-channel", "futures-core", @@ -587,9 +587,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.15" +version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e682a68b29a882df0545c143dc3646daefe80ba479bcdede94d5a703de2871e2" +checksum = "74ed2411805f6e4e3d9bc904c95d5d423b89b3b25dc0250aa74729de20629ff9" dependencies = [ "futures-core", "futures-sink", @@ -597,15 +597,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.15" +version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0402f765d8a89a26043b889b26ce3c4679d268fa6bb22cd7c6aad98340e179d1" +checksum = "af51b1b4a7fdff033703db39de8802c673eb91855f2e0d47dcf3bf2c0ef01f99" [[package]] name = "futures-executor" -version = "0.3.15" +version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "badaa6a909fac9e7236d0620a2f57f7664640c56575b71a7552fbd68deafab79" +checksum = "4d0d535a57b87e1ae31437b892713aee90cd2d7b0ee48727cd11fc72ef54761c" dependencies = [ "futures-core", "futures-task", @@ -614,15 +614,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.15" +version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acc499defb3b348f8d8f3f66415835a9131856ff7714bf10dadfc4ec4bdb29a1" +checksum = "0b0e06c393068f3a6ef246c75cdca793d6a46347e75286933e5e75fd2fd11582" [[package]] name = "futures-macro" -version = "0.3.15" +version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4c40298486cdf52cc00cd6d6987892ba502c7656a16a4192a9992b1ccedd121" +checksum = "c54913bae956fb8df7f4dc6fc90362aa72e69148e3f39041fbe8742d21e0ac57" dependencies = [ "autocfg", "proc-macro-hack", @@ -633,21 +633,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.15" +version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a57bead0ceff0d6dde8f465ecd96c9338121bb7717d3e7b108059531870c4282" +checksum = "c0f30aaa67363d119812743aa5f33c201a7a66329f97d1a887022971feea4b53" [[package]] name = "futures-task" -version = "0.3.15" +version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a16bef9fc1a4dddb5bee51c989e3fbba26569cbb0e31f5b303c184e3dd33dae" +checksum = "bbe54a98670017f3be909561f6ad13e810d9a51f3f061b902062ca3da80799f2" [[package]] name = "futures-util" -version = "0.3.15" +version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "feb5c238d27e2bf94ffdfd27b2c29e3df4a68c4193bb6427384259e2bf191967" +checksum = "67eb846bfd58e44a8481a00049e82c43e0ccb5d61f8dc071057cb19249dd4d78" dependencies = [ "autocfg", "futures-channel", @@ -753,9 +753,9 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.2" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60daa14be0e0786db0f03a9e57cb404c9d756eed2b6c62b9ea98ec5743ec75a9" +checksum = "399c583b2979440c60be0821a6199eca73bc3c8dcd9d070d75ac726e2c6186e5" dependencies = [ "bytes", "http", @@ -872,9 +872,9 @@ dependencies = [ [[package]] name = "jobserver" -version = "0.1.22" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "972f5ae5d1cb9c6ae417789196c803205313edde988685da5e3aae0827b9e7fd" +checksum = "f5ca711fd837261e14ec9e674f092cbb931d3fa1482b017ae59328ddc6f3212b" dependencies = [ "libc", ] @@ -937,9 +937,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.98" +version = "0.2.99" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "320cfe77175da3a483efed4bc0adc1968ca050b098ce4f2f1c13a56626128790" +checksum = "a7f823d141fe0a24df1e23b4af4e3c7ba9e5966ec514ea068c93024aa7deb765" [[package]] name = "libz-sys" @@ -1093,9 +1093,9 @@ dependencies = [ [[package]] name = "native-tls" -version = "0.2.7" +version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8d96b2e1c8da3957d58100b09f102c6d9cfdfced01b7ec5a8974044bb09dbd4" +checksum = "48ba9f7719b5a0f42f338907614285fb5fd70e53858141f69898a1fb7203b24d" dependencies = [ "lazy_static", "libc", @@ -1240,9 +1240,9 @@ dependencies = [ [[package]] name = "num_enum" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5adf0198d427ee515335639f275e806ca01acf9f07d7cf14bb36a10532a6169" +checksum = "ee2c8fd66061a707503d515639b8af10fd3807a5b5ee6959f7ff1bd303634bd5" dependencies = [ "derivative", "num_enum_derive", @@ -1250,9 +1250,9 @@ dependencies = [ [[package]] name = "num_enum_derive" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1def5a3f69d4707d8a040b12785b98029a39e8c610ae685c7f6265669767482" +checksum = "474fd1d096da3ad17084694eebed40ba09c4a36c5255cd772bd8b98859cc562e" dependencies = [ "proc-macro-crate", "proc-macro2", @@ -1374,7 +1374,7 @@ dependencies = [ "cfg-if 1.0.0", "instant", "libc", - "redox_syscall 0.2.9", + "redox_syscall 0.2.10", "smallvec", "winapi", ] @@ -1582,9 +1582,9 @@ checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" [[package]] name = "redox_syscall" -version = "0.2.9" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ab49abadf3f9e1c4bc499e8845e152ad87d2ad2d30371841171169e9d75feee" +checksum = "8383f39639269cde97d255a32bdb68c047337295414940c68bdd30c2e13203ff" dependencies = [ "bitflags", ] @@ -1596,7 +1596,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "528532f3d801c87aec9def2add9ca802fe569e44a544afe633765267840abe64" dependencies = [ "getrandom", - "redox_syscall 0.2.9", + "redox_syscall 0.2.10", ] [[package]] @@ -1812,18 +1812,18 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.126" +version = "1.0.127" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03" +checksum = "f03b9878abf6d14e6779d3f24f07b2cfa90352cfec4acc5aab8f1ac7f146fae8" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.126" +version = "1.0.127" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "963a7dbc9895aeac7ac90e74f34a5d5261828f79df35cbed41e10189d3804d43" +checksum = "a024926d3432516606328597e0f224a51355a493b49fdd67e9209187cbe55ecc" dependencies = [ "proc-macro2", "quote", @@ -1832,9 +1832,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.64" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "799e97dc9fdae36a5c8b8f2cae9ce2ee9fdce2058c57a93e6099d919fd982f79" +checksum = "336b10da19a12ad094b59d870ebde26a45402e5b470add4b5fd03c5048a32127" dependencies = [ "indexmap", "itoa", @@ -1912,9 +1912,9 @@ dependencies = [ [[package]] name = "slab" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f173ac3d1a7e3b28003f40de0b5ce7fe2710f9b9dc3fc38664cebee46b3b6527" +checksum = "c307a32c1c5c437f38c7fd45d753050587732ba8628319fbdf12a7e289ccc590" [[package]] name = "slug" @@ -1939,9 +1939,9 @@ checksum = "45456094d1983e2ee2a18fdfebce3189fa451699d0502cb8e3b49dba5ba41451" [[package]] name = "socket2" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e3dfc207c526015c632472a77be09cf1b6e46866581aecae5cc38fb4235dea2" +checksum = "765f090f0e423d2b55843402a07915add955e7d60657db13707a159727326cad" dependencies = [ "libc", "winapi", @@ -2067,7 +2067,7 @@ dependencies = [ "cfg-if 1.0.0", "libc", "rand", - "redox_syscall 0.2.9", + "redox_syscall 0.2.10", "remove_dir_all", "winapi", ] @@ -2421,9 +2421,9 @@ checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" [[package]] name = "wasm-bindgen" -version = "0.2.74" +version = "0.2.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54ee1d4ed486f78874278e63e4069fc1ab9f6a18ca492076ffb90c5eb2997fd" +checksum = "b608ecc8f4198fe8680e2ed18eccab5f0cd4caaf3d83516fa5fb2e927fda2586" dependencies = [ "cfg-if 1.0.0", "wasm-bindgen-macro", @@ -2431,9 +2431,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.74" +version = "0.2.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b33f6a0694ccfea53d94db8b2ed1c3a8a4c86dd936b13b9f0a15ec4a451b900" +checksum = "580aa3a91a63d23aac5b6b267e2d13cb4f363e31dce6c352fca4752ae12e479f" dependencies = [ "bumpalo", "lazy_static", @@ -2446,9 +2446,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.74" +version = "0.2.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "088169ca61430fe1e58b8096c24975251700e7b1f6fd91cc9d59b04fb9b18bd4" +checksum = "171ebf0ed9e1458810dfcb31f2e766ad6b3a89dbda42d8901f2b268277e5f09c" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -2456,9 +2456,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.74" +version = "0.2.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be2241542ff3d9f241f5e2cb6dd09b37efe786df8851c54957683a49f0987a97" +checksum = "6c2657dd393f03aa2a659c25c6ae18a13a4048cebd220e147933ea837efc589f" dependencies = [ "proc-macro2", "quote", @@ -2469,9 +2469,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.74" +version = "0.2.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7cff876b8f18eed75a66cf49b65e7f967cb354a7aa16003fb55dbfd25b44b4f" +checksum = "2e0c4a743a309662d45f4ede961d7afa4ba4131a59a639f29b0069c3798bbcc2" [[package]] name = "winapi" @@ -2506,9 +2506,9 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "xml-rs" -version = "0.8.3" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a" +checksum = "d2d7d3948613f75c98fd9328cfdcc45acc4d360655289d0a7d4ec931392200a3" [[package]] name = "zeroize" diff --git a/src/deltalake_ext.rs b/src/deltalake_ext.rs index 48c0bc6..62a21bf 100644 --- a/src/deltalake_ext.rs +++ b/src/deltalake_ext.rs @@ -12,11 +12,12 @@ use arrow::{ }; use deltalake::{ action::{Add, ColumnCountStat, ColumnValueStat, Stats}, + writer::time_utils::timestamp_to_delta_stats_string, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable, DeltaTableError, DeltaTransactionError, Schema, StorageBackend, StorageError, UriError, - writer::time_utils::timestamp_to_delta_stats_string }; use log::debug; +use parquet::basic::TimestampType; use parquet::{ arrow::ArrowWriter, basic::{Compression, LogicalType}, @@ -34,7 +35,6 @@ use std::convert::TryFrom; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use uuid::Uuid; -use parquet::basic::TimestampType; const NULL_PARTITION_VALUE_DATA_PATH: &str = "__HIVE_DEFAULT_PARTITION__"; @@ -139,7 +139,12 @@ impl DeltaArrowWriter { self.arrow_writer.write(&record_batch)?; self.buffered_record_batch_count += 1; - apply_null_counts(partition_columns, &record_batch.into(), &mut self.null_counts, 0); + apply_null_counts( + partition_columns, + &record_batch.into(), + &mut self.null_counts, + 0, + ); Ok(()) } } @@ -453,9 +458,7 @@ fn min_max_values_from_file_metadata( let statistics: Vec<&Statistics> = row_group_metadata .iter() - .filter_map(|g| - g.column(i).statistics() - ) + .filter_map(|g| g.column(i).statistics()) .collect(); let _ = apply_min_max_for_column( @@ -470,7 +473,12 @@ fn min_max_values_from_file_metadata( Ok((min_values, max_values)) } -fn apply_null_counts(partition_columns: &[String], array: &StructArray, null_counts: &mut HashMap, nest_level: i32) { +fn apply_null_counts( + partition_columns: &[String], + array: &StructArray, + null_counts: &mut HashMap, + nest_level: i32, +) { let fields = match array.data_type() { DataType::Struct(fields) => fields, _ => unreachable!(), @@ -497,7 +505,12 @@ fn apply_null_counts(partition_columns: &[String], array: &StructArray, null_cou match col_struct { ColumnCountStat::Column(map) => { - apply_null_counts(partition_columns, as_struct_array(column), map, nest_level + 1); + apply_null_counts( + partition_columns, + as_struct_array(column), + map, + nest_level + 1, + ); } _ => unreachable!(), } @@ -768,7 +781,8 @@ fn create_add( size: i64, file_metadata: &FileMetaData, ) -> Result { - let (min_values, max_values) = min_max_values_from_file_metadata(partition_values, file_metadata)?; + let (min_values, max_values) = + min_max_values_from_file_metadata(partition_values, file_metadata)?; let stats = Stats { num_records: file_metadata.num_rows, diff --git a/src/lib.rs b/src/lib.rs index e3b29d3..a527d79 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -592,14 +592,16 @@ impl KafkaJsonToDelta { return Ok(()); } Err(e) => match e { - DeltaTransactionError::VersionAlreadyExists { .. } - if attempt_number > DEFAULT_DELTA_MAX_RETRY_COMMIT_ATTEMPTS + 1 => - { + DeltaTableError::TransactionError { + source: DeltaTransactionError::VersionAlreadyExists { .. }, + } if attempt_number > DEFAULT_DELTA_MAX_RETRY_COMMIT_ATTEMPTS + 1 => { debug!("Transaction attempt failed. Attempts exhausted beyond max_retry_commit_attempts of {} so failing.", DEFAULT_DELTA_MAX_RETRY_COMMIT_ATTEMPTS); self.log_delta_write_failed().await; return Err(e.into()); } - DeltaTransactionError::VersionAlreadyExists { .. } => { + DeltaTableError::TransactionError { + source: DeltaTransactionError::VersionAlreadyExists { .. }, + } => { attempt_number += 1; debug!("Transaction attempt failed. Incrementing attempt number to {} and retrying.", attempt_number); } From fe294a36791d576fc2e98d5b15de2f2f071c9d4d Mon Sep 17 00:00:00 2001 From: mosyp Date: Thu, 12 Aug 2021 12:10:55 +0300 Subject: [PATCH 4/5] Fix post merge tests --- Cargo.lock | 2 +- src/deltalake_ext.rs | 2 +- src/transforms.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 01c8936..2962f53 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -374,7 +374,7 @@ dependencies = [ [[package]] name = "deltalake" version = "0.4.1" -source = "git+https://github.com/delta-io/delta-rs.git?branch=writer-map-support#d66239fd7189919ee9ce52cf7734a6ad6fb5eab5" +source = "git+https://github.com/delta-io/delta-rs.git?branch=writer-map-support#97597d881ba5666d353fd4c82c052ef1860f945f" dependencies = [ "anyhow", "arrow", diff --git a/src/deltalake_ext.rs b/src/deltalake_ext.rs index 62a21bf..701235a 100644 --- a/src/deltalake_ext.rs +++ b/src/deltalake_ext.rs @@ -889,7 +889,7 @@ mod tests { assert_eq!(add.len(), 1); let stats = add[0].get_stats().unwrap().unwrap(); - let min_max_keys = vec!["meta", "some_int", "some_string", "some_bool", "date"]; + let min_max_keys = vec!["meta", "some_int", "some_string", "some_bool"]; let mut null_count_keys = vec!["some_list", "some_nested_list"]; null_count_keys.extend_from_slice(min_max_keys.as_slice()); diff --git a/src/transforms.rs b/src/transforms.rs index 94730f8..10d5e47 100644 --- a/src/transforms.rs +++ b/src/transforms.rs @@ -646,7 +646,7 @@ mod tests { assert_eq!(0i64, kafka_offset); assert_eq!(0i64, kafka_partition); assert_eq!("test", kafka_topic); - assert_eq!(1626823098519000000, kafka_timestamp); + assert_eq!(1626823098519000, kafka_timestamp); assert_eq!(0, kafka_timestamp_type); } } From a831800ef919b7ca5e391da238d01cddd8333d8d Mon Sep 17 00:00:00 2001 From: mosyp Date: Thu, 12 Aug 2021 13:11:30 +0300 Subject: [PATCH 5/5] `Fix post merge tests` --- tests/integration_tests.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 6959c22..e22de3a 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -157,9 +157,9 @@ async fn e2e_smoke_test() { assert_eq!(row_num, r.get_int(1).unwrap()); assert_eq!("2021-03-01T14:38:58Z", r.get_string(2).unwrap()); assert_eq!("2021-03-01", r.get_string(3).unwrap()); - let kafka_timestamp = r.get_long(4).unwrap(); - assert!(start_time.timestamp_nanos() < kafka_timestamp); - assert!(chrono::Utc::now().timestamp_nanos() > kafka_timestamp); + let kafka_timestamp = r.get_timestamp_micros(4).unwrap() as i64; + assert!(start_time.timestamp_millis() * 1000 < kafka_timestamp); + assert!(chrono::Utc::now().timestamp_millis() * 1000 > kafka_timestamp); assert_eq!(0, r.get_int(5).unwrap()); }