diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py index db98f2e57264..3bed538b680a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -53,8 +53,7 @@ def __init__(self, source_config: ConnectionDefinition, debug: bool = False): """ self.logger = logging.getLogger(f"airbyte.{self.name}") - evaluated_manifest = {} - resolved_source_config = ManifestReferenceResolver().preprocess_manifest(source_config, evaluated_manifest, "") + resolved_source_config = ManifestReferenceResolver().preprocess_manifest(source_config) self._source_config = resolved_source_config self._debug = debug self._factory = DeclarativeComponentFactory() diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/undefined_reference_exception.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/custom_exceptions.py similarity index 55% rename from airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/undefined_reference_exception.py rename to airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/custom_exceptions.py index f9721edbf7ee..68368490bd8e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/undefined_reference_exception.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/custom_exceptions.py @@ -3,6 +3,15 @@ # +class CircularReferenceException(Exception): + """ + Raised when a circular reference is detected in a manifest. + """ + + def __init__(self, reference): + super().__init__(f"Circular reference found: {reference}") + + class UndefinedReferenceException(Exception): """ Raised when refering to an undefined reference. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py index 04af303c87f1..7cb54edcf075 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py @@ -2,10 +2,10 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from copy import deepcopy -from typing import Any, Mapping, Tuple, Union +import re +from typing import Any, Mapping, Set, Tuple, Union -from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException +from airbyte_cdk.sources.declarative.parsers.custom_exceptions import CircularReferenceException, UndefinedReferenceException class ManifestReferenceResolver: @@ -96,91 +96,102 @@ class ManifestReferenceResolver: ref_tag = "$ref" - def preprocess_manifest(self, manifest: Mapping[str, Any], evaluated_mapping: Mapping[str, Any], path: Union[str, Tuple[str]]): - + def preprocess_manifest(self, manifest): """ :param manifest: incoming manifest that could have references to previously defined components - :param evaluated_mapping: mapping produced by dereferencing the content of input_mapping - :param path: curent path in configuration traversal :return: """ - d = {} - if self.ref_tag in manifest: - partial_ref_string = manifest[self.ref_tag] - d = deepcopy(self._preprocess(partial_ref_string, evaluated_mapping, path)) - - for key, value in manifest.items(): - if key == self.ref_tag: - continue - full_path = self._resolve_value(key, path) - if full_path in evaluated_mapping: - raise Exception(f"Databag already contains key={key} with path {full_path}") - processed_value = self._preprocess(value, evaluated_mapping, full_path) - evaluated_mapping[full_path] = processed_value - d[key] = processed_value - - return d - - def _get_ref_key(self, s: str) -> str: - ref_start = s.find("*ref(") - if ref_start == -1: - return None - return s[ref_start + 5 : s.find(")")] - - def _resolve_value(self, value: str, path): - if path: - return *path, value - else: - return (value,) - - def _preprocess(self, value, evaluated_config: Mapping[str, Any], path): - if isinstance(value, str): - ref_key = self._get_ref_key(value) - if ref_key is None: - return value + return self._evaluate_node(manifest, manifest) + + def _evaluate_node(self, node: Any, manifest: Mapping[str, Any], visited: Set = None): + if isinstance(node, dict): + evaluated_dict = {k: self._evaluate_node(v, manifest) for k, v in node.items() if not self._is_ref_key(k)} + if self.ref_tag in node: + # The node includes a $ref key, so we splat the referenced value(s) into the evaluated dict + evaluated_ref = self._evaluate_node(node[self.ref_tag], manifest) + if not isinstance(evaluated_ref, dict): + return evaluated_ref + else: + # The values defined on the component take precedence over the reference values + return evaluated_ref | evaluated_dict else: - """ - references are ambiguous because one could define a key containing with `.` - in this example, we want to refer to the limit key in the dict object: - dict: - limit: 50 - limit_ref: "*ref(dict.limit)" - - whereas here we want to access the `nested.path` value. - nested: - path: "first one" - nested.path: "uh oh" - value: "ref(nested.path) - - to resolve the ambiguity, we try looking for the reference key at the top level, and then traverse the structs downward - until we find a key with the given path, or until there is nothing to traverse. - """ - key = (ref_key,) - while key[-1]: - if key in evaluated_config: - return evaluated_config[key] - else: - split = key[-1].split(".") - key = *key[:-1], split[0], ".".join(split[1:]) - raise UndefinedReferenceException(path, ref_key) - elif isinstance(value, dict): - return self.preprocess_manifest(value, evaluated_config, path) - elif type(value) == list: - evaluated_list = [ - # pass in elem's path instead of the list's path - self._preprocess(v, evaluated_config, self._get_path_for_list_item(path, index)) - for index, v in enumerate(value) - ] - # Add the list's element to the evaluated config so they can be referenced - for index, elem in enumerate(evaluated_list): - evaluated_config[self._get_path_for_list_item(path, index)] = elem - return evaluated_list + return evaluated_dict + elif isinstance(node, list): + return [self._evaluate_node(v, manifest) for v in node] + elif isinstance(node, str) and node.startswith("*ref("): + if visited is None: + visited = set() + if node in visited: + raise CircularReferenceException(node) + visited.add(node) + ret = self._evaluate_node(self._lookup_reference_value(node, manifest), manifest, visited) + visited.remove(node) + return ret else: - return value + return node + + def _is_ref_key(self, key): + return key == self.ref_tag + + def _lookup_reference_value(self, reference: str, manifest: Mapping[str, Any]) -> Any: + path = re.match("\\*ref\\(([^)]+)\\)", reference).groups()[0] + if not path: + raise UndefinedReferenceException(path, reference) + try: + return self._read_reference_value(path, manifest) + except (KeyError, IndexError): + raise UndefinedReferenceException(path, reference) + + @staticmethod + def _read_reference_value(ref: str, manifest_node: Mapping[str, Any]) -> Any: + """ + Read the value at the referenced location of the manifest. - def _get_path_for_list_item(self, path, index): - # An elem's path is {path_to_list}[{index}] - if len(path) > 1: - return path[:-1], f"{path[-1]}[{index}]" - else: - return (f"{path[-1]}[{index}]",) + References are ambiguous because one could define a key containing `.` + In this example, we want to refer to the `limit` key in the `dict` object: + dict: + limit: 50 + limit_ref: "*ref(dict.limit)" + + Whereas here we want to access the `nested.path` value. + nested: + path: "first one" + nested.path: "uh oh" + value: "ref(nested.path) + + To resolve the ambiguity, we try looking for the reference key at the top level, and then traverse the structs downward + until we find a key with the given path, or until there is nothing to traverse. + + Consider the path foo.bar.baz. To resolve the ambiguity, we first try 'foo.bar.baz' in its entirety as a top-level key. If this + fails, we try 'foo' as the top-level key, and if this succeeds, pass 'bar.baz' on as the key to be tried at the next level. + """ + while ref: + try: + return manifest_node[ref] + except (KeyError, TypeError): + head, ref = _parse_path(ref) + manifest_node = manifest_node[head] + return manifest_node + + +def _parse_path(ref: str) -> Tuple[Union[str, int], str]: + """ + Return the next path component, together with the rest of the path. + + A path component may be a string key, or an int index. + + >>> _parse_path("foo.bar") + "foo", "bar" + >>> _parse_path("foo[7][8].bar") + "foo", "[7][8].bar" + >>> _parse_path("[7][8].bar") + 7, "[8].bar" + >>> _parse_path("[8].bar") + 8, "bar" + """ + if match := re.match(r"^\[([0-9]+)\]\.?(.*)", ref): + idx, rest = match.groups() + result = int(idx), rest + else: + result = re.match(r"([^[.]*)\.?(.*)", ref).groups() + return result diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py index 59f7412ef1a1..e988870a5336 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py @@ -3,65 +3,33 @@ # import pytest -from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver -from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException +from airbyte_cdk.sources.declarative.parsers.custom_exceptions import CircularReferenceException, UndefinedReferenceException +from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver, _parse_path resolver = ManifestReferenceResolver() -def test_get_ref(): - s = "*ref(limit)" - ref_key = resolver._get_ref_key(s) - assert ref_key == "limit" - - -def test_get_ref_no_ref(): - s = "limit: 50" - - ref_key = resolver._get_ref_key(s) - assert ref_key is None - - def test_refer(): - content = { - "limit": 50, - "limit_ref": "*ref(limit)" - } - config = resolver.preprocess_manifest(content, {}, "") + content = {"limit": 50, "limit_ref": "*ref(limit)"} + config = resolver.preprocess_manifest(content) assert config["limit_ref"] == 50 def test_refer_to_inner(): - content = { - "dict": { - "limit": 50 - }, - "limit_ref": "*ref(dict.limit)" - } - config = resolver.preprocess_manifest(content, {}, "") + content = {"dict": {"limit": 50}, "limit_ref": "*ref(dict.limit)"} + config = resolver.preprocess_manifest(content) assert config["limit_ref"] == 50 def test_refer_to_non_existant_struct(): - content = { - "dict": { - "limit": 50 - }, - "limit_ref": "*ref(not_dict)" - } + content = {"dict": {"limit": 50}, "limit_ref": "*ref(not_dict)"} with pytest.raises(UndefinedReferenceException): - resolver.preprocess_manifest(content, {}, "") + resolver.preprocess_manifest(content) def test_refer_in_dict(): - content = { - "limit": 50, - "offset_request_parameters": { - "offset": "{{ next_page_token['offset'] }}", - "limit": "*ref(limit)" - } - } - config = resolver.preprocess_manifest(content, {}, "") + content = {"limit": 50, "offset_request_parameters": {"offset": "{{ next_page_token['offset'] }}", "limit": "*ref(limit)"}} + config = resolver.preprocess_manifest(content) assert config["offset_request_parameters"]["offset"] == "{{ next_page_token['offset'] }}" assert config["offset_request_parameters"]["limit"] == 50 @@ -69,16 +37,13 @@ def test_refer_in_dict(): def test_refer_to_dict(): content = { "limit": 50, - "offset_request_parameters": { - "offset": "{{ next_page_token['offset'] }}", - "limit": "*ref(limit)" - }, + "offset_request_parameters": {"offset": "{{ next_page_token['offset'] }}", "limit": "*ref(limit)"}, "offset_pagination_request_parameters": { "class": "InterpolatedRequestParameterProvider", - "request_parameters": "*ref(offset_request_parameters)" - } + "request_parameters": "*ref(offset_request_parameters)", + }, } - config = resolver.preprocess_manifest(content, {}, "") + config = resolver.preprocess_manifest(content) assert config["limit"] == 50 assert config["offset_request_parameters"]["limit"] == 50 assert len(config["offset_pagination_request_parameters"]) == 2 @@ -90,16 +55,10 @@ def test_refer_and_overwrite(): content = { "limit": 50, "custom_limit": 25, - "offset_request_parameters": { - "offset": "{{ next_page_token['offset'] }}", - "limit": "*ref(limit)" - }, - "custom_request_parameters": { - "$ref": "*ref(offset_request_parameters)", - "limit": "*ref(custom_limit)" - } + "offset_request_parameters": {"offset": "{{ next_page_token['offset'] }}", "limit": "*ref(limit)"}, + "custom_request_parameters": {"$ref": "*ref(offset_request_parameters)", "limit": "*ref(custom_limit)"}, } - config = resolver.preprocess_manifest(content, {}, "") + config = resolver.preprocess_manifest(content) assert config["offset_request_parameters"]["limit"] == 50 assert config["custom_request_parameters"]["limit"] == 25 @@ -110,22 +69,13 @@ def test_refer_and_overwrite(): def test_collision(): content = { "example": { - "nested":{ - "path": "first one", - "more_nested": { - "value": "found it!" - } - }, + "nested": {"path": "first one", "more_nested": {"value": "found it!"}}, "nested.path": "uh oh", }, - "reference_to_nested_path": { - "$ref": "*ref(example.nested.path)" - }, - "reference_to_nested_nested_value": { - "$ref": "*ref(example.nested.more_nested.value)" - } + "reference_to_nested_path": {"$ref": "*ref(example.nested.path)"}, + "reference_to_nested_nested_value": {"$ref": "*ref(example.nested.more_nested.value)"}, } - config = resolver.preprocess_manifest(content, {}, "") + config = resolver.preprocess_manifest(content) assert config["example"]["nested"]["path"] == "first one" assert config["example"]["nested.path"] == "uh oh" assert config["reference_to_nested_path"] == "uh oh" @@ -133,11 +83,55 @@ def test_collision(): assert config["reference_to_nested_nested_value"] == "found it!" -def test_list(): +def test_internal_collision(): content = { - "list": ["A", "B"], - "elem_ref": "*ref(list[0])" + "example": { + "nested": {"path": {"internal": "uh oh"}, "path.internal": "found it!"}, + }, + "reference": {"$ref": "*ref(example.nested.path.internal)"}, } - config = resolver.preprocess_manifest(content, {}, "") + config = resolver.preprocess_manifest(content) + assert config["example"]["nested"]["path"]["internal"] == "uh oh" + assert config["example"]["nested"]["path.internal"] == "found it!" + assert config["reference"] == "found it!" + + +def test_parse_path(): + assert _parse_path("foo.bar") == ("foo", "bar") + assert _parse_path("foo[7][8].bar") == ("foo", "[7][8].bar") + assert _parse_path("[7][8].bar") == (7, "[8].bar") + assert _parse_path("[8].bar") == (8, "bar") + + +def test_list(): + content = {"list": ["A", "B"], "elem_ref": "*ref(list[0])"} + config = resolver.preprocess_manifest(content) elem_ref = config["elem_ref"] assert elem_ref == "A" + + +def test_nested_list(): + content = {"list": [["A"], ["B"]], "elem_ref": "*ref(list[1][0])"} + config = resolver.preprocess_manifest(content) + elem_ref = config["elem_ref"] + assert elem_ref == "B" + + +def test_list_of_dicts(): + content = {"list": [{"A": "a"}, {"B": "b"}], "elem_ref": "*ref(list[1].B)"} + config = resolver.preprocess_manifest(content) + elem_ref = config["elem_ref"] + assert elem_ref == "b" + + +def test_multiple_levels_of_indexing(): + content = {"list": [{"A": ["a1", "a2"]}, {"B": ["b1", "b2"]}], "elem_ref": "*ref(list[1].B[0])"} + config = resolver.preprocess_manifest(content) + elem_ref = config["elem_ref"] + assert elem_ref == "b1" + + +def test_circular_reference(): + content = {"elem_ref1": "*ref(elem_ref2)", "elem_ref2": "*ref(elem_ref1)"} + with pytest.raises(CircularReferenceException): + resolver.preprocess_manifest(content) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py index 6265b0e638f0..8ab6091495f5 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py @@ -34,7 +34,7 @@ ) from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever -from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader +from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader, InlineSchemaLoader from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer @@ -45,10 +45,10 @@ factory = DeclarativeComponentFactory() -resolver = ManifestReferenceResolver() - input_config = {"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]} +resolver = ManifestReferenceResolver() + def test_factory(): content = """ @@ -64,7 +64,7 @@ def test_factory(): request_body_json: body_offset: "{{ next_page_token['offset'] }}" """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["request_options"], input_config, False) @@ -89,7 +89,7 @@ def test_interpolate_config(): body_field: "yoyoyo" interpolated_body_field: "{{ config['apikey'] }}" """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["authenticator"], input_config, False) @@ -111,7 +111,7 @@ def test_list_based_stream_slicer_with_values_refd(): slice_values: "*ref(repositories)" cursor_field: repository """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["stream_slicer"], input_config, False) @@ -129,7 +129,7 @@ def test_list_based_stream_slicer_with_values_defined_in_config(): inject_into: header field_name: repository """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["stream_slicer"], input_config, False) @@ -181,7 +181,7 @@ def test_create_substream_slicer(): parent_key: someid stream_slice_field: word_id """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) stream_slicer = factory.create_component(config["stream_slicer"], input_config)() parent_stream_configs = stream_slicer.parent_stream_configs @@ -216,7 +216,7 @@ def test_create_cartesian_stream_slicer(): - "*ref(stream_slicer_A)" - "*ref(stream_slicer_B)" """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["stream_slicer"], input_config, False) @@ -249,7 +249,7 @@ def test_datetime_stream_slicer(): field_name: created[gte] """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["stream_slicer"], input_config, False) @@ -363,7 +363,7 @@ def test_full_config(): description: Test API Key order: 0 """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["list_stream"], input_config, False) @@ -430,7 +430,7 @@ def test_create_record_selector(test_name, record_selector, expected_runtime_sel $ref: "*ref(extractor)" field_pointer: ["{record_selector}"] """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["selector"], input_config, False) @@ -477,7 +477,7 @@ def test_create_record_selector(test_name, record_selector, expected_runtime_sel ], ) def test_options_propagation(test_name, content, expected_field_pointer_value): - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) selector = factory.create_component(config["selector"], input_config, True)() assert selector.extractor.field_pointer[0].eval(input_config) == expected_field_pointer_value @@ -547,7 +547,7 @@ def test_create_requester(test_name, error_handler): header: header_value {error_handler} """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["requester"], input_config, False) @@ -577,7 +577,7 @@ def test_create_composite_error_handler(): - http_codes: [ 403 ] action: RETRY """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["error_handler"], input_config, False) @@ -626,7 +626,7 @@ def test_config_with_defaults(): streams: - "*ref(lists_stream)" """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["lists_stream"], input_config, False) @@ -663,7 +663,7 @@ def test_create_default_paginator(): page_size: 50 cursor_value: "{{ response._metadata.next }}" """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["paginator"], input_config, False) @@ -702,7 +702,7 @@ def test_no_transformations(self): $options: {self.base_options} """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["the_stream"], input_config, False) @@ -722,7 +722,7 @@ def test_remove_fields(self): - ["path", "to", "field1"] - ["path2"] """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["the_stream"], input_config, False) @@ -743,7 +743,7 @@ def test_add_fields(self): - path: ["field1"] value: "static_value" """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["the_stream"], input_config, False) @@ -761,6 +761,31 @@ def test_add_fields(self): ] assert expected == component.transformations + def test_forward_references(self): + content = f""" + the_stream: + type: DeclarativeStream + $options: + {self.base_options} + schema_loader: + type: InlineSchemaLoader + schema: "*ref(schemas.the_stream_schema)" + schemas: + the_stream_schema: + type: object + properties: + title: + type: string + """ + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) + + factory.create_component(config["the_stream"], input_config, False) + + component = factory.create_component(config["the_stream"], input_config)() + assert isinstance(component.schema_loader, InlineSchemaLoader) + expected = {"type": "object", "properties": {"title": {"type": "string"}}} + assert expected == component.schema_loader.schema + def test_validation_wrong_input_type(): content = """ @@ -775,7 +800,7 @@ def test_validation_wrong_input_type(): $ref: "*ref(extractor)" field_pointer: 408 """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) with pytest.raises(ValidationError): factory.create_component(config["selector"], input_config, False) @@ -798,7 +823,7 @@ def test_validation_type_missing_required_fields(): field_name: created[gte] """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) with pytest.raises(ValidationError): factory.create_component(config["stream_slicer"], input_config, False) @@ -818,7 +843,7 @@ def test_validation_wrong_interface_type(): type: "MinMaxDatetime" datetime: "{{ response._metadata.next }}" """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) with pytest.raises(ValidationError): factory.create_component(config["paginator"], input_config, False) @@ -834,7 +859,7 @@ def test_validation_create_composite_error_handler(): - response_filters: - http_codes: [ 403 ] """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) with pytest.raises(ValidationError): factory.create_component(config["error_handler"], input_config, False) @@ -859,7 +884,7 @@ def test_validation_wrong_object_type(): type: "MinMaxDatetime" datetime: "{{ response._metadata.next }}" """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["paginator"], input_config, False) @@ -873,7 +898,7 @@ def test_validate_types_nested_in_list(): - type: DpathExtractor field_pointer: ["result"] """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["error_handler"], input_config, False) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_yaml_declarative_source.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_yaml_declarative_source.py index ba6af18094f4..9586ecb58c9b 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_yaml_declarative_source.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_yaml_declarative_source.py @@ -7,7 +7,7 @@ import tempfile import pytest -from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException +from airbyte_cdk.sources.declarative.parsers.custom_exceptions import UndefinedReferenceException from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource from yaml.parser import ParserError