Skip to content

Commit

Permalink
fix(avro): parse decimals with shorter bytes correctly (#16202)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangjinwu authored Apr 9, 2024
1 parent 98e1c64 commit 6e759c2
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 58 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions e2e_test/source/basic/nosim_kafka.slt
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,16 @@ sleep 8s

query II
SELECT
op_type, "ID", "CLASS_ID", "ITEM_ID", "ATTR_ID", "ATTR_VALUE", "ORG_ID", "UNIT_INFO", "UPD_TIME"
op_type, "ID", "CLASS_ID", "ITEM_ID", "ATTR_ID", "ATTR_VALUE", "ORG_ID", "UNIT_INFO", "UPD_TIME", "DEC_VAL"
FROM
upsert_avro_json_default_key
ORDER BY
"ID";
----
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z
update id1 -1 6768 6970 value9 7172 info9 2021-05-18T07:59:58.714Z -21474836.47
delete id2 2 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 99999999.99
delete id3 3 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.47
delete id5 5 7778 7980 value10 8182 info10 2021-05-19T15:22:45.539Z 21474836.49

# query II
# SELECT
Expand Down
45 changes: 41 additions & 4 deletions scripts/source/schema_registry_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ def delivery_report(err, msg):
return


def load_avro_json(encoded, schema):
"""Unlike `json.loads`, this decodes a json according to given avro schema.
https://avro.apache.org/docs/1.11.1/specification/#json-encoding
Specially, it handles `union` variants, and differentiates `bytes` from `string`.
"""
from fastavro import json_reader
from io import StringIO

with StringIO(encoded) as buf:
reader = json_reader(buf, schema)
return next(reader)


if __name__ == '__main__':
if len(sys.argv) < 5:
print("datagen.py <brokerlist> <schema-registry-url> <file> <name-strategy> <json/avro>")
Expand Down Expand Up @@ -59,6 +73,8 @@ def delivery_report(err, msg):
producer = Producer(kafka_conf)
key_serializer = None
value_serializer = None
key_schema = None
value_schema = None
with open(file) as file:
for (i, line) in enumerate(file):
if i == 0:
Expand All @@ -71,6 +87,8 @@ def delivery_report(err, msg):
value_serializer = AvroSerializer(schema_registry_client=schema_registry_client,
schema_str=parts[1],
conf=avro_ser_conf)
key_schema = json.loads(parts[0])
value_schema = json.loads(parts[1])
else:
key_serializer = JSONSerializer(schema_registry_client=schema_registry_client,
schema_str=parts[0])
Expand All @@ -80,28 +98,47 @@ def delivery_report(err, msg):
if type == 'avro':
value_serializer = AvroSerializer(schema_registry_client=schema_registry_client,
schema_str=parts[0])
value_schema = json.loads(parts[0])
else:
value_serializer = JSONSerializer(schema_registry_client=schema_registry_client,
schema_str=parts[0])
else:
parts = line.split("^")
key_idx = None
value_idx = None
if len(parts) > 1:
key_idx = 0
if len(parts[1].strip()) > 0:
value_idx = 1
else:
value_idx = 0
if type == 'avro':
if key_idx is not None:
key_json = load_avro_json(parts[key_idx], key_schema)
if value_idx is not None:
value_json = load_avro_json(parts[value_idx], value_schema)
else:
if key_idx is not None:
key_json = json.loads(parts[key_idx])
if value_idx is not None:
value_json = json.loads(parts[value_idx])
if len(parts) > 1:
if len(parts[1].strip()) > 0:
producer.produce(topic=topic, partition=0,
key=key_serializer(json.loads(parts[0]),
key=key_serializer(key_json,
SerializationContext(topic, MessageField.KEY)),
value=value_serializer(
json.loads(parts[1]), SerializationContext(topic, MessageField.VALUE)),
value_json, SerializationContext(topic, MessageField.VALUE)),
on_delivery=delivery_report)
else:
producer.produce(topic=topic, partition=0,
key=key_serializer(json.loads(parts[0]),
key=key_serializer(key_json,
SerializationContext(topic, MessageField.KEY)),
on_delivery=delivery_report)
else:
producer.produce(topic=topic, partition=0,
value=value_serializer(
json.loads(parts[0]), SerializationContext(topic, MessageField.VALUE)),
value_json, SerializationContext(topic, MessageField.VALUE)),
on_delivery=delivery_report)

producer.flush()
10 changes: 5 additions & 5 deletions scripts/source/test_data/debezium_compact_avro_json.1
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{"type":"record","name":"Key","namespace":"postgres.public.orders","fields":[{"name":"order_id","type":{"type":"int","connect.default":0},"default":0}],"connect.name":"postgres.public.orders.Key"}^{"type":"record","name":"Envelope","namespace":"postgres.public.orders","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"order_id","type":{"type":"int","connect.default":0},"default":0},{"name":"order_date","type":["null","long"],"default":null},{"name":"customer_name","type":["null","string"],"default":null},{"name":"price","type":["null",{"type":"record","name":"VariableScaleDecimal","namespace":"io.debezium.data","fields":[{"name":"scale","type":"int"},{"name":"value","type":"bytes"}],"connect.doc":"Variable scaled decimal","connect.version":1,"connect.name":"io.debezium.data.VariableScaleDecimal"}],"default":null},{"name":"product_id","type":["null","int"],"default":null},{"name":"order_status","type":["null",{"type":"int","connect.type":"int16"}],"default":null}],"connect.name":"postgres.public.orders.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.postgresql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"schema","type":"string"},{"name":"table","type":"string"},{"name":"txId","type":["null","long"],"default":null},{"name":"lsn","type":["null","long"],"default":null},{"name":"xmin","type":["null","long"],"default":null}],"connect.name":"io.debezium.connector.postgresql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"postgres.public.orders.Envelope"}
{"order_id": 1}^{"before":null,"after":{"order_id":1,"order_date":1558430840000,"customer_name":"Bob","price":null,"product_id":1,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1685975423896,"snapshot":"true","db":"mydb","sequence":"[null,\"23911624\"]","schema":"public","table":"orders","txId":490,"lsn":23911624,"xmin":null},"op":"r","ts_ms":1685975424096,"transaction":null}
{"order_id": 2}^{"before":null,"after":{"order_id":2,"order_date":1558430840001,"customer_name":"Alice","price":null,"product_id":2,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1685975423896,"snapshot":"true","db":"mydb","sequence":"[null,\"23911624\"]","schema":"public","table":"orders","txId":490,"lsn":23911624,"xmin":null},"op":"r","ts_ms":1685975424102,"transaction":null}
{"order_id": 3}^{"before":null,"after":{"order_id":3,"order_date":1558430840002,"customer_name":"Alice","price":null,"product_id":2,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1685975423896,"snapshot":"last","db":"mydb","sequence":"[null,\"23911624\"]","schema":"public","table":"orders","txId":490,"lsn":23911624,"xmin":null},"op":"r","ts_ms":1685975424103,"transaction":null}
{"order_id": 1}^{"before":null,"after":{"order_id":1,"order_date":1558430840000,"customer_name":"Bob","price":null,"product_id":3,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1686029203809,"snapshot":"false","db":"mydb","sequence":"[null,\"23911952\"]","schema":"public","table":"orders","txId":491,"lsn":23911952,"xmin":null},"op":"u","ts_ms":1686029204058,"transaction":null}
{"order_id": 1}^
{"order_id": 1}^{"before": null, "after": {"postgres.public.orders.Value": {"order_id": 1, "order_date": {"long": 1558430840000}, "customer_name": {"string": "Bob"}, "price": null, "product_id": {"int": 1}, "order_status": {"int": 1}}}, "source": {"version": "1.9.7.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1685975423896, "snapshot": {"string": "true"}, "db": "mydb", "sequence": {"string": "[null,\"23911624\"]"}, "schema": "public", "table": "orders", "txId": {"long": 490}, "lsn": {"long": 23911624}, "xmin": null}, "op": "r", "ts_ms": {"long": 1685975424096}, "transaction": null}
{"order_id": 2}^{"before": null, "after": {"postgres.public.orders.Value": {"order_id": 2, "order_date": {"long": 1558430840001}, "customer_name": {"string": "Alice"}, "price": null, "product_id": {"int": 2}, "order_status": {"int": 1}}}, "source": {"version": "1.9.7.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1685975423896, "snapshot": {"string": "true"}, "db": "mydb", "sequence": {"string": "[null,\"23911624\"]"}, "schema": "public", "table": "orders", "txId": {"long": 490}, "lsn": {"long": 23911624}, "xmin": null}, "op": "r", "ts_ms": {"long": 1685975424102}, "transaction": null}
{"order_id": 3}^{"before": null, "after": {"postgres.public.orders.Value": {"order_id": 3, "order_date": {"long": 1558430840002}, "customer_name": {"string": "Alice"}, "price": null, "product_id": {"int": 2}, "order_status": {"int": 1}}}, "source": {"version": "1.9.7.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1685975423896, "snapshot": {"string": "last"}, "db": "mydb", "sequence": {"string": "[null,\"23911624\"]"}, "schema": "public", "table": "orders", "txId": {"long": 490}, "lsn": {"long": 23911624}, "xmin": null}, "op": "r", "ts_ms": {"long": 1685975424103}, "transaction": null}
{"order_id": 1}^{"before": null, "after": {"postgres.public.orders.Value": {"order_id": 1, "order_date": {"long": 1558430840000}, "customer_name": {"string": "Bob"}, "price": null, "product_id": {"int": 3}, "order_status": {"int": 1}}}, "source": {"version": "1.9.7.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1686029203809, "snapshot": {"string": "false"}, "db": "mydb", "sequence": {"string": "[null,\"23911952\"]"}, "schema": "public", "table": "orders", "txId": {"long": 491}, "lsn": {"long": 23911952}, "xmin": null}, "op": "u", "ts_ms": {"long": 1686029204058}, "transaction": null}
{"order_id": 1}^
12 changes: 6 additions & 6 deletions scripts/source/test_data/debezium_non_compact_avro_json.1
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{"type":"record","name":"Key","namespace":"postgres.public.orders","fields":[{"name":"order_id","type":{"type":"int","connect.default":0},"default":0}],"connect.name":"postgres.public.orders.Key"}^{"type":"record","name":"Envelope","namespace":"postgres.public.orders","fields":[{"name":"before","type":["null",{"type":"record","name":"Value","fields":[{"name":"order_id","type":{"type":"int","connect.default":0},"default":0},{"name":"order_date","type":["null","long"],"default":null},{"name":"customer_name","type":["null","string"],"default":null},{"name":"price","type":["null",{"type":"record","name":"VariableScaleDecimal","namespace":"io.debezium.data","fields":[{"name":"scale","type":"int"},{"name":"value","type":"bytes"}],"connect.doc":"Variable scaled decimal","connect.version":1,"connect.name":"io.debezium.data.VariableScaleDecimal"}],"default":null},{"name":"product_id","type":["null","int"],"default":null},{"name":"order_status","type":["null",{"type":"int","connect.type":"int16"}],"default":null}],"connect.name":"postgres.public.orders.Value"}],"default":null},{"name":"after","type":["null","Value"],"default":null},{"name":"source","type":{"type":"record","name":"Source","namespace":"io.debezium.connector.postgresql","fields":[{"name":"version","type":"string"},{"name":"connector","type":"string"},{"name":"name","type":"string"},{"name":"ts_ms","type":"long"},{"name":"snapshot","type":[{"type":"string","connect.version":1,"connect.parameters":{"allowed":"true,last,false,incremental"},"connect.default":"false","connect.name":"io.debezium.data.Enum"},"null"],"default":"false"},{"name":"db","type":"string"},{"name":"sequence","type":["null","string"],"default":null},{"name":"schema","type":"string"},{"name":"table","type":"string"},{"name":"txId","type":["null","long"],"default":null},{"name":"lsn","type":["null","long"],"default":null},{"name":"xmin","type":["null","long"],"default":null}],"connect.name":"io.debezium.connector.postgresql.Source"}},{"name":"op","type":"string"},{"name":"ts_ms","type":["null","long"],"default":null},{"name":"transaction","type":["null",{"type":"record","name":"ConnectDefault","namespace":"io.confluent.connect.avro","fields":[{"name":"id","type":"string"},{"name":"total_order","type":"long"},{"name":"data_collection_order","type":"long"}]}],"default":null}],"connect.name":"postgres.public.orders.Envelope"}
{"order_id": 1}^{"before":null,"after":{"order_id":1,"order_date":1558430840000,"customer_name":"Bob","product_id":1,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1685975423896,"snapshot":"true","db":"mydb","sequence":"[null,\"23911624\"]","schema":"public","table":"orders","txId":490,"lsn":23911624,"xmin":null},"op":"r","ts_ms":1685975424096,"transaction":null}
{"order_id": 2}^{"before":null,"after":{"order_id":2,"order_date":1558430840001,"customer_name":"Alice","product_id":2,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1685975423896,"snapshot":"true","db":"mydb","sequence":"[null,\"23911624\"]","schema":"public","table":"orders","txId":490,"lsn":23911624,"xmin":null},"op":"r","ts_ms":1685975424102,"transaction":null}
{"order_id": 3}^{"before":null,"after":{"order_id":3,"order_date":1558430840002,"customer_name":"Alice","product_id":2,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1685975423896,"snapshot":"last","db":"mydb","sequence":"[null,\"23911624\"]","schema":"public","table":"orders","txId":490,"lsn":23911624,"xmin":null},"op":"r","ts_ms":1685975424103,"transaction":null}
{"order_id": 1}^{"before":null,"after":{"order_id":1,"order_date":1558430840000,"customer_name":"Bob","product_id":3,"order_status":1},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1686029203809,"snapshot":"false","db":"mydb","sequence":"[null,\"23911952\"]","schema":"public","table":"orders","txId":491,"lsn":23911952,"xmin":null},"op":"u","ts_ms":1686029204058,"transaction":null}
{"order_id": 1}^{"before":{"order_id":1,"order_date":null,"customer_name":null,"price":null,"product_id":null,"order_status":null},"after":null,"source":{"version":"1.9.7.Final","connector":"postgresql","name":"postgres","ts_ms":1686029268569,"snapshot":"false","db":"mydb","sequence":"[\"23912408\",\"23912488\"]","schema":"public","table":"orders","txId":492,"lsn":23912488,"xmin":null},"op":"d","ts_ms":1686029268858,"transaction":null}
{"order_id": 1}^
{"order_id": 1}^{"before": null, "after": {"postgres.public.orders.Value": {"order_id": 1, "order_date": {"long": 1558430840000}, "customer_name": {"string": "Bob"}, "price": null, "product_id": {"int": 1}, "order_status": {"int": 1}}}, "source": {"version": "1.9.7.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1685975423896, "snapshot": {"string": "true"}, "db": "mydb", "sequence": {"string": "[null,\"23911624\"]"}, "schema": "public", "table": "orders", "txId": {"long": 490}, "lsn": {"long": 23911624}, "xmin": null}, "op": "r", "ts_ms": {"long": 1685975424096}, "transaction": null}
{"order_id": 2}^{"before": null, "after": {"postgres.public.orders.Value": {"order_id": 2, "order_date": {"long": 1558430840001}, "customer_name": {"string": "Alice"}, "price": null, "product_id": {"int": 2}, "order_status": {"int": 1}}}, "source": {"version": "1.9.7.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1685975423896, "snapshot": {"string": "true"}, "db": "mydb", "sequence": {"string": "[null,\"23911624\"]"}, "schema": "public", "table": "orders", "txId": {"long": 490}, "lsn": {"long": 23911624}, "xmin": null}, "op": "r", "ts_ms": {"long": 1685975424102}, "transaction": null}
{"order_id": 3}^{"before": null, "after": {"postgres.public.orders.Value": {"order_id": 3, "order_date": {"long": 1558430840002}, "customer_name": {"string": "Alice"}, "price": null, "product_id": {"int": 2}, "order_status": {"int": 1}}}, "source": {"version": "1.9.7.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1685975423896, "snapshot": {"string": "last"}, "db": "mydb", "sequence": {"string": "[null,\"23911624\"]"}, "schema": "public", "table": "orders", "txId": {"long": 490}, "lsn": {"long": 23911624}, "xmin": null}, "op": "r", "ts_ms": {"long": 1685975424103}, "transaction": null}
{"order_id": 1}^{"before": null, "after": {"postgres.public.orders.Value": {"order_id": 1, "order_date": {"long": 1558430840000}, "customer_name": {"string": "Bob"}, "price": null, "product_id": {"int": 3}, "order_status": {"int": 1}}}, "source": {"version": "1.9.7.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1686029203809, "snapshot": {"string": "false"}, "db": "mydb", "sequence": {"string": "[null,\"23911952\"]"}, "schema": "public", "table": "orders", "txId": {"long": 491}, "lsn": {"long": 23911952}, "xmin": null}, "op": "u", "ts_ms": {"long": 1686029204058}, "transaction": null}
{"order_id": 1}^{"before": {"postgres.public.orders.Value": {"order_id": 1, "order_date": null, "customer_name": null, "price": null, "product_id": null, "order_status": null}}, "after": null, "source": {"version": "1.9.7.Final", "connector": "postgresql", "name": "postgres", "ts_ms": 1686029268569, "snapshot": {"string": "false"}, "db": "mydb", "sequence": {"string": "[\"23912408\",\"23912488\"]"}, "schema": "public", "table": "orders", "txId": {"long": 492}, "lsn": {"long": 23912488}, "xmin": null}, "op": "d", "ts_ms": {"long": 1686029268858}, "transaction": null}
{"order_id": 1}^
Loading

0 comments on commit 6e759c2

Please sign in to comment.