Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ snafu = { version = "0.8.5", features = ["futures"] }
tracing = { version = "0.1" }

[patch.crates-io]
datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" }
datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" }

[workspace.lints.clippy]
all = { level = "deny", priority = -1 }
Expand Down
18 changes: 11 additions & 7 deletions crates/control_plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,21 @@ bytes = { version = "1.8.0" }

chrono = { workspace = true }

datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" }
datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" }
datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" }
datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "a5bbbd1266447c26f3bfdb5314b4a326796bc784" }
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" }
datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" }
datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" }
datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" }
datafusion-physical-plan = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" }

flatbuffers = { version = "24.3.25" }
futures = { workspace = true }
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" }
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" }
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" }
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" }
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" }

icelake = { git = "https://github.com/Embucket/icelake.git", rev = "b4cbcaf" }

datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "86f5294d30b7bf53fbcb2e25dee409ad818b34cc" }

object_store = { workspace = true }
once_cell = "1.19.0"
quick-xml = { version = "0.36.2" }
Expand Down
69 changes: 0 additions & 69 deletions crates/control_plane/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@ pub trait ControlService: Send + Sync {

async fn query_table(&self, session_id: &str, query: &str) -> ControlPlaneResult<String>;

async fn query_dbt(
&self,
session_id: &str,
query: &str,
) -> ControlPlaneResult<(String, Vec<ColumnInfo>)>;

async fn upload_data_to_table(
&self,
session_id: &str,
Expand Down Expand Up @@ -355,69 +349,6 @@ impl ControlService for ControlServiceImpl {
Ok(String::from_utf8(buf).context(crate::error::Utf8Snafu)?)
}

#[tracing::instrument(level = "debug", skip(self))]
async fn query_dbt(
&self,
session_id: &str,
query: &str,
) -> ControlPlaneResult<(String, Vec<ColumnInfo>)> {
let (records, columns) = self.query(session_id, query).await?;

// THIS CODE RELATED TO ARROW FORMAT
//////////////////////////////////////

// fn roundtrip_ipc_stream(rb: &RecordBatch) -> RecordBatch {
// let mut buf = Vec::new();
// let mut writer = StreamWriter::try_new(&mut buf, rb.schema_ref()).unwrap();
// writer.write(rb).unwrap();
// writer.finish().unwrap();
// drop(writer);
//
// let mut reader = StreamReader::try_new(std::io::Cursor::new(buf), None).unwrap();
// reader.next().unwrap().unwrap()
// }
//
// println!("agahaha {:?}", roundtrip_ipc_stream(&records[0]));

// let mut buffer = Vec::new();
// let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
// let mut stream_writer =
// StreamWriter::try_new_with_options(&mut buffer, &records[0].schema_ref(), options)
// .unwrap();
// stream_writer.write(&records[0]).unwrap();
// stream_writer.finish().unwrap();
// drop(stream_writer);

// // Try to add flatbuffer verification
// println!("{:?}", buffer.len());
// let base64 = general_purpose::STANDARD.encode(buffer);
// Ok((base64, columns))
// let encoded = general_purpose::STANDARD.decode(res.clone()).unwrap();
//
// let mut verifier = Verifier::new(&VerifierOptions::default(), &encoded);
// let mut builder = FlatBufferBuilder::new();
// let res = general_purpose::STANDARD.encode(buf);
//////////////////////////////////////

// We use json format since there is a bug between arrow and nanoarrow
let buf = Vec::new();
let write_builder = WriterBuilder::new().with_explicit_nulls(true);
let mut writer = write_builder.build::<_, JsonArray>(buf);

let record_refs: Vec<&RecordBatch> = records.iter().collect();
writer
.write_batches(&record_refs)
.context(crate::error::ArrowSnafu)?;
writer.finish().context(crate::error::ArrowSnafu)?;

// Get the underlying buffer back,
let buf = writer.into_inner();
Ok((
String::from_utf8(buf).context(crate::error::Utf8Snafu)?,
columns,
))
}

#[tracing::instrument(level = "debug", skip(self))]
async fn upload_data_to_table(
&self,
Expand Down
1 change: 1 addition & 0 deletions crates/nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ utoipa-axum = { workspace = true }
utoipa-swagger-ui = { workspace = true }
uuid = { workspace = true, features = ["v4", "v5"] }
validator = { version = "0.18.1", features = ["derive"] }
base64 = { version = "0.22.1" }

[dev-dependencies]
async-trait = { workspace = true }
Expand Down
19 changes: 16 additions & 3 deletions crates/nexus/src/http/dbt/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ pub enum DbtError {

#[snafu(display("Failed to parse row JSON"))]
RowParse { source: serde_json::Error },

#[snafu(display("UTF8 error: {source}"))]
Utf8 { source: std::string::FromUtf8Error },

#[snafu(display("Arrow error: {source}"))]
Arrow { source: arrow::error::ArrowError },
}

pub type DbtResult<T> = std::result::Result<T, DbtError>;
Expand All @@ -48,9 +54,10 @@ impl IntoResponse for DbtError {
| Self::LoginRequestParse { .. }
| Self::QueryBodyParse { .. }
| Self::InvalidWarehouseIdFormat { .. } => http::StatusCode::BAD_REQUEST,
Self::ControlService { .. } | Self::RowParse { .. } => {
http::StatusCode::INTERNAL_SERVER_ERROR
}
Self::ControlService { .. }
| Self::RowParse { .. }
| Self::Utf8 { .. }
| Self::Arrow { .. } => http::StatusCode::INTERNAL_SERVER_ERROR,
Self::MissingAuthToken | Self::MissingDbtSession | Self::InvalidAuthData => {
http::StatusCode::UNAUTHORIZED
}
Expand All @@ -69,6 +76,12 @@ impl IntoResponse for DbtError {
Self::MissingAuthToken | Self::MissingDbtSession | Self::InvalidAuthData => {
"session error".to_string()
}
Self::Utf8 { source } => {
format!("Error encoding UTF8 string: {source}")
}
Self::Arrow { source } => {
format!("Error encoding in Arrow format: {source}")
}
Self::NotImplemented => "feature not implemented".to_string(),
};

Expand Down
103 changes: 78 additions & 25 deletions crates/nexus/src/http/dbt/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,36 @@ use crate::http::dbt::schemas::{
};
use crate::http::session::DFSessionId;
use crate::state::AppState;
use arrow::ipc::writer::{IpcWriteOptions, StreamWriter};
use arrow::ipc::MetadataVersion;
use arrow::json::writer::JsonArray;
use arrow::json::WriterBuilder;
use arrow::record_batch::RecordBatch;
use axum::body::Bytes;
use axum::extract::{Query, State};
use axum::http::HeaderMap;
use axum::Json;
use base64;
use base64::engine::general_purpose::STANDARD as engine_base64;
use base64::prelude::*;
use flate2::read::GzDecoder;
use regex::Regex;
use snafu::ResultExt;
use std::io::Read;
use tracing::debug;
use uuid::Uuid;

// TODO: move out as a configurable parameter
const SERIALIZATION_FORMAT: &str = "json"; // or "arrow"

// https://arrow.apache.org/docs/format/Columnar.html#buffer-alignment-and-padding
// Buffer Alignment and Padding: Implementations are recommended to allocate memory
// on aligned addresses (multiple of 8- or 64-bytes) and pad (overallocate) to a
// length that is a multiple of 8 or 64 bytes. When serializing Arrow data for interprocess
// communication, these alignment and padding requirements are enforced.
// For more info see issue #115
const ARROW_IPC_ALIGNMENT: usize = 8;

#[tracing::instrument(level = "debug", skip(state), err, ret(level = tracing::Level::TRACE))]
pub async fn login(
State(state): State<AppState>,
Expand All @@ -32,8 +52,6 @@ pub async fn login(
let _body_json: LoginRequestBody =
serde_json::from_str(&s).context(dbt_error::LoginRequestParseSnafu)?;

//println!("Received login request: {:?}", query);
//println!("Body data parameters: {:?}", body_json);
let token = Uuid::new_v4().to_string();

let warehouses = state
Expand All @@ -56,6 +74,37 @@ pub async fn login(
}))
}

fn records_to_arrow_string(recs: &Vec<RecordBatch>) -> Result<String, DbtError> {
let mut buf = Vec::new();
let options = IpcWriteOptions::try_new(ARROW_IPC_ALIGNMENT, false, MetadataVersion::V5)
.context(dbt_error::ArrowSnafu)?;
if !recs.is_empty() {
let mut writer =
StreamWriter::try_new_with_options(&mut buf, recs[0].schema_ref(), options)
.context(dbt_error::ArrowSnafu)?;
for rec in recs {
writer.write(rec).context(dbt_error::ArrowSnafu)?;
}
writer.finish().context(dbt_error::ArrowSnafu)?;
drop(writer);
};
Ok(engine_base64.encode(buf))
}

fn records_to_json_string(recs: &[RecordBatch]) -> Result<String, DbtError> {
let buf = Vec::new();
let write_builder = WriterBuilder::new().with_explicit_nulls(true);
let mut writer = write_builder.build::<_, JsonArray>(buf);
let record_refs: Vec<&RecordBatch> = recs.iter().collect();
writer
.write_batches(&record_refs)
.context(dbt_error::ArrowSnafu)?;
writer.finish().context(dbt_error::ArrowSnafu)?;

// Get the underlying buffer back,
String::from_utf8(writer.into_inner()).context(dbt_error::Utf8Snafu)
}

#[tracing::instrument(level = "debug", skip(state), err, ret(level = tracing::Level::TRACE))]
pub async fn query(
DFSessionId(session_id): DFSessionId,
Expand Down Expand Up @@ -86,30 +135,46 @@ pub async fn query(

// let _ = log_query(&body_json.sql_text).await;

let (result, columns) = state
let (records, columns) = state
.control_svc
.query_dbt(&session_id, &body_json.sql_text)
.query(&session_id, &body_json.sql_text)
.await
.context(dbt_error::ControlServiceSnafu)?;

// query_result_format now is JSON since arrow IPC has flatbuffers bug
// https://github.com/apache/arrow-rs/pull/6426
Ok(Json(JsonResponse {
debug!(
"serialized json: {}",
records_to_json_string(&records)?.as_str()
);

let json_resp = Json(JsonResponse {
data: Option::from(ResponseData {
row_type: columns.into_iter().map(Into::into).collect(),
// row_set_base_64: Option::from(result.clone()),
row_set_base_64: None,
#[allow(clippy::unwrap_used)]
row_set: ResponseData::rows_to_vec(result.as_str())?,
query_result_format: Option::from(String::from(SERIALIZATION_FORMAT)),
row_set: if SERIALIZATION_FORMAT == "json" {
Option::from(ResponseData::rows_to_vec(
records_to_json_string(&records)?.as_str(),
)?)
} else {
None
},
row_set_base_64: if SERIALIZATION_FORMAT == "arrow" {
Option::from(records_to_arrow_string(&records)?)
} else {
None
},
total: Some(1),
query_result_format: Option::from("json".to_string()),
error_code: None,
sql_state: Option::from("ok".to_string()),
}),
success: true,
message: Option::from("successfully executed".to_string()),
code: Some(format!("{:06}", 200)),
}))
});
debug!(
"query {:?}, response: {:?}, records: {:?}",
body_json.sql_text, json_resp, records
);
Ok(json_resp)
}

pub async fn abort() -> DbtResult<Json<serde_json::value::Value>> {
Expand All @@ -127,15 +192,3 @@ pub fn extract_token(headers: &HeaderMap) -> Option<String> {
})
})
}

/*async fn log_query(query: &str) -> Result<(), std::io::Error> {
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open("queries.log")
.await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
file.write_all(query.as_bytes()).await?;
file.write_all(b"\n").await?;
Ok(())
}*/
2 changes: 1 addition & 1 deletion crates/nexus/src/http/dbt/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub struct ResponseData {
#[serde(rename = "rowsetBase64")]
pub row_set_base_64: Option<String>,
#[serde(rename = "rowset")]
pub row_set: Vec<Vec<serde_json::Value>>,
pub row_set: Option<Vec<Vec<serde_json::Value>>>,
pub total: Option<u32>,
#[serde(rename = "queryResultFormat")]
pub query_result_format: Option<String>,
Expand Down
19 changes: 11 additions & 8 deletions crates/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@ async-trait = { workspace = true }

chrono = { workspace = true }

datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" }
datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" }
datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "2950e0be59f81fbd6593f7457f1f03e63a292820" }
datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "a5bbbd1266447c26f3bfdb5314b4a326796bc784" }
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" }
futures = { workspace = true }
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" }
datafusion = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" }
datafusion-common = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" }
datafusion-expr = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" }
datafusion-physical-plan = { git = "https://github.com/Embucket/datafusion.git", rev = "314a72634dc4f15242345fd17dc32df6c3ae04e8" }

datafusion-functions-json = { git = "https://github.com/Embucket/datafusion-functions-json.git", rev = "86f5294d30b7bf53fbcb2e25dee409ad818b34cc" }

iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "f2fef1d973f59309919d60aaaa32e83e04029d6d" }
iceberg-rust = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" }
iceberg-rest-catalog = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" }
datafusion_iceberg = { git = "https://github.com/Embucket/iceberg-rust.git", rev = "17d76a0ecc4ebaa2bfc5178e6589f76dc5d71c67" }

futures = { workspace = true }
object_store = { workspace = true }

paste = "1"
Expand Down
6 changes: 3 additions & 3 deletions crates/runtime/src/datafusion/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::error::{self as ih_error, IcehutSQLError, IcehutSQLResult};
use crate::datafusion::functions::register_udfs;
use crate::datafusion::planner::ExtendedSqlToRel;
use crate::datafusion::session::SessionParams;
use arrow::array::{RecordBatch, UInt64Array};
use arrow::array::{Int64Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datafusion::common::tree_node::{TransformedResult, TreeNode};
use datafusion::datasource::default_table_source::provider_as_source;
Expand Down Expand Up @@ -1251,11 +1251,11 @@ impl SqlExecutor {
pub fn created_entity_response() -> Result<Vec<RecordBatch>, arrow::error::ArrowError> {
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
"count",
DataType::UInt64,
DataType::Int64,
false,
)]));
Ok(vec![RecordBatch::try_new(
schema,
vec![Arc::new(UInt64Array::from(vec![0]))],
vec![Arc::new(Int64Array::from(vec![0]))],
)?])
}
Loading