Skip to content

Commit

Permalink
schema_registry/dt: Add tests for normalization
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Pope <ben@redpanda.com>
  • Loading branch information
BenPope committed Jul 26, 2024
1 parent f06cbaf commit 86feb13
Showing 1 changed file with 131 additions and 0 deletions.
131 changes: 131 additions & 0 deletions tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,87 @@ def get_compat_dataset(type: SchemaType) -> TestCompatDataset:
assert False, f"Unsupported schema {type=}"


class TestNormalizeDataset(NamedTuple):
type: SchemaType
schema_base: str
schema_canonical: str
schema_normalized: str


def get_normalize_dataset(type: SchemaType) -> TestNormalizeDataset:
if type == SchemaType.AVRO:
return TestNormalizeDataset(type=SchemaType.AVRO,
schema_base="""{
"name": "myrecord",
"type": "record",
"fields": [
{
"name": "nested",
"type": {
"type": "array",
"items": {
"fields": [
{
"type": "string",
"name": "f1"
}
],
"name": "nested_item",
"type": "record"
}
}
}
]
}""",
schema_canonical=re.sub(
r"[\n\t\s]*", "", """{
"type": "record",
"name": "myrecord",
"fields": [
{
"name": "nested",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "nested_item",
"fields": [
{
"name": "f1",
"type": "string"
}
]
}
}
}
]
}"""),
schema_normalized=re.sub(
r"[\n\t\s]*", "", """{
"type": "record",
"name": "myrecord",
"fields": [
{
"name": "nested",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "nested_item",
"fields": [
{
"name": "f1",
"type": "string"
}
]
}
}
}
]
}"""))
assert False, f"Unsupported schema {type=}"


class SchemaRegistryEndpoints(RedpandaTest):
"""
Test schema registry against a redpanda cluster.
Expand Down Expand Up @@ -474,11 +555,14 @@ def _post_subjects_subject(self,
subject,
data,
deleted=False,
normalize=False,
headers=HTTP_POST_HEADERS,
**kwargs):
params = {}
if (deleted):
params['deleted'] = 'true'
if (normalize):
params['normalize'] = 'true'
return self._request("POST",
f"subjects/{subject}",
headers=headers,
Expand All @@ -489,12 +573,17 @@ def _post_subjects_subject(self,
def _post_subjects_subject_versions(self,
subject,
data,
normalize=False,
headers=HTTP_POST_HEADERS,
**kwargs):
params = {}
if (normalize):
params['normalize'] = 'true'
return self._request("POST",
f"subjects/{subject}/versions",
headers=headers,
data=data,
params=params,
**kwargs)

def _get_subjects_subject_versions_version(self,
Expand Down Expand Up @@ -1106,6 +1195,48 @@ def test_config(self):
assert result_raw.json(
)["message"] == f"Subject 'foo-key' not found.", f"{json.dumps(result_raw.json(), indent=1)}"

@cluster(num_nodes=3)
@parametrize(dataset_type=SchemaType.AVRO)
def test_normalize(self, dataset_type: SchemaType):
dataset = get_normalize_dataset(dataset_type)
self.logger.debug(f"testing with {dataset=}")

topics = create_topic_names(2)[0]
canonical_topic = topics[0]
normalize_topic = topics[1]

base_schema = json.dumps({
"schema": dataset.schema_base,
"schemaType": str(dataset.type)
})

self.logger.debug(
f"Register a schema against a subject - not normalized")
result_raw = self._post_subjects_subject_versions(
subject=f"{canonical_topic}-key",
data=base_schema,
normalize=False)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok
v1_id = result_raw.json()["id"]

self.logger.debug(f"Checking that the returned schema is canonical")
result_raw = self._get_schemas_ids_id(id=v1_id)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()['schema'] == dataset.schema_canonical

self.logger.debug(f"Register a schema against a subject - normalized")
result_raw = self._post_subjects_subject_versions(
subject=f"{normalize_topic}-key", data=base_schema, normalize=True)
self.logger.debug(result_raw)
assert result_raw.status_code == requests.codes.ok
v1_id = result_raw.json()["id"]

self.logger.debug(f"Checking that the returned schema is normalized")
result_raw = self._get_schemas_ids_id(id=v1_id)
assert result_raw.status_code == requests.codes.ok
assert result_raw.json()['schema'] == dataset.schema_normalized

@cluster(num_nodes=3)
@parametrize(dataset_type=SchemaType.AVRO)
@parametrize(dataset_type=SchemaType.JSON)
Expand Down

0 comments on commit 86feb13

Please sign in to comment.