Skip to content

Commit

Permalink
sr: search backwards for transitive compat checks
Browse files Browse the repository at this point in the history
This matches the reference implementation and provides a more useful
output because the users likely prefer comparing the new schema against
the most recent incompatible schema.
  • Loading branch information
pgellert committed Aug 19, 2024
1 parent 1e31632 commit dcf281c
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
12 changes: 8 additions & 4 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -752,14 +752,18 @@ ss::future<compatibility_result> sharded_store::do_is_compatible(
throw as_exception(invalid_schema_type(new_schema.type()));
}

// if transitive, search all, otherwise seach forwards from version
// search backwards
// if transitive, search all, seach until version
if (
compat == compatibility_level::backward_transitive
|| compat == compatibility_level::forward_transitive
|| compat == compatibility_level::full_transitive) {
ver_it = versions.begin();
}

auto it = std::reverse_iterator(versions.end());
auto it_end = std::reverse_iterator(ver_it);

auto new_valid = co_await make_valid_schema(std::move(new_schema));

compatibility_result result{.is_compat = true};
Expand All @@ -773,13 +777,13 @@ ss::future<compatibility_result> sharded_store::do_is_compatible(
};
};

for (; result.is_compat && ver_it != versions.end(); ++ver_it) {
if (ver_it->deleted) {
for (; result.is_compat && it != it_end; ++it) {
if (it->deleted) {
continue;
}

auto old_schema = co_await get_subject_schema(
sub, ver_it->version, include_deleted::no);
sub, it->version, include_deleted::no);
auto old_valid = co_await make_valid_schema(
std::move(old_schema.schema));

Expand Down
49 changes: 49 additions & 0 deletions tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,55 @@ def test_post_compatibility_subject_version(self,
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["id"] == v1_id

@cluster(num_nodes=3)
def test_post_compatibility_subject_version_transitive_order(self):
"""
Verify the compatibility message shows the latest failing schema
"""

topic = create_topic_names(1)[0]

schema_1_data = json.dumps({"schema": schema1_def})
schema_2_data = json.dumps({"schema": schema2_def})
schema_3_data = json.dumps({"schema": schema3_def})

self.logger.debug("Posting schema 1 as a subject key")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_1_data)
self.logger.debug(f"{result_raw=}")
assert result_raw.status_code == requests.codes.ok

self.logger.debug("Set subject config - BACKWARD_TRANSITIVE")
result_raw = self._set_config_subject(
subject=f"{topic}-key",
data=json.dumps({"compatibility": "BACKWARD_TRANSITIVE"}))
self.logger.debug(f"{result_raw=}")
assert result_raw.status_code == requests.codes.ok

self.logger.debug("Posting schema 2 (compatible with schema 1)")
result_raw = self._post_subjects_subject_versions(
subject=f"{topic}-key", data=schema_2_data)
self.logger.debug(result_raw, result_raw.json())
assert result_raw.status_code == requests.codes.ok

self.logger.debug(
"Check compatibility schema 3 (incompatible with both schema 1 and 2) with verbose=True"
)
result_raw = self._post_compatibility_subject_version(
subject=f"{topic}-key",
version=1,
data=schema_3_data,
verbose=True)
self.logger.debug(result_raw, result_raw.json())
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()["is_compatible"] == False

messages = result_raw.json().get("messages", [])
assert not any(schema1_def in m for m in messages), \
f"Expected schema 3 to be compared against schema 2 only (not schema 1)"
assert any(schema2_def in m for m in messages), \
f"Expected schema 3 to be compared against schema 2 only (not schema 1)"

@cluster(num_nodes=3)
@parametrize(schemas=("avro", "avro_incompat", "AVRO"))
@parametrize(schemas=("proto3", "proto3_incompat", "PROTOBUF"))
Expand Down

0 comments on commit dcf281c

Please sign in to comment.