Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: align transitive compatibility checks #953

Merged
merged 3 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ Test the compatibility of a schema with the latest schema under subject "test-ke
http://localhost:8081/compatibility/subjects/test-key/versions/latest
{"is_compatible":true}

NOTE: if the subject's compatibility mode is transitive (BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE or FULL_TRANSITIVE) then the
compatibility is checked not only against the latest schema, but also against all previous schemas, as it would be done
when trying to register the new schema through the `subjects/<subject-key>/versions` endpoint.

Get current global backwards compatibility setting value::

$ curl -X GET http://localhost:8081/config
Expand Down
133 changes: 0 additions & 133 deletions src/karapace/compatibility/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,7 @@
Copyright (c) 2019 Aiven Ltd
See LICENSE for details
"""
from avro.compatibility import (
merge,
ReaderWriterCompatibilityChecker as AvroChecker,
SchemaCompatibilityResult,
SchemaCompatibilityType,
SchemaIncompatibilityType,
)
from avro.schema import Schema as AvroSchema
from enum import Enum, unique
from jsonschema import Draft7Validator
from karapace.compatibility.jsonschema.checks import compatibility as jsonschema_compatibility, incompatible_schema
from karapace.compatibility.protobuf.checks import check_protobuf_schema_compatibility
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_models import ParsedTypedSchema, ValidatedTypedSchema
from karapace.schema_reader import SchemaType
from karapace.utils import assert_never

import logging

Expand Down Expand Up @@ -54,121 +39,3 @@ def is_transitive(self) -> bool:
"FULL_TRANSITIVE",
}
return self.value in TRANSITIVE_MODES


def check_avro_compatibility(reader_schema: AvroSchema, writer_schema: AvroSchema) -> SchemaCompatibilityResult:
return AvroChecker().get_compatibility(reader=reader_schema, writer=writer_schema)


def check_jsonschema_compatibility(reader: Draft7Validator, writer: Draft7Validator) -> SchemaCompatibilityResult:
return jsonschema_compatibility(reader, writer)


def check_protobuf_compatibility(reader: ProtobufSchema, writer: ProtobufSchema) -> SchemaCompatibilityResult:
return check_protobuf_schema_compatibility(reader, writer)


def check_compatibility(
old_schema: ParsedTypedSchema,
new_schema: ValidatedTypedSchema,
compatibility_mode: CompatibilityModes,
) -> SchemaCompatibilityResult:
"""Check that `old_schema` and `new_schema` are compatible under `compatibility_mode`."""
if compatibility_mode is CompatibilityModes.NONE:
LOG.info("Compatibility level set to NONE, no schema compatibility checks performed")
return SchemaCompatibilityResult(SchemaCompatibilityType.compatible)

if old_schema.schema_type is not new_schema.schema_type:
return incompatible_schema(
incompat_type=SchemaIncompatibilityType.type_mismatch,
message=f"Comparing different schema types: {old_schema.schema_type} with {new_schema.schema_type}",
location=[],
)

if old_schema.schema_type is SchemaType.AVRO:
assert isinstance(old_schema.schema, AvroSchema)
assert isinstance(new_schema.schema, AvroSchema)
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = check_avro_compatibility(
reader_schema=new_schema.schema,
writer_schema=old_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = check_avro_compatibility(
reader_schema=old_schema.schema,
writer_schema=new_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = check_avro_compatibility(
reader_schema=new_schema.schema,
writer_schema=old_schema.schema,
)
result = merge(
result,
check_avro_compatibility(
reader_schema=old_schema.schema,
writer_schema=new_schema.schema,
),
)

elif old_schema.schema_type is SchemaType.JSONSCHEMA:
assert isinstance(old_schema.schema, Draft7Validator)
assert isinstance(new_schema.schema, Draft7Validator)
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = check_jsonschema_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = check_jsonschema_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = check_jsonschema_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
result = merge(
result,
check_jsonschema_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
),
)

elif old_schema.schema_type is SchemaType.PROTOBUF:
assert isinstance(old_schema.schema, ProtobufSchema)
assert isinstance(new_schema.schema, ProtobufSchema)
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = check_protobuf_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = check_protobuf_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = check_protobuf_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
result = merge(
result,
check_protobuf_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
),
)

else:
assert_never(f"Unknown schema_type {old_schema.schema_type}")

return result
138 changes: 138 additions & 0 deletions src/karapace/compatibility/schema_compatibility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""
Copyright (c) 2024 Aiven Ltd
See LICENSE for details
"""
from avro.compatibility import (
merge,
ReaderWriterCompatibilityChecker as AvroChecker,
SchemaCompatibilityResult,
SchemaCompatibilityType,
SchemaIncompatibilityType,
)
from avro.schema import Schema as AvroSchema
from jsonschema import Draft7Validator
from karapace.compatibility import CompatibilityModes
from karapace.compatibility.jsonschema.checks import compatibility as jsonschema_compatibility, incompatible_schema
from karapace.compatibility.protobuf.checks import check_protobuf_schema_compatibility
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_models import ParsedTypedSchema, ValidatedTypedSchema
from karapace.schema_type import SchemaType
from karapace.utils import assert_never

import logging

LOG = logging.getLogger(__name__)


class SchemaCompatibility:
@staticmethod
def check_compatibility(
old_schema: ParsedTypedSchema,
new_schema: ValidatedTypedSchema,
compatibility_mode: CompatibilityModes,
) -> SchemaCompatibilityResult:
"""Check that `old_schema` and `new_schema` are compatible under `compatibility_mode`."""

if compatibility_mode is CompatibilityModes.NONE:
LOG.info("Compatibility level set to NONE, no schema compatibility checks performed")
return SchemaCompatibilityResult(SchemaCompatibilityType.compatible)

if old_schema.schema_type is not new_schema.schema_type:
return incompatible_schema(
incompat_type=SchemaIncompatibilityType.type_mismatch,
message=f"Comparing different schema types: {old_schema.schema_type} with {new_schema.schema_type}",
location=[],
)

if old_schema.schema_type is SchemaType.AVRO:
assert isinstance(old_schema.schema, AvroSchema)
assert isinstance(new_schema.schema, AvroSchema)
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = SchemaCompatibility.check_avro_compatibility(
reader_schema=new_schema.schema,
writer_schema=old_schema.schema,
)
elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = SchemaCompatibility.check_avro_compatibility(
reader_schema=old_schema.schema,
writer_schema=new_schema.schema,
)
elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = SchemaCompatibility.check_avro_compatibility(
reader_schema=new_schema.schema,
writer_schema=old_schema.schema,
)
result = merge(
result,
SchemaCompatibility.check_avro_compatibility(
reader_schema=old_schema.schema,
writer_schema=new_schema.schema,
),
)
elif old_schema.schema_type is SchemaType.JSONSCHEMA:
assert isinstance(old_schema.schema, Draft7Validator)
assert isinstance(new_schema.schema, Draft7Validator)
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = SchemaCompatibility.check_jsonschema_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = SchemaCompatibility.check_jsonschema_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
)
elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = SchemaCompatibility.check_jsonschema_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
result = merge(
result,
SchemaCompatibility.check_jsonschema_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
),
)
elif old_schema.schema_type is SchemaType.PROTOBUF:
assert isinstance(old_schema.schema, ProtobufSchema)
assert isinstance(new_schema.schema, ProtobufSchema)
if compatibility_mode in {CompatibilityModes.BACKWARD, CompatibilityModes.BACKWARD_TRANSITIVE}:
result = SchemaCompatibility.check_protobuf_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
elif compatibility_mode in {CompatibilityModes.FORWARD, CompatibilityModes.FORWARD_TRANSITIVE}:
result = SchemaCompatibility.check_protobuf_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
)

elif compatibility_mode in {CompatibilityModes.FULL, CompatibilityModes.FULL_TRANSITIVE}:
result = SchemaCompatibility.check_protobuf_compatibility(
reader=new_schema.schema,
writer=old_schema.schema,
)
result = merge(
result,
SchemaCompatibility.check_protobuf_compatibility(
reader=old_schema.schema,
writer=new_schema.schema,
),
)
else:
assert_never(f"Unknown schema_type {old_schema.schema_type}")

return result

@staticmethod
def check_avro_compatibility(reader_schema: AvroSchema, writer_schema: AvroSchema) -> SchemaCompatibilityResult:
return AvroChecker().get_compatibility(reader=reader_schema, writer=writer_schema)

@staticmethod
def check_jsonschema_compatibility(reader: Draft7Validator, writer: Draft7Validator) -> SchemaCompatibilityResult:
return jsonschema_compatibility(reader, writer)

@staticmethod
def check_protobuf_compatibility(reader: ProtobufSchema, writer: ProtobufSchema) -> SchemaCompatibilityResult:
return check_protobuf_schema_compatibility(reader, writer)
Loading
Loading