Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support avro reference type #17052

Merged
merged 3 commits into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,7 @@ SLT_HOST = "${RISEDEV_RW_FRONTEND_LISTEN_ADDRESS}"
SLT_PORT = "${RISEDEV_RW_FRONTEND_PORT}"
SLT_DB = "dev"
PATH = "${PWD}/e2e_test/commands:${PATH}"
RUST_BACKTRACE = "0" # slt backtrace is useless

[tasks.slt-clean]
category = "RiseDev - Test - SQLLogicTest"
Expand Down
34 changes: 18 additions & 16 deletions e2e_test/source_inline/kafka/avro/name_strategy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,11 @@ FROM
ORDER BY
"ID";
----
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg)

# query II
# SELECT
Expand All @@ -164,13 +165,14 @@ FROM
ORDER BY
"ID";
----
update id1 1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z 99999999.99
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47
delete id4 4 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49
NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL
update id1 1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z 99999999.99 NULL NULL
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL
delete id4 4 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg)


query II
Expand All @@ -181,11 +183,11 @@ FROM
ORDER BY
"ID";
----
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49

update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg)



Expand Down
3 changes: 2 additions & 1 deletion e2e_test/source_inline/kafka/avro/upsert_avro_json
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"string"^{"type":"record","name":"OBJ_ATTRIBUTE_VALUE","namespace":"CPLM","fields":[{"name":"op_type","type":["null","string"],"default":null},{"name":"ID","type":["null","string"],"default":null},{"name":"CLASS_ID","type":["null","string"],"default":null},{"name":"ITEM_ID","type":["null","string"], "default":null},{"name":"ATTR_ID","type":["null","string"],"default":null},{"name":"ATTR_VALUE","type":["null","string"],"default":null},{"name":"ORG_ID","type":["null","string"],"default":null},{"name":"UNIT_INFO","type":["null","string"],"default":null},{"name":"UPD_TIME","type":["null","string"],"default":null},{"name":"DEC_VAL","type":[{"type":"bytes","logicalType":"decimal","precision":10,"scale":2},"null"],"default":"\u00ff"}],"connect.name":"CPLM.OBJ_ATTRIBUTE_VALUE"}
"string"^{"type":"record","name":"OBJ_ATTRIBUTE_VALUE","namespace":"CPLM","fields":[{"name":"op_type","type":["null","string"],"default":null},{"name":"ID","type":["null","string"],"default":null},{"name":"CLASS_ID","type":["null","string"],"default":null},{"name":"ITEM_ID","type":["null","string"],"default":null},{"name":"ATTR_ID","type":["null","string"],"default":null},{"name":"ATTR_VALUE","type":["null","string"],"default":null},{"name":"ORG_ID","type":["null","string"],"default":null},{"name":"UNIT_INFO","type":["null","string"],"default":null},{"name":"UPD_TIME","type":["null","string"],"default":null},{"name":"DEC_VAL","type":[{"type":"bytes","logicalType":"decimal","precision":10,"scale":2},"null"],"default":"\u00ff"},{"name":"REFERRED","type":["null",{"type":"record","name":"REFERRED_TYPE","fields":[{"name":"a","type":"string"}]}],"default":null},{"name":"REF","type":["null","REFERRED_TYPE"],"default":null}],"connect.name":"CPLM.OBJ_ATTRIBUTE_VALUE"}
"id1"^{"op_type": {"string": "update"}, "ID": {"string": "id1"}, "CLASS_ID": {"string": "1"}, "ITEM_ID": {"string": "6768"}, "ATTR_ID": {"string": "6970"}, "ATTR_VALUE": {"string": "value9"}, "ORG_ID": {"string": "7172"}, "UNIT_INFO": {"string": "info9"}, "UPD_TIME": {"string": "2021-05-18T07:59:58.714Z"}, "DEC_VAL": {"bytes": "\u0002\u0054\u000b\u00e3\u00ff"}}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the test case? The schema changes but the test data is left unchanged except the newly added line.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The added line tests the reference type. Previous lines just uses defaults (null).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically we are abusing the test case to test complex types. This is a little strange but maybe the quickest way to test before we have a better designed testing framework.

"id2"^{"op_type": {"string": "delete"}, "ID": {"string": "id2"}, "CLASS_ID": {"string": "2"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u0002\u0054\u000b\u00e3\u00ff"}}
"id3"^{"op_type": {"string": "delete"}, "ID": {"string": "id3"}, "CLASS_ID": {"string": "3"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u007f\u00ff\u00ff\u00ff"}}
"id4"^{"op_type": {"string": "delete"}, "ID": {"string": "id4"}, "CLASS_ID": {"string": "4"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u0000\u0080\u0000\u0000\u0001"}}
"id5"^{"op_type": {"string": "delete"}, "ID": {"string": "id5"}, "CLASS_ID": {"string": "5"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u0000\u0080\u0000\u0000\u0001"}}
"id1"^{"op_type": {"string": "update"}, "ID": {"string": "id1"}, "CLASS_ID": {"string": "-1"}, "ITEM_ID": {"string": "6768"}, "ATTR_ID": {"string": "6970"}, "ATTR_VALUE": {"string": "value9"}, "ORG_ID": {"string": "7172"}, "UNIT_INFO": {"string": "info9"}, "UPD_TIME": {"string": "2021-05-18T07:59:58.714Z"}, "DEC_VAL": {"bytes": "\u0080\u0000\u0000\u0001"}}
"id4"^
"id100"^{"REF": {"CPLM.REFERRED_TYPE":{"a": "abcdefg"}}}
36 changes: 21 additions & 15 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_common::{bail, try_match_expand};
use risingwave_pb::plan_common::ColumnDesc;

use super::schema_resolver::ConfluentSchemaCache;
use super::util::avro_schema_to_column_descs;
use super::util::{avro_schema_to_column_descs, ResolvedAvroSchema};
use crate::error::ConnectorResult;
use crate::parser::unified::avro::{AvroAccess, AvroParseOptions};
use crate::parser::unified::AccessImpl;
Expand All @@ -35,7 +35,7 @@ use crate::schema::schema_registry::{
// Default avro access builder
#[derive(Debug)]
pub struct AvroAccessBuilder {
schema: Arc<Schema>,
schema: Arc<ResolvedAvroSchema>,
/// Refer to [`AvroParserConfig::writer_schema_cache`].
pub writer_schema_cache: Option<Arc<ConfluentSchemaCache>>,
value: Option<Value>,
Expand All @@ -46,7 +46,7 @@ impl AccessBuilder for AvroAccessBuilder {
self.value = self.parse_avro_value(&payload).await?;
Ok(AccessImpl::Avro(AvroAccess::new(
self.value.as_ref().unwrap(),
AvroParseOptions::create(&self.schema),
AvroParseOptions::create(&self.schema.resolved_schema),
)))
}
}
Expand All @@ -69,6 +69,8 @@ impl AvroAccessBuilder {
})
}

/// Note: we should use unresolved schema to parsing bytes into avro value.
/// Otherwise it's an invalid schema and parsing will fail. (Avro error: Two named schema defined for same fullname)
async fn parse_avro_value(&self, payload: &[u8]) -> ConnectorResult<Option<Value>> {
// parse payload to avro value
// if use confluent schema, get writer schema from confluent schema registry
Expand All @@ -78,10 +80,10 @@ impl AvroAccessBuilder {
Ok(Some(from_avro_datum(
writer_schema.as_ref(),
&mut raw_payload,
Some(self.schema.as_ref()),
Some(&self.schema.original_schema),
)?))
} else {
let mut reader = Reader::with_schema(self.schema.as_ref(), payload)?;
let mut reader = Reader::with_schema(&self.schema.original_schema, payload)?;
match reader.next() {
Some(Ok(v)) => Ok(Some(v)),
Some(Err(e)) => Err(e)?,
Expand All @@ -93,8 +95,8 @@ impl AvroAccessBuilder {

#[derive(Debug, Clone)]
pub struct AvroParserConfig {
pub schema: Arc<Schema>,
pub key_schema: Option<Arc<Schema>>,
pub schema: Arc<ResolvedAvroSchema>,
pub key_schema: Option<Arc<ResolvedAvroSchema>>,
/// Writer schema is the schema used to write the data. When parsing Avro data, the exactly same schema
/// must be used to decode the message, and then convert it with the reader schema.
pub writer_schema_cache: Option<Arc<ConfluentSchemaCache>>,
Expand Down Expand Up @@ -143,9 +145,13 @@ impl AvroParserConfig {
tracing::debug!("infer key subject {subject_key:?}, value subject {subject_value}");

Ok(Self {
schema: resolver.get_by_subject(&subject_value).await?,
schema: Arc::new(ResolvedAvroSchema::create(
resolver.get_by_subject(&subject_value).await?,
)?),
key_schema: if let Some(subject_key) = subject_key {
Some(resolver.get_by_subject(&subject_key).await?)
Some(Arc::new(ResolvedAvroSchema::create(
resolver.get_by_subject(&subject_key).await?,
)?))
} else {
None
},
Expand All @@ -161,7 +167,7 @@ impl AvroParserConfig {
let schema = Schema::parse_reader(&mut schema_content.as_slice())
.context("failed to parse avro schema")?;
Ok(Self {
schema: Arc::new(schema),
schema: Arc::new(ResolvedAvroSchema::create(Arc::new(schema))?),
key_schema: None,
writer_schema_cache: None,
map_handling,
Expand All @@ -170,7 +176,7 @@ impl AvroParserConfig {
}

pub fn map_to_columns(&self) -> ConnectorResult<Vec<ColumnDesc>> {
avro_schema_to_column_descs(self.schema.as_ref(), self.map_handling)
avro_schema_to_column_descs(&self.schema.resolved_schema, self.map_handling)
}
}

Expand Down Expand Up @@ -289,7 +295,7 @@ mod test {
.await
.unwrap();
let builder = try_match_expand!(&parser.payload_builder, AccessBuilderImpl::Avro).unwrap();
let schema = builder.schema.clone();
let schema = builder.schema.original_schema.clone();
let record = build_avro_data(&schema);
assert_eq!(record.fields.len(), 11);
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Snappy);
Expand Down Expand Up @@ -482,7 +488,7 @@ mod test {
.await
.unwrap();
let builder = try_match_expand!(&parser.payload_builder, AccessBuilderImpl::Avro).unwrap();
let schema = &builder.schema;
let schema = &builder.schema.original_schema;
let mut null_record = Record::new(schema).unwrap();
null_record.put("id", Value::Int(5));
null_record.put("age", Value::Union(0, Box::new(Value::Null)));
Expand Down Expand Up @@ -554,8 +560,8 @@ mod test {
let conf = new_avro_conf_from_local("simple-schema.avsc")
.await
.unwrap();
let mut writer = Writer::new(&conf.schema, Vec::new());
let record = build_avro_data(&conf.schema);
let mut writer = Writer::new(conf.schema.original_schema.as_ref(), Vec::new());
let record = build_avro_data(conf.schema.original_schema.as_ref());
writer.append(record).unwrap();
let encoded = writer.into_inner().unwrap();
println!("path = {:?}", e2e_file_path("avro_simple_schema_bin.1"));
Expand Down
33 changes: 31 additions & 2 deletions src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::LazyLock;
use std::sync::{Arc, LazyLock};

use apache_avro::schema::{DecimalSchema, RecordSchema, Schema};
use apache_avro::schema::{DecimalSchema, RecordSchema, ResolvedSchema, Schema};
use apache_avro::types::{Value, ValueKind};
use apache_avro::AvroResult;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::log::LogSuppresser;
Expand All @@ -26,6 +27,33 @@ use crate::error::ConnectorResult;
use crate::parser::unified::bail_uncategorized;
use crate::parser::{AccessError, MapHandling};

/// Avro schema with `Ref` inlined. The newtype is used to indicate whether the schema is resolved.
///
/// TODO: Actually most of the place should use resolved schema, but currently they just happen to work (Some edge cases are not met yet).
///
/// TODO: refactor avro lib to use the feature there.
#[derive(Debug)]
pub struct ResolvedAvroSchema {
/// Should be used for parsing bytes into Avro value
pub original_schema: Arc<Schema>,
/// Should be used for type mapping from Avro value to RisingWave datum
pub resolved_schema: Schema,
}

impl ResolvedAvroSchema {
pub fn create(schema: Arc<Schema>) -> AvroResult<Self> {
let resolver = ResolvedSchema::try_from(schema.as_ref())?;
// todo: to_resolved may cause stackoverflow if there's a loop in the schema
let resolved_schema = resolver.to_resolved(schema.as_ref())?;
Ok(Self {
original_schema: schema,
resolved_schema,
})
}
}

/// This function expects resolved schema (no `Ref`).
/// FIXME: require passing resolved schema here.
pub fn avro_schema_to_column_descs(
schema: &Schema,
map_handling: Option<MapHandling>,
Expand Down Expand Up @@ -92,6 +120,7 @@ fn avro_field_to_column_desc(
}
}

/// This function expects resolved schema (no `Ref`).
fn avro_type_mapping(
schema: &Schema,
map_handling: Option<MapHandling>,
Expand Down
15 changes: 8 additions & 7 deletions src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_pb::plan_common::ColumnDesc;

use crate::error::ConnectorResult;
use crate::parser::avro::schema_resolver::ConfluentSchemaCache;
use crate::parser::avro::util::avro_schema_to_column_descs;
use crate::parser::avro::util::{avro_schema_to_column_descs, ResolvedAvroSchema};
use crate::parser::unified::avro::{
avro_extract_field_schema, avro_schema_skip_union, AvroAccess, AvroParseOptions,
};
Expand All @@ -40,7 +40,7 @@ const PAYLOAD: &str = "payload";

#[derive(Debug)]
pub struct DebeziumAvroAccessBuilder {
schema: Schema,
schema: ResolvedAvroSchema,
schema_resolver: Arc<ConfluentSchemaCache>,
key_schema: Option<Arc<Schema>>,
value: Option<Value>,
Expand All @@ -59,9 +59,10 @@ impl AccessBuilder for DebeziumAvroAccessBuilder {
};
Ok(AccessImpl::Avro(AvroAccess::new(
self.value.as_mut().unwrap(),
// Assumption: Key will not contain reference, so unresolved schema can work here.
AvroParseOptions::create(match self.encoding_type {
EncodingType::Key => self.key_schema.as_mut().unwrap(),
EncodingType::Value => &self.schema,
EncodingType::Value => &self.schema.resolved_schema,
}),
)))
}
Expand All @@ -78,11 +79,8 @@ impl DebeziumAvroAccessBuilder {
..
} = config;

let resolver = apache_avro::schema::ResolvedSchema::try_from(&*outer_schema)?;
// todo: to_resolved may cause stackoverflow if there's a loop in the schema
let schema = resolver.to_resolved(&outer_schema)?;
Ok(Self {
schema,
schema: ResolvedAvroSchema::create(outer_schema)?,
schema_resolver,
key_schema: None,
value: None,
Expand Down Expand Up @@ -133,6 +131,9 @@ impl DebeziumAvroParserConfig {
pub fn map_to_columns(&self) -> ConnectorResult<Vec<ColumnDesc>> {
avro_schema_to_column_descs(
avro_schema_skip_union(avro_extract_field_schema(
// FIXME: use resolved schema here.
// Currently it works because "after" refers to a subtree in "before",
// but in theory, inside "before" there could also be a reference.
&self.outer_schema,
Some("before"),
)?)?,
Expand Down
5 changes: 4 additions & 1 deletion src/connector/src/parser/unified/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ use crate::parser::avro::util::avro_to_jsonb;
#[derive(Clone)]
/// Options for parsing an `AvroValue` into Datum, with an optional avro schema.
pub struct AvroParseOptions<'a> {
/// Currently, this schema is only used for decimal
/// Currently, this schema is only used for decimal.
///
/// FIXME: In theory we should use resolved schema.
/// e.g., it's possible that a field is a reference to a decimal or a record containing a decimal field.
pub schema: Option<&'a Schema>,
/// Strict Mode
/// If strict mode is disabled, an int64 can be parsed from an `AvroInt` (int32) value.
Expand Down
Loading