Skip to content

Commit

Permalink
Resolving issue 679.
Browse files Browse the repository at this point in the history
The previous implementation of the schema resolver wasn't resolving recursively the entities of a schema that was containing references. With that implementation the schema are recursively resolved and the references are sent to the parser.
  • Loading branch information
eliax1996 committed Jul 21, 2023
1 parent 779676f commit f523d2f
Show file tree
Hide file tree
Showing 23 changed files with 534 additions and 47 deletions.
13 changes: 10 additions & 3 deletions karapace/schema_references.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
from __future__ import annotations

from karapace.dataclasses import default_dataclass
from karapace.typing import JsonData, ResolvedVersion, SchemaId, Subject
from typing import List, Mapping, NewType, TypeVar
from karapace.typing import JsonData, JsonObject, ResolvedVersion, SchemaId, Subject
from typing import cast, List, Mapping, NewType, TypeVar

Referents = NewType("Referents", List[SchemaId])


T = TypeVar("T")


Expand Down Expand Up @@ -64,6 +63,14 @@ def to_dict(self) -> JsonData:
"version": self.version,
}

@staticmethod
def from_dict(data: JsonObject) -> Reference:
return Reference(
name=str(data["name"]),
subject=Subject(str(data["subject"])),
version=ResolvedVersion(cast(int, data["version"])),
)


def reference_from_mapping(
data: Mapping[str, object],
Expand Down
66 changes: 49 additions & 17 deletions karapace/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
from google.protobuf.message import DecodeError
from jsonschema import ValidationError
from karapace.client import Client
from karapace.dependency import Dependency
from karapace.errors import InvalidReferences
from karapace.protobuf.exception import ProtobufTypeException
from karapace.protobuf.io import ProtobufDatumReader, ProtobufDatumWriter
from karapace.schema_models import InvalidSchema, ParsedTypedSchema, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import Reference, reference_from_mapping
from karapace.typing import SchemaId, Subject
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping
from karapace.typing import ResolvedVersion, SchemaId, Subject
from karapace.utils import json_decode, json_encode
from typing import Any, Callable, Dict, List, MutableMapping, Optional, Tuple
from urllib.parse import quote
Expand Down Expand Up @@ -101,16 +102,40 @@ async def post_new_schema(
raise SchemaRetrievalError(result.json())
return SchemaId(result.json()["id"])

async def get_latest_schema(self, subject: str) -> Tuple[SchemaId, ParsedTypedSchema]:
result = await self.client.get(f"subjects/{quote(subject)}/versions/latest")
async def get_schema(
self,
subject: Subject,
version: Optional[ResolvedVersion] = None,
) -> Tuple[SchemaId, ValidatedTypedSchema, ResolvedVersion]:
version_str = str(version) if version is not None else "latest"
result = await self.client.get(f"subjects/{quote(subject)}/versions/{version_str}")
if not result.ok:
raise SchemaRetrievalError(result.json())
json_result = result.json()
if "id" not in json_result or "schema" not in json_result:
if "id" not in json_result or "schema" not in json_result or "version" not in json_result:
raise SchemaRetrievalError(f"Invalid result format: {json_result}")

if "references" in json_result:
references = [Reference.from_dict(data) for data in json_result["references"]]
dependencies = {}
for reference in references:
dependencies[reference.name] = await self.get_schema(reference.subject, reference.version)
else:
references = None
dependencies = None

try:
schema_type = SchemaType(json_result.get("schemaType", "AVRO"))
return SchemaId(json_result["id"]), ParsedTypedSchema.parse(schema_type, json_result["schema"])
return (
SchemaId(json_result["id"]),
ValidatedTypedSchema.parse(
schema_type,
json_result["schema"],
references=references,
dependencies=dependencies,
),
ResolvedVersion(json_result["version"]),
)
except InvalidSchema as e:
raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e

Expand Down Expand Up @@ -138,15 +163,22 @@ async def get_schema_for_id(self, schema_id: SchemaId) -> Tuple[TypedSchema, Lis
raise InvalidReferences from exc
parsed_references.append(reference)
if parsed_references:
return (
ParsedTypedSchema.parse(
schema_type,
json_result["schema"],
references=parsed_references,
),
subjects,
)
return ParsedTypedSchema.parse(schema_type, json_result["schema"]), subjects
dependencies = {}

for reference in parsed_references:
if isinstance(reference, LatestVersionReference):
_, schema, version = await self.get_schema(reference.subject)
else:
_, schema, version = await self.get_schema(reference.subject, reference.version)

dependencies[reference.name] = Dependency(reference.name, reference.subject, version, schema)

return (
ParsedTypedSchema.parse(
schema_type, json_result["schema"], references=parsed_references, dependencies=dependencies
),
subjects,
)
except InvalidSchema as e:
raise SchemaRetrievalError(f"Failed to parse schema string from response: {json_result}") from e

Expand Down Expand Up @@ -204,9 +236,9 @@ def get_subject_name(

return Subject(f"{self.subject_name_strategy(topic_name, namespace)}-{subject_type}")

async def get_schema_for_subject(self, subject: str) -> TypedSchema:
async def get_schema_for_subject(self, subject: Subject) -> TypedSchema:
assert self.registry_client, "must not call this method after the object is closed."
schema_id, schema = await self.registry_client.get_latest_schema(subject)
schema_id, schema, _ = await self.registry_client.get_schema(subject)
async with self.state_lock:
schema_ser = str(schema)
self.schemas_to_ids[schema_ser] = schema_id
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
syntax = "proto3";

import "c_ab748295b306e9585e736a0a80bfd249.proto";

message Example {
Reference example = 1;
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
syntax = "proto3";

message Reference {
string name = 1;
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions tests/integration/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async def test_remote_client(registry_async_client: Client) -> None:
assert sc_id >= 0
stored_schema, _ = await reg_cli.get_schema_for_id(sc_id)
assert stored_schema == schema_avro, f"stored schema {stored_schema.to_dict()} is not {schema_avro.to_dict()}"
stored_id, stored_schema = await reg_cli.get_latest_schema(subject)
stored_id, stored_schema, _ = await reg_cli.get_schema(subject)
assert stored_id == sc_id
assert stored_schema == schema_avro

Expand All @@ -31,6 +31,6 @@ async def test_remote_client_tls(registry_async_client_tls: Client) -> None:
assert sc_id >= 0
stored_schema, _ = await reg_cli.get_schema_for_id(sc_id)
assert stored_schema == schema_avro, f"stored schema {stored_schema.to_dict()} is not {schema_avro.to_dict()}"
stored_id, stored_schema = await reg_cli.get_latest_schema(subject)
stored_id, stored_schema = await reg_cli.get_schema(subject)
assert stored_id == sc_id
assert stored_schema == schema_avro
4 changes: 2 additions & 2 deletions tests/integration/test_client_protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async def test_remote_client_protobuf(registry_async_client):
assert sc_id >= 0
stored_schema, _ = await reg_cli.get_schema_for_id(sc_id)
assert stored_schema == schema_protobuf, f"stored schema {stored_schema} is not {schema_protobuf}"
stored_id, stored_schema = await reg_cli.get_latest_schema(subject)
stored_id, stored_schema, _ = await reg_cli.get_schema(subject)
assert stored_id == sc_id
assert stored_schema == schema_protobuf

Expand All @@ -33,6 +33,6 @@ async def test_remote_client_protobuf2(registry_async_client):
assert sc_id >= 0
stored_schema, _ = await reg_cli.get_schema_for_id(sc_id)
assert stored_schema == schema_protobuf, f"stored schema {stored_schema} is not {schema_protobuf}"
stored_id, stored_schema = await reg_cli.get_latest_schema(subject)
stored_id, stored_schema, _ = await reg_cli.get_schema(subject)
assert stored_id == sc_id
assert stored_schema == schema_protobuf_after
131 changes: 131 additions & 0 deletions tests/integration/test_rest_consumer_protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@
Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""

from karapace.client import Client
from karapace.kafka_rest_apis import KafkaRestAdminClient
from karapace.protobuf.kotlin_wrapper import trim_margin
from tests.integration.test_rest import NEW_TOPIC_TIMEOUT
from tests.utils import (
new_consumer,
new_topic,
repeat_until_successful_request,
REST_HEADERS,
schema_data,
schema_data_second,
wait_for_topics,
)

import pytest
Expand Down Expand Up @@ -74,3 +80,128 @@ async def test_publish_consume_protobuf_second(rest_async_client, admin_client,
data_values = [x["value"] for x in data]
for expected, actual in zip(publish_payload, data_values):
assert expected == actual, f"Expecting {actual} to be {expected}"


async def test_publish_protobuf_with_references(
rest_async_client: Client,
admin_client: KafkaRestAdminClient,
registry_async_client: Client,
):
topic_name = new_topic(admin_client)
subject_reference = "reference"
subject_topic = f"{topic_name}-value"

await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)

reference_schema = trim_margin(
"""
|syntax = "proto3";
|message Reference {
| string name = 1;
|}
|"""
)

topic_schema = trim_margin(
"""
|syntax = "proto3";
|import "Reference.proto";
|message Example {
| Reference example = 1;
|}
|"""
)

res = await registry_async_client.post(
f"subjects/{subject_reference}/versions", json={"schemaType": "PROTOBUF", "schema": reference_schema}
)
assert "id" in res.json()

res = await registry_async_client.post(
f"subjects/{subject_topic}/versions",
json={
"schemaType": "PROTOBUF",
"schema": topic_schema,
"references": [
{
"name": "Reference.proto",
"subject": subject_reference,
"version": 1,
}
],
},
)
topic_schema_id = res.json()["id"]

example_message = {"value_schema_id": topic_schema_id, "records": [{"value": {"example": {"name": "myname"}}}]}

res = await rest_async_client.post(
f"/topics/{topic_name}",
json=example_message,
headers=REST_HEADERS["avro"],
)
assert res.status_code == 200


# async def test_publish_consume_protobuf_with_references(
# rest_async_client: Client,
# admin_client: KafkaRestAdminClient,
# registry_async_client: Client,
# ):
# topic_name = new_topic(admin_client, num_partitions=1)
# subject_reference = "reference"
# subject_topic = f"{topic_name}-value"
# await wait_for_topics(rest_async_client, topic_names=[topic_name], timeout=NEW_TOPIC_TIMEOUT, sleep=1)
# reference_schema = trim_margin(
# """
# |syntax = "proto3";
# |message Reference {
# | string name = 1;
# |}
# |"""
# )
#
# topic_schema = trim_margin(
# """
# |syntax = "proto3";
# |import "Reference.proto";
# |message Example {
# | Reference example = 1;
# |}
# |"""
# )
#
# res = await registry_async_client.post(
# f"subjects/{subject_reference}/versions", json={"schemaType": "PROTOBUF", "schema": reference_schema}
# )
# assert "id" in res.json()
#
# res = await registry_async_client.post(
# f"subjects/{subject_topic}/versions",
# json={
# "schemaType": "PROTOBUF",
# "schema": topic_schema,
# "references": [
# {
# "name": "Reference.proto",
# "subject": subject_reference,
# "version": 1,
# }
# ],
# },
# )
# topic_schema_id = res.json()["id"]
#
# example_message = {"value_schema_id": topic_schema_id, "records": [{"value": {"example": {"name": "myname"}}}]}
# res = await rest_async_client.post(
# f"/topics/{topic_name}",
# json=example_message,
# headers=REST_HEADERS["avro"],
# )
# assert res.status_code == 200
#
# first_partition_res = await rest_async_client.get(f"/topics/{topic_name}/partitions/0")
#

# todo: add test with dependency of level 2, a dependency that require another dependency
# def recursive_schema_definition_test():
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
syntax = "proto3";

import "c_ef4d87254985d5212360c04fed3846a5.proto";

option java_package = "com.serge.protobuf";
option java_outer_classname = "TestMessage";

message Message {
int32 index = 1;
Query qry = 2;
}
Loading

0 comments on commit f523d2f

Please sign in to comment.