Skip to content

Commit

Permalink
fix: align transitive compatibility checks
Browse files Browse the repository at this point in the history
Make so that the transitive compatibility checks that are done in the
schema creation endpoint are also done in the schema validation
endpoint.

In the creation endpoint (`/subjects/<subject-key>/versions`), if the
compatibility mode is transient then the new schema is checked against
all schemas.

In the validation endpoint (`/compatibility/subjects/<subject-key>/versions/<schema-version>`):
 - Before this fix, only the latest schema is checked against (even in
   case of transitive mode)
 - After this fix, in case of transitive mode then all schema are
   checked against.
   Note that in this case the version provided in the POST request
   (`<schema-version>`) is ignored.

[EC-289]
  • Loading branch information
davide-armand committed Sep 23, 2024
1 parent 4879bef commit d4b7cc0
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 15 deletions.
4 changes: 4 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ Test the compatibility of a schema with the latest schema under subject "test-ke
http://localhost:8081/compatibility/subjects/test-key/versions/latest
{"is_compatible":true}

NOTE: if the subject's compatibility mode is transitive (BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE or FULL_TRANSITIVE) then the
compatibility is checked not only against the latest schema, but also against all previous schemas, as it would be done
when trying to register the new schema through the `subjects/<subject-key>/versions` endpoint.

Get current global backwards compatibility setting value::

$ curl -X GET http://localhost:8081/config
Expand Down
2 changes: 1 addition & 1 deletion karapace/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ def check_schema_compatibility(
)

if is_incompatible(result):
break
return result

return result

Expand Down
24 changes: 12 additions & 12 deletions karapace/schema_registry_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from http import HTTPStatus
from karapace.auth import HTTPAuthorizer, Operation, User
from karapace.compatibility import CompatibilityModes
from karapace.compatibility.schema_compatibility import SchemaCompatibility
from karapace.compatibility.jsonschema.checks import is_incompatible
from karapace.compatibility.schema_compatibility import SchemaCompatibility
from karapace.config import Config
from karapace.errors import (
IncompatibleSchema,
Expand Down Expand Up @@ -396,14 +396,15 @@ async def compatibility_check(
status=HTTPStatus.INTERNAL_SERVER_ERROR,
)

new_schema = self.get_new_schema(request.json, content_type)
old_schema = self.get_old_schema(subject, Versioner.V(version), content_type)

result = SchemaCompatibility.check_compatibility(
old_schema,
new_schema,
compatibility_mode
)
new_schema = self._get_new_schema(request.json, content_type)
if compatibility_mode.is_transitive():
# Ignore the schema version provided in the rest api call (`version`)
# Instead check against all previous versions (including `version` if existing)
result = self.schema_registry.check_schema_compatibility(new_schema, subject)
else:
# Check against the schema version provided in the rest api call (`version`)
old_schema = self._get_old_schema(subject, Versioner.V(version), content_type)
result = SchemaCompatibility.check_compatibility(old_schema, new_schema, compatibility_mode)

if is_incompatible(result):
self.r({"is_compatible": False}, content_type)
Expand Down Expand Up @@ -1318,7 +1319,7 @@ def no_master_error(self, content_type: str) -> None:
status=HTTPStatus.INTERNAL_SERVER_ERROR,
)

def get_new_schema(self, body: JsonObject, content_type: str) -> ValidatedTypedSchema:
def _get_new_schema(self, body: JsonObject, content_type: str) -> ValidatedTypedSchema:
schema_type = self._validate_schema_type(content_type=content_type, data=body)
references = self._validate_references(content_type, schema_type, body)
try:
Expand All @@ -1339,8 +1340,7 @@ def get_new_schema(self, body: JsonObject, content_type: str) -> ValidatedTypedS
status=HTTPStatus.UNPROCESSABLE_ENTITY,
)


def get_old_schema(self, subject: Subject, version: Version, content_type: str) -> ValidatedTypedSchema:
def _get_old_schema(self, subject: Subject, version: Version, content_type: str) -> ValidatedTypedSchema:
try:
old = self.schema_registry.subject_version_get(subject=subject, version=version)
except InvalidVersion:
Expand Down
41 changes: 39 additions & 2 deletions tests/unit/test_schema_registry_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,43 @@ async def test_forward_when_not_ready() -> None:
)


async def test_compatibility_check_in_transitive_mode() -> None:
# Given
config = set_config_defaults(DEFAULTS)
config["compatibility"] = "FULL_TRANSITIVE"
controller = KarapaceSchemaRegistryController(config=config)

result = Mock(spec=SchemaCompatibilityResult)
result.compatibility = SchemaCompatibilityType.compatible

controller.schema_registry = Mock(spec=KarapaceSchemaRegistry)
controller.schema_registry.get_compatibility_mode = Mock(return_value=CompatibilityModes.FULL_TRANSITIVE)
controller.schema_registry.check_schema_compatibility = Mock(return_value=result)

new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, '{"type": "array"}')
controller._get_new_schema = Mock(return_value=new_schema)

request_mock = Mock(HTTPRequest)
request_mock.json = '{"schema": "{}", "schemaType": "JSON", "references": [], "metadata": {}, "ruleSet": {}}'

subject = Subject("subject1")

# When
with pytest.raises(HTTPResponse) as exc_info:
await controller.compatibility_check(
"application/json",
subject=subject,
version="1",
request=request_mock,
)

# Then run same checks as in schema registration
controller.schema_registry.check_schema_compatibility.assert_called_once_with(new_schema, subject)

assert exc_info.type is HTTPResponse
assert str(exc_info.value) == "HTTPResponse 200"


async def test_compatibility_check_in_not_transitive_mode() -> None:
# Given
config = set_config_defaults(DEFAULTS)
Expand All @@ -86,10 +123,10 @@ async def test_compatibility_check_in_not_transitive_mode() -> None:
SchemaCompatibility.check_compatibility = Mock(return_value=result)

new_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, '{"type": "array"}')
controller.get_new_schema = Mock(return_value=new_schema)
controller._get_new_schema = Mock(return_value=new_schema)

old_schema = ValidatedTypedSchema.parse(SchemaType.JSONSCHEMA, '{"type": "array"}')
controller.get_old_schema = Mock(return_value=old_schema)
controller._get_old_schema = Mock(return_value=old_schema)

request_mock = Mock(HTTPRequest)
request_mock.json = '{"schema": "{}", "schemaType": "JSON", "references": [], "metadata": {}, "ruleSet": {}}'
Expand Down
4 changes: 4 additions & 0 deletions website/source/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ Test the compatibility of a schema with the latest schema under subject "test-ke
$KARAPACE_REGISTRY_URI/compatibility/subjects/test-key/versions/latest
{"is_compatible":true}

NOTE: if the subject's compatibility mode is transitive (BACKWARD_TRANSITIVE, FORWARD_TRANSITIVE or FULL_TRANSITIVE) then the
compatibility is checked not only against the latest schema, but also against all previous schemas, as it would be done
when trying to register the new schema through the `subjects/<subject-key>/versions` endpoint.

Get current global backwards compatibility setting value::

$ curl -X GET $KARAPACE_REGISTRY_URI/config
Expand Down

0 comments on commit d4b7cc0

Please sign in to comment.