Skip to content

Commit

Permalink
JSON referenced schema support (#1514)
Browse files Browse the repository at this point in the history
* JSON referenced schema support

* Changes

* PR Feedback

* PR Feedback

* PR Feedback

* PR Feedback

* Fix wrong documentation

* PR Feedback

* Update unit tests

* PR Feedback

* Use ref.name as the id

* Remove _id_of function
  • Loading branch information
anchitj authored and emasab committed Jun 15, 2023
1 parent 718c7ae commit 7d25825
Show file tree
Hide file tree
Showing 7 changed files with 441 additions and 12 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ v2.1.0 is a feature release with the following features, fixes and enhancements:
- Added support for password protected private key in CachedSchemaRegistryClient.
- Add reference support in Schema Registry client. (@RickTalken, #1304)
- Migrated travis jobs to Semaphore CI (#1503)

- Add support for passing schema references in JSONSerializer and JSONDeserializer. (#1514)

confluent-kafka-python is based on librdkafka v2.1.0, see the
[librdkafka release notes](https://github.com/edenhill/librdkafka/releases/tag/v2.1.0)
Expand Down
79 changes: 68 additions & 11 deletions src/confluent_kafka/schema_registry/json_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import json
import struct

from jsonschema import validate, ValidationError
from jsonschema import validate, ValidationError, RefResolver

from confluent_kafka.schema_registry import (_MAGIC_BYTE,
Schema,
Expand All @@ -43,6 +43,25 @@ def __exit__(self, *args):
return False


def _resolve_named_schema(schema, schema_registry_client, named_schemas=None):
"""
Resolves named schemas referenced by the provided schema recursively.
:param schema: Schema to resolve named schemas for.
:param schema_registry_client: SchemaRegistryClient to use for retrieval.
:param named_schemas: Dict of named schemas resolved recursively.
:return: named_schemas dict.
"""
if named_schemas is None:
named_schemas = {}
if schema.references is not None:
for ref in schema.references:
referenced_schema = schema_registry_client.get_version(ref.subject, ref.version)
_resolve_named_schema(referenced_schema.schema, schema_registry_client, named_schemas)
referenced_schema_dict = json.loads(referenced_schema.schema.schema_str)
named_schemas[ref.name] = referenced_schema_dict
return named_schemas


class JSONSerializer(Serializer):
"""
Serializer that outputs JSON encoded data with Confluent Schema Registry framing.
Expand Down Expand Up @@ -122,7 +141,7 @@ class JSONSerializer(Serializer):
callable with JSONSerializer.
Args:
schema_str (str): `JSON Schema definition. <https://json-schema.org/understanding-json-schema/reference/generic.html>`_
schema_str (str, Schema): `JSON Schema definition. <https://json-schema.org/understanding-json-schema/reference/generic.html>`_ Accepts schema as either a string or a `Schema`(Schema) instance. Note that string definitions cannot reference other schemas. For referencing other schemas, use a Schema instance.
schema_registry_client (SchemaRegistryClient): Schema Registry
client instance.
Expand All @@ -134,14 +153,23 @@ class JSONSerializer(Serializer):
""" # noqa: E501
__slots__ = ['_hash', '_auto_register', '_normalize_schemas', '_use_latest_version',
'_known_subjects', '_parsed_schema', '_registry', '_schema', '_schema_id',
'_schema_name', '_subject_name_func', '_to_dict']
'_schema_name', '_subject_name_func', '_to_dict', '_are_references_provided']

_default_conf = {'auto.register.schemas': True,
'normalize.schemas': False,
'use.latest.version': False,
'subject.name.strategy': topic_subject_name_strategy}

def __init__(self, schema_str, schema_registry_client, to_dict=None, conf=None):
self._are_references_provided = False
if isinstance(schema_str, str):
self._schema = Schema(schema_str, schema_type="JSON")
elif isinstance(schema_str, Schema):
self._schema = schema_str
self._are_references_provided = bool(schema_str.references)
else:
raise TypeError('You must pass either str or Schema')

self._registry = schema_registry_client
self._schema_id = None
self._known_subjects = set()
Expand Down Expand Up @@ -178,14 +206,13 @@ def __init__(self, schema_str, schema_registry_client, to_dict=None, conf=None):
raise ValueError("Unrecognized properties: {}"
.format(", ".join(conf_copy.keys())))

schema_dict = json.loads(schema_str)
schema_dict = json.loads(self._schema.schema_str)
schema_name = schema_dict.get('title', None)
if schema_name is None:
raise ValueError("Missing required JSON schema annotation title")

self._schema_name = schema_name
self._parsed_schema = schema_dict
self._schema = Schema(schema_str, schema_type="JSON")

def __call__(self, obj, ctx):
"""
Expand Down Expand Up @@ -238,7 +265,14 @@ def __call__(self, obj, ctx):
value = obj

try:
validate(instance=value, schema=self._parsed_schema)
if self._are_references_provided:
named_schemas = _resolve_named_schema(self._schema, self._registry)
validate(instance=value, schema=self._parsed_schema,
resolver=RefResolver(self._parsed_schema.get('$id'),
self._parsed_schema,
store=named_schemas))
else:
validate(instance=value, schema=self._parsed_schema)
except ValidationError as ve:
raise SerializationError(ve.message)

Expand All @@ -258,16 +292,32 @@ class JSONDeserializer(Deserializer):
framing.
Args:
schema_str (str): `JSON schema definition <https://json-schema.org/understanding-json-schema/reference/generic.html>`_ use for validating records.
schema_str (str, Schema): `JSON schema definition <https://json-schema.org/understanding-json-schema/reference/generic.html>`_ Accepts schema as either a string or a `Schema`(Schema) instance. Note that string definitions cannot reference other schemas. For referencing other schemas, use a Schema instance.
from_dict (callable, optional): Callable(dict, SerializationContext) -> object.
Converts a dict to a Python object instance.
schema_registry_client (SchemaRegistryClient, optional): Schema Registry client instance. Needed if ``schema_str`` is a schema referencing other schemas.
""" # noqa: E501

__slots__ = ['_parsed_schema', '_from_dict']
__slots__ = ['_parsed_schema', '_from_dict', '_registry', '_are_references_provided', '_schema']

def __init__(self, schema_str, from_dict=None, schema_registry_client=None):
self._are_references_provided = False
if isinstance(schema_str, str):
schema = Schema(schema_str, schema_type="JSON")
elif isinstance(schema_str, Schema):
schema = schema_str
self._are_references_provided = bool(schema_str.references)
if self._are_references_provided and schema_registry_client is None:
raise ValueError(
"""schema_registry_client must be provided if "schema_str" is a Schema instance with references""")
else:
raise TypeError('You must pass either str or Schema')

def __init__(self, schema_str, from_dict=None):
self._parsed_schema = json.loads(schema_str)
self._parsed_schema = json.loads(schema.schema_str)
self._schema = schema
self._registry = schema_registry_client

if from_dict is not None and not callable(from_dict):
raise ValueError("from_dict must be callable with the signature"
Expand Down Expand Up @@ -313,7 +363,14 @@ def __call__(self, data, ctx):
obj_dict = json.loads(payload.read())

try:
validate(instance=obj_dict, schema=self._parsed_schema)
if self._are_references_provided:
named_schemas = _resolve_named_schema(self._schema, self._registry)
validate(instance=obj_dict,
schema=self._parsed_schema, resolver=RefResolver(self._parsed_schema.get('$id'),
self._parsed_schema,
store=named_schemas))
else:
validate(instance=obj_dict, schema=self._parsed_schema)
except ValidationError as ve:
raise SerializationError(ve.message)

Expand Down
22 changes: 22 additions & 0 deletions tests/integration/schema_registry/data/customer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://example.com/customer.schema.json",
"title": "Customer",
"description": "Customer data",
"type": "object",
"properties": {
"name": {
"description": "Customer name",
"type": "string"
},
"id": {
"description": "Customer id",
"type": "integer"
},
"email": {
"description": "Customer email",
"type": "string"
}
},
"required": [ "name", "id"]
}
24 changes: 24 additions & 0 deletions tests/integration/schema_registry/data/order.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://example.com/referencedproduct.schema.json",
"title": "Order",
"description": "Order",
"type": "object",
"properties": {
"order_details": {
"description": "Order Details",
"$ref": "http://example.com/order_details.schema.json"
},
"order_date": {
"description": "Order Date",
"type": "string",
"format": "date-time"
},
"product": {
"description": "Product",
"$ref": "http://example.com/product.schema.json"
}
},
"required": [
"order_details", "product"]
}
22 changes: 22 additions & 0 deletions tests/integration/schema_registry/data/order_details.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://example.com/order_details.schema.json",
"title": "Order Details",
"description": "Order Details",
"type": "object",
"properties": {
"id": {
"description": "Order Id",
"type": "integer"
},
"customer": {
"description": "Customer",
"$ref": "http://example.com/customer.schema.json"
},
"payment_id": {
"description": "Payment Id",
"type": "string"
}
},
"required": [ "id", "customer"]
}
Loading

0 comments on commit 7d25825

Please sign in to comment.