diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index 5d625931b..334634dd9 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -10,7 +10,7 @@ from confluent_kafka import Message from dataclasses import dataclass from karapace.config import DEFAULTS -from karapace.errors import CorruptKafkaRecordException, ShutdownException +from karapace.errors import CorruptKafkaRecordException, InvalidReferences, ShutdownException from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.consumer import KafkaConsumer from karapace.key_format import KeyFormatter @@ -26,8 +26,15 @@ from karapace.schema_type import SchemaType from karapace.typing import SchemaId, Version from tests.base_testcase import BaseTestCase -from tests.utils import schema_protobuf_invalid_because_corrupted, schema_protobuf_with_invalid_ref -from typing import Callable, List, Tuple +from tests.utils import ( + schema_avro_corrupted_and_invalid_json, + schema_avro_corrupted_but_valid_json, + schema_avro_referencing_corrupted_and_invalid_json, + schema_avro_referencing_corrupted_but_valid_json, + schema_protobuf_invalid_because_corrupted, + schema_protobuf_with_invalid_ref, +) +from typing import Callable, List, Tuple, Union from unittest.mock import Mock import confluent_kafka @@ -37,6 +44,8 @@ import random import time +LOG = logging.getLogger(__name__) + def test_offset_watcher() -> None: watcher = OffsetWatcher() @@ -326,13 +335,18 @@ def test_handle_msg_delete_subject_logs(caplog: LogCaptureFixture) -> None: @dataclass -class KafkaMessageHandlingErrorTestCase(BaseTestCase): - key: bytes +class SingleMessageHandlingErrorTestCase(BaseTestCase): + key: Union[bytes, None] value: bytes - schema_type: SchemaType - message_type: MessageType - expected_error: ShutdownException - expected_log_message: str + schema_type: Union[SchemaType, None] + message_type: Union[MessageType, None] + expected_exception: Union[ShutdownException, None] + expected_warn_message: Union[str, None] + + +@dataclass +class MultipleMessagesHandlingErrorTestCase(BaseTestCase): + inner_test_cases: List[SingleMessageHandlingErrorTestCase] @pytest.fixture(name="schema_reader_with_consumer_messages_factory") @@ -381,72 +395,72 @@ def factory(key: bytes, value: bytes, offset: int = 1) -> Message: @pytest.mark.parametrize( "test_case", [ - KafkaMessageHandlingErrorTestCase( + SingleMessageHandlingErrorTestCase( test_name="Message key is not valid JSON", key=b'{subject1::::"test""version":1"magic":1}', value=b'{"value": "value does not matter at this stage, just correct JSON"}', schema_type=None, message_type=MessageType.schema, - expected_error=CorruptKafkaRecordException, - expected_log_message='Invalid JSON in msg.key(): {subject1::::"test""version":1"magic":1} at offset 1', + expected_exception=CorruptKafkaRecordException, + expected_warn_message='Invalid JSON in msg.key(): {subject1::::"test""version":1"magic":1} at offset 1', ), - KafkaMessageHandlingErrorTestCase( + SingleMessageHandlingErrorTestCase( test_name="Message key is empty, i.e. `null/None`", key=None, value=b'{"value": "value does not matter at this stage, just correct JSON"}', schema_type=None, message_type=MessageType.schema, - expected_error=CorruptKafkaRecordException, - expected_log_message="Empty msg.key() at offset 1", + expected_exception=CorruptKafkaRecordException, + expected_warn_message="Empty msg.key() at offset 1", ), - KafkaMessageHandlingErrorTestCase( + SingleMessageHandlingErrorTestCase( test_name="Keytype is missing from message key", key=b'{"subject":"test","version":1,"magic":1}', value=b'{"value": "value does not matter at this stage, just correct JSON"}', schema_type=None, message_type=MessageType.schema, - expected_error=CorruptKafkaRecordException, - expected_log_message=( + expected_exception=CorruptKafkaRecordException, + expected_warn_message=( "The message {'subject': 'test', 'version': 1, 'magic': 1}-" "{'value': 'value does not matter at this stage, just correct JSON'} " "has been discarded because doesn't contain the `keytype` key in the key" ), ), - KafkaMessageHandlingErrorTestCase( + SingleMessageHandlingErrorTestCase( test_name="Keytype is invalid on message key", key=b'{"keytype":"NOT_A_VALID_KEY_TYPE","subject":"test","version":1,"magic":1}', value=b'{"value": "value does not matter at this stage, just correct JSON"}', schema_type=None, message_type=None, - expected_error=CorruptKafkaRecordException, - expected_log_message=( + expected_exception=CorruptKafkaRecordException, + expected_warn_message=( "The message {'keytype': 'NOT_A_VALID_KEY_TYPE', 'subject': 'test', 'version': 1, 'magic': 1}-" "{'value': 'value does not matter at this stage, just correct JSON'} " "has been discarded because the NOT_A_VALID_KEY_TYPE is not managed" ), ), - KafkaMessageHandlingErrorTestCase( + SingleMessageHandlingErrorTestCase( test_name="Config message value is not valid JSON", key=b'{"keytype":"CONFIG","subject":null,"magic":0}', value=(b'no-valid-jason"compatibilityLevel": "BACKWARD""'), schema_type=None, message_type=MessageType.config, - expected_error=CorruptKafkaRecordException, - expected_log_message="Invalid JSON in msg.value() at offset 1", + expected_exception=CorruptKafkaRecordException, + expected_warn_message="Invalid JSON in msg.value() at offset 1", ), - KafkaMessageHandlingErrorTestCase( + SingleMessageHandlingErrorTestCase( test_name="Config message value is not valid config setting", key=b'{"keytype":"CONFIG","subject":null,"magic":0}', value=b'{"not_the_key_name":"INVALID_CONFIG"}', schema_type=None, message_type=MessageType.config, - expected_error=CorruptKafkaRecordException, - expected_log_message=( + expected_exception=CorruptKafkaRecordException, + expected_warn_message=( "The message {'keytype': 'CONFIG', 'subject': None, 'magic': 0}-" "{'not_the_key_name': 'INVALID_CONFIG'} has been discarded because the CONFIG is not managed" ), ), - KafkaMessageHandlingErrorTestCase( + SingleMessageHandlingErrorTestCase( test_name="Version in schema message value is not valid", key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}', value=( @@ -456,15 +470,15 @@ def factory(key: bytes, value: bytes, offset: int = 1) -> Message: ), schema_type=SchemaType.AVRO, message_type=MessageType.schema, - expected_error=CorruptKafkaRecordException, - expected_log_message=( + expected_exception=CorruptKafkaRecordException, + expected_warn_message=( "The message {'keytype': 'SCHEMA', 'subject': 'test', 'version': 1, 'magic': 1}-" "{'subject': 'test', 'version': 'invalid-version', 'id': 1, 'deleted': False, 'schema': " '\'{"name": "test", "type": "record", "fields": [{"name": "test_field", "type": ["string", "int"]}]}\'} ' "has been discarded because the SCHEMA is not managed" ), ), - KafkaMessageHandlingErrorTestCase( + SingleMessageHandlingErrorTestCase( test_name="Message value is not valid JSON", key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}', value=( @@ -474,22 +488,22 @@ def factory(key: bytes, value: bytes, offset: int = 1) -> Message: ), schema_type=SchemaType.AVRO, message_type=MessageType.schema, - expected_error=CorruptKafkaRecordException, - expected_log_message="Invalid JSON in msg.value() at offset 1", + expected_exception=CorruptKafkaRecordException, + expected_warn_message="Invalid JSON in msg.value() at offset 1", ), - KafkaMessageHandlingErrorTestCase( + SingleMessageHandlingErrorTestCase( test_name="Delete subject message value is missing `subject` field", key=b'{"keytype":"DELETE_SUBJECT","subject":"test","version":1,"magic":1}', value=b'{"not-subject-key":"test","version":1}', schema_type=None, message_type=MessageType.delete_subject, - expected_error=CorruptKafkaRecordException, - expected_log_message=( + expected_exception=CorruptKafkaRecordException, + expected_warn_message=( "The message {'keytype': 'DELETE_SUBJECT', 'subject': 'test', 'version': 1, 'magic': 1}-" "{'not-subject-key': 'test', 'version': 1} has been discarded because the DELETE_SUBJECT is not managed" ), ), - KafkaMessageHandlingErrorTestCase( + SingleMessageHandlingErrorTestCase( test_name="Protobuf schema is invalid", key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}', value=( @@ -499,14 +513,14 @@ def factory(key: bytes, value: bytes, offset: int = 1) -> Message: ), schema_type=SchemaType.PROTOBUF, message_type=MessageType.schema, - expected_error=CorruptKafkaRecordException, - expected_log_message="Schema is not valid ProtoBuf definition", + expected_exception=CorruptKafkaRecordException, + expected_warn_message="Schema is not valid ProtoBuf definition", ), ], ) def test_message_error_handling( caplog: LogCaptureFixture, - test_case: KafkaMessageHandlingErrorTestCase, + test_case: SingleMessageHandlingErrorTestCase, schema_reader_with_consumer_messages_factory: Callable[[Tuple[List[Message]]], KafkaSchemaReader], message_factory: Callable[[bytes, bytes, int], Message], ) -> None: @@ -515,7 +529,7 @@ def test_message_error_handling( schema_reader = schema_reader_with_consumer_messages_factory(consumer_messages) with caplog.at_level(logging.WARNING, logger="karapace.schema_reader"): - with pytest.raises(test_case.expected_error): + with pytest.raises(test_case.expected_exception): schema_reader.handle_messages() assert schema_reader.offset == 1 @@ -523,63 +537,183 @@ def test_message_error_handling( for log in caplog.records: assert log.name == "karapace.schema_reader" assert log.levelname == "WARNING" - assert log.message == test_case.expected_log_message + assert log.message == test_case.expected_warn_message -def test_message_error_handling_with_invalid_reference_schema_protobuf( +@pytest.mark.parametrize( + "test_case", + [ + MultipleMessagesHandlingErrorTestCase( + test_name="Invalid PROTOBUF schema because referencing a corrupted schema", + inner_test_cases=[ + # Given an invalid PROTOBUF schema (corrupted) + # When handling the corrupted schema + # Then the schema is recognised as invalid because corrupted + SingleMessageHandlingErrorTestCase( + test_name="PROTOBUF corrupted", + key=b'{"keytype":"SCHEMA","subject":"testref","version":1,"magic":1}', + value=( + b'{"schemaType": "PROTOBUF", "subject": "testref", "version": 1, "id": 1, "deleted": false' + + b', "schema": ' + + json.dumps(schema_protobuf_invalid_because_corrupted).encode() + + b"}" + ), + schema_type=SchemaType.PROTOBUF, + message_type=MessageType.schema, + expected_exception=CorruptKafkaRecordException, + expected_warn_message="Schema is not valid ProtoBuf definition", + ), + # And given a PROTOBUF schema referencing that corrupted schema (valid otherwise) + # When handling the schema + # Then the schema is recognised as invalid because of the corrupted referenced schema + SingleMessageHandlingErrorTestCase( + test_name="PROTOBUF referencing corrupted", + key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}', + value=( + b'{"schemaType": "PROTOBUF", "subject": "test", "version": 1, "id": 1, "deleted": false' + + b', "schema": ' + + json.dumps(schema_protobuf_with_invalid_ref).encode() + + b', "references": [{"name": "testref.proto", "subject": "testref", "version": 1}]' + + b"}" + ), + schema_type=SchemaType.PROTOBUF, + message_type=MessageType.schema, + expected_exception=CorruptKafkaRecordException, + expected_warn_message="Invalid Protobuf references", + ), + ], + ), + MultipleMessagesHandlingErrorTestCase( + test_name="Invalid AVRO schema because referencing a corrupted but syntactically valid JSON", + inner_test_cases=[ + # Given an invalid AVRO schema (corrupted but syntactically valid JSON) + # When handling the corrupted schema + # Then the schema is recognised as invalid because corrupted + SingleMessageHandlingErrorTestCase( + test_name="AVRO corrupted but valid JSON", + key=b'{"keytype":"SCHEMA","subject":"testref","version":1,"magic":1}', + value=( + b'{"schemaType": "AVRO", "subject": "testref", "version": 1, "id": 1, "deleted": false' + + b', "schema": ' + + schema_avro_corrupted_but_valid_json + + b"}" + ), + schema_type=SchemaType.AVRO, + message_type=MessageType.schema, + # expected_exception=CorruptKafkaRecordException, + expected_exception=None, + # expected_warn_message='TODO', + expected_warn_message=None, + ), + # And given a AVRO schema referencing that corrupted schema (valid otherwise) + # When handling the schema + # Then the schema is recognised as invalid because of the corrupted referenced schema + SingleMessageHandlingErrorTestCase( + test_name="AVRO referencing corrupted but valid JSON", + key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}', + value=( + b'{"schemaType": "AVRO", "subject": "test", "version": 1, "id": 1, "deleted": false' + + b', "schema": ' + + schema_avro_referencing_corrupted_but_valid_json + + b', "references": [{"name": "testref.avsc", "subject": "testref", "version": 1}]' + + b"}" + ), + schema_type=SchemaType.AVRO, + message_type=MessageType.schema, + expected_exception=CorruptKafkaRecordException, + # expected_exception=None, + # expected_warn_message='TODO', + expected_warn_message=None, + ), + ], + ), + MultipleMessagesHandlingErrorTestCase( + test_name="Invalid AVRO schema because referencing a corrupted and syntactically invalid JSON", + inner_test_cases=[ + # Given an invalid AVRO schema (corrupted and syntactically invalid JSON) + # When handling the corrupted schema + # Then the schema is recognised as invalid because corrupted + SingleMessageHandlingErrorTestCase( + test_name="AVRO corrupted and invalid JSON", + key=b'{"keytype":"SCHEMA","subject":"testref","version":1,"magic":1}', + value=( + b'{"schemaType": "AVRO", "subject": "testref", "version": 1, "id": 1, "deleted": false' + + b', "schema": ' + + schema_avro_corrupted_and_invalid_json + + b"}" + ), + schema_type=SchemaType.AVRO, + message_type=MessageType.schema, + expected_exception=CorruptKafkaRecordException, + # expected_exception=None, + expected_warn_message="Schema is not valid JSON", + # expected_warn_message=None, + ), + # And given a AVRO schema referencing that corrupted schema (valid otherwise) + # When handling the schema + # Then the schema is recognised as invalid because of the corrupted referenced schema + SingleMessageHandlingErrorTestCase( + test_name="AVRO referencing corrupted and invalid JSON", + key=b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}', + value=( + b'{"schemaType": "AVRO", "subject": "test", "version": 1, "id": 1, "deleted": false' + + b', "schema": ' + + schema_avro_referencing_corrupted_and_invalid_json + + b', "references": [{"name": "testref.avsc", "subject": "testref", "version": 1}]' + + b"}" + ), + schema_type=SchemaType.AVRO, + message_type=MessageType.schema, + # expected_exception=CorruptKafkaRecordException, + expected_exception=InvalidReferences, + # expected_exception=None, + expected_warn_message=( + "The message {'keytype': 'SCHEMA', 'subject': 'testref', 'version': 1, 'magic': 1}-" + "{'schemaType': 'AVRO', 'subject': 'testref', 'version': 1, 'id': 1, 'deleted': Fa" + 'lse, \'schema\': \'\\n {\\n "namespace": "example.avro",\\n "typ----corrupti' + 'on here--ecord",\\n "name": "CorruptedSchemaAndInvalidJson",\\n "fields": [\\n ' + ' {"name": "name", "type": "string"}\\n ]\\n }\\n\'} has been discarded be' + "cause the SCHEMA is not managed" + ) + # expected_warn_message=None, + ), + ], + ), + ], +) +def test_message_error_handling_with_invalid_reference_schema( caplog: LogCaptureFixture, + test_case: MultipleMessagesHandlingErrorTestCase, schema_reader_with_consumer_messages_factory: Callable[[Tuple[List[Message]]], KafkaSchemaReader], message_factory: Callable[[bytes, bytes, int], Message], ) -> None: - # Given an invalid schema (corrupted) - key_ref = b'{"keytype":"SCHEMA","subject":"testref","version":1,"magic":1}' - value_ref = ( - b'{"schemaType": "PROTOBUF", "subject": "testref", "version": 1, "id": 1, "deleted": false' - + b', "schema": ' - + json.dumps(schema_protobuf_invalid_because_corrupted).encode() - + b"}" - ) - message_ref = message_factory(key=key_ref, value=value_ref) - - # And given a schema referencing that corrupted schema (valid otherwise) - key_using_ref = b'{"keytype":"SCHEMA","subject":"test","version":1,"magic":1}' - value_using_ref = ( - b'{"schemaType": "PROTOBUF", "subject": "test", "version": 1, "id": 1, "deleted": false' - + b', "schema": ' - + json.dumps(schema_protobuf_with_invalid_ref).encode() - + b', "references": [{"name": "testref.proto", "subject": "testref", "version": 1}]' - + b"}" - ) - message_using_ref = message_factory(key=key_using_ref, value=value_using_ref) + messages = [] + for t in test_case.inner_test_cases: + message = message_factory(key=t.key, value=t.value) + messages.append(message) with caplog.at_level(logging.WARN, logger="karapace.schema_reader"): - # When handling the corrupted schema - schema_reader = schema_reader_with_consumer_messages_factory(([message_ref],)) + schema_reader = schema_reader_with_consumer_messages_factory(([],)) - # Then the schema is recognised as invalid - with pytest.raises(CorruptKafkaRecordException): - schema_reader.handle_messages() + for t, m in zip(test_case.inner_test_cases, messages): + schema_reader.consumer.consume.side_effect = ([m],) - assert schema_reader.offset == 1 - assert not schema_reader.ready - - # When handling the schema - schema_reader.consumer.consume.side_effect = ([message_using_ref],) - - # Then the schema is recognised as invalid because of the corrupted referenced schema - with pytest.raises(CorruptKafkaRecordException): - schema_reader.handle_messages() + LOG.info("Handling message: %s", t.value) + LOG.info("Expected exception: %s", t.expected_exception) + if t.expected_exception: + with pytest.raises(t.expected_exception): + schema_reader.handle_messages() + else: + schema_reader.handle_messages() assert schema_reader.offset == 1 assert not schema_reader.ready - warn_records = [r for r in caplog.records if r.levelname == "WARNING"] - - assert len(warn_records) == 2 + occurred_warnings = [r for r in caplog.records if r.levelname == "WARNING"] + expected_warnings = [t.expected_warn_message for t in test_case.inner_test_cases if t.expected_warn_message] - # Check that different warnings are logged for each schema - assert warn_records[0].name == "karapace.schema_reader" - assert warn_records[0].message == "Schema is not valid ProtoBuf definition" + assert len(occurred_warnings) == len(expected_warnings) - assert warn_records[1].name == "karapace.schema_reader" - assert warn_records[1].message == "Invalid Protobuf references" + for ow, ew in zip(occurred_warnings, expected_warnings): + assert ow.name == "karapace.schema_reader" + assert ow.message == ew diff --git a/tests/utils.py b/tests/utils.py index 3757e0739..c6e372637 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -65,6 +65,73 @@ } ) +schema_avro_corrupted_but_valid_json = """ + { + "namespace": "example.avro", + "typTheCorruptionIsHere": "record", + "name": "CorruptedSchemaButValidJson", + "fields": [ + {"name": "name", "type": "string"} + ] + } +""" + +schema_avro_corrupted_but_valid_json = ( + b'"' + schema_avro_corrupted_but_valid_json.translate(str.maketrans({'"': '\\"', "\n": "\\n"})).encode() + b'"' +) + +schema_avro_referencing_corrupted_but_valid_json = """ + { + "fields": [ + {"name": "name", "type": "string"}, + {"name": "corruptedSchema", "type": "CorruptedSchemaButValidJson"} + ], + "name": "InvalidSchemaRef", + "namespace": "com.netapp", + "type": "record" + } +""" + +schema_avro_referencing_corrupted_but_valid_json = ( + b'"' + + schema_avro_referencing_corrupted_but_valid_json.translate(str.maketrans({'"': '\\"', "\n": "\\n"})).encode() + + b'"' +) + + +schema_avro_corrupted_and_invalid_json = """ + { + "namespace": "example.avro", + "typ----corruption here--ecord", + "name": "CorruptedSchemaAndInvalidJson", + "fields": [ + {"name": "name", "type": "string"} + ] + } +""" + +schema_avro_corrupted_and_invalid_json = ( + b'"' + schema_avro_corrupted_and_invalid_json.translate(str.maketrans({'"': '\\"', "\n": "\\n"})).encode() + b'"' +) + +schema_avro_referencing_corrupted_and_invalid_json = """ + { + "fields": [ + {"name": "name", "type": "string"}, + {"name": "corruptedSchema", "type": "CorruptedSchemaAndInvalidJson"} + ], + "name": "InvalidSchemaRef", + "namespace": "com.netapp", + "type": "record" + } +""" + +schema_avro_referencing_corrupted_and_invalid_json = ( + b'"' + + schema_avro_referencing_corrupted_and_invalid_json.translate(str.maketrans({'"': '\\"', "\n": "\\n"})).encode() + + b'"' +) + test_objects_jsonschema = [{"foo": 100}, {"foo": 200}]