Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema parser causes conflict with fastavro #1156

Closed
3 of 7 tasks
gt6989b opened this issue Jul 6, 2021 · 1 comment · Fixed by #1208
Closed
3 of 7 tasks

schema parser causes conflict with fastavro #1156

gt6989b opened this issue Jul 6, 2021 · 1 comment · Fixed by #1208

Comments

@gt6989b
Copy link

gt6989b commented Jul 6, 2021

Description

The code in the reproduction section below uses Avro schema encoder in order to encode a message, which succeeds only if we transform the resulting schema encoding by getting rid of the MappingProxyType. The transformation basically leaves everything unchanged except altering MappingProxyType to dict instances.

  1. Is there a problem in the way I am calling the standard library which causes mapping proxy to be used, which in turn causes fastavro to throw? Can this be fixed by something as a user, or is this really a bug in the Confluent Kafka library?
  2. In addition, the output schemaId from registryClient.get_latest_schema() is marked in the docs to return str but returns int. If I understand correctly, this is the intended input into the schema_id parameter of serializer.encode_record_with_schema_id() (and it works correctly if I call it), which is also marked as int. Is that a typo in the docs? In other words, it seems either registryClient.get_latest_schema() should return an integer, or serializer.encode_record_with_schema_id() should take a string, or I am doing something incorrectly :) Which one is it?

Thank you very much.

Environment

I am running Python 3.9 with Confluent Kafka 1.7.0, avro-python3 1.10.0 and fastavro 1.4.1. Currently running on Windows, but also intend to deliver and support on Linux.

confluent_kafka.version() == ('1.7.0', 17235968)
confluent_kafka.libversion() == ('1.7.0', 17236223)

How to reproduce

from confluent_kafka      import Producer
from confluent_kafka.avro import CachedSchemaRegistryClient, MessageSerializer

from fastavro.schema import parse_schema
from fastavro.validation import validate

from types  import MappingProxyType
from typing import Any

import sys

def transformMap(item: Any) -> Any:
    if type(item) in {dict, MappingProxyType}:
        return {k:transformMap(v) for k,v in item.items()}
    elif type(item) is list:
        return [transformMap(v) for v in item]
    else:
        return item

def main(argv = None):
    msgType = 'InstrumentIdMsg'
    idFigi  = 'BBG123456789'
    head = {'sDateTime': 1, 'msgType': msgType,      'srcSeq': 1,
            'rDateTime': 1, 'src':     'Brownstone', 'reqID':  None,
            'sequence':  1}
    msgMap = {'head': head, 'product':  'Port', 'idIsin': None, 'idFigi': idFigi,
              'idBB': None, 'benchmark': None,  'idCusip': None,'idCins': None}

    registryClient = CachedSchemaRegistryClient(url = 'http://local.KafkaRegistry.com:8081')
    schemaId, schema, version = registryClient.get_latest_schema(msgType)

    serializer = MessageSerializer(registry_client = registryClient)

    schemaMap = schema.to_json()

    # NOTE:
    # schemaMap cannot be used since it uses mappingproxy
    # which causes validate() and parse_schema() to throw
    schemaDict = transformMap(schemaMap)

    isValid = validate(datum = msgMap, schema = schemaDict, raise_errors = True)
    parsed_schema = parse_schema(schema = schemaDict)

    msg = serializer.encode_record_with_schema_id(schema_id = schemaId,
                                                  record    = msgMap)

    producer = Producer({'bootstrap.servers': 'kafkaServer:9092'})
    producer.produce(key = idFigi,
                     topic = 'TOPIC_NAME',
                     value = msg)
    return 0


if __name__ == '__main__':
    sys.exit(main())

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • Apache Kafka broker version:
  • Client configuration: {...}
  • Operating system:
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue
@gt6989b
Copy link
Author

gt6989b commented Jul 6, 2021

Same question on Stack Overflow (no answers as of 2021-07-06): https://stackoverflow.com/q/68262245/399573

ffissore added a commit to ffissore/confluent-kafka-python that referenced this issue Sep 30, 2021
…om raising exception `TypeError: unhashable type: 'mappingproxy'`

Fixes confluentinc#1156
ffissore added a commit to ffissore/confluent-kafka-python that referenced this issue Oct 1, 2021
…om raising exception `TypeError: unhashable type: 'mappingproxy'`

Fixes confluentinc#1156
ffissore added a commit to ffissore/confluent-kafka-python that referenced this issue Dec 10, 2021
…om raising exception `TypeError: unhashable type: 'mappingproxy'`

Fixes confluentinc#1156
ffissore added a commit to ffissore/confluent-kafka-python that referenced this issue Dec 15, 2021
…om raising exception `TypeError: unhashable type: 'mappingproxy'`

Fixes confluentinc#1156
ffissore added a commit to ffissore/confluent-kafka-python that referenced this issue Jan 24, 2022
…om raising exception `TypeError: unhashable type: 'mappingproxy'`

Fixes confluentinc#1156
ffissore added a commit to ffissore/confluent-kafka-python that referenced this issue Jan 24, 2022
…om raising exception `TypeError: unhashable type: 'mappingproxy'`

Fixes confluentinc#1156
edenhill pushed a commit that referenced this issue Jan 24, 2022
…om raising exception `TypeError: unhashable type: 'mappingproxy'`

Fixes #1156
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant