diff --git a/src/confluent_kafka/avro/serializer/message_serializer.py b/src/confluent_kafka/avro/serializer/message_serializer.py index 9023cb63d..1eeb04bf3 100644 --- a/src/confluent_kafka/avro/serializer/message_serializer.py +++ b/src/confluent_kafka/avro/serializer/message_serializer.py @@ -20,6 +20,7 @@ # derived from https://github.com/verisign/python-confluent-schemaregistry.git # import io +import json import logging import struct import sys @@ -79,7 +80,7 @@ def __init__(self, registry_client, reader_key_schema=None, reader_value_schema= # Encoder support def _get_encoder_func(self, writer_schema): if HAS_FAST: - schema = writer_schema.to_json() + schema = json.loads(str(writer_schema)) parsed_schema = parse_schema(schema) return lambda record, fp: schemaless_writer(fp, parsed_schema, record) writer = avro.io.DatumWriter(writer_schema) @@ -175,8 +176,8 @@ def _get_decoder_func(self, schema_id, payload, is_key=False): if HAS_FAST: # try to use fast avro try: - fast_avro_writer_schema = parse_schema(writer_schema_obj.to_json()) - fast_avro_reader_schema = parse_schema(reader_schema_obj.to_json()) + fast_avro_writer_schema = parse_schema(json.loads(str(writer_schema_obj))) + fast_avro_reader_schema = parse_schema(json.loads(str(reader_schema_obj))) schemaless_reader(payload, fast_avro_writer_schema) # If we reach this point, this means we have fastavro and it can