Skip to content

Commit fc752a7

Browse files
Add partial support of arrow format for dbt/snowflake endpoint (#222)
* Int64 instead of Uint64 in count batch result * switch on json format
1 parent 6b539b3 commit fc752a7

File tree

9 files changed

+122
-117
lines changed

9 files changed

+122
-117
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ snafu = { version = "0.8.5", features = ["futures"] }
3737
tracing = { version = "0.1" }
3838

3939
[patch.crates-io]
40-
datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" }
40+
datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" }
4141

4242
[workspace.lints.clippy]
4343
all = { level = "deny", priority = -1 }

crates/control_plane/Cargo.toml

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,21 @@ bytes = { version = "1.8.0" }
1313

1414
chrono = { workspace = true }
1515

16-
datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" }
17-
datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" }
18-
datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" }
19-
datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "a5bbbd1266447c26f3bfdb5314b4a326796bc784" }
20-
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" }
16+
datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" }
17+
datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" }
18+
datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" }
19+
datafusion-physical-plan = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" }
20+
2121
flatbuffers = { version = "24.3.25" }
2222
futures = { workspace = true }
23-
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" }
24-
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" }
23+
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" }
24+
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" }
25+
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" }
26+
2527
icelake = { git = "https://github.com/Embucket/icelake.git", rev = "b4cbcaf" }
2628

29+
datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "86f5294d30b7bf53fbcb2e25dee409ad818b34cc" }
30+
2731
object_store = { workspace = true }
2832
once_cell = "1.19.0"
2933
quick-xml = { version = "0.36.2" }

crates/control_plane/src/service.rs

Lines changed: 0 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,6 @@ pub trait ControlService: Send + Sync {
6868

6969
async fn query_table(&self, session_id: &str, query: &str) -> ControlPlaneResult<String>;
7070

71-
async fn query_dbt(
72-
&self,
73-
session_id: &str,
74-
query: &str,
75-
) -> ControlPlaneResult<(String, Vec<ColumnInfo>)>;
76-
7771
async fn upload_data_to_table(
7872
&self,
7973
session_id: &str,
@@ -355,69 +349,6 @@ impl ControlService for ControlServiceImpl {
355349
Ok(String::from_utf8(buf).context(crate::error::Utf8Snafu)?)
356350
}
357351

358-
#[tracing::instrument(level = "debug", skip(self))]
359-
async fn query_dbt(
360-
&self,
361-
session_id: &str,
362-
query: &str,
363-
) -> ControlPlaneResult<(String, Vec<ColumnInfo>)> {
364-
let (records, columns) = self.query(session_id, query).await?;
365-
366-
// THIS CODE RELATED TO ARROW FORMAT
367-
//////////////////////////////////////
368-
369-
// fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
370-
// let mut buf = Vec::new();
371-
// let mut writer = StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
372-
// writer.write(rb).unwrap();
373-
// writer.finish().unwrap();
374-
// drop(writer);
375-
//
376-
// let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None).unwrap();
377-
// reader.next().unwrap().unwrap()
378-
// }
379-
//
380-
// println!("agahaha {:?}", roundtrip_ipc_stream(&records[0]));
381-
382-
// let mut buffer = Vec::new();
383-
// let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
384-
// let mut stream_writer =
385-
// StreamWriter::try_new_with_options(&mut buffer, &records[0].schema_ref(), options)
386-
// .unwrap();
387-
// stream_writer.write(&records[0]).unwrap();
388-
// stream_writer.finish().unwrap();
389-
// drop(stream_writer);
390-
391-
// // Try to add flatbuffer verification
392-
// println!("{:?}", buffer.len());
393-
// let base64 = general_purpose::STANDARD.encode(buffer);
394-
// Ok((base64, columns))
395-
// let encoded = general_purpose::STANDARD.decode(res.clone()).unwrap();
396-
//
397-
// let mut verifier = Verifier::new(&VerifierOptions::default(), &encoded);
398-
// let mut builder = FlatBufferBuilder::new();
399-
// let res = general_purpose::STANDARD.encode(buf);
400-
//////////////////////////////////////
401-
402-
// We use json format since there is a bug between arrow and nanoarrow
403-
let buf = Vec::new();
404-
let write_builder = WriterBuilder::new().with_explicit_nulls(true);
405-
let mut writer = write_builder.build::<_, JsonArray>(buf);
406-
407-
let record_refs: Vec<&RecordBatch> = records.iter().collect();
408-
writer
409-
.write_batches(&record_refs)
410-
.context(crate::error::ArrowSnafu)?;
411-
writer.finish().context(crate::error::ArrowSnafu)?;
412-
413-
// Get the underlying buffer back,
414-
let buf = writer.into_inner();
415-
Ok((
416-
String::from_utf8(buf).context(crate::error::Utf8Snafu)?,
417-
columns,
418-
))
419-
}
420-
421352
#[tracing::instrument(level = "debug", skip(self))]
422353
async fn upload_data_to_table(
423354
&self,

crates/nexus/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ utoipa-axum = { workspace = true }
5454
utoipa-swagger-ui = { workspace = true }
5555
uuid = { workspace = true, features = ["v4", "v5"] }
5656
validator = { version = "0.18.1", features = ["derive"] }
57+
base64 = { version = "0.22.1" }
5758

5859
[dev-dependencies]
5960
async-trait = { workspace = true }

crates/nexus/src/http/dbt/error.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ pub enum DbtError {
3737

3838
#[snafu(display("Failed to parse row JSON"))]
3939
RowParse { source: serde_json::Error },
40+
41+
#[snafu(display("UTF8 error: {source}"))]
42+
Utf8 { source: std::string::FromUtf8Error },
43+
44+
#[snafu(display("Arrow error: {source}"))]
45+
Arrow { source: arrow::error::ArrowError },
4046
}
4147

4248
pub type DbtResult<T> = std::result::Result<T, DbtError>;
@@ -48,9 +54,10 @@ impl IntoResponse for DbtError {
4854
| Self::LoginRequestParse { .. }
4955
| Self::QueryBodyParse { .. }
5056
| Self::InvalidWarehouseIdFormat { .. } => http::StatusCode::BAD_REQUEST,
51-
Self::ControlService { .. } | Self::RowParse { .. } => {
52-
http::StatusCode::INTERNAL_SERVER_ERROR
53-
}
57+
Self::ControlService { .. }
58+
| Self::RowParse { .. }
59+
| Self::Utf8 { .. }
60+
| Self::Arrow { .. } => http::StatusCode::INTERNAL_SERVER_ERROR,
5461
Self::MissingAuthToken | Self::MissingDbtSession | Self::InvalidAuthData => {
5562
http::StatusCode::UNAUTHORIZED
5663
}
@@ -69,6 +76,12 @@ impl IntoResponse for DbtError {
6976
Self::MissingAuthToken | Self::MissingDbtSession | Self::InvalidAuthData => {
7077
"session error".to_string()
7178
}
79+
Self::Utf8 { source } => {
80+
format!("Error encoding UTF8 string: {source}")
81+
}
82+
Self::Arrow { source } => {
83+
format!("Error encoding in Arrow format: {source}")
84+
}
7285
Self::NotImplemented => "feature not implemented".to_string(),
7386
};
7487

crates/nexus/src/http/dbt/handlers.rs

Lines changed: 78 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,36 @@ use crate::http::dbt::schemas::{
55
};
66
use crate::http::session::DFSessionId;
77
use crate::state::AppState;
8+
use arrow::ipc::writer::{IpcWriteOptions, StreamWriter};
9+
use arrow::ipc::MetadataVersion;
10+
use arrow::json::writer::JsonArray;
11+
use arrow::json::WriterBuilder;
12+
use arrow::record_batch::RecordBatch;
813
use axum::body::Bytes;
914
use axum::extract::{Query, State};
1015
use axum::http::HeaderMap;
1116
use axum::Json;
17+
use base64;
18+
use base64::engine::general_purpose::STANDARD as engine_base64;
19+
use base64::prelude::*;
1220
use flate2::read::GzDecoder;
1321
use regex::Regex;
1422
use snafu::ResultExt;
1523
use std::io::Read;
24+
use tracing::debug;
1625
use uuid::Uuid;
1726

27+
// TODO: move out as a configurable parameter
28+
const SERIALIZATION_FORMAT: &str = "json"; // or "arrow"
29+
30+
// https://arrow.apache.org/docs/format/Columnar.html#buffer-alignment-and-padding
31+
// Buffer Alignment and Padding: Implementations are recommended to allocate memory
32+
// on aligned addresses (multiple of 8- or 64-bytes) and pad (overallocate) to a
33+
// length that is a multiple of 8 or 64 bytes. When serializing Arrow data for interprocess
34+
// communication, these alignment and padding requirements are enforced.
35+
// For more info see issue #115
36+
const ARROW_IPC_ALIGNMENT: usize = 8;
37+
1838
#[tracing::instrument(level = "debug", skip(state), err, ret(level = tracing::Level::TRACE))]
1939
pub async fn login(
2040
State(state): State<AppState>,
@@ -32,8 +52,6 @@ pub async fn login(
3252
let _body_json: LoginRequestBody =
3353
serde_json::from_str(&s).context(dbt_error::LoginRequestParseSnafu)?;
3454

35-
//println!("Received login request: {:?}", query);
36-
//println!("Body data parameters: {:?}", body_json);
3755
let token = Uuid::new_v4().to_string();
3856

3957
let warehouses = state
@@ -56,6 +74,37 @@ pub async fn login(
5674
}))
5775
}
5876

77+
fn records_to_arrow_string(recs: &Vec<RecordBatch>) -> Result<String, DbtError> {
78+
let mut buf = Vec::new();
79+
let options = IpcWriteOptions::try_new(ARROW_IPC_ALIGNMENT, false, MetadataVersion::V5)
80+
.context(dbt_error::ArrowSnafu)?;
81+
if !recs.is_empty() {
82+
let mut writer =
83+
StreamWriter::try_new_with_options(&mut buf, recs[0].schema_ref(), options)
84+
.context(dbt_error::ArrowSnafu)?;
85+
for rec in recs {
86+
writer.write(rec).context(dbt_error::ArrowSnafu)?;
87+
}
88+
writer.finish().context(dbt_error::ArrowSnafu)?;
89+
drop(writer);
90+
};
91+
Ok(engine_base64.encode(buf))
92+
}
93+
94+
fn records_to_json_string(recs: &[RecordBatch]) -> Result<String, DbtError> {
95+
let buf = Vec::new();
96+
let write_builder = WriterBuilder::new().with_explicit_nulls(true);
97+
let mut writer = write_builder.build::<_, JsonArray>(buf);
98+
let record_refs: Vec<&RecordBatch> = recs.iter().collect();
99+
writer
100+
.write_batches(&record_refs)
101+
.context(dbt_error::ArrowSnafu)?;
102+
writer.finish().context(dbt_error::ArrowSnafu)?;
103+
104+
// Get the underlying buffer back,
105+
String::from_utf8(writer.into_inner()).context(dbt_error::Utf8Snafu)
106+
}
107+
59108
#[tracing::instrument(level = "debug", skip(state), err, ret(level = tracing::Level::TRACE))]
60109
pub async fn query(
61110
DFSessionId(session_id): DFSessionId,
@@ -86,30 +135,46 @@ pub async fn query(
86135

87136
// let _ = log_query(&body_json.sql_text).await;
88137

89-
let (result, columns) = state
138+
let (records, columns) = state
90139
.control_svc
91-
.query_dbt(&session_id, &body_json.sql_text)
140+
.query(&session_id, &body_json.sql_text)
92141
.await
93142
.context(dbt_error::ControlServiceSnafu)?;
94143

95-
// query_result_format now is JSON since arrow IPC has flatbuffers bug
96-
// https://github.com/apache/arrow-rs/pull/6426
97-
Ok(Json(JsonResponse {
144+
debug!(
145+
"serialized json: {}",
146+
records_to_json_string(&records)?.as_str()
147+
);
148+
149+
let json_resp = Json(JsonResponse {
98150
data: Option::from(ResponseData {
99151
row_type: columns.into_iter().map(Into::into).collect(),
100-
// row_set_base_64: Option::from(result.clone()),
101-
row_set_base_64: None,
102-
#[allow(clippy::unwrap_used)]
103-
row_set: ResponseData::rows_to_vec(result.as_str())?,
152+
query_result_format: Option::from(String::from(SERIALIZATION_FORMAT)),
153+
row_set: if SERIALIZATION_FORMAT == "json" {
154+
Option::from(ResponseData::rows_to_vec(
155+
records_to_json_string(&records)?.as_str(),
156+
)?)
157+
} else {
158+
None
159+
},
160+
row_set_base_64: if SERIALIZATION_FORMAT == "arrow" {
161+
Option::from(records_to_arrow_string(&records)?)
162+
} else {
163+
None
164+
},
104165
total: Some(1),
105-
query_result_format: Option::from("json".to_string()),
106166
error_code: None,
107167
sql_state: Option::from("ok".to_string()),
108168
}),
109169
success: true,
110170
message: Option::from("successfully executed".to_string()),
111171
code: Some(format!("{:06}", 200)),
112-
}))
172+
});
173+
debug!(
174+
"query {:?}, response: {:?}, records: {:?}",
175+
body_json.sql_text, json_resp, records
176+
);
177+
Ok(json_resp)
113178
}
114179

115180
pub async fn abort() -> DbtResult<Json<serde_json::value::Value>> {
@@ -127,15 +192,3 @@ pub fn extract_token(headers: &HeaderMap) -> Option<String> {
127192
})
128193
})
129194
}
130-
131-
/*async fn log_query(query: &str) -> Result<(), std::io::Error> {
132-
let mut file = OpenOptions::new()
133-
.create(true)
134-
.append(true)
135-
.open("queries.log")
136-
.await
137-
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
138-
file.write_all(query.as_bytes()).await?;
139-
file.write_all(b"\n").await?;
140-
Ok(())
141-
}*/

crates/nexus/src/http/dbt/schemas.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ pub struct ResponseData {
8585
#[serde(rename = "rowsetBase64")]
8686
pub row_set_base_64: Option<String>,
8787
#[serde(rename = "rowset")]
88-
pub row_set: Vec<Vec<serde_json::Value>>,
88+
pub row_set: Option<Vec<Vec<serde_json::Value>>>,
8989
pub total: Option<u32>,
9090
#[serde(rename = "queryResultFormat")]
9191
pub query_result_format: Option<String>,

crates/runtime/Cargo.toml

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,18 @@ async-trait = { workspace = true }
1212

1313
chrono = { workspace = true }
1414

15-
datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" }
16-
datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" }
17-
datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" }
18-
datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "a5bbbd1266447c26f3bfdb5314b4a326796bc784" }
19-
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" }
20-
futures = { workspace = true }
21-
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" }
15+
datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" }
16+
datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" }
17+
datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" }
18+
datafusion-physical-plan = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" }
19+
20+
datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "86f5294d30b7bf53fbcb2e25dee409ad818b34cc" }
2221

23-
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" }
22+
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" }
23+
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" }
24+
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" }
25+
26+
futures = { workspace = true }
2427
object_store = { workspace = true }
2528

2629
paste = "1"

crates/runtime/src/datafusion/execution.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use super::error::{self as ih_error, IcehutSQLError, IcehutSQLResult};
55
use crate::datafusion::functions::register_udfs;
66
use crate::datafusion::planner::ExtendedSqlToRel;
77
use crate::datafusion::session::SessionParams;
8-
use arrow::array::{RecordBatch, UInt64Array};
8+
use arrow::array::{Int64Array, RecordBatch};
99
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
1010
use datafusion::common::tree_node::{TransformedResult, TreeNode};
1111
use datafusion::datasource::default_table_source::provider_as_source;
@@ -1251,11 +1251,11 @@ impl SqlExecutor {
12511251
pub fn created_entity_response() -> Result<Vec<RecordBatch>, arrow::error::ArrowError> {
12521252
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
12531253
"count",
1254-
DataType::UInt64,
1254+
DataType::Int64,
12551255
false,
12561256
)]));
12571257
Ok(vec![RecordBatch::try_new(
12581258
schema,
1259-
vec![Arc::new(UInt64Array::from(vec![0]))],
1259+
vec![Arc::new(Int64Array::from(vec![0]))],
12601260
)?])
12611261
}

0 commit comments

Comments
 (0)