From e1041fd2c3f02ad6896b57c9a21095ac1d66078a Mon Sep 17 00:00:00 2001 From: Yaroslav Litvinov Date: Mon, 10 Feb 2025 01:38:31 +0200 Subject: [PATCH 1/9] arow-ipc, Int64 instead of Uint64 --- Cargo.toml | 2 +- crates/control_plane/Cargo.toml | 18 +- crates/control_plane/src/service.rs | 69 ------- crates/nexus/Cargo.toml | 1 + crates/nexus/src/http/dbt/error.rs | 19 +- crates/nexus/src/http/dbt/handlers.rs | 198 +++++++++++++++++++-- crates/nexus/src/http/dbt/schemas.rs | 2 +- crates/runtime/Cargo.toml | 14 +- crates/runtime/src/datafusion/execution.rs | 7 +- 9 files changed, 226 insertions(+), 104 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3c13c1a24..46ee352a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ snafu = { version = "0.8.5", features = ["futures"] } tracing = { version = "0.1" } [patch.crates-io] -datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" } +datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } [workspace.lints.clippy] all = { level = "deny", priority = -1 } diff --git a/crates/control_plane/Cargo.toml b/crates/control_plane/Cargo.toml index fea567b63..114f5139b 100644 --- a/crates/control_plane/Cargo.toml +++ b/crates/control_plane/Cargo.toml @@ -13,17 +13,21 @@ bytes = { version = "1.8.0" } chrono = { workspace = true } -datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" } -datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" } -datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" } -datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "a5bbbd1266447c26f3bfdb5314b4a326796bc784" } -datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" } +datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } +datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } +datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } +datafusion-physical-plan = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } + +datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } flatbuffers = { version = "24.3.25" } futures = { workspace = true } -iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" } -iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" } + +iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } +iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } icelake = { git = "https://github.com/Embucket/icelake.git", rev = "b4cbcaf" } +datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "1cbcd2bffa2c5b56add996859d02053fd4bc21fc" } + object_store = { workspace = true } once_cell = "1.19.0" quick-xml = { version = "0.36.2" } diff --git a/crates/control_plane/src/service.rs b/crates/control_plane/src/service.rs index ba4f91bc4..d86dbd079 100644 --- a/crates/control_plane/src/service.rs +++ b/crates/control_plane/src/service.rs @@ -68,12 +68,6 @@ pub trait ControlService: Send + Sync { async fn query_table(&self, session_id: &str, query: &str) -> ControlPlaneResult; - async fn query_dbt( - &self, - session_id: &str, - query: &str, - ) -> ControlPlaneResult<(String, Vec)>; - async fn upload_data_to_table( &self, session_id: &str, @@ -355,69 +349,6 @@ impl ControlService for ControlServiceImpl { Ok(String::from_utf8(buf).context(crate::error::Utf8Snafu)?) } - #[tracing::instrument(level = "debug", skip(self))] - async fn query_dbt( - &self, - session_id: &str, - query: &str, - ) -> ControlPlaneResult<(String, Vec)> { - let (records, columns) = self.query(session_id, query).await?; - - // THIS CODE RELATED TO ARROW FORMAT - ////////////////////////////////////// - - // fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch { - // let mut buf = Vec::new(); - // let mut writer = StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap(); - // writer.write(rb).unwrap(); - // writer.finish().unwrap(); - // drop(writer); - // - // let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None).unwrap(); - // reader.next().unwrap().unwrap() - // } - // - // println!("agahaha {:?}", roundtrip_ipc_stream(&records[0])); - - // let mut buffer = Vec::new(); - // let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap(); - // let mut stream_writer = - // StreamWriter::try_new_with_options(&mut buffer, &records[0].schema_ref(), options) - // .unwrap(); - // stream_writer.write(&records[0]).unwrap(); - // stream_writer.finish().unwrap(); - // drop(stream_writer); - - // // Try to add flatbuffer verification - // println!("{:?}", buffer.len()); - // let base64 = general_purpose::STANDARD.encode(buffer); - // Ok((base64, columns)) - // let encoded = general_purpose::STANDARD.decode(res.clone()).unwrap(); - // - // let mut verifier = Verifier::new(&VerifierOptions::default(), &encoded); - // let mut builder = FlatBufferBuilder::new(); - // let res = general_purpose::STANDARD.encode(buf); - ////////////////////////////////////// - - // We use json format since there is a bug between arrow and nanoarrow - let buf = Vec::new(); - let write_builder = WriterBuilder::new().with_explicit_nulls(true); - let mut writer = write_builder.build::<_, JsonArray>(buf); - - let record_refs: Vec<&RecordBatch> = records.iter().collect(); - writer - .write_batches(&record_refs) - .context(crate::error::ArrowSnafu)?; - writer.finish().context(crate::error::ArrowSnafu)?; - - // Get the underlying buffer back, - let buf = writer.into_inner(); - Ok(( - String::from_utf8(buf).context(crate::error::Utf8Snafu)?, - columns, - )) - } - #[tracing::instrument(level = "debug", skip(self))] async fn upload_data_to_table( &self, diff --git a/crates/nexus/Cargo.toml b/crates/nexus/Cargo.toml index a5390a46a..176549fea 100644 --- a/crates/nexus/Cargo.toml +++ b/crates/nexus/Cargo.toml @@ -54,6 +54,7 @@ utoipa-axum = { workspace = true } utoipa-swagger-ui = { workspace = true } uuid = { workspace = true, features = ["v4", "v5"] } validator = { version = "0.18.1", features = ["derive"] } +base64 = { version = "0.22.1" } [dev-dependencies] async-trait = { workspace = true } diff --git a/crates/nexus/src/http/dbt/error.rs b/crates/nexus/src/http/dbt/error.rs index 9e4b881ab..222433378 100644 --- a/crates/nexus/src/http/dbt/error.rs +++ b/crates/nexus/src/http/dbt/error.rs @@ -37,6 +37,12 @@ pub enum DbtError { #[snafu(display("Failed to parse row JSON"))] RowParse { source: serde_json::Error }, + + #[snafu(display("UTF8 error: {source}"))] + Utf8 { source: std::string::FromUtf8Error }, + + #[snafu(display("Arrow error: {source}"))] + Arrow { source: arrow::error::ArrowError }, } pub type DbtResult = std::result::Result; @@ -48,9 +54,10 @@ impl IntoResponse for DbtError { | Self::LoginRequestParse { .. } | Self::QueryBodyParse { .. } | Self::InvalidWarehouseIdFormat { .. } => http::StatusCode::BAD_REQUEST, - Self::ControlService { .. } | Self::RowParse { .. } => { - http::StatusCode::INTERNAL_SERVER_ERROR - } + Self::ControlService { .. } + | Self::RowParse { .. } + | Self::Utf8 { .. } + | Self::Arrow { .. } => http::StatusCode::INTERNAL_SERVER_ERROR, Self::MissingAuthToken | Self::MissingDbtSession | Self::InvalidAuthData => { http::StatusCode::UNAUTHORIZED } @@ -69,6 +76,12 @@ impl IntoResponse for DbtError { Self::MissingAuthToken | Self::MissingDbtSession | Self::InvalidAuthData => { "session error".to_string() } + Self::Utf8 { source } => { + format!("Error encoding UTF8 string: {source}") + } + Self::Arrow { source } => { + format!("Error encoding in Arrow format: {source}") + } Self::NotImplemented => "feature not implemented".to_string(), }; diff --git a/crates/nexus/src/http/dbt/handlers.rs b/crates/nexus/src/http/dbt/handlers.rs index 634ef6966..c14911c3e 100644 --- a/crates/nexus/src/http/dbt/handlers.rs +++ b/crates/nexus/src/http/dbt/handlers.rs @@ -5,16 +5,28 @@ use crate::http::dbt::schemas::{ }; use crate::http::session::DFSessionId; use crate::state::AppState; +use arrow::ipc::writer::{IpcWriteOptions, StreamWriter}; +use arrow::ipc::MetadataVersion; +use arrow::json::writer::JsonArray; +use arrow::json::WriterBuilder; +use arrow::record_batch::RecordBatch; use axum::body::Bytes; use axum::extract::{Query, State}; use axum::http::HeaderMap; use axum::Json; +use base64; +use base64::engine::general_purpose::STANDARD as engine_base64; +use base64::prelude::*; use flate2::read::GzDecoder; use regex::Regex; use snafu::ResultExt; use std::io::Read; +use tracing::debug; use uuid::Uuid; +const SERIALIZATION_FORMAT: &str = "arrow"; // or "json" +const ARROW_IPC_ALIGNMENT: usize = 8; + #[tracing::instrument(level = "debug", skip(state), err, ret(level = tracing::Level::TRACE))] pub async fn login( State(state): State, @@ -32,8 +44,6 @@ pub async fn login( let _body_json: LoginRequestBody = serde_json::from_str(&s).context(dbt_error::LoginRequestParseSnafu)?; - //println!("Received login request: {:?}", query); - //println!("Body data parameters: {:?}", body_json); let token = Uuid::new_v4().to_string(); let warehouses = state @@ -56,6 +66,37 @@ pub async fn login( })) } +fn records_to_arrow_string(recs: &Vec) -> Result { + let mut buf = Vec::new(); + let options = IpcWriteOptions::try_new(ARROW_IPC_ALIGNMENT, false, MetadataVersion::V5) + .context(dbt_error::ArrowSnafu)?; + if !recs.is_empty() { + let mut writer = + StreamWriter::try_new_with_options(&mut buf, recs[0].schema_ref(), options) + .context(dbt_error::ArrowSnafu)?; + for rec in recs { + writer.write(rec).context(dbt_error::ArrowSnafu)?; + } + writer.finish().context(dbt_error::ArrowSnafu)?; + drop(writer); + }; + Ok(engine_base64.encode(buf)) +} + +fn records_to_json_string(recs: &[RecordBatch]) -> Result { + let buf = Vec::new(); + let write_builder = WriterBuilder::new().with_explicit_nulls(true); + let mut writer = write_builder.build::<_, JsonArray>(buf); + let record_refs: Vec<&RecordBatch> = recs.iter().collect(); + writer + .write_batches(&record_refs) + .context(dbt_error::ArrowSnafu)?; + writer.finish().context(dbt_error::ArrowSnafu)?; + + // Get the underlying buffer back, + String::from_utf8(writer.into_inner()).context(dbt_error::Utf8Snafu) +} + #[tracing::instrument(level = "debug", skip(state), err, ret(level = tracing::Level::TRACE))] pub async fn query( DFSessionId(session_id): DFSessionId, @@ -86,30 +127,44 @@ pub async fn query( // let _ = log_query(&body_json.sql_text).await; - let (result, columns) = state + let (records, columns) = state .control_svc - .query_dbt(&session_id, &body_json.sql_text) + .query(&session_id, &body_json.sql_text) + .await .context(dbt_error::ControlServiceSnafu)?; - // query_result_format now is JSON since arrow IPC has flatbuffers bug - // https://github.com/apache/arrow-rs/pull/6426 - Ok(Json(JsonResponse { + debug!("serialized json: {}", records_to_json_string(&records)?.as_str()); + + let json_resp = Json(JsonResponse { data: Option::from(ResponseData { row_type: columns.into_iter().map(Into::into).collect(), - // row_set_base_64: Option::from(result.clone()), - row_set_base_64: None, - #[allow(clippy::unwrap_used)] - row_set: ResponseData::rows_to_vec(result.as_str())?, + query_result_format: Option::from(String::from(SERIALIZATION_FORMAT)), + row_set: if SERIALIZATION_FORMAT == "json" { + Option::from(ResponseData::rows_to_vec( + records_to_json_string(&records)?.as_str(), + )?) + } else { + None + }, + row_set_base_64: if SERIALIZATION_FORMAT == "arrow" { + Option::from(records_to_arrow_string(&records)?) + } else { + None + }, total: Some(1), - query_result_format: Option::from("json".to_string()), error_code: None, sql_state: Option::from("ok".to_string()), }), success: true, message: Option::from("successfully executed".to_string()), code: Some(format!("{:06}", 200)), - })) + }); + debug!( + "query {:?}, response: {:?}, records: {:?}", + body_json.sql_text, json_resp, records + ); + Ok(json_resp) } pub async fn abort() -> DbtResult> { @@ -139,3 +194,120 @@ pub fn extract_token(headers: &HeaderMap) -> Option { file.write_all(b"\n").await?; Ok(()) }*/ + +// mod tests { +// use std::io::Cursor; +// use arrow::record_batch::RecordBatch; +// use arrow::ipc::writer::{ FileWriter, StreamWriter, IpcWriteOptions }; +// use arrow::ipc::reader::{ FileReader, StreamReader }; +// use arrow::array::{ArrayRef, StringArray, UInt32Array}; +// use arrow::datatypes::{ Schema, Field, DataType }; +// use tracing::span::Record; +// use std::sync::Arc; +// use std::any::Any; + +// // fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch { +// // let mut buf = Vec::new(); +// // let mut writer = StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap(); +// // writer.write(rb).unwrap(); +// // writer.finish().unwrap(); +// // drop(writer); + +// // let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None).unwrap(); +// // reader.next().unwrap().unwrap() +// // } + +// // if records.len() > 0 { +// // println!("agahaha {:?}", roundtrip_ipc_stream(&records[0])); +// // } + +// // Try to add flatbuffer verification +// /////////////////// +// // let base64 = general_purpose::STANDARD.encode(buffer); +// // Ok((base64, columns)) +// // let encoded = general_purpose::STANDARD.decode(res.clone()).unwrap(); + +// // let mut verifier = Verifier::new(&VerifierOptions::default(), &encoded); +// // let mut builder = FlatBufferBuilder::new(); +// // let result = general_purpose::STANDARD.encode(buf); +// /////////////////// + +// #[test] +// fn test_slice_uint32() { +// /// Read/write a record batch to a File and Stream and ensure it is the same at the outout +// fn ensure_roundtrip(array: ArrayRef) { +// let num_rows = array.len(); +// let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap(); +// // take off the first element +// let sliced_batch = orig_batch.slice(1, num_rows - 1); + +// let schema = orig_batch.schema(); +// let stream_data = { +// let mut writer = StreamWriter::try_new(vec![], &schema).unwrap(); +// writer.write(&sliced_batch).unwrap(); +// writer.into_inner().unwrap() +// }; +// let read_batch = { +// let projection = None; +// let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap(); +// reader +// .next() +// .expect("expect no errors reading batch") +// .expect("expect batch") +// }; +// assert_eq!(sliced_batch, read_batch); + +// let file_data = { +// let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap(); +// writer.write(&sliced_batch).unwrap(); +// writer.into_inner().unwrap().into_inner().unwrap() +// }; +// let read_batch = { +// let projection = None; +// let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap(); +// reader +// .next() +// .expect("expect no errors reading batch") +// .expect("expect batch") +// }; +// assert_eq!(sliced_batch, read_batch); + +// // TODO test file writer/reader +// } + +// ensure_roundtrip(Arc::new(UInt32Array::from_iter((0..8).map(|i| { +// if i % 2 == 0 { +// Some(i) +// } else { +// None +// } +// })))); +// } + +// #[test] +// fn test_flatbuffer_issue_8150() { +// fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch { +// let mut buf = Vec::new(); +// let mut writer = StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap(); +// writer.write(rb).unwrap(); +// writer.finish().unwrap(); +// drop(writer); + +// let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None).unwrap(); +// reader.next().unwrap().unwrap() +// } + +// let schema = Schema::new(vec![ +// Field::new("hours", DataType::UInt32, true), +// Field::new("minutes", DataType::UInt32, true), +// ]); +// let batch = RecordBatch::try_new( +// Arc::new(schema), +// vec![ +// Arc::new(UInt32Array::from(vec![4, 7, 12, 16])), +// Arc::new(UInt32Array::from(vec![20, 40, 00, 20])), +// ] +// ).unwrap(); +// roundtrip_ipc_stream(&batch); +// } +// } diff --git a/crates/nexus/src/http/dbt/schemas.rs b/crates/nexus/src/http/dbt/schemas.rs index 216350df2..82709432e 100644 --- a/crates/nexus/src/http/dbt/schemas.rs +++ b/crates/nexus/src/http/dbt/schemas.rs @@ -85,7 +85,7 @@ pub struct ResponseData { #[serde(rename = "rowsetBase64")] pub row_set_base_64: Option, #[serde(rename = "rowset")] - pub row_set: Vec>, + pub row_set: Option>>, pub total: Option, #[serde(rename = "queryResultFormat")] pub query_result_format: Option, diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index af7acc64b..a5eda0b5a 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -12,15 +12,15 @@ async-trait = { workspace = true } chrono = { workspace = true } -datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" } -datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" } -datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" } -datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "a5bbbd1266447c26f3bfdb5314b4a326796bc784" } -datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" } +datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } +datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } +datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } +datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "1cbcd2bffa2c5b56add996859d02053fd4bc21fc" } +datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } futures = { workspace = true } -iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" } +iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } -iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" } +iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } object_store = { workspace = true } paste = "1" diff --git a/crates/runtime/src/datafusion/execution.rs b/crates/runtime/src/datafusion/execution.rs index 575f5afc3..72a8ec31a 100644 --- a/crates/runtime/src/datafusion/execution.rs +++ b/crates/runtime/src/datafusion/execution.rs @@ -5,7 +5,7 @@ use super::error::{self as ih_error, IcehutSQLError, IcehutSQLResult}; use crate::datafusion::functions::register_udfs; use crate::datafusion::planner::ExtendedSqlToRel; use crate::datafusion::session::SessionParams; -use arrow::array::{RecordBatch, UInt64Array}; +use arrow::array::{RecordBatch, Int64Array}; use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use datafusion::common::tree_node::{TransformedResult, TreeNode}; use datafusion::datasource::default_table_source::provider_as_source; @@ -769,6 +769,7 @@ impl SqlExecutor { warehouse_name: &str, ) -> IcehutSQLResult> { let plan = self.get_custom_logical_plan(query, warehouse_name).await?; + println!("plan: {:?}", plan); self.ctx .execute_logical_plan(plan) .await @@ -1251,11 +1252,11 @@ impl SqlExecutor { pub fn created_entity_response() -> Result, arrow::error::ArrowError> { let schema = Arc::new(ArrowSchema::new(vec![Field::new( "count", - DataType::UInt64, + DataType::Int64, false, )])); Ok(vec![RecordBatch::try_new( schema, - vec![Arc::new(UInt64Array::from(vec![0]))], + vec![Arc::new(Int64Array::from(vec![0]))], )?]) } From 2b70acad526ec70b79dc08d56a2d0cf0855f61f3 Mon Sep 17 00:00:00 2001 From: Yaroslav Litvinov Date: Mon, 10 Feb 2025 16:59:05 +0200 Subject: [PATCH 2/9] changes --- crates/runtime/Cargo.toml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index a5eda0b5a..ba90c7f2d 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -15,12 +15,15 @@ chrono = { workspace = true } datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } +datafusion-physical-plan = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } + datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "1cbcd2bffa2c5b56add996859d02053fd4bc21fc" } -datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } -futures = { workspace = true } -iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } +iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } +datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } + +futures = { workspace = true } object_store = { workspace = true } paste = "1" From aa4e6209ae25749961806b50089232b7d85514fc Mon Sep 17 00:00:00 2001 From: Yaroslav Litvinov Date: Mon, 10 Feb 2025 18:25:50 +0200 Subject: [PATCH 3/9] update datafusion in 2 more repos --- crates/control_plane/Cargo.toml | 6 +++--- crates/runtime/Cargo.toml | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/control_plane/Cargo.toml b/crates/control_plane/Cargo.toml index 114f5139b..d445f2911 100644 --- a/crates/control_plane/Cargo.toml +++ b/crates/control_plane/Cargo.toml @@ -18,12 +18,12 @@ datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } datafusion-physical-plan = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } -datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } flatbuffers = { version = "24.3.25" } futures = { workspace = true } - -iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } +iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } +datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } + icelake = { git = "https://github.com/Embucket/icelake.git", rev = "b4cbcaf" } datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "1cbcd2bffa2c5b56add996859d02053fd4bc21fc" } diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index ba90c7f2d..525dbf886 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -39,6 +39,7 @@ tokio = { workspace = true } tracing = { workspace = true } url = { version = "2.5.4" } uuid = { workspace = true } +datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "567a3f1c806eed61fe9fbcf12a71f3f8ab7faa95" } [lints] workspace = true From c97ac48551f036ee583a69db21843c9e71034d02 Mon Sep 17 00:00:00 2001 From: Yaroslav Litvinov Date: Mon, 10 Feb 2025 19:29:19 +0200 Subject: [PATCH 4/9] fmt, clippy --- crates/nexus/src/http/dbt/handlers.rs | 6 ++++-- crates/runtime/src/datafusion/execution.rs | 3 +-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/crates/nexus/src/http/dbt/handlers.rs b/crates/nexus/src/http/dbt/handlers.rs index c14911c3e..05704b976 100644 --- a/crates/nexus/src/http/dbt/handlers.rs +++ b/crates/nexus/src/http/dbt/handlers.rs @@ -130,11 +130,13 @@ pub async fn query( let (records, columns) = state .control_svc .query(&session_id, &body_json.sql_text) - .await .context(dbt_error::ControlServiceSnafu)?; - debug!("serialized json: {}", records_to_json_string(&records)?.as_str()); + debug!( + "serialized json: {}", + records_to_json_string(&records)?.as_str() + ); let json_resp = Json(JsonResponse { data: Option::from(ResponseData { diff --git a/crates/runtime/src/datafusion/execution.rs b/crates/runtime/src/datafusion/execution.rs index 72a8ec31a..4b01af790 100644 --- a/crates/runtime/src/datafusion/execution.rs +++ b/crates/runtime/src/datafusion/execution.rs @@ -5,7 +5,7 @@ use super::error::{self as ih_error, IcehutSQLError, IcehutSQLResult}; use crate::datafusion::functions::register_udfs; use crate::datafusion::planner::ExtendedSqlToRel; use crate::datafusion::session::SessionParams; -use arrow::array::{RecordBatch, Int64Array}; +use arrow::array::{Int64Array, RecordBatch}; use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use datafusion::common::tree_node::{TransformedResult, TreeNode}; use datafusion::datasource::default_table_source::provider_as_source; @@ -769,7 +769,6 @@ impl SqlExecutor { warehouse_name: &str, ) -> IcehutSQLResult> { let plan = self.get_custom_logical_plan(query, warehouse_name).await?; - println!("plan: {:?}", plan); self.ctx .execute_logical_plan(plan) .await From afaa30a55c45591ea63e2bdd30f4e6e0a66f3115 Mon Sep 17 00:00:00 2001 From: Yaroslav Litvinov Date: Tue, 11 Feb 2025 18:50:53 +0200 Subject: [PATCH 5/9] comments --- crates/nexus/src/http/dbt/handlers.rs | 137 ++------------------------ 1 file changed, 8 insertions(+), 129 deletions(-) diff --git a/crates/nexus/src/http/dbt/handlers.rs b/crates/nexus/src/http/dbt/handlers.rs index 05704b976..d7b83474d 100644 --- a/crates/nexus/src/http/dbt/handlers.rs +++ b/crates/nexus/src/http/dbt/handlers.rs @@ -24,7 +24,15 @@ use std::io::Read; use tracing::debug; use uuid::Uuid; +// TODO: move out as a configurable parameter const SERIALIZATION_FORMAT: &str = "arrow"; // or "json" + +// https://arrow.apache.org/docs/format/Columnar.html#buffer-alignment-and-padding +// Buffer Alignment and Padding: Implementations are recommended to allocate memory +// on aligned addresses (multiple of 8- or 64-bytes) and pad (overallocate) to a +// length that is a multiple of 8 or 64 bytes. When serializing Arrow data for interprocess +// communication, these alignment and padding requirements are enforced. +// For more info see issue #115 const ARROW_IPC_ALIGNMENT: usize = 8; #[tracing::instrument(level = "debug", skip(state), err, ret(level = tracing::Level::TRACE))] @@ -184,132 +192,3 @@ pub fn extract_token(headers: &HeaderMap) -> Option { }) }) } - -/*async fn log_query(query: &str) -> Result<(), std::io::Error> { - let mut file = OpenOptions::new() - .create(true) - .append(true) - .open("queries.log") - .await - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - file.write_all(query.as_bytes()).await?; - file.write_all(b"\n").await?; - Ok(()) -}*/ - -// mod tests { -// use std::io::Cursor; -// use arrow::record_batch::RecordBatch; -// use arrow::ipc::writer::{ FileWriter, StreamWriter, IpcWriteOptions }; -// use arrow::ipc::reader::{ FileReader, StreamReader }; -// use arrow::array::{ArrayRef, StringArray, UInt32Array}; -// use arrow::datatypes::{ Schema, Field, DataType }; -// use tracing::span::Record; -// use std::sync::Arc; -// use std::any::Any; - -// // fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch { -// // let mut buf = Vec::new(); -// // let mut writer = StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap(); -// // writer.write(rb).unwrap(); -// // writer.finish().unwrap(); -// // drop(writer); - -// // let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None).unwrap(); -// // reader.next().unwrap().unwrap() -// // } - -// // if records.len() > 0 { -// // println!("agahaha {:?}", roundtrip_ipc_stream(&records[0])); -// // } - -// // Try to add flatbuffer verification -// /////////////////// -// // let base64 = general_purpose::STANDARD.encode(buffer); -// // Ok((base64, columns)) -// // let encoded = general_purpose::STANDARD.decode(res.clone()).unwrap(); - -// // let mut verifier = Verifier::new(&VerifierOptions::default(), &encoded); -// // let mut builder = FlatBufferBuilder::new(); -// // let result = general_purpose::STANDARD.encode(buf); -// /////////////////// - -// #[test] -// fn test_slice_uint32() { -// /// Read/write a record batch to a File and Stream and ensure it is the same at the outout -// fn ensure_roundtrip(array: ArrayRef) { -// let num_rows = array.len(); -// let orig_batch = RecordBatch::try_from_iter(vec![("a", array)]).unwrap(); -// // take off the first element -// let sliced_batch = orig_batch.slice(1, num_rows - 1); - -// let schema = orig_batch.schema(); -// let stream_data = { -// let mut writer = StreamWriter::try_new(vec![], &schema).unwrap(); -// writer.write(&sliced_batch).unwrap(); -// writer.into_inner().unwrap() -// }; -// let read_batch = { -// let projection = None; -// let mut reader = StreamReader::try_new(Cursor::new(stream_data), projection).unwrap(); -// reader -// .next() -// .expect("expect no errors reading batch") -// .expect("expect batch") -// }; -// assert_eq!(sliced_batch, read_batch); - -// let file_data = { -// let mut writer = FileWriter::try_new_buffered(vec![], &schema).unwrap(); -// writer.write(&sliced_batch).unwrap(); -// writer.into_inner().unwrap().into_inner().unwrap() -// }; -// let read_batch = { -// let projection = None; -// let mut reader = FileReader::try_new(Cursor::new(file_data), projection).unwrap(); -// reader -// .next() -// .expect("expect no errors reading batch") -// .expect("expect batch") -// }; -// assert_eq!(sliced_batch, read_batch); - -// // TODO test file writer/reader -// } - -// ensure_roundtrip(Arc::new(UInt32Array::from_iter((0..8).map(|i| { -// if i % 2 == 0 { -// Some(i) -// } else { -// None -// } -// })))); -// } - -// #[test] -// fn test_flatbuffer_issue_8150() { -// fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch { -// let mut buf = Vec::new(); -// let mut writer = StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap(); -// writer.write(rb).unwrap(); -// writer.finish().unwrap(); -// drop(writer); - -// let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None).unwrap(); -// reader.next().unwrap().unwrap() -// } - -// let schema = Schema::new(vec![ -// Field::new("hours", DataType::UInt32, true), -// Field::new("minutes", DataType::UInt32, true), -// ]); -// let batch = RecordBatch::try_new( -// Arc::new(schema), -// vec![ -// Arc::new(UInt32Array::from(vec![4, 7, 12, 16])), -// Arc::new(UInt32Array::from(vec![20, 40, 00, 20])), -// ] -// ).unwrap(); -// roundtrip_ipc_stream(&batch); -// } -// } From 3a2fa6a7eeac62b67257c14acb22e770b6272390 Mon Sep 17 00:00:00 2001 From: Yaroslav Litvinov Date: Tue, 11 Feb 2025 20:45:10 +0200 Subject: [PATCH 6/9] fix rebase --- crates/runtime/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index 525dbf886..ba90c7f2d 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -39,7 +39,6 @@ tokio = { workspace = true } tracing = { workspace = true } url = { version = "2.5.4" } uuid = { workspace = true } -datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "567a3f1c806eed61fe9fbcf12a71f3f8ab7faa95" } [lints] workspace = true From d230cabb344802b83a9600e6a0871d4d0b98e936 Mon Sep 17 00:00:00 2001 From: Yaroslav Litvinov Date: Wed, 12 Feb 2025 00:08:06 +0200 Subject: [PATCH 7/9] enable json format --- crates/nexus/src/http/dbt/handlers.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/nexus/src/http/dbt/handlers.rs b/crates/nexus/src/http/dbt/handlers.rs index d7b83474d..591196a1a 100644 --- a/crates/nexus/src/http/dbt/handlers.rs +++ b/crates/nexus/src/http/dbt/handlers.rs @@ -25,7 +25,7 @@ use tracing::debug; use uuid::Uuid; // TODO: move out as a configurable parameter -const SERIALIZATION_FORMAT: &str = "arrow"; // or "json" +const SERIALIZATION_FORMAT: &str = "json"; // or "arrow" // https://arrow.apache.org/docs/format/Columnar.html#buffer-alignment-and-padding // Buffer Alignment and Padding: Implementations are recommended to allocate memory From 203573fd1c9a308240557fde5e4c84b173d2072a Mon Sep 17 00:00:00 2001 From: Yaroslav Litvinov Date: Wed, 12 Feb 2025 00:10:11 +0200 Subject: [PATCH 8/9] fix comment fmt --- crates/nexus/src/http/dbt/handlers.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/nexus/src/http/dbt/handlers.rs b/crates/nexus/src/http/dbt/handlers.rs index 591196a1a..d8221f762 100644 --- a/crates/nexus/src/http/dbt/handlers.rs +++ b/crates/nexus/src/http/dbt/handlers.rs @@ -28,10 +28,10 @@ use uuid::Uuid; const SERIALIZATION_FORMAT: &str = "json"; // or "arrow" // https://arrow.apache.org/docs/format/Columnar.html#buffer-alignment-and-padding -// Buffer Alignment and Padding: Implementations are recommended to allocate memory -// on aligned addresses (multiple of 8- or 64-bytes) and pad (overallocate) to a -// length that is a multiple of 8 or 64 bytes. When serializing Arrow data for interprocess -// communication, these alignment and padding requirements are enforced. +// Buffer Alignment and Padding: Implementations are recommended to allocate memory +// on aligned addresses (multiple of 8- or 64-bytes) and pad (overallocate) to a +// length that is a multiple of 8 or 64 bytes. When serializing Arrow data for interprocess +// communication, these alignment and padding requirements are enforced. // For more info see issue #115 const ARROW_IPC_ALIGNMENT: usize = 8; From c8938bf891c2766a2c8a1c86a0a68b7a902f76e5 Mon Sep 17 00:00:00 2001 From: Yaroslav Litvinov Date: Wed, 12 Feb 2025 02:31:09 +0200 Subject: [PATCH 9/9] update hash commits --- Cargo.toml | 2 +- crates/control_plane/Cargo.toml | 16 ++++++++-------- crates/runtime/Cargo.toml | 16 ++++++++-------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 46ee352a8..4f3827c19 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ snafu = { version = "0.8.5", features = ["futures"] } tracing = { version = "0.1" } [patch.crates-io] -datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } +datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } [workspace.lints.clippy] all = { level = "deny", priority = -1 } diff --git a/crates/control_plane/Cargo.toml b/crates/control_plane/Cargo.toml index d445f2911..065eccda2 100644 --- a/crates/control_plane/Cargo.toml +++ b/crates/control_plane/Cargo.toml @@ -13,20 +13,20 @@ bytes = { version = "1.8.0" } chrono = { workspace = true } -datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } -datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } -datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } -datafusion-physical-plan = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } +datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } +datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } +datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } +datafusion-physical-plan = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } flatbuffers = { version = "24.3.25" } futures = { workspace = true } -iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } -iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } -datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } +iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" } +iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" } +datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" } icelake = { git = "https://github.com/Embucket/icelake.git", rev = "b4cbcaf" } -datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "1cbcd2bffa2c5b56add996859d02053fd4bc21fc" } +datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "86f5294d30b7bf53fbcb2e25dee409ad818b34cc" } object_store = { workspace = true } once_cell = "1.19.0" diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index ba90c7f2d..bcacda407 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -12,16 +12,16 @@ async-trait = { workspace = true } chrono = { workspace = true } -datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } -datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } -datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } -datafusion-physical-plan = { git = "https://github.com/Embucket/datafusion.git", rev = "8c1bb903792f9c90fa0b1501180bba16cac361d8" } +datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } +datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } +datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } +datafusion-physical-plan = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } -datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "1cbcd2bffa2c5b56add996859d02053fd4bc21fc" } +datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "86f5294d30b7bf53fbcb2e25dee409ad818b34cc" } -iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } -iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } -datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "26b014d1fa6c0ffa33e4a7bfcc5db130e3cc6bc5" } +iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" } +iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" } +datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" } futures = { workspace = true } object_store = { workspace = true }