Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source): support JSON schema addtionalProperties (map) #17110

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions e2e_test/source/basic/nosim_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,22 @@ CREATE TABLE kafka_json_schema_plain with (
kafka.scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON (schema.registry = '${RISEDEV_SCHEMA_REGISTRY_URL}');


query
describe kafka_json_schema_plain;
----
dimensions (empty) false NULL
Copy link
Member Author

@xxchan xxchan Jun 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a bug: when using external schema, struct's fields are not shown in describe <table>

#17128

map jsonb false NULL
notMap (empty) false NULL
price double precision false NULL
productId bigint false NULL
productName character varying false NULL
tags character varying[] false NULL
_row_id serial true NULL
primary key _row_id NULL NULL
distribution key _row_id NULL NULL
table description kafka_json_schema_plain NULL NULL

statement ok
CREATE TABLE kafka_json_schema_upsert (PRIMARY KEY(rw_key))
INCLUDE KEY AS rw_key
Expand Down Expand Up @@ -83,10 +99,10 @@ select count(*) from debezium_compact;

query TFITT
select
"dimensions", "price", "productId", "productName", "tags"
*
from kafka_json_schema_plain
----
(9.5,7,12) 12.5 1 An ice sculpture {cold,ice}
(9.5,7,12) {"foo": "bar"} (b) 12.5 1 An ice sculpture {cold,ice}

query TFITT
select
Expand Down
4 changes: 2 additions & 2 deletions scripts/source/test_data/kafka_json_schema.1
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"$schema":"https://json-schema.org/draft/2020-12/schema","$id":"https://example.com/product.schema.json","title":"Product","description":"A product from Acme's catalog","type":"object","properties":{"productId":{"description":"The unique identifier for a product","type":"integer"},"productName":{"description":"Name of the product","type":"string"},"price":{"description":"The price of the product","type":"number","exclusiveMinimum":0},"tags":{"description":"Tags for the product","type":"array","items":{"type":"string"},"minItems":1,"uniqueItems":true},"dimensions":{"type":"object","properties":{"length":{"type":"number"},"width":{"type":"number"},"height":{"type":"number"}},"required":["length","width","height"]}},"required":["productId","productName","price"]}
{"productId":1,"productName":"An ice sculpture","price":12.5,"tags":["cold","ice"],"dimensions":{"length":7,"width":12,"height":9.5}}
{"$schema":"https://json-schema.org/draft/2020-12/schema","$id":"https://example.com/product.schema.json","title":"Product","description":"A product from Acme's catalog","type":"object","properties":{"productId":{"description":"The unique identifier for a product","type":"integer"},"productName":{"description":"Name of the product","type":"string"},"price":{"description":"The price of the product","type":"number","exclusiveMinimum":0},"tags":{"description":"Tags for the product","type":"array","items":{"type":"string"},"minItems":1,"uniqueItems":true},"dimensions":{"type":"object","properties":{"length":{"type":"number"},"width":{"type":"number"},"height":{"type":"number"}},"required":["length","width","height"]},"map":{"type":"object","additionalProperties":{"type":"string"}},"notMap":{"type":"object","additionalProperties":{"type":"string"},"properties":{"a":{"type":"string"}}}},"required":["productId","productName","price"]}
{"productId":1,"productName":"An ice sculpture","price":12.5,"tags":["cold","ice"],"dimensions":{"length":7,"width":12,"height":9.5},"map":{"foo":"bar"},"notMap":{"a":"b","ignored":"c"}}
21 changes: 16 additions & 5 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::collections::HashMap;
use anyhow::Context as _;
use apache_avro::Schema;
use jst::{convert_avro, Context};
use risingwave_connector_codec::decoder::avro::MapHandling;
use risingwave_pb::plan_common::ColumnDesc;

use super::util::{bytes_from_url, get_kafka_topic};
Expand Down Expand Up @@ -80,7 +81,7 @@ impl JsonAccessBuilder {
}
}

pub async fn schema_to_columns(
pub async fn fetch_json_schema_and_map_to_columns(
schema_location: &str,
schema_registry_auth: Option<SchemaRegistryAuth>,
props: &HashMap<String, String>,
Expand All @@ -98,11 +99,21 @@ pub async fn schema_to_columns(
let bytes = bytes_from_url(url, None).await?;
serde_json::from_slice(&bytes)?
};
let context = Context::default();
let avro_schema = convert_avro(&json_schema, context).to_string();
json_schema_to_columns(&json_schema)
}

/// FIXME: when the JSON schema is invalid, it will panic.
///
/// ## Notes on type conversion
/// Map will be used when an object doesn't have `properties` but has `additionalProperties`.
/// When an object has `properties` and `additionalProperties`, the latter will be ignored.
/// <https://github.com/mozilla/jsonschema-transpiler/blob/fb715c7147ebd52427e0aea09b2bba2d539850b1/src/jsonschema.rs#L228-L280>
///
/// TODO: examine other stuff like `oneOf`, `patternProperties`, etc.
fn json_schema_to_columns(json_schema: &serde_json::Value) -> ConnectorResult<Vec<ColumnDesc>> {
let avro_schema = convert_avro(json_schema, Context::default()).to_string();
let schema = Schema::parse_str(&avro_schema).context("failed to parse avro schema")?;
// TODO: do we need to support map type here?
avro_schema_to_column_descs(&schema, None).map_err(Into::into)
avro_schema_to_column_descs(&schema, Some(MapHandling::Jsonb)).map_err(Into::into)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None -> Some(MapHandling::Jsonb) is the only logic change in this PR

}

#[cfg(test)]
Expand Down
24 changes: 14 additions & 10 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use risingwave_connector::parser::additional_columns::{
build_additional_column_catalog, get_supported_additional_columns,
};
use risingwave_connector::parser::{
schema_to_columns, AvroParserConfig, DebeziumAvroParserConfig, ProtobufParserConfig,
SpecificParserConfig, TimestamptzHandling, DEBEZIUM_IGNORE_KEY,
fetch_json_schema_and_map_to_columns, AvroParserConfig, DebeziumAvroParserConfig,
ProtobufParserConfig, SpecificParserConfig, TimestamptzHandling, DEBEZIUM_IGNORE_KEY,
};
use risingwave_connector::schema::schema_registry::{
name_strategy_from_str, SchemaRegistryAuth, SCHEMA_REGISTRY_PASSWORD, SCHEMA_REGISTRY_USERNAME,
Expand Down Expand Up @@ -102,14 +102,18 @@ async fn extract_json_table_schema(
auth
});
Ok(Some(
schema_to_columns(&schema_location.0, schema_registry_auth, with_properties)
.await?
.into_iter()
.map(|col| ColumnCatalog {
column_desc: col.into(),
is_hidden: false,
})
.collect_vec(),
fetch_json_schema_and_map_to_columns(
&schema_location.0,
schema_registry_auth,
with_properties,
)
.await?
.into_iter()
.map(|col| ColumnCatalog {
column_desc: col.into(),
is_hidden: false,
})
.collect_vec(),
))
}
}
Expand Down
Loading