Skip to content

Commit

Permalink
Nest JSON schemas instead of encoding into a string
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Sep 8, 2024
1 parent c324c3e commit 427e73e
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 12 deletions.
12 changes: 7 additions & 5 deletions src/adapter/http/src/data/metadata_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ pub struct Output {

pub refs: Option<Vec<Ref>>,

pub schema: Option<String>,
pub schema: Option<query_types::Schema>,
pub schema_format: Option<query_types::SchemaFormat>,

#[serde_as(as = "Option<odf::serde::yaml::SeedDef>")]
Expand Down Expand Up @@ -187,10 +187,12 @@ pub async fn dataset_metadata_handler(
.map(|schema| schema.schema_as_arrow())
.transpose()
.int_err()?
.map(|schema| query_types::serialize_schema(&schema, params.schema_format))
.transpose()
.int_err()?
.map(|schema| (schema, params.schema_format))
.map(|schema| {
(
query_types::Schema::new(schema, params.schema_format),
params.schema_format,
)
})
.unzip();

let seed = seed_visitor.and_then(vis::SearchSingleTypedBlockVisitor::into_event);
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/http/src/data/query_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn dataset_query_handler_post_v2(

let (schema, schema_format) = if body.include.contains(&Include::Schema) {
(
Some(serialize_schema(df.schema().as_arrow(), body.schema_format).api_err()?),
Some(Schema::new(df.schema().inner().clone(), body.schema_format)),
Some(body.schema_format),
)
} else {
Expand Down
51 changes: 50 additions & 1 deletion src/adapter/http/src/data/query_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ pub struct Outputs {

/// Schema of the resulting data
#[serde(skip_serializing_if = "Option::is_none")]
pub schema: Option<String>,
pub schema: Option<Schema>,

/// What representation is used for the schema
#[serde(skip_serializing_if = "Option::is_none")]
Expand Down Expand Up @@ -458,6 +458,55 @@ pub enum SchemaFormat {

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Debug, Clone)]
pub struct Schema {
schema: datafusion::arrow::datatypes::SchemaRef,
format: SchemaFormat,
}

impl Schema {
pub fn new(schema: datafusion::arrow::datatypes::SchemaRef, format: SchemaFormat) -> Self {
Self { schema, format }
}
}

impl serde::Serialize for Schema {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
use kamu_data_utils::schema::{convert, format};

match self.format {
SchemaFormat::ArrowJson => self.schema.serialize(serializer),
SchemaFormat::ParquetJson => {
let mut buf = Vec::new();

format::write_schema_parquet_json(
&mut buf,
convert::arrow_schema_to_parquet_schema(&self.schema).as_ref(),
)
.unwrap();

// TODO: PERF: Avoid re-serialization
let json: serde_json::Value = serde_json::from_slice(&buf).unwrap();
json.serialize(serializer)
}
SchemaFormat::Parquet => {
let mut buf = Vec::new();

format::write_schema_parquet(
&mut buf,
convert::arrow_schema_to_parquet_schema(&self.schema).as_ref(),
)
.unwrap();

serializer.collect_str(&std::str::from_utf8(&buf).unwrap())
}
}
}
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// TODO: Remove after V2 transition
pub(crate) fn serialize_schema(
schema: &datafusion::arrow::datatypes::Schema,
format: SchemaFormat,
Expand Down
185 changes: 180 additions & 5 deletions src/adapter/http/tests/tests/test_data_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ async fn test_data_query_handler_v2() {
.unwrap();

let response = res.json::<serde_json::Value>().await.unwrap();
let ignore_schema = response["output"]["schema"].as_str().unwrap();
let ignore_schema = &response["output"]["schema"];

pretty_assertions::assert_eq!(
response,
Expand Down Expand Up @@ -431,7 +431,7 @@ async fn test_data_query_handler_v2() {
.unwrap();

let response = res.json::<serde_json::Value>().await.unwrap();
let ignore_schema = response["output"]["schema"].as_str().unwrap();
let ignore_schema = &response["output"]["schema"];

pretty_assertions::assert_eq!(
response,
Expand Down Expand Up @@ -463,13 +463,13 @@ async fn test_data_query_handler_v2() {
"subQueries": [],
"commitment": {
"inputHash": "f162001ff67ca8970bcb4f4f8b25e79b3c6db3fcd2ac0501d131e446591fd0475a2af",
"outputHash": "f16205df27bb7e790bb1fc48132b6239a3829ec3a177bd7e253c76cf31b54e195f11c",
"outputHash": "f1620fa841fae69710c888fdf82d8fd63948469c0fd1e2a37c16e2067127e2eec1ea8",
"subQueriesHash": "f1620ca4510738395af1429224dd785675309c344b2b549632e20275c69b15ed1d210",
},
"proof": {
"type": "Ed25519Signature2020",
"verificationMethod": "did:key:z6Mko2nqhQ9wYSTS5Giab2j1aHzGnxHimqwmFeEVY8aNsVnN",
"proofValue": "ueVlIWSbdPK3G7zjC_W0hhDXoqjY1vcvi9HWYPDt6cqVtIq3J2IhCpjF4AXyXbIh_9tKPH90S6qOquZNb1UxTAQ",
"proofValue": "u-k8Rd9dB5ERqbTU9ymUvpySTQEh8HMPAqcEBrtZviNOBFoe-FXZtJUGcvwud39dxC659bkVz4iYHhDYUexmiCQ",
}
})
);
Expand Down Expand Up @@ -942,7 +942,7 @@ async fn test_data_query_handler_schema_formats() {

#[test_group::group(engine, datafusion)]
#[test_log::test(tokio::test)]
async fn test_metadata_query_handler() {
async fn test_metadata_handler_aspects() {
let harness = Harness::new().await;

let client = async move {
Expand Down Expand Up @@ -1042,3 +1042,178 @@ async fn test_metadata_query_handler() {
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[test_group::group(engine, datafusion)]
#[test_log::test(tokio::test)]
async fn test_metadata_handler_schema_formats() {
let harness = Harness::new().await;

let client = async move {
let cl = reqwest::Client::new();

let query_url = format!("{}/metadata", harness.dataset_url);
let res = cl
.get(&query_url)
.query(&[("include", "schema"), ("schemaFormat", "ArrowJson")])
.send()
.await
.unwrap()
.error_for_status()
.unwrap();

pretty_assertions::assert_eq!(
res.json::<serde_json::Value>().await.unwrap(),
json!({
"output": {
"schemaFormat": "ArrowJson",
"schema": {
"fields": [
{
"data_type": "Int64",
"dict_id": 0,
"dict_is_ordered": false,
"metadata": {},
"name": "offset",
"nullable": true
},
{
"data_type": "Int32",
"dict_id": 0,
"dict_is_ordered": false,
"metadata": {},
"name": "op",
"nullable": false
},
{
"data_type": {
"Timestamp": [
"Millisecond",
"UTC"
]
},
"dict_id": 0,
"dict_is_ordered": false,
"metadata": {},
"name": "system_time",
"nullable": false
},
{
"data_type": {
"Timestamp": [
"Millisecond",
"UTC"
]
},
"dict_id": 0,
"dict_is_ordered": false,
"metadata": {},
"name": "event_time",
"nullable": true
},
{
"data_type": "Utf8",
"dict_id": 0,
"dict_is_ordered": false,
"metadata": {},
"name": "city",
"nullable": false
},
{
"data_type": "UInt64",
"dict_id": 0,
"dict_is_ordered": false,
"metadata": {},
"name": "population",
"nullable": false
}
],
"metadata": {}
},
}
})
);

let query_url = format!("{}/metadata", harness.dataset_url);
let res = cl
.get(&query_url)
.query(&[("include", "schema"), ("schemaFormat", "ParquetJson")])
.send()
.await
.unwrap()
.error_for_status()
.unwrap();

pretty_assertions::assert_eq!(
res.json::<serde_json::Value>().await.unwrap(),
json!({
"output": {
"schemaFormat": "ParquetJson",
"schema": {
"name": "arrow_schema",
"type": "struct",
"fields": [
{
"name": "offset",
"repetition": "OPTIONAL",
"type": "INT64"
},
{
"name": "op",
"repetition": "REQUIRED",
"type": "INT32"
},
{
"logicalType": "TIMESTAMP(MILLIS,true)",
"name": "system_time",
"repetition": "REQUIRED",
"type": "INT64"
},
{
"logicalType": "TIMESTAMP(MILLIS,true)",
"name": "event_time",
"repetition": "OPTIONAL",
"type": "INT64"
},
{
"logicalType": "STRING",
"name": "city",
"repetition": "REQUIRED",
"type": "BYTE_ARRAY"
},
{
"logicalType": "INTEGER(64,false)",
"name": "population",
"repetition": "REQUIRED",
"type": "INT64"
}
],
},
}
})
);

let query_url = format!("{}/metadata", harness.dataset_url);
let res = cl
.get(&query_url)
.query(&[("include", "schema"), ("schemaFormat", "Parquet")])
.send()
.await
.unwrap()
.error_for_status()
.unwrap();

pretty_assertions::assert_eq!(
res.json::<serde_json::Value>().await.unwrap(),
json!({
"output": {
"schemaFormat": "Parquet",
"schema": "message arrow_schema {\n OPTIONAL INT64 offset;\n REQUIRED INT32 op;\n REQUIRED INT64 system_time (TIMESTAMP(MILLIS,true));\n OPTIONAL INT64 event_time (TIMESTAMP(MILLIS,true));\n REQUIRED BYTE_ARRAY city (STRING);\n REQUIRED INT64 population (INTEGER(64,false));\n}\n",
}
})
);
};

await_client_server_flow!(harness.server_harness.api_server_run(), client);
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

0 comments on commit 427e73e

Please sign in to comment.