A Python client used to interact with Confluent's schema registry. Supports Python 2.6 and 2.7 and 3.5. This also works within a virtual env.
The API is heavily based off of the existing Java API of Confluent schema registry.
Run python setup.py install
from the source root.
This library will be available via pip
in the future.
from confluent.schemaregistry.client import CachedSchemaRegistryClient
from confluent.schemaregistry.serializers import MessageSerializer, Util
# Note that some methods may throw exceptions if
# the registry cannot be reached, decoding/encoding fails,
# or IO fails
# some helper methods in util to get a schema
avro_schema = Util.parse_schema_from_file('/path/to/schema.avsc')
avro_schema = Util.parse_schema_from_string(open('/path/to/schema.avsc').read())
# Initialize the client
client = CachedSchemaRegistryClient(url='http://registry.host')
# Schema operations
# register a schema for a subject
schema_id = client.register('my_subject', avro_schema)
# fetch a schema by ID
avro_schema = client.get_by_id(schema_id)
# get the latest schema info for a subject
schema_id,avro_schema,schema_version = client.get_latest_schema('my_subject')
# get the version of a schema
schema_version = client.get_version('my_subject', avro_schema)
# Compatibility tests
is_compatible = client.test_compatibility('my_subject', another_schema)
# One of NONE, FULL, FORWARD, BACKWARD
new_level = client.update_compatibility('NONE','my_subject')
current_level = client.get_compatibility('my_subject')
# Message operations
# encode a record to put onto kafka
serializer = MessageSerializer(client)
record = get_obj_to_put_into_kafka()
# use the schema id directly
encoded = serializer.encode_record_with_schema_id(schema_id, record)
# use an existing schema and topic
# this will register the schema to the right subject based
# on the topic name and then serialize
encoded = serializer.encode_record_with_schema('my_topic', avro_schema, record)
# encode a record with the latest schema for the topic
# this is not efficient as it queries for the latest
# schema each time
encoded = serializer.encode_record_for_topic('my_kafka_topic', record)
# decode a message from kafka
message = get_message_from_kafka()
decoded_object = serializer.decode_message(message)
pip install unittest2
unit2 discover -s test
Tests use unittest2 due to unittest being different between 2.6 and 2.7.
The project is licensed under the Apache 2 license.