Skip to content

Commit

Permalink
fix parsing and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Jun 3, 2024
1 parent 7ff6f08 commit e74218d
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 32 deletions.
1 change: 1 addition & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,7 @@ SLT_HOST = "${RISEDEV_RW_FRONTEND_LISTEN_ADDRESS}"
SLT_PORT = "${RISEDEV_RW_FRONTEND_PORT}"
SLT_DB = "dev"
PATH = "${PWD}/e2e_test/commands:${PATH}"
RUST_BACKTRACE = "0" # slt backtrace is useless

[tasks.slt-clean]
category = "RiseDev - Test - SQLLogicTest"
Expand Down
34 changes: 18 additions & 16 deletions e2e_test/source_inline/kafka/avro/name_strategy.slt
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,11 @@ FROM
ORDER BY
"ID";
----
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg)

# query II
# SELECT
Expand All @@ -164,13 +165,14 @@ FROM
ORDER BY
"ID";
----
update id1 1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z 99999999.99
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47
delete id4 4 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49
NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL
update id1 1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z 99999999.99 NULL NULL
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL
delete id4 4 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg)


query II
Expand All @@ -181,11 +183,11 @@ FROM
ORDER BY
"ID";
----
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49

update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47 NULL NULL
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99 NULL NULL
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47 NULL NULL
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49 NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL NULL -0.01 NULL (abcdefg)



Expand Down
3 changes: 2 additions & 1 deletion e2e_test/source_inline/kafka/avro/upsert_avro_json
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"string"^{"type":"record","name":"OBJ_ATTRIBUTE_VALUE","namespace":"CPLM","fields":[{"name":"op_type","type":["null","string"],"default":null},{"name":"ID","type":["null","string"],"default":null},{"name":"CLASS_ID","type":["null","string"],"default":null},{"name":"ITEM_ID","type":["null","string"], "default":null},{"name":"ATTR_ID","type":["null","string"],"default":null},{"name":"ATTR_VALUE","type":["null","string"],"default":null},{"name":"ORG_ID","type":["null","string"],"default":null},{"name":"UNIT_INFO","type":["null","string"],"default":null},{"name":"UPD_TIME","type":["null","string"],"default":null},{"name":"DEC_VAL","type":[{"type":"bytes","logicalType":"decimal","precision":10,"scale":2},"null"],"default":"\u00ff"}],"connect.name":"CPLM.OBJ_ATTRIBUTE_VALUE"}
"string"^{"type":"record","name":"OBJ_ATTRIBUTE_VALUE","namespace":"CPLM","fields":[{"name":"op_type","type":["null","string"],"default":null},{"name":"ID","type":["null","string"],"default":null},{"name":"CLASS_ID","type":["null","string"],"default":null},{"name":"ITEM_ID","type":["null","string"],"default":null},{"name":"ATTR_ID","type":["null","string"],"default":null},{"name":"ATTR_VALUE","type":["null","string"],"default":null},{"name":"ORG_ID","type":["null","string"],"default":null},{"name":"UNIT_INFO","type":["null","string"],"default":null},{"name":"UPD_TIME","type":["null","string"],"default":null},{"name":"DEC_VAL","type":[{"type":"bytes","logicalType":"decimal","precision":10,"scale":2},"null"],"default":"\u00ff"},{"name":"REFERRED","type":["null",{"type":"record","name":"REFERRED_TYPE","fields":[{"name":"a","type":"string"}]}],"default":null},{"name":"REF","type":["null","REFERRED_TYPE"],"default":null}],"connect.name":"CPLM.OBJ_ATTRIBUTE_VALUE"}
"id1"^{"op_type": {"string": "update"}, "ID": {"string": "id1"}, "CLASS_ID": {"string": "1"}, "ITEM_ID": {"string": "6768"}, "ATTR_ID": {"string": "6970"}, "ATTR_VALUE": {"string": "value9"}, "ORG_ID": {"string": "7172"}, "UNIT_INFO": {"string": "info9"}, "UPD_TIME": {"string": "2021-05-18T07:59:58.714Z"}, "DEC_VAL": {"bytes": "\u0002\u0054\u000b\u00e3\u00ff"}}
"id2"^{"op_type": {"string": "delete"}, "ID": {"string": "id2"}, "CLASS_ID": {"string": "2"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u0002\u0054\u000b\u00e3\u00ff"}}
"id3"^{"op_type": {"string": "delete"}, "ID": {"string": "id3"}, "CLASS_ID": {"string": "3"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u007f\u00ff\u00ff\u00ff"}}
"id4"^{"op_type": {"string": "delete"}, "ID": {"string": "id4"}, "CLASS_ID": {"string": "4"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u0000\u0080\u0000\u0000\u0001"}}
"id5"^{"op_type": {"string": "delete"}, "ID": {"string": "id5"}, "CLASS_ID": {"string": "5"}, "ITEM_ID": {"string": "7778"}, "ATTR_ID": {"string": "7980"}, "ATTR_VALUE": {"string": "value10"}, "ORG_ID": {"string": "8182"}, "UNIT_INFO": {"string": "info10"}, "UPD_TIME": {"string": "2021-05-19T15:22:45.539Z"}, "DEC_VAL": {"bytes": "\u0000\u0080\u0000\u0000\u0001"}}
"id1"^{"op_type": {"string": "update"}, "ID": {"string": "id1"}, "CLASS_ID": {"string": "-1"}, "ITEM_ID": {"string": "6768"}, "ATTR_ID": {"string": "6970"}, "ATTR_VALUE": {"string": "value9"}, "ORG_ID": {"string": "7172"}, "UNIT_INFO": {"string": "info9"}, "UPD_TIME": {"string": "2021-05-18T07:59:58.714Z"}, "DEC_VAL": {"bytes": "\u0080\u0000\u0000\u0001"}}
"id4"^
"id100"^{"REF": {"CPLM.REFERRED_TYPE":{"a": "abcdefg"}}}
16 changes: 9 additions & 7 deletions src/connector/src/parser/avro/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl AccessBuilder for AvroAccessBuilder {
self.value = self.parse_avro_value(&payload).await?;
Ok(AccessImpl::Avro(AvroAccess::new(
self.value.as_ref().unwrap(),
AvroParseOptions::create(&self.schema.0),
AvroParseOptions::create(&self.schema.resolved_schema),
)))
}
}
Expand All @@ -69,6 +69,8 @@ impl AvroAccessBuilder {
})
}

/// Note: we should use unresolved schema to parsing bytes into avro value.
/// Otherwise it's an invalid schema and parsing will fail. (Avro error: Two named schema defined for same fullname)
async fn parse_avro_value(&self, payload: &[u8]) -> ConnectorResult<Option<Value>> {
// parse payload to avro value
// if use confluent schema, get writer schema from confluent schema registry
Expand All @@ -78,10 +80,10 @@ impl AvroAccessBuilder {
Ok(Some(from_avro_datum(
writer_schema.as_ref(),
&mut raw_payload,
Some(&self.schema.0),
Some(&self.schema.original_schema),
)?))
} else {
let mut reader = Reader::with_schema(&self.schema.0, payload)?;
let mut reader = Reader::with_schema(&self.schema.original_schema, payload)?;
match reader.next() {
Some(Ok(v)) => Ok(Some(v)),
Some(Err(e)) => Err(e)?,
Expand Down Expand Up @@ -144,11 +146,11 @@ impl AvroParserConfig {

Ok(Self {
schema: Arc::new(ResolvedAvroSchema::create(
resolver.get_by_subject(&subject_value).await?.as_ref(),
resolver.get_by_subject(&subject_value).await?,
)?),
key_schema: if let Some(subject_key) = subject_key {
Some(Arc::new(ResolvedAvroSchema::create(
resolver.get_by_subject(&subject_key).await?.as_ref(),
resolver.get_by_subject(&subject_key).await?,
)?))
} else {
None
Expand All @@ -165,7 +167,7 @@ impl AvroParserConfig {
let schema = Schema::parse_reader(&mut schema_content.as_slice())
.context("failed to parse avro schema")?;
Ok(Self {
schema: Arc::new(ResolvedAvroSchema::create(&schema)?),
schema: Arc::new(ResolvedAvroSchema::create(Arc::new(schema))?),
key_schema: None,
writer_schema_cache: None,
map_handling,
Expand All @@ -174,7 +176,7 @@ impl AvroParserConfig {
}

pub fn map_to_columns(&self) -> ConnectorResult<Vec<ColumnDesc>> {
avro_schema_to_column_descs(&self.schema.0, self.map_handling)
avro_schema_to_column_descs(&self.schema.resolved_schema, self.map_handling)
}
}

Expand Down
22 changes: 16 additions & 6 deletions src/connector/src/parser/avro/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::LazyLock;
use std::sync::{Arc, LazyLock};

use apache_avro::schema::{DecimalSchema, RecordSchema, ResolvedSchema, Schema};
use apache_avro::types::{Value, ValueKind};
Expand All @@ -33,17 +33,26 @@ use crate::parser::{AccessError, MapHandling};
///
/// TODO: refactor avro lib to use the feature there.
#[derive(Debug)]
pub struct ResolvedAvroSchema(pub Schema);
pub struct ResolvedAvroSchema {
/// Should be used for parsing bytes into Avro value
pub original_schema: Arc<Schema>,
/// Should be used for type mapping from Avro value to RisingWave datum
pub resolved_schema: Schema,
}

impl ResolvedAvroSchema {
pub fn create(schema: &Schema) -> AvroResult<Self> {
let resolver = ResolvedSchema::try_from(schema)?;
pub fn create(schema: Arc<Schema>) -> AvroResult<Self> {
let resolver = ResolvedSchema::try_from(schema.as_ref())?;
// todo: to_resolved may cause stackoverflow if there's a loop in the schema
let schema = resolver.to_resolved(schema)?;
Ok(Self(schema.clone()))
let resolved_schema = resolver.to_resolved(schema.as_ref())?;
Ok(Self {
original_schema: schema,
resolved_schema,
})
}
}

/// This function expects resolved schema (no `Ref`).
/// FIXME: require passing resolved schema here.
pub fn avro_schema_to_column_descs(
schema: &Schema,
Expand Down Expand Up @@ -111,6 +120,7 @@ fn avro_field_to_column_desc(
}
}

/// This function expects resolved schema (no `Ref`).
fn avro_type_mapping(
schema: &Schema,
map_handling: Option<MapHandling>,
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/debezium/avro_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl AccessBuilder for DebeziumAvroAccessBuilder {
// Assumption: Key will not contain reference, so unresolved schema can work here.
AvroParseOptions::create(match self.encoding_type {
EncodingType::Key => self.key_schema.as_mut().unwrap(),
EncodingType::Value => &self.schema.0,
EncodingType::Value => &self.schema.resolved_schema,
}),
)))
}
Expand All @@ -80,7 +80,7 @@ impl DebeziumAvroAccessBuilder {
} = config;

Ok(Self {
schema: ResolvedAvroSchema::create(&outer_schema)?,
schema: ResolvedAvroSchema::create(outer_schema)?,
schema_resolver,
key_schema: None,
value: None,
Expand Down

0 comments on commit e74218d

Please sign in to comment.