diff --git a/Cargo.toml b/Cargo.toml index 3c13c1a24..4f3827c19 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ snafu = { version = "0.8.5", features = ["futures"] } tracing = { version = "0.1" } [patch.crates-io] -datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" } +datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } [workspace.lints.clippy] all = { level = "deny", priority = -1 } diff --git a/crates/control_plane/Cargo.toml b/crates/control_plane/Cargo.toml index fea567b63..065eccda2 100644 --- a/crates/control_plane/Cargo.toml +++ b/crates/control_plane/Cargo.toml @@ -13,17 +13,21 @@ bytes = { version = "1.8.0" } chrono = { workspace = true } -datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" } -datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" } -datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" } -datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "a5bbbd1266447c26f3bfdb5314b4a326796bc784" } -datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" } +datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } +datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } +datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } +datafusion-physical-plan = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } + flatbuffers = { version = "24.3.25" } futures = { workspace = true } -iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" } -iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" } +iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" } +iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" } +datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" } + icelake = { git = "https://github.com/Embucket/icelake.git", rev = "b4cbcaf" } +datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "86f5294d30b7bf53fbcb2e25dee409ad818b34cc" } + object_store = { workspace = true } once_cell = "1.19.0" quick-xml = { version = "0.36.2" } diff --git a/crates/control_plane/src/service.rs b/crates/control_plane/src/service.rs index ba4f91bc4..d86dbd079 100644 --- a/crates/control_plane/src/service.rs +++ b/crates/control_plane/src/service.rs @@ -68,12 +68,6 @@ pub trait ControlService: Send + Sync { async fn query_table(&self, session_id: &str, query: &str) -> ControlPlaneResult; - async fn query_dbt( - &self, - session_id: &str, - query: &str, - ) -> ControlPlaneResult<(String, Vec)>; - async fn upload_data_to_table( &self, session_id: &str, @@ -355,69 +349,6 @@ impl ControlService for ControlServiceImpl { Ok(String::from_utf8(buf).context(crate::error::Utf8Snafu)?) } - #[tracing::instrument(level = "debug", skip(self))] - async fn query_dbt( - &self, - session_id: &str, - query: &str, - ) -> ControlPlaneResult<(String, Vec)> { - let (records, columns) = self.query(session_id, query).await?; - - // THIS CODE RELATED TO ARROW FORMAT - ////////////////////////////////////// - - // fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch { - // let mut buf = Vec::new(); - // let mut writer = StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap(); - // writer.write(rb).unwrap(); - // writer.finish().unwrap(); - // drop(writer); - // - // let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None).unwrap(); - // reader.next().unwrap().unwrap() - // } - // - // println!("agahaha {:?}", roundtrip_ipc_stream(&records[0])); - - // let mut buffer = Vec::new(); - // let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap(); - // let mut stream_writer = - // StreamWriter::try_new_with_options(&mut buffer, &records[0].schema_ref(), options) - // .unwrap(); - // stream_writer.write(&records[0]).unwrap(); - // stream_writer.finish().unwrap(); - // drop(stream_writer); - - // // Try to add flatbuffer verification - // println!("{:?}", buffer.len()); - // let base64 = general_purpose::STANDARD.encode(buffer); - // Ok((base64, columns)) - // let encoded = general_purpose::STANDARD.decode(res.clone()).unwrap(); - // - // let mut verifier = Verifier::new(&VerifierOptions::default(), &encoded); - // let mut builder = FlatBufferBuilder::new(); - // let res = general_purpose::STANDARD.encode(buf); - ////////////////////////////////////// - - // We use json format since there is a bug between arrow and nanoarrow - let buf = Vec::new(); - let write_builder = WriterBuilder::new().with_explicit_nulls(true); - let mut writer = write_builder.build::<_, JsonArray>(buf); - - let record_refs: Vec<&RecordBatch> = records.iter().collect(); - writer - .write_batches(&record_refs) - .context(crate::error::ArrowSnafu)?; - writer.finish().context(crate::error::ArrowSnafu)?; - - // Get the underlying buffer back, - let buf = writer.into_inner(); - Ok(( - String::from_utf8(buf).context(crate::error::Utf8Snafu)?, - columns, - )) - } - #[tracing::instrument(level = "debug", skip(self))] async fn upload_data_to_table( &self, diff --git a/crates/nexus/Cargo.toml b/crates/nexus/Cargo.toml index a5390a46a..176549fea 100644 --- a/crates/nexus/Cargo.toml +++ b/crates/nexus/Cargo.toml @@ -54,6 +54,7 @@ utoipa-axum = { workspace = true } utoipa-swagger-ui = { workspace = true } uuid = { workspace = true, features = ["v4", "v5"] } validator = { version = "0.18.1", features = ["derive"] } +base64 = { version = "0.22.1" } [dev-dependencies] async-trait = { workspace = true } diff --git a/crates/nexus/src/http/dbt/error.rs b/crates/nexus/src/http/dbt/error.rs index 9e4b881ab..222433378 100644 --- a/crates/nexus/src/http/dbt/error.rs +++ b/crates/nexus/src/http/dbt/error.rs @@ -37,6 +37,12 @@ pub enum DbtError { #[snafu(display("Failed to parse row JSON"))] RowParse { source: serde_json::Error }, + + #[snafu(display("UTF8 error: {source}"))] + Utf8 { source: std::string::FromUtf8Error }, + + #[snafu(display("Arrow error: {source}"))] + Arrow { source: arrow::error::ArrowError }, } pub type DbtResult = std::result::Result; @@ -48,9 +54,10 @@ impl IntoResponse for DbtError { | Self::LoginRequestParse { .. } | Self::QueryBodyParse { .. } | Self::InvalidWarehouseIdFormat { .. } => http::StatusCode::BAD_REQUEST, - Self::ControlService { .. } | Self::RowParse { .. } => { - http::StatusCode::INTERNAL_SERVER_ERROR - } + Self::ControlService { .. } + | Self::RowParse { .. } + | Self::Utf8 { .. } + | Self::Arrow { .. } => http::StatusCode::INTERNAL_SERVER_ERROR, Self::MissingAuthToken | Self::MissingDbtSession | Self::InvalidAuthData => { http::StatusCode::UNAUTHORIZED } @@ -69,6 +76,12 @@ impl IntoResponse for DbtError { Self::MissingAuthToken | Self::MissingDbtSession | Self::InvalidAuthData => { "session error".to_string() } + Self::Utf8 { source } => { + format!("Error encoding UTF8 string: {source}") + } + Self::Arrow { source } => { + format!("Error encoding in Arrow format: {source}") + } Self::NotImplemented => "feature not implemented".to_string(), }; diff --git a/crates/nexus/src/http/dbt/handlers.rs b/crates/nexus/src/http/dbt/handlers.rs index 634ef6966..d8221f762 100644 --- a/crates/nexus/src/http/dbt/handlers.rs +++ b/crates/nexus/src/http/dbt/handlers.rs @@ -5,16 +5,36 @@ use crate::http::dbt::schemas::{ }; use crate::http::session::DFSessionId; use crate::state::AppState; +use arrow::ipc::writer::{IpcWriteOptions, StreamWriter}; +use arrow::ipc::MetadataVersion; +use arrow::json::writer::JsonArray; +use arrow::json::WriterBuilder; +use arrow::record_batch::RecordBatch; use axum::body::Bytes; use axum::extract::{Query, State}; use axum::http::HeaderMap; use axum::Json; +use base64; +use base64::engine::general_purpose::STANDARD as engine_base64; +use base64::prelude::*; use flate2::read::GzDecoder; use regex::Regex; use snafu::ResultExt; use std::io::Read; +use tracing::debug; use uuid::Uuid; +// TODO: move out as a configurable parameter +const SERIALIZATION_FORMAT: &str = "json"; // or "arrow" + +// https://arrow.apache.org/docs/format/Columnar.html#buffer-alignment-and-padding +// Buffer Alignment and Padding: Implementations are recommended to allocate memory +// on aligned addresses (multiple of 8- or 64-bytes) and pad (overallocate) to a +// length that is a multiple of 8 or 64 bytes. When serializing Arrow data for interprocess +// communication, these alignment and padding requirements are enforced. +// For more info see issue #115 +const ARROW_IPC_ALIGNMENT: usize = 8; + #[tracing::instrument(level = "debug", skip(state), err, ret(level = tracing::Level::TRACE))] pub async fn login( State(state): State, @@ -32,8 +52,6 @@ pub async fn login( let _body_json: LoginRequestBody = serde_json::from_str(&s).context(dbt_error::LoginRequestParseSnafu)?; - //println!("Received login request: {:?}", query); - //println!("Body data parameters: {:?}", body_json); let token = Uuid::new_v4().to_string(); let warehouses = state @@ -56,6 +74,37 @@ pub async fn login( })) } +fn records_to_arrow_string(recs: &Vec) -> Result { + let mut buf = Vec::new(); + let options = IpcWriteOptions::try_new(ARROW_IPC_ALIGNMENT, false, MetadataVersion::V5) + .context(dbt_error::ArrowSnafu)?; + if !recs.is_empty() { + let mut writer = + StreamWriter::try_new_with_options(&mut buf, recs[0].schema_ref(), options) + .context(dbt_error::ArrowSnafu)?; + for rec in recs { + writer.write(rec).context(dbt_error::ArrowSnafu)?; + } + writer.finish().context(dbt_error::ArrowSnafu)?; + drop(writer); + }; + Ok(engine_base64.encode(buf)) +} + +fn records_to_json_string(recs: &[RecordBatch]) -> Result { + let buf = Vec::new(); + let write_builder = WriterBuilder::new().with_explicit_nulls(true); + let mut writer = write_builder.build::<_, JsonArray>(buf); + let record_refs: Vec<&RecordBatch> = recs.iter().collect(); + writer + .write_batches(&record_refs) + .context(dbt_error::ArrowSnafu)?; + writer.finish().context(dbt_error::ArrowSnafu)?; + + // Get the underlying buffer back, + String::from_utf8(writer.into_inner()).context(dbt_error::Utf8Snafu) +} + #[tracing::instrument(level = "debug", skip(state), err, ret(level = tracing::Level::TRACE))] pub async fn query( DFSessionId(session_id): DFSessionId, @@ -86,30 +135,46 @@ pub async fn query( // let _ = log_query(&body_json.sql_text).await; - let (result, columns) = state + let (records, columns) = state .control_svc - .query_dbt(&session_id, &body_json.sql_text) + .query(&session_id, &body_json.sql_text) .await .context(dbt_error::ControlServiceSnafu)?; - // query_result_format now is JSON since arrow IPC has flatbuffers bug - // https://github.com/apache/arrow-rs/pull/6426 - Ok(Json(JsonResponse { + debug!( + "serialized json: {}", + records_to_json_string(&records)?.as_str() + ); + + let json_resp = Json(JsonResponse { data: Option::from(ResponseData { row_type: columns.into_iter().map(Into::into).collect(), - // row_set_base_64: Option::from(result.clone()), - row_set_base_64: None, - #[allow(clippy::unwrap_used)] - row_set: ResponseData::rows_to_vec(result.as_str())?, + query_result_format: Option::from(String::from(SERIALIZATION_FORMAT)), + row_set: if SERIALIZATION_FORMAT == "json" { + Option::from(ResponseData::rows_to_vec( + records_to_json_string(&records)?.as_str(), + )?) + } else { + None + }, + row_set_base_64: if SERIALIZATION_FORMAT == "arrow" { + Option::from(records_to_arrow_string(&records)?) + } else { + None + }, total: Some(1), - query_result_format: Option::from("json".to_string()), error_code: None, sql_state: Option::from("ok".to_string()), }), success: true, message: Option::from("successfully executed".to_string()), code: Some(format!("{:06}", 200)), - })) + }); + debug!( + "query {:?}, response: {:?}, records: {:?}", + body_json.sql_text, json_resp, records + ); + Ok(json_resp) } pub async fn abort() -> DbtResult> { @@ -127,15 +192,3 @@ pub fn extract_token(headers: &HeaderMap) -> Option { }) }) } - -/*async fn log_query(query: &str) -> Result<(), std::io::Error> { - let mut file = OpenOptions::new() - .create(true) - .append(true) - .open("queries.log") - .await - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - file.write_all(query.as_bytes()).await?; - file.write_all(b"\n").await?; - Ok(()) -}*/ diff --git a/crates/nexus/src/http/dbt/schemas.rs b/crates/nexus/src/http/dbt/schemas.rs index 216350df2..82709432e 100644 --- a/crates/nexus/src/http/dbt/schemas.rs +++ b/crates/nexus/src/http/dbt/schemas.rs @@ -85,7 +85,7 @@ pub struct ResponseData { #[serde(rename = "rowsetBase64")] pub row_set_base_64: Option, #[serde(rename = "rowset")] - pub row_set: Vec>, + pub row_set: Option>>, pub total: Option, #[serde(rename = "queryResultFormat")] pub query_result_format: Option, diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index af7acc64b..bcacda407 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -12,15 +12,18 @@ async-trait = { workspace = true } chrono = { workspace = true } -datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" } -datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" } -datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" } -datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "a5bbbd1266447c26f3bfdb5314b4a326796bc784" } -datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" } -futures = { workspace = true } -iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" } +datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } +datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } +datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } +datafusion-physical-plan = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" } + +datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "86f5294d30b7bf53fbcb2e25dee409ad818b34cc" } -iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" } +iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" } +iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" } +datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" } + +futures = { workspace = true } object_store = { workspace = true } paste = "1" diff --git a/crates/runtime/src/datafusion/execution.rs b/crates/runtime/src/datafusion/execution.rs index 575f5afc3..4b01af790 100644 --- a/crates/runtime/src/datafusion/execution.rs +++ b/crates/runtime/src/datafusion/execution.rs @@ -5,7 +5,7 @@ use super::error::{self as ih_error, IcehutSQLError, IcehutSQLResult}; use crate::datafusion::functions::register_udfs; use crate::datafusion::planner::ExtendedSqlToRel; use crate::datafusion::session::SessionParams; -use arrow::array::{RecordBatch, UInt64Array}; +use arrow::array::{Int64Array, RecordBatch}; use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use datafusion::common::tree_node::{TransformedResult, TreeNode}; use datafusion::datasource::default_table_source::provider_as_source; @@ -1251,11 +1251,11 @@ impl SqlExecutor { pub fn created_entity_response() -> Result, arrow::error::ArrowError> { let schema = Arc::new(ArrowSchema::new(vec![Field::new( "count", - DataType::UInt64, + DataType::Int64, false, )])); Ok(vec![RecordBatch::try_new( schema, - vec![Arc::new(UInt64Array::from(vec![0]))], + vec![Arc::new(Int64Array::from(vec![0]))], )?]) }