-
Notifications
You must be signed in to change notification settings - Fork 901
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
return record name for AVRO union #785
Conversation
It looks like @Dieken hasn't signed our Contributor License Agreement, yet.
You can read and sign our full Contributor License Agreement here. Once you've signed reply with Appreciation of efforts, clabot |
[clabot:check] |
@confluentinc It looks like @Dieken just signed our Contributor License Agreement. 👍 Always at your service, clabot |
I don't think this is something you want on by default. It changes the format of the the data that the user gets back and therefore could break for people that aren't expecting it. It should probably be some option that a user can choose to have on. |
@scottbelden Yes, this will break backward compatibility, but maybe it's a chance to break due to rewriting of AVRO encoding and decoding in branch "FirstClassFastAvro". I'm not sure how to properly add this option to the AvroConsumer class, actually it's better to be a standard behaviour, always returns record name for AVRO union just like NodeJS avsc and LinkedIn goavro: Currently in avsc and goavro "union {Foo, Bar} msg" is deserialzed into:
Currently in confluent-kafka-python(fastavro without "return_record_name" option):
Expected with "return_record_name=True":
Unluckily the "return_record_name" option makes fastavro change normal AVRO record into a tuple too, this is very ugly. |
We can't enable this by default but we could expose the option. We are actually deprecating the Avro Producer/ Avro Consumer entirely in favor of genreric serialization. Let me push some of those changes and we can adjust accordingly. |
fef1440
to
b4f2481
Compare
558cdb0
to
52520d5
Compare
b936017
to
40fe906
Compare
2d1b742
to
4d318bc
Compare
This is about consuming AVRO unions, and there is a related topic: #656 (produce AVRO unions). Am I understanding correctly that as of right now there is no way to properly handle AVRO unions (properly being described in AVRO docs: https://avro.apache.org/docs/current/spec.html#json_encoding)? |
@slominskir The JSON encoding that you referenced does exist in from fastavro import json_writer, json_reader
from io import StringIO
schema = [
{
"type": "record",
"name": "A",
"fields": [{"name": "foo", "type": ["string", "null"]}],
},
{
"type": "record",
"name": "B",
"fields": [{"name": "bar", "type": ["string", "null"]}],
},
{
"type": "record",
"name": "AOrB",
"fields": [{"name": "entity", "type": ["A", "B"]}],
},
]
records = [
{"entity": {"foo": "FOO"}}, # This is an AOrB record that has an internal A record
{"entity": {"bar": "BAR"}}, # This is an AOrB record that has an internal B record
]
sio = StringIO()
json_writer(sio, schema, records)
print(sio.getvalue()) That will print out the following:
However, it should be noted that the form above is the serialized form. If one were to read that into memory, the form would look more like the original There is currently no way to have the in-memory representation be like |
This sounds very bad for interoperability between Java and Python APIs as the Java API does use the JSON AVRO format with union type information. If a Java client exports topic messages to a file the contents are formatted with such that the Python client cannot import them back in (Java backed Confluent kafka-avro-console-consumer can though). In your example you printed out the "correct" format, so it seems it is possible to print it at least? |
@slominskir I don't think I'm doing a very good job of explaining myself. The following form:
Is the serialized JSON Avro format. from fastavro import json_reader
schema = [
{
"type": "record",
"name": "A",
"fields": [{"name": "foo", "type": ["string", "null"]}],
},
{
"type": "record",
"name": "B",
"fields": [{"name": "bar", "type": ["string", "null"]}],
},
{
"type": "record",
"name": "AOrB",
"fields": [{"name": "entity", "type": ["A", "B"]}],
},
]
with open("aorb.json") as fp:
for record in json_reader(fp, schema):
print(record) If you run that, what gets printed out is On disk, in the serialized form, both Java and So on the python side, if you already read in the json file and you want to access the |
@scottbelden One point of confusion is that I'm using the confluent-kafka-python API, which hides a lot of the details that happen under the hood (sometimes a good thing). It sounds like the fastavro library does everything, but by the time the DeserializedConsumer from the kafka-python library returns a message with poll() it's already too late it seems as I get a dict object back (that does not include the union type info). You've given examples using fastavro, but do you have read/write examples using the kafka-python API? I assume that's what this ticket is about? |
By the way - one "hack" to get this to work is to edit the file under site-packages directly: confluent-kafka-python/src/confluent_kafka/schema_registry/avro.py Lines 321 to 323 in 8069f26
to be:
On my system the full file path is: /usr/local/lib/python3.7/site-packages/confluent_kafka/schema_registry.avro.py. This is obviously not a permanent solution. I assume buried in the 42 files changed in this pull request a flag is exposed so that True is added to that function call? |
Correct, my examples all just use fastavro directly because I am not familiar with the python kafka API. Unfortunately I probably can't help there.
Correct, I think the original PR was just this single commit: f958879. But then a refactor commit got added to it that made it massive. But my original comment from above still stands. This would need to be added as an optional argument somewhere in this library because it changes the data structure of what is being returned to the user. Instead of getting |
Replacement pull: #1028 |
Replaced stale pull #785; add config bool return_record_name
See https://fastavro.readthedocs.io/en/latest/reader.html#fastavro._read_py.schemaless_reader