Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Replaced stale pull #785; add config bool return_record_name
Browse files Browse the repository at this point in the history
slominskir committed Feb 4, 2021
1 parent 8069f26 commit 4ef0b43
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions src/confluent_kafka/schema_registry/avro.py
Original file line number Diff line number Diff line change
@@ -137,7 +137,8 @@ class AvroSerializer(Serializer):

# default configuration
_default_conf = {'auto.register.schemas': True,
'subject.name.strategy': topic_subject_name_strategy}
'subject.name.strategy': topic_subject_name_strategy,
'return.record.name': False}

def __init__(self, schema_str, schema_registry_client,
to_dict=None, conf=None):
@@ -258,15 +259,19 @@ class AvroDeserializer(Deserializer):
from_dict (callable, optional): Callable(dict, SerializationContext) -> object.
Converts dict to an instance of some object.
return_record_name (bool): If True, when reading a union of records, the result will
be a tuple where the first value is the name of the record and the second value is
the record itself. Defaults to False.
See Also:
`Apache Avro Schema Declaration <https://avro.apache.org/docs/current/spec.html#schemas>`_
`Apache Avro Schema Resolution <https://avro.apache.org/docs/1.8.2/spec.html#Schema+Resolution>`_
"""
__slots__ = ['_reader_schema', '_registry', '_from_dict', '_writer_schemas']
__slots__ = ['_reader_schema', '_registry', '_from_dict', '_writer_schemas', '_return_record_name']

def __init__(self, schema_str, schema_registry_client, from_dict=None):
def __init__(self, schema_str, schema_registry_client, from_dict=None, return_record_name=False):
self._registry = schema_registry_client
self._writer_schemas = {}

@@ -277,6 +282,10 @@ def __init__(self, schema_str, schema_registry_client, from_dict=None):
" from_dict(SerializationContext, dict) -> object")
self._from_dict = from_dict

self._return_record_name = return_record_name
if not isinstance(self._return_record_name, bool):
raise ValueError("return.record.name must be a boolean value")

def __call__(self, value, ctx):
"""
Decodes a Confluent Schema Registry formatted Avro bytes to an object.
@@ -320,7 +329,8 @@ def __call__(self, value, ctx):

obj_dict = schemaless_reader(payload,
writer_schema,
self._reader_schema)
self._reader_schema,
self._return_record_name)

if self._from_dict is not None:
return self._from_dict(obj_dict, ctx)

0 comments on commit 4ef0b43

Please sign in to comment.