Skip to content

Commit

Permalink
Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
anchitj committed Mar 2, 2023
1 parent dd73f37 commit f50d5f6
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 147 deletions.
95 changes: 51 additions & 44 deletions src/confluent_kafka/schema_registry/json_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import json
import struct
from typing import overload

from jsonschema import validate, ValidationError, RefResolver

Expand All @@ -44,6 +43,35 @@ def __exit__(self, *args):
return False


def _id_of(schema):
"""
Returns the schema id if present otherwise None.
:param schema: Schema to return id of.
:return: Id of schema if present otherwise None.
"""
return schema.get('$id', "None")


def _resolve_named_schema(schema, schema_registry_client, named_schemas=None):
"""
Resolves named schemas referenced by the provided schema recursively.
:param schema: Schema to resolve named schemas for.
:param schema_registry_client: SchemaRegistryClient to use for retrieval.
:param named_schemas: Dict of named schemas resolved recursively.
:return: named_schemas dict.
"""
if named_schemas is None:
named_schemas = {}
for ref in schema.references:
referenced_schema = schema_registry_client.get_version(ref.subject, ref.version)
_resolve_named_schema(referenced_schema.schema, schema_registry_client, named_schemas)
referenced_schema_dict = json.loads(referenced_schema.schema.schema_str)
named_schemas[_id_of(referenced_schema_dict)] = referenced_schema_dict
schema_dict = json.loads(schema.schema_str)
named_schemas[_id_of(schema_dict)] = schema_dict
return named_schemas


class JSONSerializer(Serializer):
"""
Serializer that outputs JSON encoded data with Confluent Schema Registry framing.
Expand Down Expand Up @@ -123,7 +151,7 @@ class JSONSerializer(Serializer):
callable with JSONSerializer.
Args:
schema_str (str): `JSON Schema definition. <https://json-schema.org/understanding-json-schema/reference/generic.html>`_
schema_str (str, Schema): `JSON Schema definition. <https://json-schema.org/understanding-json-schema/reference/generic.html>`_
schema_registry_client (SchemaRegistryClient): Schema Registry
client instance.
Expand All @@ -142,22 +170,14 @@ class JSONSerializer(Serializer):
'use.latest.version': False,
'subject.name.strategy': topic_subject_name_strategy}

@overload
def __init__(self, schema: str, schema_registry_client, to_dict=None, conf=None):
...

@overload
def __init__(self, schema: Schema, schema_registry_client, to_dict=None, conf=None):
...

def __init__(self, schema, schema_registry_client, to_dict=None, conf=None):
if isinstance(schema, str):
self._schema = Schema(schema, schema_type="JSON")
def __init__(self, schema_str, schema_registry_client, to_dict=None, conf=None):
if isinstance(schema_str, str):
self._schema = Schema(schema_str, schema_type="JSON")
else:
if not isinstance(schema, Schema):
if not isinstance(schema_str, Schema):
raise ValueError('You must pass either str or Schema')
else:
self._schema = schema
self._schema = schema_str

self._registry = schema_registry_client
self._schema_id = None
Expand Down Expand Up @@ -254,11 +274,13 @@ def __call__(self, obj, ctx):
value = obj

try:
if self._schema.named_schemas is not None and len(self._schema.named_schemas) > 0:
named_schemas = _resolve_named_schema(self._schema, self._registry)
# If there are any references
if len(named_schemas) > 1:
validate(instance=value, schema=self._parsed_schema,
resolver=RefResolver(self._parsed_schema["$id"],
resolver=RefResolver(_id_of(self._parsed_schema),
self._parsed_schema,
store=self._schema.named_schemas))
store=named_schemas))
else:
validate(instance=value, schema=self._parsed_schema)
except ValidationError as ve:
Expand All @@ -280,30 +302,22 @@ class JSONDeserializer(Deserializer):
framing.
Args:
schema (str): `JSON schema definition <https://json-schema.org/understanding-json-schema/reference/generic.html>`_ use for validating records.
schema_str (str, Schema): `JSON schema definition <https://json-schema.org/understanding-json-schema/reference/generic.html>`_ use for validating records.
from_dict (callable, optional): Callable(dict, SerializationContext) -> object.
Converts a dict to a Python object instance.
""" # noqa: E501

__slots__ = ['_parsed_schema', '_from_dict', '_registry', '_schema']

@overload
def __init__(self, schema: str, from_dict=None, schema_registry_client=None):
...

@overload
def __init__(self, schema: Schema, from_dict=None, schema_registry_client=None):
...

def __init__(self, schema, from_dict=None, schema_registry_client=None):
if isinstance(schema, str):
self._schema = Schema(schema, schema_type="JSON")
def __init__(self, schema_str, from_dict=None, schema_registry_client=None):
if isinstance(schema_str, str):
self._schema = Schema(schema_str, schema_type="JSON")
else:
if not isinstance(schema, Schema):
if not isinstance(schema_str, Schema):
raise ValueError('You must pass either str or Schema')
else:
self._schema = schema
self._schema = schema_str
self._parsed_schema = json.loads(self._schema.schema_str)
self._registry = schema_registry_client

Expand Down Expand Up @@ -347,24 +361,17 @@ def __call__(self, data, ctx):
"was not produced with a Confluent "
"Schema Registry serializer".format(magic))

named_schemas = None
if self._registry is not None:
registered_schema: Schema = self._registry.get_schema(schema_id)
named_schemas = {}
for ref in registered_schema.references:
ref_reg_schema = self._registry.get_version(ref.subject, ref.version)
ref_dict = json.loads(ref_reg_schema.schema.schema_str)
named_schemas[ref_dict["$id"]] = ref_dict

# JSON documents are self-describing; no need to query schema
obj_dict = json.loads(payload.read())

try:
if named_schemas is not None and len(named_schemas) > 0:
if self._registry is not None:
registered_schema = self._registry.get_schema(schema_id)
validate(instance=obj_dict,
schema=self._parsed_schema, resolver=RefResolver(self._parsed_schema["$id"],
schema=self._parsed_schema, resolver=RefResolver(_id_of(self._parsed_schema),
self._parsed_schema,
named_schemas))
store=_resolve_named_schema(
registered_schema, self._registry)))
else:
validate(instance=obj_dict, schema=self._parsed_schema)
except ValidationError as ve:
Expand Down
7 changes: 2 additions & 5 deletions src/confluent_kafka/schema_registry/schema_registry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,19 +689,16 @@ class Schema(object):
references ([SchemaReference]): SchemaReferences used in this schema.
schema_type (str): The schema type: AVRO, PROTOBUF or JSON.
named_schemas (dict): Named schemas
"""

__slots__ = ['schema_str', 'references', 'schema_type', 'named_schemas', '_hash']
__slots__ = ['schema_str', 'references', 'schema_type', '_hash']

def __init__(self, schema_str, schema_type, references=[], named_schemas={}):
def __init__(self, schema_str, schema_type, references=[]):
super(Schema, self).__init__()

self.schema_str = schema_str
self.schema_type = schema_type
self.references = references
self.named_schemas = named_schemas
self._hash = hash(schema_str)

def __eq__(self, other):
Expand Down
22 changes: 22 additions & 0 deletions tests/integration/schema_registry/data/customer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://example.com/customer.schema.json",
"title": "Customer",
"description": "Customer data",
"type": "object",
"properties": {
"name": {
"description": "Customer name",
"type": "string"
},
"id": {
"description": "Customer id",
"type": "integer"
},
"email": {
"description": "Customer email",
"type": "string"
}
},
"required": [ "name", "id"]
}
24 changes: 24 additions & 0 deletions tests/integration/schema_registry/data/order.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://example.com/referencedproduct.schema.json",
"title": "Order",
"description": "Order",
"type": "object",
"properties": {
"order_details": {
"description": "Order Details",
"$ref": "http://example.com/order_details.schema.json"
},
"order_date": {
"description": "Order Date",
"type": "string",
"format": "date-time"
},
"product": {
"description": "Product",
"$ref": "http://example.com/product.schema.json"
}
},
"required": [
"order_details", "product"]
}
22 changes: 22 additions & 0 deletions tests/integration/schema_registry/data/order_details.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "http://example.com/order_details.schema.json",
"title": "Order Details",
"description": "Order Details",
"type": "object",
"properties": {
"id": {
"description": "Order Id",
"type": "integer"
},
"customer": {
"description": "Customer",
"$ref": "http://example.com/customer.schema.json"
},
"payment_id": {
"description": "Payment Id",
"type": "string"
}
},
"required": [ "id", "customer"]
}
37 changes: 0 additions & 37 deletions tests/integration/schema_registry/data/referencedProduct.json

This file was deleted.

Loading

0 comments on commit f50d5f6

Please sign in to comment.