diff --git a/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md b/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md index 096a3bdae91a..09b96fa91c39 100644 --- a/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md +++ b/airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.19 +Assert a non-empty overlap between the fields present in the record and the declared json schema. + ## 0.1.18 Fix checking date-time format againt nullable field. diff --git a/airbyte-integrations/bases/source-acceptance-test/Dockerfile b/airbyte-integrations/bases/source-acceptance-test/Dockerfile index 2979e5506ac3..ae83f649e79e 100644 --- a/airbyte-integrations/bases/source-acceptance-test/Dockerfile +++ b/airbyte-integrations/bases/source-acceptance-test/Dockerfile @@ -9,7 +9,7 @@ COPY setup.py ./ COPY pytest.ini ./ RUN pip install . -LABEL io.airbyte.version=0.1.18 +LABEL io.airbyte.version=0.1.19 LABEL io.airbyte.name=airbyte/source-acceptance-test ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin"] diff --git a/airbyte-integrations/bases/source-acceptance-test/setup.py b/airbyte-integrations/bases/source-acceptance-test/setup.py index e5895caa66d8..d99b1ce7d526 100644 --- a/airbyte-integrations/bases/source-acceptance-test/setup.py +++ b/airbyte-integrations/bases/source-acceptance-test/setup.py @@ -19,6 +19,7 @@ "pprintpp~=0.4", "dpath~=2.0.1", "jsonschema~=3.2.0", + "jsonref==0.2", ] setuptools.setup( diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py index 26484008f429..32671e75c403 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py @@ -6,17 +6,17 @@ from collections import Counter, defaultdict from functools import reduce from logging import Logger -from typing import Any, Dict, List, Mapping, MutableMapping +from typing import Any, Dict, List, Mapping, MutableMapping, Set import dpath.util import pytest -from airbyte_cdk.models import AirbyteMessage, ConnectorSpecification, Status, Type +from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, Status, Type from docker.errors import ContainerError from jsonschema import validate from source_acceptance_test.base import BaseTest from source_acceptance_test.config import BasicReadTestConfig, ConnectionTestConfig from source_acceptance_test.utils import ConnectorRunner, SecretDict, filter_output, serialize, verify_records_schema -from source_acceptance_test.utils.json_schema_helper import JsonSchemaHelper +from source_acceptance_test.utils.json_schema_helper import JsonSchemaHelper, get_expected_schema_structure, get_object_structure @pytest.mark.default_timeout(10) @@ -126,10 +126,37 @@ def primary_keys_for_records(streams, records): @pytest.mark.default_timeout(5 * 60) class TestBasicRead(BaseTest): @staticmethod - def _validate_schema(records, configured_catalog): + def _validate_records_structure(records: List[AirbyteRecordMessage], configured_catalog: ConfiguredAirbyteCatalog): + """ + Check object structure simmilar to one expected by schema. Sometimes + just running schema validation is not enough case schema could have + additionalProperties parameter set to true and no required fields + therefore any arbitrary object would pass schema validation. + This method is here to catch those cases by extracting all the pathes + from the object and compare it to pathes expected from jsonschema. If + there no common pathes then raise an alert. + + :param records: List of airbyte record messages gathered from connector instances. + :param configured_catalog: SAT testcase parameters parsed from yaml file + """ + schemas: Dict[str, Set] = {} + for stream in configured_catalog.streams: + schemas[stream.stream.name] = set(get_expected_schema_structure(stream.stream.json_schema)) + + for record in records: + schema_pathes = schemas.get(record.stream) + if not schema_pathes: + continue + record_fields = set(get_object_structure(record.data)) + common_fields = set.intersection(record_fields, schema_pathes) + assert common_fields, f" Record from {record.stream} stream should have some fields mentioned by json schema, {schema_pathes}" + + @staticmethod + def _validate_schema(records: List[AirbyteRecordMessage], configured_catalog: ConfiguredAirbyteCatalog): """ Check if data type and structure in records matches the one in json_schema of the stream in catalog """ + TestBasicRead._validate_records_structure(records, configured_catalog) bar = "-" * 80 streams_errors = verify_records_schema(records, configured_catalog) for stream_name, errors in streams_errors.items(): diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/json_schema_helper.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/json_schema_helper.py index ddfe7b66093e..06d9c2a559ed 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/json_schema_helper.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/json_schema_helper.py @@ -8,6 +8,7 @@ import dpath.util import pendulum +from jsonref import JsonRef class CatalogField: @@ -129,3 +130,61 @@ def get_top_level_item(variant_path: List[str]): assert any( [all(["const" in var["properties"][prop] for var in variants]) for prop in common_props] ), f"Any of {common_props} properties in {'.'.join(variant_path)} has no const keyword. See specification reference at https://docs.airbyte.io/connector-development/connector-specification-reference" + + +def get_object_structure(obj: dict) -> List[str]: + """ + Traverse through object structure and compose a list of property keys including nested one. + This list reflects object's structure with list of all obj property key + paths. In case if object is nested inside array we assume that it has same + structure as first element. + :param obj: data object to get its structure + :returns list of object property keys paths + """ + paths = [] + + def _traverse_obj_and_get_path(obj, path=""): + if path: + paths.append(path) + if isinstance(obj, dict): + return {k: _traverse_obj_and_get_path(v, path + "/" + k) for k, v in obj.items()} + elif isinstance(obj, list) and len(obj) > 0: + return [_traverse_obj_and_get_path(obj[0], path + "/[]")] + + _traverse_obj_and_get_path(obj) + + return paths + + +def get_expected_schema_structure(schema: dict) -> List[str]: + """ + Travers through json schema and compose list of property keys that object expected to have. + :param schema: jsonschema to get expected paths + :returns list of object property keys paths + """ + paths = [] + # Resolve all references to simplify schema processing. + schema = JsonRef.replace_refs(schema) + + def _scan_schema(subschema, path=""): + if "oneOf" in subschema or "anyOf" in subschema: + return [_scan_schema({"type": "object", **s}, path) for s in subschema.get("oneOf") or subschema.get("anyOf")] + schema_type = subschema.get("type", ["null"]) + if not isinstance(schema_type, list): + schema_type = [schema_type] + if "object" in schema_type: + props = subschema.get("properties") + if not props: + # Handle objects with arbitrary properties: + # {"type": "object", "additionalProperties": {"type": "string"}} + if path: + paths.append(path) + return + return {k: _scan_schema(v, path + "/" + k) for k, v in props.items()} + elif "array" in schema_type: + items = subschema.get("items", {}) + return [_scan_schema(items, path + "/[]")] + paths.append(path) + + _scan_schema(schema) + return paths diff --git a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_core.py b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_core.py index 10f85b34c60a..dbe10af719b7 100644 --- a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_core.py +++ b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_core.py @@ -2,8 +2,12 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # +from unittest.mock import MagicMock + import pytest -from airbyte_cdk.models import AirbyteStream +from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, Type +from source_acceptance_test.config import BasicReadTestConfig +from source_acceptance_test.tests.test_core import TestBasicRead as _TestBasicRead from source_acceptance_test.tests.test_core import TestDiscovery as _TestDiscovery @@ -28,3 +32,42 @@ def test_discovery(schema, cursors, should_fail): t.test_defined_cursors_exist_in_schema(None, discovered_catalog) else: t.test_defined_cursors_exist_in_schema(None, discovered_catalog) + + +@pytest.mark.parametrize( + "schema, record, should_fail", + [ + ({"type": "object"}, {"aa": 23}, False), + ({"type": "object"}, {}, False), + ({"type": "object", "properties": {"created": {"type": "string"}}}, {"aa": 23}, True), + ({"type": "object", "properties": {"created": {"type": "string"}}}, {"created": "23"}, False), + ({"type": "object", "properties": {"created": {"type": "string"}}}, {"root": {"created": "23"}}, True), + # Recharge shop stream case + ( + {"type": "object", "properties": {"shop": {"type": ["null", "object"]}, "store": {"type": ["null", "object"]}}}, + {"shop": {"a": "23"}, "store": {"b": "23"}}, + False, + ), + ], +) +def test_read(schema, record, should_fail): + catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream( + stream=AirbyteStream.parse_obj({"name": "test_stream", "json_schema": schema}), + sync_mode="full_refresh", + destination_sync_mode="overwrite", + ) + ] + ) + input_config = BasicReadTestConfig() + docker_runner_mock = MagicMock() + docker_runner_mock.call_read.return_value = [ + AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data=record, emitted_at=111)) + ] + t = _TestBasicRead() + if should_fail: + with pytest.raises(AssertionError, match="stream should have some fields mentioned by json schema"): + t.test_read(None, catalog, input_config, [], docker_runner_mock, MagicMock()) + else: + t.test_read(None, catalog, input_config, [], docker_runner_mock, MagicMock()) diff --git a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_json_schema_helper.py b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_json_schema_helper.py index fda4946da5bd..3a9a433704d5 100644 --- a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_json_schema_helper.py +++ b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_json_schema_helper.py @@ -18,7 +18,7 @@ ) from pydantic import BaseModel from source_acceptance_test.tests.test_incremental import records_with_state -from source_acceptance_test.utils.json_schema_helper import JsonSchemaHelper +from source_acceptance_test.utils.json_schema_helper import JsonSchemaHelper, get_expected_schema_structure, get_object_structure @pytest.fixture(name="simple_state") @@ -167,3 +167,53 @@ class Root(BaseModel): assert variant_paths == [["properties", "f", "anyOf"], ["definitions", "C", "properties", "e", "anyOf"]] # TODO: implement validation for pydantic generated objects as well # js_helper.validate_variant_paths(variant_paths) + + +@pytest.mark.parametrize( + "object, pathes", + [ + ({}, []), + ({"a": 12}, ["/a"]), + ({"a": {"b": 12}}, ["/a", "/a/b"]), + ({"a": {"b": 12}, "c": 45}, ["/a", "/a/b", "/c"]), + ( + {"a": [{"b": 12}]}, + ["/a", "/a/[]", "/a/[]/b"], + ), + ({"a": [{"b": 12}, {"b": 15}]}, ["/a", "/a/[]", "/a/[]/b"]), + ({"a": [[[{"b": 12}, {"b": 15}]]]}, ["/a", "/a/[]", "/a/[]/[]", "/a/[]/[]/[]", "/a/[]/[]/[]/b"]), + ], +) +def test_get_object_strucutre(object, pathes): + assert get_object_structure(object) == pathes + + +@pytest.mark.parametrize( + "schema, pathes", + [ + ({"type": "object", "properties": {"a": {"type": "string"}}}, ["/a"]), + ({"type": "object", "properties": {"a": {"type": "string"}, "b": {"type": "number"}}}, ["/a", "/b"]), + ( + { + "type": "object", + "properties": {"a": {"type": "string"}, "b": {"$ref": "#definitions/b_type"}}, + "definitions": {"b_type": {"type": "number"}}, + }, + ["/a", "/b"], + ), + ({"type": "object", "oneOf": [{"properties": {"a": {"type": "string"}}}, {"properties": {"b": {"type": "string"}}}]}, ["/a", "/b"]), + # Some of pydantic generatec schemas have anyOf keyword + ({"type": "object", "anyOf": [{"properties": {"a": {"type": "string"}}}, {"properties": {"b": {"type": "string"}}}]}, ["/a", "/b"]), + ( + {"type": "array", "items": {"oneOf": [{"properties": {"a": {"type": "string"}}}, {"properties": {"b": {"type": "string"}}}]}}, + ["/[]/a", "/[]/b"], + ), + # There could be an object with any properties with specific type + ({"type": "object", "properties": {"a": {"type": "object", "additionalProperties": {"type": "string"}}}}, ["/a"]), + # Array with no item type specified + ({"type": "array"}, ["/[]"]), + ({"type": "array", "items": {"type": "object", "additionalProperties": {"type": "string"}}}, ["/[]"]), + ], +) +def test_get_expected_schema_structure(schema, pathes): + assert get_expected_schema_structure(schema) == pathes