Skip to content

Commit

Permalink
feat: support avro reference type (#17052)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Jun 3, 2024
1 parent 8f63559 commit 95ce5ec
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 42 deletions.
1 change: 1 addition & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,7 @@ SLT_HOST = "${RISEDEV_RW_FRONTEND_LISTEN_ADDRESS}"
SLT_PORT = "${RISEDEV_RW_FRONTEND_PORT}"
SLT_DB = "dev"
PATH = "${PWD}/e2e_test/commands:${PATH}"
RUST_BACKTRACE = "0" # slt backtrace is useless

[tasks.slt-clean]
category = "RiseDev - Test - SQLLogicTest"
Expand Down
34 changes: 18 additions & 16 deletions e2e_test/source_inline/kafka/avro/name_strategy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,11 @@ FROM
ORDER BY
"ID";
----
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg)

# query II
# SELECT
Expand All @@ -164,13 +165,14 @@ FROM
ORDER BY
"ID";
----
update id1 1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z 99999999.99
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47
delete id4 4 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49
NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL
update id1 1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z 99999999.99 NULL NULL
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL
delete id4 4 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg)


query II
Expand All @@ -181,11 +183,11 @@ FROM
ORDER BY
"ID";
----
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49

update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg)



Expand Down
3 changes: 2 additions & 1 deletion e2e_test/source_inline/kafka/avro/upsert_avro_json
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"string"^{"type":"record","name":"OBJ_ATTRIBUTE_VALUE","namespace":"CPLM","fields":[{"name":"op_type","type":["null","string"],"default":null},{"name":"ID","type":["null","string"],"default":null},{"name":"CLASS_ID","type":["null","string"],"default":null},{"name":"ITEM_ID","type":["null","string"], "default":null},{"name":"ATTR_ID","type":["null","string"],"default":null},{"name":"ATTR_VALUE","type":["null","string"],"default":null},{"name":"ORG_ID","type":["null","string"],"default":null},{"name":"UNIT_INFO","type":["null","string"],"default":null},{"name":"UPD_TIME","type":["null","string"],"default":null},{"name":"DEC_VAL","type":[{"type":"bytes","logicalType":"decimal","precision":10,"scale":2},"null"],"default":"\u00ff"}],"connect.name":"CPLM.OBJ_ATTRIBUTE_VALUE"}
"string"^{"type":"record","name":"OBJ_ATTRIBUTE_VALUE","namespace":"CPLM","fields":[{"name":"op_type","type":["null","string"],"default":null},{"name":"ID","type":["null","string"],"default":null},{"name":"CLASS_ID","type":["null","string"],"default":null},{"name":"ITEM_ID","type":["null","string"],"default":null},{"name":"ATTR_ID","type":["null","string"],"default":null},{"name":"ATTR_VALUE","type":["null","string"],"default":null},{"name":"ORG_ID","type":["null","string"],"default":null},{"name":"UNIT_INFO","type":["null","string"],"default":null},{"name":"UPD_TIME","type":["null","string"],"default":null},{"name":"DEC_VAL","type":[{"type":"bytes","logicalType":"decimal","precision":10,"scale":2},"null"],"default":"\u00ff"},{"name":"REFERRED","type":["null",{"type":"record","name":"REFERRED_TYPE","fields":[{"name":"a","type":"string"}]}],"default":null},{"name":"REF","type":["null","REFERRED_TYPE"],"default":null}],"connect.name":"CPLM.OBJ_ATTRIBUTE_VALUE"}
"id1"^{"op_type": {"string": "update"}, "ID": {"string": "id1"}, "CLASS_ID": {"string": "1"}, "ITEM_ID": {"string": "6768"}, "ATTR_ID": {"string": "6970"}, "ATTR_VALUE": {"string": "value9"}, "ORG_ID": {"string": "7172"}, "UNIT_INFO": {"string": "info9"}, "UPD_TIME": {"string": "2021-05-18T07:59:58.714Z"}, "DEC_VAL": {"bytes": "\u0002\u0054\u000b\u00e3\u00ff"}}
"id2"^{"op_type": {"string": "delete"}, "ID": {"string": "id2"}, "CLASS_ID": {"string": "2"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u0002\u0054\u000b\u00e3\u00ff"}}
"id3"^{"op_type": {"string": "delete"}, "ID": {"string": "id3"}, "CLASS_ID": {"string": "3"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u007f\u00ff\u00ff\u00ff"}}
"id4"^{"op_type": {"string": "delete"}, "ID": {"string": "id4"}, "CLASS_ID": {"string": "4"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u0000\u0080\u0000\u0000\u0001"}}
"id5"^{"op_type": {"string": "delete"}, "ID": {"string": "id5"}, "CLASS_ID": {"string": "5"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u0000\u0080\u0000\u0000\u0001"}}
"id1"^{"op_type": {"string": "update"}, "ID": {"string": "id1"}, "CLASS_ID": {"string": "-1"}, "ITEM_ID": {"string": "6768"}, "ATTR_ID": {"string": "6970"}, "ATTR_VALUE": {"string": "value9"}, "ORG_ID": {"string": "7172"}, "UNIT_INFO": {"string": "info9"}, "UPD_TIME": {"string": "2021-05-18T07:59:58.714Z"}, "DEC_VAL": {"bytes": "\u0080\u0000\u0000\u0001"}}
"id4"^
"id100"^{"REF": {"CPLM.REFERRED_TYPE":{"a": "abcdefg"}}}
36 changes: 21 additions & 15 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_common::{bail, try_match_expand};
use risingwave_pb::plan_common::ColumnDesc;

use super::schema_resolver::ConfluentSchemaCache;
use super::util::avro_schema_to_column_descs;
use super::util::{avro_schema_to_column_descs, ResolvedAvroSchema};
use crate::error::ConnectorResult;
use crate::parser::unified::avro::{AvroAccess, AvroParseOptions};
use crate::parser::unified::AccessImpl;
Expand All @@ -35,7 +35,7 @@ use crate::schema::schema_registry::{
// Default avro access builder
#[derive(Debug)]
pub struct AvroAccessBuilder {
schema: Arc<Schema>,
schema: Arc<ResolvedAvroSchema>,
/// Refer to [`AvroParserConfig::writer_schema_cache`].
pub writer_schema_cache: Option<Arc<ConfluentSchemaCache>>,
value: Option<Value>,
Expand All @@ -46,7 +46,7 @@ impl AccessBuilder for AvroAccessBuilder {
self.value = self.parse_avro_value(&payload).await?;
Ok(AccessImpl::Avro(AvroAccess::new(
self.value.as_ref().unwrap(),
AvroParseOptions::create(&self.schema),
AvroParseOptions::create(&self.schema.resolved_schema),
)))
}
}
Expand All @@ -69,6 +69,8 @@ impl AvroAccessBuilder {
})
}

/// Note: we should use unresolved schema to parsing bytes into avro value.
/// Otherwise it's an invalid schema and parsing will fail. (Avro error: Two named schema defined for same fullname)
async fn parse_avro_value(&self, payload: &[u8]) -> ConnectorResult<Option<Value>> {
// parse payload to avro value
// if use confluent schema, get writer schema from confluent schema registry
Expand All @@ -78,10 +80,10 @@ impl AvroAccessBuilder {
Ok(Some(from_avro_datum(
writer_schema.as_ref(),
&mut raw_payload,
Some(self.schema.as_ref()),
Some(&self.schema.original_schema),
)?))
} else {
let mut reader = Reader::with_schema(self.schema.as_ref(), payload)?;
let mut reader = Reader::with_schema(&self.schema.original_schema, payload)?;
match reader.next() {
Some(Ok(v)) => Ok(Some(v)),
Some(Err(e)) => Err(e)?,
Expand All @@ -93,8 +95,8 @@ impl AvroAccessBuilder {

#[derive(Debug, Clone)]
pub struct AvroParserConfig {
pub schema: Arc<Schema>,
pub key_schema: Option<Arc<Schema>>,
pub schema: Arc<ResolvedAvroSchema>,
pub key_schema: Option<Arc<ResolvedAvroSchema>>,
/// Writer schema is the schema used to write the data. When parsing Avro data, the exactly same schema
/// must be used to decode the message, and then convert it with the reader schema.
pub writer_schema_cache: Option<Arc<ConfluentSchemaCache>>,
Expand Down Expand Up @@ -143,9 +145,13 @@ impl AvroParserConfig {
tracing::debug!("infer key subject {subject_key:?}, value subject {subject_value}");

Ok(Self {
schema: resolver.get_by_subject(&subject_value).await?,
schema: Arc::new(ResolvedAvroSchema::create(
resolver.get_by_subject(&subject_value).await?,
)?),
key_schema: if let Some(subject_key) = subject_key {
Some(resolver.get_by_subject(&subject_key).await?)
Some(Arc::new(ResolvedAvroSchema::create(
resolver.get_by_subject(&subject_key).await?,
)?))
} else {
None
},
Expand All @@ -161,7 +167,7 @@ impl AvroParserConfig {
let schema = Schema::parse_reader(&mut schema_content.as_slice())
.context("failed to parse avro schema")?;
Ok(Self {
schema: Arc::new(schema),
schema: Arc::new(ResolvedAvroSchema::create(Arc::new(schema))?),
key_schema: None,
writer_schema_cache: None,
map_handling,
Expand All @@ -170,7 +176,7 @@ impl AvroParserConfig {
}

pub fn map_to_columns(&self) -> ConnectorResult<Vec<ColumnDesc>> {
avro_schema_to_column_descs(self.schema.as_ref(), self.map_handling)
avro_schema_to_column_descs(&self.schema.resolved_schema, self.map_handling)
}
}

Expand Down Expand Up @@ -289,7 +295,7 @@ mod test {
.await
.unwrap();
let builder = try_match_expand!(&parser.payload_builder, AccessBuilderImpl::Avro).unwrap();
let schema = builder.schema.clone();
let schema = builder.schema.original_schema.clone();
let record = build_avro_data(&schema);
assert_eq!(record.fields.len(), 11);
let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Snappy);
Expand Down Expand Up @@ -482,7 +488,7 @@ mod test {
.await
.unwrap();
let builder = try_match_expand!(&parser.payload_builder, AccessBuilderImpl::Avro).unwrap();
let schema = &builder.schema;
let schema = &builder.schema.original_schema;
let mut null_record = Record::new(schema).unwrap();
null_record.put("id", Value::Int(5));
null_record.put("age", Value::Union(0, Box::new(Value::Null)));
Expand Down Expand Up @@ -554,8 +560,8 @@ mod test {
let conf = new_avro_conf_from_local("simple-schema.avsc")
.await
.unwrap();
let mut writer = Writer::new(&conf.schema, Vec::new());
let record = build_avro_data(&conf.schema);
let mut writer = Writer::new(conf.schema.original_schema.as_ref(), Vec::new());
let record = build_avro_data(conf.schema.original_schema.as_ref());
writer.append(record).unwrap();
let encoded = writer.into_inner().unwrap();
println!("path = {:?}", e2e_file_path("avro_simple_schema_bin.1"));
Expand Down
33 changes: 31 additions & 2 deletions src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::LazyLock;
use std::sync::{Arc, LazyLock};

use apache_avro::schema::{DecimalSchema, RecordSchema, Schema};
use apache_avro::schema::{DecimalSchema, RecordSchema, ResolvedSchema, Schema};
use apache_avro::types::{Value, ValueKind};
use apache_avro::AvroResult;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::log::LogSuppresser;
Expand All @@ -26,6 +27,33 @@ use crate::error::ConnectorResult;
use crate::parser::unified::bail_uncategorized;
use crate::parser::{AccessError, MapHandling};

/// Avro schema with `Ref` inlined. The newtype is used to indicate whether the schema is resolved.
///
/// TODO: Actually most of the place should use resolved schema, but currently they just happen to work (Some edge cases are not met yet).
///
/// TODO: refactor avro lib to use the feature there.
#[derive(Debug)]
pub struct ResolvedAvroSchema {
/// Should be used for parsing bytes into Avro value
pub original_schema: Arc<Schema>,
/// Should be used for type mapping from Avro value to RisingWave datum
pub resolved_schema: Schema,
}

impl ResolvedAvroSchema {
pub fn create(schema: Arc<Schema>) -> AvroResult<Self> {
let resolver = ResolvedSchema::try_from(schema.as_ref())?;
// todo: to_resolved may cause stackoverflow if there's a loop in the schema
let resolved_schema = resolver.to_resolved(schema.as_ref())?;
Ok(Self {
original_schema: schema,
resolved_schema,
})
}
}

/// This function expects resolved schema (no `Ref`).
/// FIXME: require passing resolved schema here.
pub fn avro_schema_to_column_descs(
schema: &Schema,
map_handling: Option<MapHandling>,
Expand Down Expand Up @@ -92,6 +120,7 @@ fn avro_field_to_column_desc(
}
}

/// This function expects resolved schema (no `Ref`).
fn avro_type_mapping(
schema: &Schema,
map_handling: Option<MapHandling>,
Expand Down
15 changes: 8 additions & 7 deletions src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use risingwave_pb::plan_common::ColumnDesc;

use crate::error::ConnectorResult;
use crate::parser::avro::schema_resolver::ConfluentSchemaCache;
use crate::parser::avro::util::avro_schema_to_column_descs;
use crate::parser::avro::util::{avro_schema_to_column_descs, ResolvedAvroSchema};
use crate::parser::unified::avro::{
avro_extract_field_schema, avro_schema_skip_union, AvroAccess, AvroParseOptions,
};
Expand All @@ -40,7 +40,7 @@ const PAYLOAD: &str = "payload";

#[derive(Debug)]
pub struct DebeziumAvroAccessBuilder {
schema: Schema,
schema: ResolvedAvroSchema,
schema_resolver: Arc<ConfluentSchemaCache>,
key_schema: Option<Arc<Schema>>,
value: Option<Value>,
Expand All @@ -59,9 +59,10 @@ impl AccessBuilder for DebeziumAvroAccessBuilder {
};
Ok(AccessImpl::Avro(AvroAccess::new(
self.value.as_mut().unwrap(),
// Assumption: Key will not contain reference, so unresolved schema can work here.
AvroParseOptions::create(match self.encoding_type {
EncodingType::Key => self.key_schema.as_mut().unwrap(),
EncodingType::Value => &self.schema,
EncodingType::Value => &self.schema.resolved_schema,
}),
)))
}
Expand All @@ -78,11 +79,8 @@ impl DebeziumAvroAccessBuilder {
..
} = config;

let resolver = apache_avro::schema::ResolvedSchema::try_from(&*outer_schema)?;
// todo: to_resolved may cause stackoverflow if there's a loop in the schema
let schema = resolver.to_resolved(&outer_schema)?;
Ok(Self {
schema,
schema: ResolvedAvroSchema::create(outer_schema)?,
schema_resolver,
key_schema: None,
value: None,
Expand Down Expand Up @@ -133,6 +131,9 @@ impl DebeziumAvroParserConfig {
pub fn map_to_columns(&self) -> ConnectorResult<Vec<ColumnDesc>> {
avro_schema_to_column_descs(
avro_schema_skip_union(avro_extract_field_schema(
// FIXME: use resolved schema here.
// Currently it works because "after" refers to a subtree in "before",
// but in theory, inside "before" there could also be a reference.
&self.outer_schema,
Some("before"),
)?)?,
Expand Down
5 changes: 4 additions & 1 deletion src/connector/src/parser/unified/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ use crate::parser::avro::util::avro_to_jsonb;
#[derive(Clone)]
/// Options for parsing an `AvroValue` into Datum, with an optional avro schema.
pub struct AvroParseOptions<'a> {
/// Currently, this schema is only used for decimal
/// Currently, this schema is only used for decimal.
///
/// FIXME: In theory we should use resolved schema.
/// e.g., it's possible that a field is a reference to a decimal or a record containing a decimal field.
pub schema: Option<&'a Schema>,
/// Strict Mode
/// If strict mode is disabled, an int64 can be parsed from an `AvroInt` (int32) value.
Expand Down

0 comments on commit 95ce5ec

Please sign in to comment.