diff --git a/Cargo.lock b/Cargo.lock index 82386342d1618..1a588a9611334 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,6 +94,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", + "const-random", "getrandom 0.2.15", "once_cell", "serde", @@ -338,6 +339,175 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "arrow" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e833808ff2d94ed40d9379848a950d995043c7fb3e81a30b383f4c6033821cc" +dependencies = [ + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", +] + +[[package]] +name = "arrow-arith" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad08897b81588f60ba983e3ca39bda2b179bdd84dced378e7df81a5313802ef8" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "num", +] + +[[package]] +name = "arrow-array" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8548ca7c070d8db9ce7aa43f37393e4bfcf3f2d3681df278490772fd1673d08d" +dependencies = [ + "ahash 0.8.11", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "hashbrown 0.16.0", + "num", +] + +[[package]] +name = "arrow-buffer" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e003216336f70446457e280807a73899dd822feaf02087d31febca1363e2fccc" +dependencies = [ + "bytes 1.10.1", + "half", + "num", +] + +[[package]] +name = "arrow-cast" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "919418a0681298d3a77d1a315f625916cb5678ad0d74b9c60108eb15fd083023" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "atoi", + "base64 0.22.1", + "chrono", + "half", + "lexical-core", + "num", + "ryu", +] + +[[package]] +name = "arrow-data" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5c64fff1d142f833d78897a772f2e5b55b36cb3e6320376f0961ab0db7bd6d0" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half", + "num", +] + +[[package]] +name = "arrow-ipc" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d3594dcddccc7f20fd069bc8e9828ce37220372680ff638c5e00dea427d88f5" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "flatbuffers", +] + +[[package]] +name = "arrow-ord" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c8f82583eb4f8d84d4ee55fd1cb306720cddead7596edce95b50ee418edf66f" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", +] + +[[package]] +name = "arrow-row" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d07ba24522229d9085031df6b94605e0f4b26e099fb7cdeec37abd941a73753" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "half", +] + +[[package]] +name = "arrow-schema" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3aa9e59c611ebc291c28582077ef25c97f1975383f1479b12f3b9ffee2ffabe" + +[[package]] +name = "arrow-select" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c41dbbd1e97bfcaee4fcb30e29105fb2c75e4d82ae4de70b792a5d3f66b2e7a" +dependencies = [ + "ahash 0.8.11", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num", +] + +[[package]] +name = "arrow-string" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53f5183c150fbc619eede22b861ea7c0eebed8eaac0333eaa7f6da5205fd504d" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "memchr", + "num", + "regex", + "regex-syntax", +] + [[package]] name = "ascii" version = "0.9.3" @@ -1875,6 +2045,29 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3eeab4423108c5d7c744f4d234de88d18d636100093ae04caf4825134b9c3a32" +[[package]] +name = "borsh" +version = "1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad8646f98db542e39fc66e68a20b2144f6a732636df7c2354e74645faaa433ce" +dependencies = [ + "borsh-derive", + "cfg_aliases", +] + +[[package]] +name = "borsh-derive" +version = "1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd1d3c0c2f5833f22386f252fe8ed005c7f59fdcddeef025c01b4c3b9fd9ac3" +dependencies = [ + "once_cell", + "proc-macro-crate 3.2.0", + "proc-macro2 1.0.101", + "quote 1.0.40", + "syn 2.0.106", +] + [[package]] name = "brotli" version = "8.0.0" @@ -2527,6 +2720,26 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28c122c3980598d243d63d9a704629a2d748d101f278052ff068be5a4423ab6f" +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.15", + "once_cell", + "tiny-keccak", +] + [[package]] name = "convert_case" version = "0.4.0" @@ -3848,6 +4061,16 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flatbuffers" +version = "25.9.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09b6620799e7340ebd9968d2e0708eb82cf1971e9a16821e2091b6d6e475eed5" +dependencies = [ + "bitflags 2.9.0", + "rustc_version 0.4.1", +] + [[package]] name = "flate2" version = "1.1.2" @@ -4418,6 +4641,7 @@ checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" dependencies = [ "cfg-if", "crunchy", + "num-traits", ] [[package]] @@ -4471,6 +4695,12 @@ dependencies = [ "foldhash", ] +[[package]] +name = "hashbrown" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d" + [[package]] name = "hashlink" version = "0.10.0" @@ -5885,6 +6115,63 @@ dependencies = [ "spin 0.5.2", ] +[[package]] +name = "lexical-core" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d8d125a277f807e55a77304455eb7b1cb52f2b18c143b60e766c120bd64a594" +dependencies = [ + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", +] + +[[package]] +name = "lexical-parse-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52a9f232fbd6f550bc0137dcb5f99ab674071ac2d690ac69704593cb4abbea56" +dependencies = [ + "lexical-parse-integer", + "lexical-util", +] + +[[package]] +name = "lexical-parse-integer" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a7a039f8fb9c19c996cd7b2fcce303c1b2874fe1aca544edc85c4a5f8489b34" +dependencies = [ + "lexical-util", +] + +[[package]] +name = "lexical-util" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2604dd126bb14f13fb5d1bd6a66155079cb9fa655b37f875b3a742c705dbed17" + +[[package]] +name = "lexical-write-float" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50c438c87c013188d415fbabbb1dceb44249ab81664efbd31b14ae55dabb6361" +dependencies = [ + "lexical-util", + "lexical-write-integer", +] + +[[package]] +name = "lexical-write-integer" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "409851a618475d2d5796377cad353802345cba92c867d9fbcde9cf4eac4e14df" +dependencies = [ + "lexical-util", +] + [[package]] name = "libc" version = "0.2.175" @@ -9156,7 +9443,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06676aec5ccb8fc1da723cc8c0f9a46549f21ebb8753d3915c6c41db1e7f1dc4" dependencies = [ "arrayvec", + "borsh", + "bytes 1.10.1", "num-traits", + "rand 0.8.5", + "rkyv", + "serde", + "serde_json", ] [[package]] @@ -10847,6 +11140,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinystr" version = "0.7.6" @@ -12054,6 +12356,8 @@ dependencies = [ "approx", "arc-swap", "arr_macro", + "arrow", + "arrow-schema", "assert_cmd", "async-compression", "async-graphql", @@ -12194,6 +12498,7 @@ dependencies = [ "roaring", "rstest", "rumqttc", + "rust_decimal", "seahash", "semver 1.0.26", "serde", diff --git a/Cargo.toml b/Cargo.toml index 3b5d6c81c6640..2f667655897da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -179,6 +179,7 @@ rand = { version = "0.9.2", default-features = false, features = ["small_rng", " rand_distr = { version = "0.5.1", default-features = false } regex = { version = "1.11.2", default-features = false, features = ["std", "perf"] } reqwest = { version = "0.11.26", features = ["json"] } +rust_decimal = { version = "1.33", default-features = false, features = ["std"] } semver = { version = "1.0.26", default-features = false, features = ["serde", "std"] } serde = { version = "1.0.219", default-features = false, features = ["alloc", "derive", "rc"] } serde_json = { version = "1.0.143", default-features = false, features = ["raw_value", "std"] } @@ -337,6 +338,8 @@ greptimedb-ingester = { git = "https://github.com/GreptimeTeam/greptimedb-ingest arc-swap = { version = "1.7", default-features = false, optional = true } async-compression = { version = "0.4.27", default-features = false, features = ["tokio", "gzip", "zstd"], optional = true } apache-avro = { version = "0.16.0", default-features = false, optional = true } +arrow = { version = "56.2.0", default-features = false, features = ["ipc"], optional = true } +arrow-schema = { version = "56.2.0", default-features = false, optional = true } axum = { version = "0.6.20", default-features = false } base64 = { workspace = true, optional = true } bloomy = { version = "1.2.0", default-features = false, optional = true } @@ -402,6 +405,7 @@ redis = { version = "0.32.4", default-features = false, features = ["connection- regex.workspace = true roaring = { version = "0.11.2", default-features = false, features = ["std"], optional = true } rumqttc = { version = "0.24.0", default-features = false, features = ["use-rustls"], optional = true } +rust_decimal = { workspace = true, optional = true } seahash = { version = "4.1.0", default-features = false } smallvec = { version = "1", default-features = false, features = ["union", "serde"] } snap = { version = "1.1.1", default-features = false } @@ -841,7 +845,7 @@ sinks-azure_blob = ["dep:azure_core", "dep:azure_identity", "dep:azure_storage", sinks-azure_monitor_logs = [] sinks-blackhole = [] sinks-chronicle = [] -sinks-clickhouse = [] +sinks-clickhouse = ["dep:arrow", "dep:arrow-schema", "dep:rust_decimal"] sinks-console = [] sinks-databend = ["dep:databend-client"] sinks-datadog_events = [] diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index c6050f1474c11..918b2cd1c3a32 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -26,6 +26,7 @@ arbitrary,https://github.com/rust-fuzz/arbitrary,MIT OR Apache-2.0,"The Rust-Fuz arc-swap,https://github.com/vorner/arc-swap,MIT OR Apache-2.0,Michal 'vorner' Vaner arr_macro,https://github.com/JoshMcguigan/arr_macro,MIT OR Apache-2.0,Josh Mcguigan arrayvec,https://github.com/bluss/arrayvec,MIT OR Apache-2.0,bluss +arrow,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow ascii,https://github.com/tomprogrammer/rust-ascii,Apache-2.0 OR MIT,"Thomas Bahn , Torbjørn Birch Moltu , Simon Sapin " async-broadcast,https://github.com/smol-rs/async-broadcast,MIT OR Apache-2.0,"Stjepan Glavina , Yoshua Wuyts , Zeeshan Ali Khan " async-channel,https://github.com/smol-rs/async-channel,Apache-2.0 OR MIT,Stjepan Glavina @@ -106,6 +107,8 @@ blocking,https://github.com/smol-rs/blocking,Apache-2.0 OR MIT,Stjepan Glavina < bloomy,https://docs.rs/bloomy/,MIT,"Aleksandr Bezobchuk , Alexis Sellier " bollard,https://github.com/fussybeaver/bollard,Apache-2.0,Bollard contributors borrow-or-share,https://github.com/yescallop/borrow-or-share,MIT-0,Scallop Ye +borsh,https://github.com/near/borsh-rs,MIT OR Apache-2.0,Near Inc +borsh-derive,https://github.com/near/borsh-rs,Apache-2.0,Near Inc brotli,https://github.com/dropbox/rust-brotli,BSD-3-Clause AND MIT,"Daniel Reiter Horn , The Brotli Authors" brotli-decompressor,https://github.com/dropbox/rust-brotli-decompressor,BSD-3-Clause OR MIT,"Daniel Reiter Horn , The Brotli Authors" bson,https://github.com/mongodb/bson-rust,MIT,"Y. T. Chung , Kevin Yeh , Saghm Rossi , Patrick Freed , Isabel Atkinson , Abraham Egnor " @@ -147,6 +150,8 @@ community-id,https://github.com/traceflight/rs-community-id,MIT OR Apache-2.0,Ju compact_str,https://github.com/ParkMyCar/compact_str,MIT,Parker Timmerman concurrent-queue,https://github.com/smol-rs/concurrent-queue,Apache-2.0 OR MIT,"Stjepan Glavina , Taiki Endo , John Nunley " const-oid,https://github.com/RustCrypto/formats/tree/master/const-oid,Apache-2.0 OR MIT,RustCrypto Developers +const-random,https://github.com/tkaitchuck/constrandom,MIT OR Apache-2.0,Tom Kaitchuck +const-random-macro,https://github.com/tkaitchuck/constrandom,MIT OR Apache-2.0,Tom Kaitchuck convert_case,https://github.com/rutrum/convert-case,MIT,David Purdum convert_case,https://github.com/rutrum/convert-case,MIT,rutrum cookie,https://github.com/SergioBenitez/cookie-rs,MIT OR Apache-2.0,"Sergio Benitez , Alex Crichton " @@ -236,6 +241,7 @@ fastrand,https://github.com/smol-rs/fastrand,Apache-2.0 OR MIT,Stjepan Glavina < ff,https://github.com/zkcrypto/ff,MIT OR Apache-2.0,"Sean Bowe , Jack Grigg " fiat-crypto,https://github.com/mit-plv/fiat-crypto,MIT OR Apache-2.0 OR BSD-1-Clause,Fiat Crypto library authors finl_unicode,https://github.com/dahosek/finl_unicode,MIT OR Apache-2.0,The finl_unicode Authors +flatbuffers,https://github.com/google/flatbuffers,Apache-2.0,"Robert Winslow , FlatBuffers Maintainers" flate2,https://github.com/rust-lang/flate2-rs,MIT OR Apache-2.0,"Alex Crichton , Josh Triplett " float_eq,https://github.com/jtempest/float_eq-rs,MIT OR Apache-2.0,jtempest fluent-uri,https://github.com/yescallop/fluent-uri-rs,MIT,Scallop Ye @@ -362,6 +368,12 @@ kube,https://github.com/kube-rs/kube,Apache-2.0,"clux , Nata lalrpop-util,https://github.com/lalrpop/lalrpop,Apache-2.0 OR MIT,Niko Matsakis lapin,https://github.com/amqp-rs/lapin,MIT,"Geoffroy Couprie , Marc-Antoine Perennou " lazy_static,https://github.com/rust-lang-nursery/lazy-static.rs,MIT OR Apache-2.0,Marvin Löbel +lexical-core,https://github.com/Alexhuszagh/rust-lexical,MIT OR Apache-2.0,Alex Huszagh +lexical-parse-float,https://github.com/Alexhuszagh/rust-lexical,MIT OR Apache-2.0,Alex Huszagh +lexical-parse-integer,https://github.com/Alexhuszagh/rust-lexical,MIT OR Apache-2.0,Alex Huszagh +lexical-util,https://github.com/Alexhuszagh/rust-lexical,MIT OR Apache-2.0,Alex Huszagh +lexical-write-float,https://github.com/Alexhuszagh/rust-lexical,MIT OR Apache-2.0,Alex Huszagh +lexical-write-integer,https://github.com/Alexhuszagh/rust-lexical,MIT OR Apache-2.0,Alex Huszagh libc,https://github.com/rust-lang/libc,MIT OR Apache-2.0,The Rust Project Developers libflate,https://github.com/sile/libflate,MIT,Takeru Ohta libm,https://github.com/rust-lang/libm,MIT OR Apache-2.0,Jorge Aparicio @@ -658,6 +670,7 @@ thread_local,https://github.com/Amanieu/thread_local-rs,MIT OR Apache-2.0,Amanie tikv-jemalloc-sys,https://github.com/tikv/jemallocator,MIT OR Apache-2.0,"Alex Crichton , Gonzalo Brito Gadeschi , The TiKV Project Developers" tikv-jemallocator,https://github.com/tikv/jemallocator,MIT OR Apache-2.0,"Alex Crichton , Gonzalo Brito Gadeschi , Simon Sapin , Steven Fackler , The TiKV Project Developers" time,https://github.com/time-rs/time,MIT OR Apache-2.0,"Jacob Pratt , Time contributors" +tiny-keccak,https://github.com/debris/tiny-keccak,CC0-1.0,debris tinystr,https://github.com/unicode-org/icu4x,Unicode-3.0,The ICU4X Project Developers tinyvec,https://github.com/Lokathor/tinyvec,Zlib OR Apache-2.0 OR MIT,Lokathor tinyvec_macros,https://github.com/Soveu/tinyvec_macros,MIT OR Apache-2.0 OR Zlib,Soveu diff --git a/changelog.d/24074_clickhouse_arrow_format.enhancement.md b/changelog.d/24074_clickhouse_arrow_format.enhancement.md new file mode 100644 index 0000000000000..9b2ffff1f4bfe --- /dev/null +++ b/changelog.d/24074_clickhouse_arrow_format.enhancement.md @@ -0,0 +1,3 @@ +The `clickhouse` sink now supports the `arrow_stream` format option, enabling high-performance binary data transfer using Apache Arrow IPC via Clickhouse's ArrowStream format endpoint. This provides significantly better performance and smaller payload sizes compared to JSON-based formats. + +authors: benjamin-awd diff --git a/src/sinks/clickhouse/arrow_schema.rs b/src/sinks/clickhouse/arrow_schema.rs new file mode 100644 index 0000000000000..4a9242abcd8cf --- /dev/null +++ b/src/sinks/clickhouse/arrow_schema.rs @@ -0,0 +1,513 @@ +//! Schema fetching and Arrow type mapping for ClickHouse tables. + +use std::sync::Arc; + +use ::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; +use http::{Request, StatusCode}; +use hyper::Body; +use serde::Deserialize; + +use crate::http::{Auth, HttpClient}; + +const DECIMAL32_PRECISION: u8 = 9; +const DECIMAL64_PRECISION: u8 = 18; +const DECIMAL128_PRECISION: u8 = 38; +const DECIMAL256_PRECISION: u8 = 76; + +#[derive(Debug, Deserialize)] +struct ColumnInfo { + name: String, + #[serde(rename = "type")] + column_type: String, +} + +/// Fetches the schema for a ClickHouse table and converts it to an Arrow schema. +pub async fn fetch_table_schema( + client: &HttpClient, + endpoint: &str, + database: &str, + table: &str, + auth: Option<&Auth>, +) -> crate::Result> { + // Query to get table schema + let query = format!( + "SELECT name, type FROM system.columns WHERE database = '{}' AND table = '{}' FORMAT JSONEachRow", + database, table + ); + + let encoded_query = + percent_encoding::utf8_percent_encode(&query, percent_encoding::NON_ALPHANUMERIC) + .to_string(); + let uri = format!("{}?query={}", endpoint, encoded_query); + let mut request = Request::get(&uri).body(Body::empty()).unwrap(); + + if let Some(auth) = auth { + auth.apply(&mut request); + } + + let response = client.send(request).await?; + + match response.status() { + StatusCode::OK => { + let body_bytes = hyper::body::to_bytes(response.into_body()).await?; + let body_str = String::from_utf8(body_bytes.into()) + .map_err(|e| format!("Failed to parse response as UTF-8: {}", e))?; + + parse_schema_from_response(&body_str) + } + status => Err(format!("Failed to fetch schema from ClickHouse: HTTP {}", status).into()), + } +} + +/// Parses the JSON response from ClickHouse and builds an Arrow schema. +fn parse_schema_from_response(response: &str) -> crate::Result> { + let mut fields = Vec::new(); + + for line in response.lines() { + if line.trim().is_empty() { + continue; + } + + let column: ColumnInfo = serde_json::from_str(line) + .map_err(|e| format!("Failed to parse column info: {}", e))?; + + let arrow_type = clickhouse_type_to_arrow(&column.column_type); + fields.push(Field::new(&column.name, arrow_type, true)); + } + + if fields.is_empty() { + return Err("No columns found in table schema".into()); + } + + Ok(Arc::new(Schema::new(fields))) +} + +/// Unwraps ClickHouse type modifiers like Nullable() and LowCardinality(). +/// For example: "Nullable(LowCardinality(String))" -> "String" +fn unwrap_type_modifiers(ch_type: &str) -> &str { + let mut base = ch_type; + for prefix in ["Nullable(", "LowCardinality("] { + if let Some(inner) = base.strip_prefix(prefix) { + base = inner.strip_suffix(')').unwrap_or(inner); + } + } + base +} + +fn clickhouse_type_to_arrow(ch_type: &str) -> DataType { + let base_type = unwrap_type_modifiers(ch_type); + + match base_type { + // String types + "String" => DataType::Utf8, + _ if base_type.starts_with("FixedString") => DataType::Utf8, + + // Integer types + "Int8" => DataType::Int8, + "Int16" => DataType::Int16, + "Int32" => DataType::Int32, + "Int64" => DataType::Int64, + "UInt8" => DataType::UInt8, + "UInt16" => DataType::UInt16, + "UInt32" => DataType::UInt32, + "UInt64" => DataType::UInt64, + + // Floating point types + "Float32" => DataType::Float32, + "Float64" => DataType::Float64, + + // Boolean + "Bool" => DataType::Boolean, + + // Date and time types (timezones not currently handled, defaults to UTC) + "Date" | "Date32" => DataType::Date32, + "DateTime" => DataType::Timestamp(TimeUnit::Second, None), + _ if base_type.starts_with("DateTime64") => parse_datetime64_precision(base_type), + + // Decimal types + _ if base_type.starts_with("Decimal") => parse_decimal_type(base_type), + + // Complex types (fallback to Utf8 for now) + _ if base_type.starts_with("Array") => DataType::Utf8, + _ if base_type.starts_with("Map") => DataType::Utf8, + _ if base_type.starts_with("Tuple") => DataType::Utf8, + + // Unknown types + _ => { + warn!("Unknown ClickHouse type '{}', defaulting to Utf8", ch_type); + DataType::Utf8 + } + } +} + +/// Extracts an identifier from the start of a string. +/// Returns (identifier, remaining_string). +fn extract_identifier(input: &str) -> (&str, &str) { + for (i, c) in input.char_indices() { + if c.is_alphabetic() || c == '_' || (i > 0 && c.is_numeric()) { + continue; + } + return (&input[..i], &input[i..]); + } + (input, "") +} + +/// Parses comma-separated arguments from a parenthesized string. +/// Input: "(arg1, arg2, arg3)" -> Output: Ok(vec!["arg1".to_string(), "arg2".to_string(), "arg3".to_string()]) +/// Returns an error if parentheses are malformed. +fn parse_args(input: &str) -> Result, String> { + let trimmed = input.trim(); + if !trimmed.starts_with('(') || !trimmed.ends_with(')') { + return Err(format!( + "Expected parentheses around arguments in '{}'", + input + )); + } + + let inner = trimmed[1..trimmed.len() - 1].trim(); + if inner.is_empty() { + return Ok(vec![]); + } + + // Split by comma, handling nested parentheses and quotes + let mut args = Vec::new(); + let mut current_arg = String::new(); + let mut depth = 0; + let mut in_quotes = false; + + for c in inner.chars() { + match c { + '\'' if !in_quotes => in_quotes = true, + '\'' if in_quotes => in_quotes = false, + '(' if !in_quotes => depth += 1, + ')' if !in_quotes => depth -= 1, + ',' if depth == 0 && !in_quotes => { + args.push(current_arg.trim().to_string()); + current_arg = String::new(); + continue; + } + _ => {} + } + current_arg.push(c); + } + + if !current_arg.trim().is_empty() { + args.push(current_arg.trim().to_string()); + } + + Ok(args) +} + +/// Parses ClickHouse Decimal types and returns the appropriate Arrow decimal type. +/// ClickHouse formats: +/// - Decimal(P, S) -> generic decimal with precision P and scale S +/// - Decimal32(S) -> precision up to 9, scale S +/// - Decimal64(S) -> precision up to 18, scale S +/// - Decimal128(S) -> precision up to 38, scale S +/// - Decimal256(S) -> precision up to 76, scale S +fn parse_decimal_type(ch_type: &str) -> DataType { + let (type_name, args_str) = extract_identifier(ch_type); + + let result = parse_args(args_str).ok().and_then(|args| match type_name { + "Decimal" if args.len() == 2 => args[0].parse::().ok().zip(args[1].parse::().ok()), + "Decimal32" | "Decimal64" | "Decimal128" | "Decimal256" if args.len() == 1 => { + args[0].parse::().ok().map(|scale| { + let precision = match type_name { + "Decimal32" => DECIMAL32_PRECISION, + "Decimal64" => DECIMAL64_PRECISION, + "Decimal128" => DECIMAL128_PRECISION, + "Decimal256" => DECIMAL256_PRECISION, + _ => unreachable!(), + }; + (precision, scale) + }) + } + _ => None, + }); + + result + .map(|(precision, scale)| { + if precision <= DECIMAL128_PRECISION { + DataType::Decimal128(precision, scale) + } else { + DataType::Decimal256(precision, scale) + } + }) + .unwrap_or_else(|| { + warn!("Could not parse Decimal type '{}'", ch_type); + DataType::Null + }) +} + +/// Parses DateTime64 precision and returns the appropriate Arrow timestamp type. +/// DateTime64(0) -> Second +/// DateTime64(3) -> Millisecond +/// DateTime64(6) -> Microsecond +/// DateTime64(9) -> Nanosecond +fn parse_datetime64_precision(ch_type: &str) -> DataType { + let (_type_name, args_str) = extract_identifier(ch_type); + + let args = match parse_args(args_str) { + Ok(args) => args, + Err(_) => { + warn!("Could not parse DateTime64 precision from '{}'", ch_type); + return DataType::Null; + } + }; + + // DateTime64(precision) or DateTime64(precision, 'timezone') + if args.is_empty() { + warn!("Could not parse DateTime64 precision from '{}'", ch_type); + return DataType::Null; + } + + // Parse the precision (first argument) + match args[0].parse::() { + Ok(0) => DataType::Timestamp(TimeUnit::Second, None), + Ok(1..=3) => DataType::Timestamp(TimeUnit::Millisecond, None), + Ok(4..=6) => DataType::Timestamp(TimeUnit::Microsecond, None), + Ok(7..=9) => DataType::Timestamp(TimeUnit::Nanosecond, None), + _ => { + warn!("Unsupported DateTime64 precision in '{}'", ch_type); + DataType::Null + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_clickhouse_type_mapping() { + assert_eq!(clickhouse_type_to_arrow("String"), DataType::Utf8); + assert_eq!(clickhouse_type_to_arrow("Int64"), DataType::Int64); + assert_eq!(clickhouse_type_to_arrow("Float64"), DataType::Float64); + assert_eq!(clickhouse_type_to_arrow("Bool"), DataType::Boolean); + assert_eq!( + clickhouse_type_to_arrow("DateTime"), + DataType::Timestamp(TimeUnit::Second, None) + ); + } + + #[test] + fn test_datetime64_precision_mapping() { + assert_eq!( + clickhouse_type_to_arrow("DateTime64(0)"), + DataType::Timestamp(TimeUnit::Second, None) + ); + assert_eq!( + clickhouse_type_to_arrow("DateTime64(3)"), + DataType::Timestamp(TimeUnit::Millisecond, None) + ); + assert_eq!( + clickhouse_type_to_arrow("DateTime64(6)"), + DataType::Timestamp(TimeUnit::Microsecond, None) + ); + assert_eq!( + clickhouse_type_to_arrow("DateTime64(9)"), + DataType::Timestamp(TimeUnit::Nanosecond, None) + ); + // Test with timezones + assert_eq!( + clickhouse_type_to_arrow("DateTime64(9, 'UTC')"), + DataType::Timestamp(TimeUnit::Nanosecond, None) + ); + assert_eq!( + clickhouse_type_to_arrow("DateTime64(6, 'UTC')"), + DataType::Timestamp(TimeUnit::Microsecond, None) + ); + assert_eq!( + clickhouse_type_to_arrow("DateTime64(9, 'America/New_York')"), + DataType::Timestamp(TimeUnit::Nanosecond, None) + ); + // Test edge cases for precision ranges + assert_eq!( + clickhouse_type_to_arrow("DateTime64(1)"), + DataType::Timestamp(TimeUnit::Millisecond, None) + ); + assert_eq!( + clickhouse_type_to_arrow("DateTime64(4)"), + DataType::Timestamp(TimeUnit::Microsecond, None) + ); + assert_eq!( + clickhouse_type_to_arrow("DateTime64(7)"), + DataType::Timestamp(TimeUnit::Nanosecond, None) + ); + // Test with spaces + assert_eq!( + clickhouse_type_to_arrow("DateTime64( 3 )"), + DataType::Timestamp(TimeUnit::Millisecond, None) + ); + } + + #[test] + fn test_nullable_type_mapping() { + assert_eq!(clickhouse_type_to_arrow("Nullable(String)"), DataType::Utf8); + assert_eq!(clickhouse_type_to_arrow("Nullable(Int64)"), DataType::Int64); + } + + #[test] + fn test_lowcardinality_type_mapping() { + assert_eq!( + clickhouse_type_to_arrow("LowCardinality(String)"), + DataType::Utf8 + ); + assert_eq!( + clickhouse_type_to_arrow("LowCardinality(FixedString(10))"), + DataType::Utf8 + ); + // Nullable + LowCardinality + assert_eq!( + clickhouse_type_to_arrow("Nullable(LowCardinality(String))"), + DataType::Utf8 + ); + } + + #[test] + fn test_decimal_type_mapping() { + // Generic Decimal(P, S) + assert_eq!( + clickhouse_type_to_arrow("Decimal(10, 2)"), + DataType::Decimal128(10, 2) + ); + assert_eq!( + clickhouse_type_to_arrow("Decimal(38, 6)"), + DataType::Decimal128(38, 6) + ); + assert_eq!( + clickhouse_type_to_arrow("Decimal(50, 10)"), + DataType::Decimal256(50, 10) + ); + + // Generic Decimal without spaces and with spaces + assert_eq!( + clickhouse_type_to_arrow("Decimal(10,2)"), + DataType::Decimal128(10, 2) + ); + assert_eq!( + clickhouse_type_to_arrow("Decimal( 18 , 6 )"), + DataType::Decimal128(18, 6) + ); + + // Decimal32(S) - precision up to 9 + assert_eq!( + clickhouse_type_to_arrow("Decimal32(2)"), + DataType::Decimal128(9, 2) + ); + assert_eq!( + clickhouse_type_to_arrow("Decimal32(4)"), + DataType::Decimal128(9, 4) + ); + + // Decimal64(S) - precision up to 18 + assert_eq!( + clickhouse_type_to_arrow("Decimal64(4)"), + DataType::Decimal128(18, 4) + ); + assert_eq!( + clickhouse_type_to_arrow("Decimal64(8)"), + DataType::Decimal128(18, 8) + ); + + // Decimal128(S) - precision up to 38 + assert_eq!( + clickhouse_type_to_arrow("Decimal128(10)"), + DataType::Decimal128(38, 10) + ); + + // Decimal256(S) - precision up to 76 + assert_eq!( + clickhouse_type_to_arrow("Decimal256(20)"), + DataType::Decimal256(76, 20) + ); + + // With Nullable wrapper + assert_eq!( + clickhouse_type_to_arrow("Nullable(Decimal(18, 6))"), + DataType::Decimal128(18, 6) + ); + } + + #[test] + fn test_parse_schema() { + let response = r#"{"name":"id","type":"Int64"} +{"name":"message","type":"String"} +{"name":"timestamp","type":"DateTime"} +"#; + + let schema = parse_schema_from_response(response).unwrap(); + assert_eq!(schema.fields().len(), 3); + assert_eq!(schema.field(0).name(), "id"); + assert_eq!(schema.field(0).data_type(), &DataType::Int64); + assert_eq!(schema.field(1).name(), "message"); + assert_eq!(schema.field(1).data_type(), &DataType::Utf8); + assert_eq!(schema.field(2).name(), "timestamp"); + assert_eq!( + schema.field(2).data_type(), + &DataType::Timestamp(TimeUnit::Second, None) + ); + } + + #[test] + fn test_extract_identifier() { + assert_eq!(extract_identifier("Decimal(10, 2)"), ("Decimal", "(10, 2)")); + assert_eq!(extract_identifier("DateTime64(3)"), ("DateTime64", "(3)")); + assert_eq!(extract_identifier("Int32"), ("Int32", "")); + assert_eq!( + extract_identifier("LowCardinality(String)"), + ("LowCardinality", "(String)") + ); + assert_eq!(extract_identifier("Decimal128(10)"), ("Decimal128", "(10)")); + } + + #[test] + fn test_parse_args() { + // Simple cases + assert_eq!( + parse_args("(10, 2)").unwrap(), + vec!["10".to_string(), "2".to_string()] + ); + assert_eq!(parse_args("(3)").unwrap(), vec!["3".to_string()]); + assert_eq!(parse_args("()").unwrap(), Vec::::new()); + + // With spaces + assert_eq!( + parse_args("( 10 , 2 )").unwrap(), + vec!["10".to_string(), "2".to_string()] + ); + + // With nested parentheses + assert_eq!( + parse_args("(Nullable(String))").unwrap(), + vec!["Nullable(String)".to_string()] + ); + assert_eq!( + parse_args("(Array(Int32), String)").unwrap(), + vec!["Array(Int32)".to_string(), "String".to_string()] + ); + + // With quotes + assert_eq!( + parse_args("(3, 'UTC')").unwrap(), + vec!["3".to_string(), "'UTC'".to_string()] + ); + assert_eq!( + parse_args("(9, 'America/New_York')").unwrap(), + vec!["9".to_string(), "'America/New_York'".to_string()] + ); + + // Complex nested case + assert_eq!( + parse_args("(Tuple(Int32, String), Array(Float64))").unwrap(), + vec![ + "Tuple(Int32, String)".to_string(), + "Array(Float64)".to_string() + ] + ); + + // Error cases + assert!(parse_args("10, 2").is_err()); // Missing parentheses + assert!(parse_args("(10, 2").is_err()); // Missing closing paren + } +} diff --git a/src/sinks/clickhouse/config.rs b/src/sinks/clickhouse/config.rs index c33bfdbac5eda..cf24d7f859ba1 100644 --- a/src/sinks/clickhouse/config.rs +++ b/src/sinks/clickhouse/config.rs @@ -11,6 +11,8 @@ use super::{ service::{ClickhouseRetryLogic, ClickhouseServiceRequestBuilder}, sink::{ClickhouseSink, PartitionKey}, }; + +use super::arrow_schema; use crate::{ http::{Auth, HttpClient, MaybeAuth}, sinks::{ @@ -39,6 +41,9 @@ pub enum Format { /// JSONAsString. JsonAsString, + + /// ArrowStream. + ArrowStream, } impl fmt::Display for Format { @@ -47,6 +52,7 @@ impl fmt::Display for Format { Format::JsonEachRow => write!(f, "JSONEachRow"), Format::JsonAsObject => write!(f, "JSONAsObject"), Format::JsonAsString => write!(f, "JSONAsString"), + Format::ArrowStream => write!(f, "ArrowStream"), } } } @@ -215,6 +221,55 @@ impl SinkConfig for ClickhouseConfig { .expect("'default' should be a valid template") }); + let arrow_schema = if self.format == Format::ArrowStream { + if !self.table.is_dynamic() && !database.is_dynamic() { + let table_str = self.table.get_ref(); + let database_str = database.get_ref(); + + debug!( + "Fetching schema for table {}.{} at startup", + database_str, table_str + ); + match arrow_schema::fetch_table_schema( + &client, + &endpoint.to_string(), + database_str, + table_str, + auth.as_ref(), + ) + .await + { + Ok(schema) => { + debug!( + "Successfully fetched Arrow schema with {} fields: {:?}", + schema.fields().len(), + schema + .fields() + .iter() + .map(|f| format!("{}:{:?}", f.name(), f.data_type())) + .collect::>() + ); + Some(schema) + } + Err(e) => { + return Err(format!( + "Failed to fetch schema for {}.{}: {}. Schema inference is not supported for ArrowStream format.", + database_str, + table_str, + e + ).into()); + } + } + } else { + return Err( + "ArrowStream format requires a static table and database (no templates). Schema inference is not supported." + .into(), + ); + } + } else { + None + }; + let request_builder = ClickhouseRequestBuilder { compression: self.compression, encoding: ( @@ -224,6 +279,8 @@ impl SinkConfig for ClickhouseConfig { JsonSerializerConfig::default().build().into(), ), ), + format: self.format, + arrow_schema, }; let sink = ClickhouseSink::new( diff --git a/src/sinks/clickhouse/integration_tests.rs b/src/sinks/clickhouse/integration_tests.rs index fe7a10226ac60..b8e5ea1b9d744 100644 --- a/src/sinks/clickhouse/integration_tests.rs +++ b/src/sinks/clickhouse/integration_tests.rs @@ -468,3 +468,197 @@ struct Stats { elapsed: f64, rows_read: usize, } + +#[tokio::test] +async fn insert_events_arrow_format() { + trace_init(); + + let table = random_table_name(); + let host = clickhouse_address(); + + let mut batch = BatchConfig::default(); + batch.max_events = Some(5); + + let config = ClickhouseConfig { + endpoint: host.parse().unwrap(), + table: table.clone().try_into().unwrap(), + compression: Compression::None, + format: crate::sinks::clickhouse::config::Format::ArrowStream, + batch, + request: TowerRequestConfig { + retry_attempts: 1, + ..Default::default() + }, + ..Default::default() + }; + + let client = ClickhouseClient::new(host.clone()); + + client + .create_table( + &table, + "host String, timestamp DateTime64(3), message String, count Int64", + ) + .await; + + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + + let mut events: Vec = Vec::new(); + for i in 0..5 { + let mut event = LogEvent::from(format!("log message {}", i)); + event.insert("host", format!("host{}.example.com", i)); + event.insert("count", i as i64); + events.push(event.into()); + } + + run_and_assert_sink_compliance(sink, stream::iter(events), &SINK_TAGS).await; + + let output = client.select_all(&table).await; + assert_eq!(5, output.rows); + + // Verify fields exist and are correctly typed + for row in output.data.iter() { + assert!(row.get("host").and_then(|v| v.as_str()).is_some()); + assert!(row.get("message").and_then(|v| v.as_str()).is_some()); + assert!( + row.get("count") + .and_then(|v| v.as_str()) + .and_then(|s| s.parse::().ok()) + .is_some() + ); + } +} + +#[tokio::test] +async fn insert_events_arrow_format_with_compression() { + trace_init(); + + let table = random_table_name(); + let host = clickhouse_address(); + + let mut batch = BatchConfig::default(); + batch.max_events = Some(5); + + let config = ClickhouseConfig { + endpoint: host.parse().unwrap(), + table: table.clone().try_into().unwrap(), + compression: Compression::gzip_default(), + format: crate::sinks::clickhouse::config::Format::ArrowStream, + batch, + request: TowerRequestConfig { + retry_attempts: 1, + ..Default::default() + }, + ..Default::default() + }; + + let client = ClickhouseClient::new(host); + + client + .create_table( + &table, + "host String, timestamp DateTime64(3), message String, count Int64", + ) + .await; + + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + + let mut events: Vec = Vec::new(); + for i in 0..5 { + let mut event = LogEvent::from(format!("log message {}", i)); + event.insert("host", format!("host{}.example.com", i)); + event.insert("count", i as i64); + events.push(event.into()); + } + + run_and_assert_sink_compliance(sink, stream::iter(events), &SINK_TAGS).await; + + let output = client.select_all(&table).await; + assert_eq!(5, output.rows); + + // Verify fields exist and are correctly typed + for row in output.data.iter() { + assert!(row.get("host").and_then(|v| v.as_str()).is_some()); + assert!(row.get("message").and_then(|v| v.as_str()).is_some()); + assert!( + row.get("count") + .and_then(|v| v.as_str()) + .and_then(|s| s.parse::().ok()) + .is_some() + ); + } +} + +#[tokio::test] +async fn insert_events_arrow_with_schema_fetching() { + trace_init(); + + let table = random_table_name(); + let host = clickhouse_address(); + + let mut batch = BatchConfig::default(); + batch.max_events = Some(3); + + let client = ClickhouseClient::new(host.clone()); + + // Create table with specific typed columns including various data types + // Include standard Vector log fields: host, timestamp, message + client + .create_table( + &table, + "host String, timestamp DateTime64(3), message String, id Int64, name String, score Float64, active Bool", + ) + .await; + + let config = ClickhouseConfig { + endpoint: host.parse().unwrap(), + table: table.clone().try_into().unwrap(), + compression: Compression::None, + format: crate::sinks::clickhouse::config::Format::ArrowStream, + batch, + request: TowerRequestConfig { + retry_attempts: 1, + ..Default::default() + }, + ..Default::default() + }; + + // Building the sink should fetch the schema from ClickHouse + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + + // Create events with various types that should match the schema + let mut events: Vec = Vec::new(); + for i in 0..3 { + let mut event = LogEvent::from(format!("Test message {}", i)); + event.insert("host", format!("host{}.example.com", i)); + event.insert("id", i as i64); + event.insert("name", format!("user_{}", i)); + event.insert("score", 95.5 + i as f64); + event.insert("active", i % 2 == 0); + events.push(event.into()); + } + + run_and_assert_sink_compliance(sink, stream::iter(events), &SINK_TAGS).await; + + let output = client.select_all(&table).await; + assert_eq!(3, output.rows); + + // Verify all fields exist and have the correct types + for row in output.data.iter() { + // Check standard Vector fields exist + assert!(row.get("host").and_then(|v| v.as_str()).is_some()); + assert!(row.get("message").and_then(|v| v.as_str()).is_some()); + assert!(row.get("timestamp").is_some()); + + // Check custom fields have correct types + assert!( + row.get("id") + .and_then(|v| v.as_str()) + .and_then(|s| s.parse::().ok()) + .is_some() + ); + assert!(row.get("name").and_then(|v| v.as_str()).is_some()); + assert!(row.get("score").and_then(|v| v.as_f64()).is_some()); + assert!(row.get("active").and_then(|v| v.as_bool()).is_some()); + } +} diff --git a/src/sinks/clickhouse/mod.rs b/src/sinks/clickhouse/mod.rs index 3a578041bf533..5dfe65487a57e 100644 --- a/src/sinks/clickhouse/mod.rs +++ b/src/sinks/clickhouse/mod.rs @@ -9,6 +9,7 @@ //! //! This sink only supports logs for now but could support metrics and traces as well in the future. +mod arrow_schema; pub mod config; #[cfg(all(test, feature = "clickhouse-integration-tests"))] mod integration_tests; diff --git a/src/sinks/clickhouse/request_builder.rs b/src/sinks/clickhouse/request_builder.rs index 7f8edc0d2d02e..d83f8e84dc64c 100644 --- a/src/sinks/clickhouse/request_builder.rs +++ b/src/sinks/clickhouse/request_builder.rs @@ -1,14 +1,46 @@ //! `RequestBuilder` implementation for the `Clickhouse` sink. +use std::io::Write; +use std::sync::Arc; + use bytes::Bytes; +use snafu::{ResultExt, Snafu}; use vector_lib::codecs::encoding::Framer; +use vector_lib::request_metadata::GroupedCountByteSize; -use super::sink::PartitionKey; +use super::{config::Format, sink::PartitionKey}; +use crate::codecs::Encoder; +use crate::sinks::util::Compressor; +use crate::sinks::util::arrow; +use crate::sinks::util::encoding::Encoder as EncoderTrait; use crate::sinks::{prelude::*, util::http::HttpRequest}; +#[derive(Debug, Snafu)] +pub enum RequestBuilderError { + #[snafu(display("Failed to encode events to Arrow: {}", source))] + ArrowEncoding { source: arrow::ArrowEncodingError }, + + #[snafu(display("Failed to compress payload: {}", source))] + Compression { source: std::io::Error }, + + #[snafu(display("Failed to encode events: {}", source))] + Encoding { source: std::io::Error }, + + #[snafu(display("IO error: {}", source))] + Io { source: std::io::Error }, +} + +impl From for RequestBuilderError { + fn from(source: std::io::Error) -> Self { + RequestBuilderError::Io { source } + } +} + pub(super) struct ClickhouseRequestBuilder { pub(super) compression: Compression, pub(super) encoding: (Transformer, Encoder), + pub(super) format: Format, + pub(super) arrow_schema: Option>, } impl RequestBuilder<(PartitionKey, Vec)> for ClickhouseRequestBuilder { @@ -17,7 +49,7 @@ impl RequestBuilder<(PartitionKey, Vec)> for ClickhouseRequestBuilder { type Encoder = (Transformer, Encoder); type Payload = Bytes; type Request = HttpRequest; - type Error = std::io::Error; + type Error = RequestBuilderError; fn compression(&self) -> Compression { self.compression @@ -38,6 +70,34 @@ impl RequestBuilder<(PartitionKey, Vec)> for ClickhouseRequestBuilder { ((key, finalizers), builder, events) } + fn encode_events( + &self, + events: Self::Events, + ) -> Result, Self::Error> { + if self.format == Format::ArrowStream { + return self.build_arrow_request_payload(events); + } + + // Standard JSON encoding path for other formats + let mut compressor = Compressor::from(self.compression()); + let is_compressed = compressor.is_compressed(); + let (_, json_size) = { + self.encoder() + .encode_input(events, &mut compressor) + .map_err(|source| RequestBuilderError::Encoding { source })? + }; + + let payload = compressor.into_inner().freeze(); + let result = if is_compressed { + let compressed_byte_size = payload.len(); + EncodeResult::compressed(payload, compressed_byte_size, json_size) + } else { + EncodeResult::uncompressed(payload, json_size) + }; + + Ok(result) + } + fn build_request( &self, metadata: Self::Metadata, @@ -57,3 +117,44 @@ impl RequestBuilder<(PartitionKey, Vec)> for ClickhouseRequestBuilder { ) } } + +impl ClickhouseRequestBuilder { + fn build_arrow_request_payload( + &self, + events: Vec, + ) -> Result, RequestBuilderError> { + // Encode events to Arrow IPC format using provided schema + let arrow_bytes = arrow::encode_events_to_arrow_stream(&events, self.arrow_schema.clone()) + .context(ArrowEncodingSnafu)?; + + let uncompressed_byte_size = arrow_bytes.len(); + + // Apply compression if enabled + let mut compressor = Compressor::from(self.compression()); + let is_compressed = compressor.is_compressed(); + + compressor + .write_all(&arrow_bytes) + .context(CompressionSnafu)?; + + let payload = compressor.into_inner().freeze(); + + // For Arrow format, use the actual Arrow byte size for metrics + // Distribute the Arrow payload size across all events proportionally + let mut arrow_size = GroupedCountByteSize::new_untagged(); + if !events.is_empty() { + let bytes_per_event = uncompressed_byte_size / events.len(); + for event in &events { + arrow_size.add_event(event, bytes_per_event.into()); + } + } + + let result = if is_compressed { + EncodeResult::compressed(payload, uncompressed_byte_size, arrow_size) + } else { + EncodeResult::uncompressed(payload, arrow_size) + }; + + Ok(result) + } +} diff --git a/src/sinks/clickhouse/service.rs b/src/sinks/clickhouse/service.rs index e9974b32a8dd9..53b09270fbdfd 100644 --- a/src/sinks/clickhouse/service.rs +++ b/src/sinks/clickhouse/service.rs @@ -92,10 +92,18 @@ impl HttpServiceRequestBuilder for ClickhouseServiceRequestBuilder let auth: Option = self.auth.clone(); + // Extract format before taking payload to avoid borrow checker issues + let format = metadata.format; let payload = request.take_payload(); + // Set content type based on format + let content_type = match format { + Format::ArrowStream => "application/vnd.apache.arrow.stream", + _ => "application/x-ndjson", + }; + let mut builder = Request::post(&uri) - .header(CONTENT_TYPE, "application/x-ndjson") + .header(CONTENT_TYPE, content_type) .header(CONTENT_LENGTH, payload.len()); if let Some(ce) = self.compression.content_encoding() { builder = builder.header(CONTENT_ENCODING, ce); @@ -200,8 +208,8 @@ fn set_uri_query( #[cfg(test)] mod tests { + use super::super::config::AsyncInsertSettingsConfig; use super::*; - use crate::sinks::clickhouse::config::*; #[test] fn encode_valid() { diff --git a/src/sinks/util/encoding/arrow/encoder.rs b/src/sinks/util/encoding/arrow/encoder.rs new file mode 100644 index 0000000000000..d47710964536b --- /dev/null +++ b/src/sinks/util/encoding/arrow/encoder.rs @@ -0,0 +1,1455 @@ +use std::sync::Arc; + +use arrow::{ + array::{ + ArrayRef, BinaryBuilder, BooleanBuilder, Decimal128Builder, Decimal256Builder, + Float64Builder, Int64Builder, StringBuilder, TimestampMicrosecondBuilder, + TimestampMillisecondBuilder, TimestampNanosecondBuilder, TimestampSecondBuilder, + UInt8Builder, UInt16Builder, UInt32Builder, UInt64Builder, + }, + datatypes::{DataType, Schema, TimeUnit, i256}, + ipc::writer::StreamWriter, + record_batch::RecordBatch, +}; +use bytes::{BufMut, Bytes, BytesMut}; +use chrono::{DateTime, Utc}; +use rust_decimal::Decimal; +use snafu::Snafu; + +use crate::event::{Event, Value}; + +#[derive(Debug, Snafu)] +pub enum ArrowEncodingError { + #[snafu(display("Failed to create Arrow record batch: {}", source))] + RecordBatchCreation { source: arrow::error::ArrowError }, + + #[snafu(display("Failed to write Arrow IPC data: {}", source))] + IpcWrite { source: arrow::error::ArrowError }, + + #[snafu(display("No events provided for encoding"))] + NoEvents, + + #[snafu(display("Schema must be provided before runtime"))] + NoSchemaProvided, + + #[snafu(display( + "Unsupported Arrow data type for field '{}': {:?}", + field_name, + data_type + ))] + UnsupportedType { + field_name: String, + data_type: DataType, + }, +} + +/// Encodes a batch of events into Arrow IPC format +pub fn encode_events_to_arrow_stream( + events: &[Event], + schema: Option>, +) -> Result { + if events.is_empty() { + return Err(ArrowEncodingError::NoEvents); + } + + let schema_ref = if let Some(provided_schema) = schema { + provided_schema + } else { + return Err(ArrowEncodingError::NoSchemaProvided); + }; + + let record_batch = build_record_batch(Arc::::clone(&schema_ref), events)?; + + debug!( + "Built RecordBatch with {} rows and {} columns", + record_batch.num_rows(), + record_batch.num_columns() + ); + + // Encode to Arrow IPC format + let mut buffer = BytesMut::new().writer(); + { + let mut writer = StreamWriter::try_new(&mut buffer, &schema_ref) + .map_err(|source| ArrowEncodingError::IpcWrite { source })?; + + writer + .write(&record_batch) + .map_err(|source| ArrowEncodingError::IpcWrite { source })?; + + writer + .finish() + .map_err(|source| ArrowEncodingError::IpcWrite { source })?; + } + + let encoded_bytes = buffer.into_inner().freeze(); + debug!( + "Encoded to {} bytes of Arrow IPC stream data", + encoded_bytes.len() + ); + + Ok(encoded_bytes) +} + +/// Builds an Arrow RecordBatch from events +fn build_record_batch( + schema: Arc, + events: &[Event], +) -> Result { + let num_events = events.len(); + let num_fields = schema.fields().len(); + let mut columns: Vec = Vec::with_capacity(num_fields); + + for field in schema.fields() { + let field_name = field.name(); + let array: ArrayRef = match field.data_type() { + DataType::Timestamp(time_unit, _) => { + build_timestamp_array(events, field_name, *time_unit, num_events)? + } + DataType::Utf8 => build_string_array(events, field_name, num_events)?, + DataType::Int64 => build_int64_array(events, field_name, num_events)?, + DataType::UInt8 => build_uint8_array(events, field_name, num_events)?, + DataType::UInt16 => build_uint16_array(events, field_name, num_events)?, + DataType::UInt32 => build_uint32_array(events, field_name, num_events)?, + DataType::UInt64 => build_uint64_array(events, field_name, num_events)?, + DataType::Float64 => build_float64_array(events, field_name, num_events)?, + DataType::Boolean => build_boolean_array(events, field_name, num_events)?, + DataType::Binary => build_binary_array(events, field_name, num_events)?, + DataType::Decimal128(precision, scale) => { + build_decimal128_array(events, field_name, *precision, *scale, num_events)? + } + DataType::Decimal256(precision, scale) => { + build_decimal256_array(events, field_name, *precision, *scale, num_events)? + } + other_type => { + return Err(ArrowEncodingError::UnsupportedType { + field_name: field_name.to_string(), + data_type: other_type.clone(), + }); + } + }; + + columns.push(array); + } + + RecordBatch::try_new(schema, columns) + .map_err(|source| ArrowEncodingError::RecordBatchCreation { source }) +} + +fn extract_timestamp(value: &Value) -> Option> { + match value { + Value::Timestamp(ts) => Some(*ts), + Value::Bytes(bytes) => std::str::from_utf8(bytes) + .ok() + .and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok()) + .map(|dt| dt.with_timezone(&Utc)), + _ => None, + } +} + +fn build_timestamp_array( + events: &[Event], + field_name: &str, + time_unit: TimeUnit, + capacity: usize, +) -> Result { + macro_rules! build_array { + ($builder:ty, $converter:expr) => {{ + let mut builder = <$builder>::with_capacity(capacity); + for event in events { + if let Event::Log(log) = event { + let value_to_append = log.get(field_name).and_then(|value| { + // First, try to extract it as a native or string timestamp + if let Some(ts) = extract_timestamp(value) { + $converter(&ts) + } + // Else, fall back to a raw integer + else if let Value::Integer(i) = value { + Some(*i) + } + // Else, it's an unsupported type (e.g., Bool, Float) + else { + None + } + }); + builder.append_option(value_to_append); + } + } + Ok(Arc::new(builder.finish())) + }}; + } + + match time_unit { + TimeUnit::Second => { + build_array!(TimestampSecondBuilder, |ts: &DateTime| Some( + ts.timestamp() + )) + } + TimeUnit::Millisecond => { + build_array!(TimestampMillisecondBuilder, |ts: &DateTime| Some( + ts.timestamp_millis() + )) + } + TimeUnit::Microsecond => { + build_array!(TimestampMicrosecondBuilder, |ts: &DateTime| Some( + ts.timestamp_micros() + )) + } + TimeUnit::Nanosecond => { + build_array!(TimestampNanosecondBuilder, |ts: &DateTime| ts + .timestamp_nanos_opt()) + } + } +} + +fn build_string_array( + events: &[Event], + field_name: &str, + capacity: usize, +) -> Result { + let mut builder = StringBuilder::with_capacity(capacity, capacity * 32); + + for event in events { + if let Event::Log(log) = event { + if let Some(value) = log.get(field_name) { + match value { + Value::Bytes(bytes) => { + // Attempt direct UTF-8 conversion first, fallback to lossy + match std::str::from_utf8(bytes) { + Ok(s) => builder.append_value(s), + Err(_) => builder.append_value(&String::from_utf8_lossy(bytes)), + } + } + Value::Object(obj) => match serde_json::to_string(&obj) { + Ok(s) => builder.append_value(s), + Err(_) => builder.append_null(), + }, + Value::Array(arr) => match serde_json::to_string(&arr) { + Ok(s) => builder.append_value(s), + Err(_) => builder.append_null(), + }, + _ => { + builder.append_value(&value.to_string_lossy()); + } + } + } else { + builder.append_null(); + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_int64_array( + events: &[Event], + field_name: &str, + capacity: usize, +) -> Result { + let mut builder = Int64Builder::with_capacity(capacity); + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Integer(i)) => builder.append_value(*i), + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_uint8_array( + events: &[Event], + field_name: &str, + capacity: usize, +) -> Result { + let mut builder = UInt8Builder::with_capacity(capacity); + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Integer(i)) if *i >= 0 && *i <= u8::MAX as i64 => { + builder.append_value(*i as u8) + } + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_uint16_array( + events: &[Event], + field_name: &str, + capacity: usize, +) -> Result { + let mut builder = UInt16Builder::with_capacity(capacity); + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Integer(i)) if *i >= 0 && *i <= u16::MAX as i64 => { + builder.append_value(*i as u16) + } + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_uint32_array( + events: &[Event], + field_name: &str, + capacity: usize, +) -> Result { + let mut builder = UInt32Builder::with_capacity(capacity); + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Integer(i)) if *i >= 0 && *i <= u32::MAX as i64 => { + builder.append_value(*i as u32) + } + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_uint64_array( + events: &[Event], + field_name: &str, + capacity: usize, +) -> Result { + let mut builder = UInt64Builder::with_capacity(capacity); + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Integer(i)) if *i >= 0 => builder.append_value(*i as u64), + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_float64_array( + events: &[Event], + field_name: &str, + capacity: usize, +) -> Result { + let mut builder = Float64Builder::with_capacity(capacity); + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Float(f)) => builder.append_value(f.into_inner()), + Some(Value::Integer(i)) => builder.append_value(*i as f64), + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_boolean_array( + events: &[Event], + field_name: &str, + capacity: usize, +) -> Result { + let mut builder = BooleanBuilder::with_capacity(capacity); + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Boolean(b)) => builder.append_value(*b), + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_binary_array( + events: &[Event], + field_name: &str, + capacity: usize, +) -> Result { + let mut builder = BinaryBuilder::with_capacity(capacity, capacity * 16); + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Bytes(bytes)) => builder.append_value(bytes), + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_decimal128_array( + events: &[Event], + field_name: &str, + precision: u8, + scale: i8, + capacity: usize, +) -> Result { + let mut builder = Decimal128Builder::with_capacity(capacity) + .with_precision_and_scale(precision, scale) + .map_err(|_| ArrowEncodingError::UnsupportedType { + field_name: field_name.to_string(), + data_type: DataType::Decimal128(precision, scale), + })?; + + let target_scale = scale.unsigned_abs() as u32; + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Float(f)) => { + if let Ok(mut decimal) = Decimal::try_from(f.into_inner()) { + decimal.rescale(target_scale); + let mantissa = decimal.mantissa(); + builder.append_value(mantissa); + } else { + builder.append_null(); + } + } + Some(Value::Integer(i)) => { + let mut decimal = Decimal::from(*i); + decimal.rescale(target_scale); + let mantissa = decimal.mantissa(); + builder.append_value(mantissa); + } + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +fn build_decimal256_array( + events: &[Event], + field_name: &str, + precision: u8, + scale: i8, + capacity: usize, +) -> Result { + let mut builder = Decimal256Builder::with_capacity(capacity) + .with_precision_and_scale(precision, scale) + .map_err(|_| ArrowEncodingError::UnsupportedType { + field_name: field_name.to_string(), + data_type: DataType::Decimal256(precision, scale), + })?; + + let target_scale = scale.unsigned_abs() as u32; + + for event in events { + if let Event::Log(log) = event { + match log.get(field_name) { + Some(Value::Float(f)) => { + if let Ok(mut decimal) = Decimal::try_from(f.into_inner()) { + decimal.rescale(target_scale); + let mantissa = decimal.mantissa(); + // rust_decimal does not support i256 natively so we upcast here + builder.append_value(i256::from_i128(mantissa)); + } else { + builder.append_null(); + } + } + Some(Value::Integer(i)) => { + let mut decimal = Decimal::from(*i); + decimal.rescale(target_scale); + let mantissa = decimal.mantissa(); + builder.append_value(i256::from_i128(mantissa)); + } + _ => builder.append_null(), + } + } + } + + Ok(Arc::new(builder.finish())) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::event::LogEvent; + use arrow::{ + array::{ + Array, BinaryArray, BooleanArray, Float64Array, Int64Array, StringArray, + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, + }, + datatypes::Field, + ipc::reader::StreamReader, + }; + use chrono::Utc; + use std::io::Cursor; + + #[test] + fn test_encode_simple_events() { + let mut log1 = LogEvent::default(); + log1.insert("message", "hello"); + log1.insert("count", 42); + + let mut log2 = LogEvent::default(); + log2.insert("message", "world"); + log2.insert("count", 100); + + let events = vec![Event::Log(log1), Event::Log(log2)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("message", DataType::Utf8, true), + Field::new("count", DataType::Int64, true), + ])); + + let result = encode_events_to_arrow_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + assert!(!bytes.is_empty()); + + // Validate the Arrow stream + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.schema().as_ref(), schema.as_ref()); + + let message_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(message_array.value(0), "hello"); + assert_eq!(message_array.value(1), "world"); + + let count_array = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(count_array.value(0), 42); + assert_eq!(count_array.value(1), 100); + } + + #[test] + fn test_encode_all_types() { + let mut log = LogEvent::default(); + log.insert("string_field", "test"); + log.insert("int_field", 42); + log.insert("float_field", 3.14); + log.insert("bool_field", true); + log.insert("bytes_field", bytes::Bytes::from("binary")); + log.insert("timestamp_field", Utc::now()); + + let events = vec![Event::Log(log)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("string_field", DataType::Utf8, true), + Field::new("int_field", DataType::Int64, true), + Field::new("float_field", DataType::Float64, true), + Field::new("bool_field", DataType::Boolean, true), + Field::new("bytes_field", DataType::Binary, true), + Field::new( + "timestamp_field", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + ])); + + let result = encode_events_to_arrow_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 6); + + // Verify each column has data + assert_eq!( + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + "test" + ); + assert_eq!( + batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + 42 + ); + assert!( + (batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap() + .value(0) + - 3.14) + .abs() + < 0.001 + ); + assert!( + batch + .column(3) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + "{}", + true + ); + assert_eq!( + batch + .column(4) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + b"binary" + ); + assert!( + !batch + .column(5) + .as_any() + .downcast_ref::() + .unwrap() + .is_null(0) + ); + } + + #[test] + fn test_encode_null_values() { + let mut log1 = LogEvent::default(); + log1.insert("field_a", 1); + // field_b is missing + + let mut log2 = LogEvent::default(); + log2.insert("field_b", 2); + // field_a is missing + + let events = vec![Event::Log(log1), Event::Log(log2)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("field_a", DataType::Int64, true), + Field::new("field_b", DataType::Int64, true), + ])); + + let result = encode_events_to_arrow_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 2); + + let field_a = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(field_a.value(0), 1); + assert!(field_a.is_null(1)); + + let field_b = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(field_b.is_null(0)); + assert_eq!(field_b.value(1), 2); + } + + #[test] + fn test_encode_type_mismatches() { + let mut log1 = LogEvent::default(); + log1.insert("field", 42); // Integer + + let mut log2 = LogEvent::default(); + log2.insert("field", 3.14); // Float - type mismatch! + + let events = vec![Event::Log(log1), Event::Log(log2)]; + + // Schema expects Int64 + let schema = Arc::new(Schema::new(vec![Field::new( + "field", + DataType::Int64, + true, + )])); + + let result = encode_events_to_arrow_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 2); + + let field_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(field_array.value(0), 42); + assert!(field_array.is_null(1)); // Type mismatch becomes null + } + + #[test] + fn test_encode_complex_json_values() { + use serde_json::json; + + let mut log = LogEvent::default(); + log.insert( + "object_field", + json!({"key": "value", "nested": {"count": 42}}), + ); + log.insert("array_field", json!([1, 2, 3])); + + let events = vec![Event::Log(log)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("object_field", DataType::Utf8, true), + Field::new("array_field", DataType::Utf8, true), + ])); + + let result = encode_events_to_arrow_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + + let object_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let object_str = object_array.value(0); + assert!(object_str.contains("key")); + assert!(object_str.contains("value")); + + let array_array = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + let array_str = array_array.value(0); + assert_eq!(array_str, "[1,2,3]"); + } + + #[test] + fn test_encode_unsupported_type() { + let mut log = LogEvent::default(); + log.insert("field", "value"); + + let events = vec![Event::Log(log)]; + + // Use an unsupported type + let schema = Arc::new(Schema::new(vec![Field::new( + "field", + DataType::Duration(TimeUnit::Millisecond), + true, + )])); + + let result = encode_events_to_arrow_stream(&events, Some(schema)); + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + ArrowEncodingError::UnsupportedType { .. } + )); + } + + #[test] + fn test_encode_without_schema_fails() { + let mut log1 = LogEvent::default(); + log1.insert("message", "hello"); + + let events = vec![Event::Log(log1)]; + + let result = encode_events_to_arrow_stream(&events, None); + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + ArrowEncodingError::NoSchemaProvided + )); + } + + #[test] + fn test_encode_empty_events() { + let events: Vec = vec![]; + let result = encode_events_to_arrow_stream(&events, None); + assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), ArrowEncodingError::NoEvents)); + } + + #[test] + fn test_encode_timestamp_precisions() { + let now = Utc::now(); + let mut log = LogEvent::default(); + log.insert("ts_second", now); + log.insert("ts_milli", now); + log.insert("ts_micro", now); + log.insert("ts_nano", now); + + let events = vec![Event::Log(log)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new( + "ts_second", + DataType::Timestamp(TimeUnit::Second, None), + true, + ), + Field::new( + "ts_milli", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new( + "ts_micro", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + Field::new( + "ts_nano", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + ), + ])); + + let result = encode_events_to_arrow_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 4); + + let ts_second = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!ts_second.is_null(0)); + assert_eq!(ts_second.value(0), now.timestamp()); + + let ts_milli = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!ts_milli.is_null(0)); + assert_eq!(ts_milli.value(0), now.timestamp_millis()); + + let ts_micro = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!ts_micro.is_null(0)); + assert_eq!(ts_micro.value(0), now.timestamp_micros()); + + let ts_nano = batch + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(!ts_nano.is_null(0)); + assert_eq!(ts_nano.value(0), now.timestamp_nanos_opt().unwrap()); + } + + #[test] + fn test_encode_integer_as_timestamp() { + // Test that integer timestamps are automatically converted + let mut log = LogEvent::default(); + + log.insert("ts_nano", 1760971112896200940_i64); + + let events = vec![Event::Log(log)]; + + let schema = Arc::new(Schema::new(vec![Field::new( + "ts_nano", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + )])); + + let result = encode_events_to_arrow_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 1); + + let ts_nano = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // Integer should be used directly for nanoseconds + assert!(!ts_nano.is_null(0)); + assert_eq!(ts_nano.value(0), 1760971112896200940_i64); + } + + #[test] + fn test_encode_mixed_timestamp_types() { + // Test mixing Timestamp and Integer values in the same field + let mut log1 = LogEvent::default(); + log1.insert("ts", Utc::now()); + + let mut log2 = LogEvent::default(); + log2.insert("ts", 1760971112896200940_i64); + + let events = vec![Event::Log(log1), Event::Log(log2)]; + + let schema = Arc::new(Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + )])); + + let result = encode_events_to_arrow_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 1); + + let ts_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // Both rows should have non-null values + assert!(!ts_array.is_null(0)); + assert!(!ts_array.is_null(1)); + + // Second row should have the integer value + assert_eq!(ts_array.value(1), 1760971112896200940_i64); + } + + #[test] + fn test_encode_string_timestamps() { + // Test that RFC3339/ISO 8601 string timestamps are automatically parsed + let mut log1 = LogEvent::default(); + log1.insert("timestamp", "2025-10-22T10:18:44.256Z"); + + let mut log2 = LogEvent::default(); + log2.insert("timestamp", "2025-10-22T15:30:00.123456789Z"); + + let mut log3 = LogEvent::default(); + log3.insert("timestamp", "2025-01-15T00:00:00Z"); + + let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)]; + + let schema = Arc::new(Schema::new(vec![Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + )])); + + let result = encode_events_to_arrow_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 1); + + let ts_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // All timestamps should be properly parsed (not null) + assert!(!ts_array.is_null(0)); + assert!(!ts_array.is_null(1)); + assert!(!ts_array.is_null(2)); + + // Verify the parsed values are correct + // 2025-10-22T10:18:44.256Z + let expected1 = chrono::DateTime::parse_from_rfc3339("2025-10-22T10:18:44.256Z") + .unwrap() + .timestamp_nanos_opt() + .unwrap(); + assert_eq!(ts_array.value(0), expected1); + + // 2025-10-22T15:30:00.123456789Z + let expected2 = chrono::DateTime::parse_from_rfc3339("2025-10-22T15:30:00.123456789Z") + .unwrap() + .timestamp_nanos_opt() + .unwrap(); + assert_eq!(ts_array.value(1), expected2); + + // 2025-01-15T00:00:00Z + let expected3 = chrono::DateTime::parse_from_rfc3339("2025-01-15T00:00:00Z") + .unwrap() + .timestamp_nanos_opt() + .unwrap(); + assert_eq!(ts_array.value(2), expected3); + } + + #[test] + fn test_encode_mixed_timestamp_string_and_native() { + // Test mixing string timestamps with native Timestamp values + let mut log1 = LogEvent::default(); + log1.insert("ts", "2025-10-22T10:18:44.256Z"); // String + + let mut log2 = LogEvent::default(); + log2.insert("ts", Utc::now()); // Native Timestamp + + let mut log3 = LogEvent::default(); + log3.insert("ts", 1729594724256000000_i64); // Integer (nanoseconds) + + let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)]; + + let schema = Arc::new(Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + )])); + + let result = encode_events_to_arrow_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 3); + + let ts_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // All three should be non-null + assert!(!ts_array.is_null(0)); + assert!(!ts_array.is_null(1)); + assert!(!ts_array.is_null(2)); + + // First one should match the parsed string + let expected = chrono::DateTime::parse_from_rfc3339("2025-10-22T10:18:44.256Z") + .unwrap() + .timestamp_nanos_opt() + .unwrap(); + assert_eq!(ts_array.value(0), expected); + + // Third one should match the integer + assert_eq!(ts_array.value(2), 1729594724256000000_i64); + } + + #[test] + fn test_encode_invalid_string_timestamp() { + // Test that invalid timestamp strings become null + let mut log1 = LogEvent::default(); + log1.insert("timestamp", "not-a-timestamp"); + + let mut log2 = LogEvent::default(); + log2.insert("timestamp", "2025-10-22T10:18:44.256Z"); // Valid + + let mut log3 = LogEvent::default(); + log3.insert("timestamp", "2025-99-99T99:99:99Z"); // Invalid + + let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)]; + + let schema = Arc::new(Schema::new(vec![Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Nanosecond, None), + true, + )])); + + let result = encode_events_to_arrow_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 3); + + let ts_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // Invalid timestamps should be null + assert!(ts_array.is_null(0)); + assert!(!ts_array.is_null(1)); // Valid one + assert!(ts_array.is_null(2)); + } + + #[test] + fn test_encode_decimal128_from_float() { + use arrow::array::Decimal128Array; + + let mut log = LogEvent::default(); + // Store price as float: 123.45 + log.insert("price", 123.45_f64); + + let events = vec![Event::Log(log)]; + + // Decimal(10, 2) - 10 total digits, 2 after decimal + let schema = Arc::new(Schema::new(vec![Field::new( + "price", + DataType::Decimal128(10, 2), + true, + )])); + + let result = encode_events_to_arrow_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 1); + + let decimal_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(!decimal_array.is_null(0)); + // 123.45 with scale 2 = 12345 + assert_eq!(decimal_array.value(0), 12345_i128); + } + + #[test] + fn test_encode_decimal128_from_integer() { + use arrow::array::Decimal128Array; + + let mut log = LogEvent::default(); + // Store quantity as integer: 1000 + log.insert("quantity", 1000_i64); + + let events = vec![Event::Log(log)]; + + // Decimal(10, 3) - will represent 1000 as 1000.000 + let schema = Arc::new(Schema::new(vec![Field::new( + "quantity", + DataType::Decimal128(10, 3), + true, + )])); + + let result = encode_events_to_arrow_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + + let decimal_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(!decimal_array.is_null(0)); + // 1000 with scale 3 = 1000 * 10^3 = 1000000 + assert_eq!(decimal_array.value(0), 1000000_i128); + } + + #[test] + fn test_encode_decimal256() { + use arrow::array::Decimal256Array; + + let mut log = LogEvent::default(); + // Very large precision number + log.insert("big_value", 123456789.123456_f64); + + let events = vec![Event::Log(log)]; + + // Decimal256(50, 6) - high precision decimal + let schema = Arc::new(Schema::new(vec![Field::new( + "big_value", + DataType::Decimal256(50, 6), + true, + )])); + + let result = encode_events_to_arrow_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + + let decimal_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(!decimal_array.is_null(0)); + // Value should be non-null and encoded + let value = decimal_array.value(0); + assert!(value.to_i128().is_some()); + } + + #[test] + fn test_encode_decimal_null_values() { + use arrow::array::Decimal128Array; + + let mut log1 = LogEvent::default(); + log1.insert("price", 99.99_f64); + + let log2 = LogEvent::default(); + // No price field - should be null + + let mut log3 = LogEvent::default(); + log3.insert("price", 50.00_f64); + + let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)]; + + let schema = Arc::new(Schema::new(vec![Field::new( + "price", + DataType::Decimal128(10, 2), + true, + )])); + + let result = encode_events_to_arrow_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 3); + + let decimal_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // First row: 99.99 + assert!(!decimal_array.is_null(0)); + assert_eq!(decimal_array.value(0), 9999_i128); + + // Second row: null + assert!(decimal_array.is_null(1)); + + // Third row: 50.00 + assert!(!decimal_array.is_null(2)); + assert_eq!(decimal_array.value(2), 5000_i128); + } + + #[test] + fn test_encode_mixed_types_with_decimal() { + use arrow::array::Decimal128Array; + + let mut log = LogEvent::default(); + log.insert("id", 123_i64); + log.insert("name", "Product A"); + log.insert("price", 19.99_f64); + log.insert("in_stock", true); + + let events = vec![Event::Log(log)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("name", DataType::Utf8, true), + Field::new("price", DataType::Decimal128(10, 2), true), + Field::new("in_stock", DataType::Boolean, true), + ])); + + let result = encode_events_to_arrow_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 4); + + // Verify decimal column + let decimal_array = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(!decimal_array.is_null(0)); + assert_eq!(decimal_array.value(0), 1999_i128); + } + + #[test] + fn test_encode_unsigned_integer_types() { + use arrow::array::{UInt8Array, UInt16Array, UInt32Array, UInt64Array}; + + let mut log = LogEvent::default(); + log.insert("uint8_field", 255_i64); + log.insert("uint16_field", 65535_i64); + log.insert("uint32_field", 4294967295_i64); + log.insert("uint64_field", 9223372036854775807_i64); + + let events = vec![Event::Log(log)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("uint8_field", DataType::UInt8, true), + Field::new("uint16_field", DataType::UInt16, true), + Field::new("uint32_field", DataType::UInt32, true), + Field::new("uint64_field", DataType::UInt64, true), + ])); + + let result = encode_events_to_arrow_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 4); + + // Verify uint8 + let uint8_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(uint8_array.value(0), 255_u8); + + // Verify uint16 + let uint16_array = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(uint16_array.value(0), 65535_u16); + + // Verify uint32 + let uint32_array = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(uint32_array.value(0), 4294967295_u32); + + // Verify uint64 + let uint64_array = batch + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(uint64_array.value(0), 9223372036854775807_u64); + } + + #[test] + fn test_encode_unsigned_integers_with_null_and_overflow() { + use arrow::array::{UInt8Array, UInt32Array}; + + let mut log1 = LogEvent::default(); + log1.insert("uint8_field", 100_i64); + log1.insert("uint32_field", 1000_i64); + + let mut log2 = LogEvent::default(); + log2.insert("uint8_field", 300_i64); // Overflow - should be null + log2.insert("uint32_field", -1_i64); // Negative - should be null + + let log3 = LogEvent::default(); + // Missing fields - should be null + + let events = vec![Event::Log(log1), Event::Log(log2), Event::Log(log3)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("uint8_field", DataType::UInt8, true), + Field::new("uint32_field", DataType::UInt32, true), + ])); + + let result = encode_events_to_arrow_stream(&events, Some(Arc::clone(&schema))); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let cursor = Cursor::new(bytes); + let mut reader = StreamReader::try_new(cursor, None).unwrap(); + let batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch.num_rows(), 3); + + // Check uint8 column + let uint8_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(uint8_array.value(0), 100_u8); // Valid + assert!(uint8_array.is_null(1)); // Overflow + assert!(uint8_array.is_null(2)); // Missing + + // Check uint32 column + let uint32_array = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(uint32_array.value(0), 1000_u32); // Valid + assert!(uint32_array.is_null(1)); // Negative + assert!(uint32_array.is_null(2)); // Missing + } +} diff --git a/src/sinks/util/encoding/arrow/mod.rs b/src/sinks/util/encoding/arrow/mod.rs new file mode 100644 index 0000000000000..2fe568cf27840 --- /dev/null +++ b/src/sinks/util/encoding/arrow/mod.rs @@ -0,0 +1,3 @@ +mod encoder; + +pub use encoder::{ArrowEncodingError, encode_events_to_arrow_stream}; diff --git a/src/sinks/util/encoding.rs b/src/sinks/util/encoding/mod.rs similarity index 99% rename from src/sinks/util/encoding.rs rename to src/sinks/util/encoding/mod.rs index bb5a938ec017f..6b07117fc929e 100644 --- a/src/sinks/util/encoding.rs +++ b/src/sinks/util/encoding/mod.rs @@ -10,6 +10,9 @@ use vector_lib::{ use crate::{codecs::Transformer, event::Event, internal_events::EncoderWriteError}; +#[cfg(feature = "sinks-clickhouse")] +pub mod arrow; + pub trait Encoder { /// Encodes the input into the provided writer. /// diff --git a/src/sinks/util/mod.rs b/src/sinks/util/mod.rs index 5d766a9b30780..dad9826fdda20 100644 --- a/src/sinks/util/mod.rs +++ b/src/sinks/util/mod.rs @@ -45,6 +45,8 @@ pub use buffer::{ pub use builder::SinkBuilderExt; use chrono::{FixedOffset, Offset, Utc}; pub use compressor::Compressor; +#[cfg(feature = "sinks-clickhouse")] +pub use encoding::arrow; pub use normalizer::Normalizer; pub use request_builder::{IncrementalRequestBuilder, RequestBuilder}; pub use service::{ diff --git a/website/cue/reference/components/sinks/generated/clickhouse.cue b/website/cue/reference/components/sinks/generated/clickhouse.cue index ea6373bd3191f..64703f0f91bfd 100644 --- a/website/cue/reference/components/sinks/generated/clickhouse.cue +++ b/website/cue/reference/components/sinks/generated/clickhouse.cue @@ -326,6 +326,7 @@ generated: components: sinks: clickhouse: configuration: { type: string: { default: "json_each_row" enum: { + arrow_stream: "ArrowStream." json_as_object: "JSONAsObject." json_as_string: "JSONAsString." json_each_row: "JSONEachRow."