diff --git a/Cargo.lock b/Cargo.lock index 054cfeb8..cf651e1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -661,7 +661,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "tracing", - "uuid 1.8.0", + "uuid", ] [[package]] @@ -1364,7 +1364,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "29c39203181991a7dd4343b8005bd804e7a9a37afb8ac070e43771e8c820bbde" dependencies = [ "chrono", - "chrono-tz-build", + "chrono-tz-build 0.0.3", + "phf", +] + +[[package]] +name = "chrono-tz" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e" +dependencies = [ + "chrono", + "chrono-tz-build 0.2.1", "phf", ] @@ -1379,6 +1390,17 @@ dependencies = [ "phf_codegen", ] +[[package]] +name = "chrono-tz-build" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "433e39f13c9a060046954e0592a8d0a4bcb1040125cbf91cb8ee58964cfb350f" +dependencies = [ + "parse-zoneinfo", + "phf", + "phf_codegen", +] + [[package]] name = "cipher" version = "0.3.0" @@ -1451,12 +1473,13 @@ checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" [[package]] name = "clickhouse-rs" -version = "1.0.0-alpha.1" -source = "git+https://github.com/suharev7/clickhouse-rs?rev=ecf28f4677#ecf28f46774773f39c74ee5213ad1e3ea240739b" +version = "1.1.0-alpha.1" +source = "git+https://github.com/suharev7/clickhouse-rs?rev=abfe517101eefe36bfdb7da248690659aa284ad3#abfe517101eefe36bfdb7da248690659aa284ad3" dependencies = [ "byteorder", + "cfg-if", "chrono", - "chrono-tz", + "chrono-tz 0.8.6", "clickhouse-rs-cityhash-sys", "combine", "crossbeam", @@ -1475,13 +1498,13 @@ dependencies = [ "tokio", "tokio-native-tls", "url", - "uuid 0.8.2", + "uuid", ] [[package]] name = "clickhouse-rs-cityhash-sys" version = "0.1.2" -source = "git+https://github.com/suharev7/clickhouse-rs?rev=ecf28f4677#ecf28f46774773f39c74ee5213ad1e3ea240739b" +source = "git+https://github.com/suharev7/clickhouse-rs?rev=abfe517101eefe36bfdb7da248690659aa284ad3#abfe517101eefe36bfdb7da248690659aa284ad3" dependencies = [ "cc", ] @@ -4298,7 +4321,7 @@ dependencies = [ "serde_cbor", "serde_json", "thiserror", - "uuid 1.8.0", + "uuid", ] [[package]] @@ -5288,7 +5311,7 @@ dependencies = [ "rkyv_derive", "seahash", "tinyvec", - "uuid 1.8.0", + "uuid", ] [[package]] @@ -6050,7 +6073,7 @@ dependencies = [ "supabase-wrappers-macros", "thiserror", "tokio", - "uuid 1.8.0", + "uuid", ] [[package]] @@ -6238,7 +6261,7 @@ dependencies = [ "rust_decimal", "thiserror", "tracing", - "uuid 1.8.0", + "uuid", "winauth", ] @@ -6693,12 +6716,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" -[[package]] -name = "uuid" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc5cf98d8186244414c848017f0e2676b3fcb46807f6668a97dfe67359a3c4b7" - [[package]] name = "uuid" version = "1.8.0" @@ -7651,10 +7668,11 @@ dependencies = [ "aws-sdk-cognitoidentityprovider", "aws-sdk-s3", "chrono", - "chrono-tz", + "chrono-tz 0.6.3", "clickhouse-rs", "csv", "dirs", + "either", "futures", "gcp-bigquery-client", "hex", diff --git a/docs/catalog/clickhouse.md b/docs/catalog/clickhouse.md index 39699660..608d650b 100644 --- a/docs/catalog/clickhouse.md +++ b/docs/catalog/clickhouse.md @@ -33,6 +33,7 @@ The ClickHouse Wrapper allows you to read and write data from ClickHouse within | text | String | | date | Date | | timestamp | DateTime | +| * | Nullable | ## Preparation diff --git a/wrappers/Cargo.toml b/wrappers/Cargo.toml index c8f3f577..0073bd9a 100644 --- a/wrappers/Cargo.toml +++ b/wrappers/Cargo.toml @@ -26,7 +26,7 @@ bigquery_fdw = [ "yup-oauth2", "thiserror", ] -clickhouse_fdw = ["clickhouse-rs", "chrono", "chrono-tz", "regex", "thiserror"] +clickhouse_fdw = ["clickhouse-rs", "chrono", "chrono-tz", "regex", "thiserror", "either"] stripe_fdw = [ "http", "reqwest", @@ -170,11 +170,12 @@ pgrx = { version = "=0.11.3" } supabase-wrappers = { path = "../supabase-wrappers", default-features = false } # for clickhouse_fdw -clickhouse-rs = { git = "https://github.com/suharev7/clickhouse-rs", rev = "ecf28f4677", features = [ +clickhouse-rs = { git = "https://github.com/suharev7/clickhouse-rs", rev = "abfe517101eefe36bfdb7da248690659aa284ad3", features = [ "tls", ], optional = true } chrono = { version = "0.4", optional = true } chrono-tz = { version = "0.6", optional = true } +either = { version = "1.12.0", optional = true } # for bigquery_fdw, firebase_fdw, airtable_fdw and etc. diff --git a/wrappers/src/fdw/clickhouse_fdw/README.md b/wrappers/src/fdw/clickhouse_fdw/README.md index 2ccff02c..92a555b1 100644 --- a/wrappers/src/fdw/clickhouse_fdw/README.md +++ b/wrappers/src/fdw/clickhouse_fdw/README.md @@ -11,6 +11,7 @@ This is a foreign data wrapper for [ClickHouse](https://clickhouse.com/). It is | Version | Date | Notes | | ------- | ---------- | ---------------------------------------------------- | +| 0.1.4 | 2024-09-10 | Added Nullable type suppport | | 0.1.3 | 2023-07-17 | Added sort and limit pushdown suppport | | 0.1.2 | 2023-07-13 | Added fdw stats collection | | 0.1.1 | 2023-05-19 | Added custom sql support | diff --git a/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs b/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs index e069e251..d13b817a 100644 --- a/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs +++ b/wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs @@ -1,7 +1,6 @@ use crate::stats; #[allow(deprecated)] -use chrono::{Date, DateTime, Datelike, NaiveDate, NaiveDateTime, Utc}; -use chrono_tz::Tz; +use chrono::{DateTime, Datelike, NaiveDate, NaiveDateTime, Utc}; use clickhouse_rs::{types, types::Block, types::SqlType, ClientHandle, Pool}; use pgrx::to_timestamp; use regex::{Captures, Regex}; @@ -56,8 +55,7 @@ fn field_to_cell(row: &types::Row, i: usize) -> ClickHouseFdwRes Ok(Some(Cell::String(value))) } SqlType::Date => { - #[allow(deprecated)] - let value = row.get::, usize>(i)?; + let value = row.get::(i)?; let dt = pgrx::Date::new(value.year(), value.month() as u8, value.day() as u8)?; Ok(Some(Cell::Date(dt))) } @@ -66,6 +64,65 @@ fn field_to_cell(row: &types::Row, i: usize) -> ClickHouseFdwRes let ts = to_timestamp(value.timestamp() as f64); Ok(Some(Cell::Timestamp(ts.to_utc()))) } + SqlType::Nullable(v) => match v { + SqlType::UInt8 => { + let value = row.get::, usize>(i)?; + Ok(value.map(|t| Cell::Bool(t != 0))) + } + SqlType::Int16 => { + let value = row.get::, usize>(i)?; + Ok(value.map(Cell::I16)) + } + SqlType::UInt16 => { + let value = row.get::, usize>(i)?; + Ok(value.map(|t| Cell::I32(t as _))) + } + SqlType::Int32 => { + let value = row.get::, usize>(i)?; + Ok(value.map(Cell::I32)) + } + SqlType::UInt32 => { + let value = row.get::, usize>(i)?; + Ok(value.map(|t| Cell::I64(t as _))) + } + SqlType::Float32 => { + let value = row.get::, usize>(i)?; + Ok(value.map(Cell::F32)) + } + SqlType::Float64 => { + let value = row.get::, usize>(i)?; + Ok(value.map(Cell::F64)) + } + SqlType::UInt64 => { + let value = row.get::, usize>(i)?; + Ok(value.map(|t| Cell::I64(t as _))) + } + SqlType::Int64 => { + let value = row.get::, usize>(i)?; + Ok(value.map(Cell::I64)) + } + SqlType::String => { + let value = row.get::, usize>(i)?; + Ok(value.map(Cell::String)) + } + SqlType::Date => { + let value = row.get::, usize>(i)?; + Ok(value + .map(|t| pgrx::Date::new(t.year(), t.month() as u8, t.day() as u8)) + .transpose()? + .map(Cell::Date)) + } + SqlType::DateTime(_) => { + let value = row.get::>, usize>(i)?; + Ok(value.map(|t| { + let ts = to_timestamp(t.timestamp() as f64); + Cell::Timestamp(ts.to_utc()) + })) + } + _ => Err(ClickHouseFdwError::UnsupportedColumnType( + sql_type.to_string().into(), + )), + }, _ => Err(ClickHouseFdwError::UnsupportedColumnType( sql_type.to_string().into(), )), @@ -73,7 +130,7 @@ fn field_to_cell(row: &types::Row, i: usize) -> ClickHouseFdwRes } #[wrappers_fdw( - version = "0.1.3", + version = "0.1.4", author = "Supabase", website = "https://github.com/supabase/wrappers/tree/main/wrappers/src/fdw/clickhouse_fdw", error_type = "ClickHouseFdwError" @@ -283,9 +340,6 @@ impl ForeignDataWrapper for ClickHouseFdw { .unwrap(); let cell = field_to_cell(&src_row, i)?; let col_name = src_row.name(i).unwrap(); - if cell.as_ref().is_none() { - return Ok(None); - } row.push(col_name, cell); } self.row_idx += 1; @@ -310,33 +364,84 @@ impl ForeignDataWrapper for ClickHouseFdw { fn insert(&mut self, src: &Row) -> ClickHouseFdwResult<()> { if let Some(ref mut client) = self.client { + // use a dummy query to probe column types + let sql = format!("select * from {} where false", self.table); + let probe = self.rt.block_on(client.query(&sql).fetch_all())?; + + // add row to block let mut row = Vec::new(); for (col_name, cell) in src.iter() { let col_name = col_name.to_owned(); - if let Some(cell) = cell { - match cell { - Cell::Bool(v) => row.push((col_name, types::Value::from(*v))), - Cell::F64(v) => row.push((col_name, types::Value::from(*v))), - Cell::I64(v) => row.push((col_name, types::Value::from(*v))), - Cell::String(v) => row.push((col_name, types::Value::from(v.as_str()))), + let tgt_col = probe.get_column(col_name.as_ref())?; + let is_nullable = matches!(tgt_col.sql_type(), SqlType::Nullable(_)); + + let value = cell + .as_ref() + .map(|c| match c { + Cell::Bool(v) => { + let val = if is_nullable { + types::Value::from(Some(*v)) + } else { + types::Value::from(*v) + }; + Ok(val) + } + Cell::F64(v) => { + let val = if is_nullable { + types::Value::from(Some(*v)) + } else { + types::Value::from(*v) + }; + Ok(val) + } + Cell::I64(v) => { + let val = if is_nullable { + types::Value::from(Some(*v)) + } else { + types::Value::from(*v) + }; + Ok(val) + } + Cell::String(v) => { + let s = v.as_str(); + let val = if is_nullable { + types::Value::from(Some(s)) + } else { + types::Value::from(s) + }; + Ok(val) + } Cell::Date(_) => { - let s = cell.to_string().replace('\'', ""); + let s = c.to_string().replace('\'', ""); let tm = NaiveDate::parse_from_str(&s, "%Y-%m-%d")?; let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); let duration = tm - epoch; - let dt = types::Value::Date(duration.num_days() as u16, Tz::UTC); - row.push((col_name, dt)); + let dt = duration.num_days() as u16; + let val = if is_nullable { + types::Value::from(Some(dt)) + } else { + types::Value::Date(dt) + }; + Ok(val) } Cell::Timestamp(_) => { - let s = cell.to_string().replace('\'', ""); - let tm = NaiveDateTime::parse_from_str(&s, "%Y-%m-%d %H:%M:%S")?; - let tm: DateTime = DateTime::from_naive_utc_and_offset(tm, Utc); - row.push((col_name, types::Value::from(tm))); - } - _ => { - return Err(ClickHouseFdwError::UnsupportedColumnType(cell.to_string())) + let s = c.to_string().replace('\'', ""); + let naive_tm = NaiveDateTime::parse_from_str(&s, "%Y-%m-%d %H:%M:%S")?; + let tm: DateTime = + DateTime::from_naive_utc_and_offset(naive_tm, Utc); + let val = if is_nullable { + types::Value::Nullable(either::Either::Right(Box::new(tm.into()))) + } else { + types::Value::from(tm) + }; + Ok(val) } - } + _ => Err(ClickHouseFdwError::UnsupportedColumnType(c.to_string())), + }) + .transpose()?; + + if let Some(v) = value { + row.push((col_name, v)); } } let mut block = Block::new(); diff --git a/wrappers/src/fdw/clickhouse_fdw/tests.rs b/wrappers/src/fdw/clickhouse_fdw/tests.rs index 4a68868a..c0650bec 100644 --- a/wrappers/src/fdw/clickhouse_fdw/tests.rs +++ b/wrappers/src/fdw/clickhouse_fdw/tests.rs @@ -19,7 +19,9 @@ mod tests { rt.block_on(async { handle.execute("DROP TABLE IF EXISTS test_table").await?; handle - .execute("CREATE TABLE test_table (id INT, name TEXT) engine = Memory") + .execute( + "CREATE TABLE test_table (id Int64, name Nullable(TEXT)) engine = Memory", + ) .await }) .expect("test_table in ClickHouse"); @@ -133,6 +135,18 @@ mod tests { )]), ) .unwrap(); + c.update( + "INSERT INTO test_table (id, name) VALUES ($1, $2)", + None, + Some(vec![ + (PgOid::BuiltIn(PgBuiltInOids::INT4OID), 42.into_datum()), + ( + PgOid::BuiltIn(PgBuiltInOids::TEXTOID), + None::.into_datum(), + ), + ]), + ) + .unwrap(); assert_eq!( c.select("SELECT name FROM test_table ORDER BY name", None, None) .unwrap() @@ -230,7 +244,7 @@ mod tests { "test3" ); - let remote_value: String = rt + let remote_value: Option = rt .block_on(async { handle .query("SELECT name FROM test_table ORDER BY name LIMIT 1") @@ -242,7 +256,7 @@ mod tests { .get("name") }) .expect("value"); - assert_eq!(remote_value, "test"); + assert_eq!(remote_value, Some("test".to_string())); }); } }