From 07c1cd7f2c3d10be63ebdd05cbdf4775b8b0f412 Mon Sep 17 00:00:00 2001 From: Catherine Noll <noll.catherine@gmail.com> Date: Tue, 20 Dec 2022 11:48:34 -0500 Subject: [PATCH 1/4] [Low-Code CDK] Handle forward references in manifest --- .../manifest_declarative_source.py | 3 +- ...ence_exception.py => custom_exceptions.py} | 9 + .../parsers/manifest_reference_resolver.py | 175 ++++++++++-------- .../test_manifest_reference_resolver.py | 146 +++++++-------- .../sources/declarative/test_factory.py | 48 ++--- .../test_yaml_declarative_source.py | 2 +- 6 files changed, 198 insertions(+), 185 deletions(-) rename airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/{undefined_reference_exception.py => custom_exceptions.py} (55%) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py index db98f2e57264..4589b09e0f3d 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -53,8 +53,7 @@ def __init__(self, source_config: ConnectionDefinition, debug: bool = False): """ self.logger = logging.getLogger(f"airbyte.{self.name}") - evaluated_manifest = {} - resolved_source_config = ManifestReferenceResolver().preprocess_manifest(source_config, evaluated_manifest, "") + resolved_source_config = ManifestReferenceResolver(source_config).preprocess_manifest() self._source_config = resolved_source_config self._debug = debug self._factory = DeclarativeComponentFactory() diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/undefined_reference_exception.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/custom_exceptions.py similarity index 55% rename from airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/undefined_reference_exception.py rename to airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/custom_exceptions.py index f9721edbf7ee..68368490bd8e 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/undefined_reference_exception.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/custom_exceptions.py @@ -3,6 +3,15 @@ # +class CircularReferenceException(Exception): + """ + Raised when a circular reference is detected in a manifest. + """ + + def __init__(self, reference): + super().__init__(f"Circular reference found: {reference}") + + class UndefinedReferenceException(Exception): """ Raised when refering to an undefined reference. diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py index 04af303c87f1..f4adab1a7319 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py @@ -2,10 +2,10 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from copy import deepcopy +import re from typing import Any, Mapping, Tuple, Union -from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException +from airbyte_cdk.sources.declarative.parsers.custom_exceptions import CircularReferenceException, UndefinedReferenceException class ManifestReferenceResolver: @@ -96,91 +96,102 @@ class ManifestReferenceResolver: ref_tag = "$ref" - def preprocess_manifest(self, manifest: Mapping[str, Any], evaluated_mapping: Mapping[str, Any], path: Union[str, Tuple[str]]): + def __init__(self, manifest: Mapping[str, Any]): + self.manifest = manifest + self.visited = set() + def preprocess_manifest(self): """ :param manifest: incoming manifest that could have references to previously defined components - :param evaluated_mapping: mapping produced by dereferencing the content of input_mapping - :param path: curent path in configuration traversal :return: """ - d = {} - if self.ref_tag in manifest: - partial_ref_string = manifest[self.ref_tag] - d = deepcopy(self._preprocess(partial_ref_string, evaluated_mapping, path)) - - for key, value in manifest.items(): - if key == self.ref_tag: - continue - full_path = self._resolve_value(key, path) - if full_path in evaluated_mapping: - raise Exception(f"Databag already contains key={key} with path {full_path}") - processed_value = self._preprocess(value, evaluated_mapping, full_path) - evaluated_mapping[full_path] = processed_value - d[key] = processed_value - - return d - - def _get_ref_key(self, s: str) -> str: - ref_start = s.find("*ref(") - if ref_start == -1: - return None - return s[ref_start + 5 : s.find(")")] - - def _resolve_value(self, value: str, path): - if path: - return *path, value - else: - return (value,) - - def _preprocess(self, value, evaluated_config: Mapping[str, Any], path): - if isinstance(value, str): - ref_key = self._get_ref_key(value) - if ref_key is None: - return value + return self._evaluate_node(self.manifest) + + def _evaluate_node(self, node: Any): + if isinstance(node, dict): + evaluated_dict = {k: self._evaluate_node(v) for k, v in node.items() if not self._is_ref_key(k)} + if self.ref_tag in node: + # The node includes a $ref key, so we splat the referenced value(s) into the evaluated dict + evaluated_ref = self._evaluate_node(node[self.ref_tag]) + if not isinstance(evaluated_ref, dict): + return evaluated_ref + else: + return evaluated_ref | evaluated_dict else: - """ - references are ambiguous because one could define a key containing with `.` - in this example, we want to refer to the limit key in the dict object: - dict: - limit: 50 - limit_ref: "*ref(dict.limit)" - - whereas here we want to access the `nested.path` value. - nested: - path: "first one" - nested.path: "uh oh" - value: "ref(nested.path) - - to resolve the ambiguity, we try looking for the reference key at the top level, and then traverse the structs downward - until we find a key with the given path, or until there is nothing to traverse. - """ - key = (ref_key,) - while key[-1]: - if key in evaluated_config: - return evaluated_config[key] - else: - split = key[-1].split(".") - key = *key[:-1], split[0], ".".join(split[1:]) - raise UndefinedReferenceException(path, ref_key) - elif isinstance(value, dict): - return self.preprocess_manifest(value, evaluated_config, path) - elif type(value) == list: - evaluated_list = [ - # pass in elem's path instead of the list's path - self._preprocess(v, evaluated_config, self._get_path_for_list_item(path, index)) - for index, v in enumerate(value) - ] - # Add the list's element to the evaluated config so they can be referenced - for index, elem in enumerate(evaluated_list): - evaluated_config[self._get_path_for_list_item(path, index)] = elem - return evaluated_list + return evaluated_dict + elif isinstance(node, list): + return [self._evaluate_node(v) for v in node] + elif isinstance(node, str) and node.startswith("*ref("): + if node in self.visited: + raise CircularReferenceException(node) + self.visited.add(node) + ret = self._evaluate_node(self._lookup_reference_value(node)) + self.visited.remove(node) + return ret else: - return value + return node - def _get_path_for_list_item(self, path, index): - # An elem's path is {path_to_list}[{index}] - if len(path) > 1: - return path[:-1], f"{path[-1]}[{index}]" - else: - return (f"{path[-1]}[{index}]",) + def _is_ref_key(self, key): + return key == self.ref_tag + + def _lookup_reference_value(self, reference: str) -> Any: + path = re.match("\\*ref\\(([^)]+)\\)", reference).groups()[0] + if not path: + raise UndefinedReferenceException(path, reference) + try: + return self._read_reference_value(path, self.manifest) + except (KeyError, IndexError): + raise UndefinedReferenceException(path, reference) + + def _read_reference_value(self, ref: str, manifest_node: Mapping[str, Any]) -> Any: + """ + Read the value at the referenced location of the manifest. + + References are ambiguous because one could define a key containing `.` + In this example, we want to refer to the `limit` key in the `dict` object: + dict: + limit: 50 + limit_ref: "*ref(dict.limit)" + + Whereas here we want to access the `nested.path` value. + nested: + path: "first one" + nested.path: "uh oh" + value: "ref(nested.path) + + To resolve the ambiguity, we try looking for the reference key at the top level, and then traverse the structs downward + until we find a key with the given path, or until there is nothing to traverse. + + Consider the path foo.bar.baz. To resolve the ambiguity, we first try 'foo.bar.baz' in its entirety as a top-level key. If this + fails, we try 'foo' as the top-level key, and if this succeeds, pass 'bar.baz' on as the key to be tried at the next level. + """ + while ref: + try: + return manifest_node[ref] + except (KeyError, TypeError): + head, ref = _parse_path(ref) + manifest_node = manifest_node[head] + return manifest_node + + +def _parse_path(ref: str) -> Tuple[Union[str, int], str]: + """ + Return the next path component, together with the rest of the path. + + A path component may be a string key, or an int index. + + >>> _parse_path("foo.bar") + "foo", "bar" + >>> _parse_path("foo[7][8].bar") + "foo", "[7][8].bar" + >>> _parse_path("[7][8].bar") + 7, "[8].bar" + >>> _parse_path("[8].bar") + 8, "bar" + """ + if match := re.match(r"^\[([0-9]+)\]\.?(.*)", ref): + idx, rest = match.groups() + result = int(idx), rest + else: + result = re.match(r"([^[.]*)\.?(.*)", ref).groups() + return result diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py index 59f7412ef1a1..f03cc0ca8c3c 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py @@ -3,65 +3,33 @@ # import pytest -from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver -from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException +from airbyte_cdk.sources.declarative.parsers.custom_exceptions import CircularReferenceException, UndefinedReferenceException +from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver, _parse_path -resolver = ManifestReferenceResolver() - - -def test_get_ref(): - s = "*ref(limit)" - ref_key = resolver._get_ref_key(s) - assert ref_key == "limit" - - -def test_get_ref_no_ref(): - s = "limit: 50" - - ref_key = resolver._get_ref_key(s) - assert ref_key is None +resolver = ManifestReferenceResolver def test_refer(): - content = { - "limit": 50, - "limit_ref": "*ref(limit)" - } - config = resolver.preprocess_manifest(content, {}, "") + content = {"limit": 50, "limit_ref": "*ref(limit)"} + config = resolver(content).preprocess_manifest() assert config["limit_ref"] == 50 def test_refer_to_inner(): - content = { - "dict": { - "limit": 50 - }, - "limit_ref": "*ref(dict.limit)" - } - config = resolver.preprocess_manifest(content, {}, "") + content = {"dict": {"limit": 50}, "limit_ref": "*ref(dict.limit)"} + config = resolver(content).preprocess_manifest() assert config["limit_ref"] == 50 def test_refer_to_non_existant_struct(): - content = { - "dict": { - "limit": 50 - }, - "limit_ref": "*ref(not_dict)" - } + content = {"dict": {"limit": 50}, "limit_ref": "*ref(not_dict)"} with pytest.raises(UndefinedReferenceException): - resolver.preprocess_manifest(content, {}, "") + resolver(content).preprocess_manifest() def test_refer_in_dict(): - content = { - "limit": 50, - "offset_request_parameters": { - "offset": "{{ next_page_token['offset'] }}", - "limit": "*ref(limit)" - } - } - config = resolver.preprocess_manifest(content, {}, "") + content = {"limit": 50, "offset_request_parameters": {"offset": "{{ next_page_token['offset'] }}", "limit": "*ref(limit)"}} + config = resolver(content).preprocess_manifest() assert config["offset_request_parameters"]["offset"] == "{{ next_page_token['offset'] }}" assert config["offset_request_parameters"]["limit"] == 50 @@ -69,16 +37,13 @@ def test_refer_in_dict(): def test_refer_to_dict(): content = { "limit": 50, - "offset_request_parameters": { - "offset": "{{ next_page_token['offset'] }}", - "limit": "*ref(limit)" - }, + "offset_request_parameters": {"offset": "{{ next_page_token['offset'] }}", "limit": "*ref(limit)"}, "offset_pagination_request_parameters": { "class": "InterpolatedRequestParameterProvider", - "request_parameters": "*ref(offset_request_parameters)" - } + "request_parameters": "*ref(offset_request_parameters)", + }, } - config = resolver.preprocess_manifest(content, {}, "") + config = resolver(content).preprocess_manifest() assert config["limit"] == 50 assert config["offset_request_parameters"]["limit"] == 50 assert len(config["offset_pagination_request_parameters"]) == 2 @@ -90,16 +55,10 @@ def test_refer_and_overwrite(): content = { "limit": 50, "custom_limit": 25, - "offset_request_parameters": { - "offset": "{{ next_page_token['offset'] }}", - "limit": "*ref(limit)" - }, - "custom_request_parameters": { - "$ref": "*ref(offset_request_parameters)", - "limit": "*ref(custom_limit)" - } + "offset_request_parameters": {"offset": "{{ next_page_token['offset'] }}", "limit": "*ref(limit)"}, + "custom_request_parameters": {"$ref": "*ref(offset_request_parameters)", "limit": "*ref(custom_limit)"}, } - config = resolver.preprocess_manifest(content, {}, "") + config = resolver(content).preprocess_manifest() assert config["offset_request_parameters"]["limit"] == 50 assert config["custom_request_parameters"]["limit"] == 25 @@ -110,22 +69,13 @@ def test_refer_and_overwrite(): def test_collision(): content = { "example": { - "nested":{ - "path": "first one", - "more_nested": { - "value": "found it!" - } - }, + "nested": {"path": "first one", "more_nested": {"value": "found it!"}}, "nested.path": "uh oh", }, - "reference_to_nested_path": { - "$ref": "*ref(example.nested.path)" - }, - "reference_to_nested_nested_value": { - "$ref": "*ref(example.nested.more_nested.value)" - } + "reference_to_nested_path": {"$ref": "*ref(example.nested.path)"}, + "reference_to_nested_nested_value": {"$ref": "*ref(example.nested.more_nested.value)"}, } - config = resolver.preprocess_manifest(content, {}, "") + config = resolver(content).preprocess_manifest() assert config["example"]["nested"]["path"] == "first one" assert config["example"]["nested.path"] == "uh oh" assert config["reference_to_nested_path"] == "uh oh" @@ -133,11 +83,55 @@ def test_collision(): assert config["reference_to_nested_nested_value"] == "found it!" -def test_list(): +def test_internal_collision(): content = { - "list": ["A", "B"], - "elem_ref": "*ref(list[0])" + "example": { + "nested": {"path": {"internal": "uh oh"}, "path.internal": "found it!"}, + }, + "reference": {"$ref": "*ref(example.nested.path.internal)"}, } - config = resolver.preprocess_manifest(content, {}, "") + config = resolver(content).preprocess_manifest() + assert config["example"]["nested"]["path"]["internal"] == "uh oh" + assert config["example"]["nested"]["path.internal"] == "found it!" + assert config["reference"] == "found it!" + + +def test_parse_path(): + assert _parse_path("foo.bar") == ("foo", "bar") + assert _parse_path("foo[7][8].bar") == ("foo", "[7][8].bar") + assert _parse_path("[7][8].bar") == (7, "[8].bar") + assert _parse_path("[8].bar") == (8, "bar") + + +def test_list(): + content = {"list": ["A", "B"], "elem_ref": "*ref(list[0])"} + config = resolver(content).preprocess_manifest() elem_ref = config["elem_ref"] assert elem_ref == "A" + + +def test_nested_list(): + content = {"list": [["A"], ["B"]], "elem_ref": "*ref(list[1][0])"} + config = resolver(content).preprocess_manifest() + elem_ref = config["elem_ref"] + assert elem_ref == "B" + + +def test_list_of_dicts(): + content = {"list": [{"A": "a"}, {"B": "b"}], "elem_ref": "*ref(list[1].B)"} + config = resolver(content).preprocess_manifest() + elem_ref = config["elem_ref"] + assert elem_ref == "b" + + +def test_multiple_levels_of_indexing(): + content = {"list": [{"A": ["a1", "a2"]}, {"B": ["b1", "b2"]}], "elem_ref": "*ref(list[1].B[0])"} + config = resolver(content).preprocess_manifest() + elem_ref = config["elem_ref"] + assert elem_ref == "b1" + + +def test_circular_reference(): + content = {"elem_ref1": "*ref(elem_ref2)", "elem_ref2": "*ref(elem_ref1)"} + with pytest.raises(CircularReferenceException): + resolver(content).preprocess_manifest() diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py index 6265b0e638f0..ac436c94ac0f 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py @@ -45,7 +45,7 @@ factory = DeclarativeComponentFactory() -resolver = ManifestReferenceResolver() +resolver = ManifestReferenceResolver input_config = {"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]} @@ -64,7 +64,7 @@ def test_factory(): request_body_json: body_offset: "{{ next_page_token['offset'] }}" """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["request_options"], input_config, False) @@ -89,7 +89,7 @@ def test_interpolate_config(): body_field: "yoyoyo" interpolated_body_field: "{{ config['apikey'] }}" """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["authenticator"], input_config, False) @@ -111,7 +111,7 @@ def test_list_based_stream_slicer_with_values_refd(): slice_values: "*ref(repositories)" cursor_field: repository """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["stream_slicer"], input_config, False) @@ -129,7 +129,7 @@ def test_list_based_stream_slicer_with_values_defined_in_config(): inject_into: header field_name: repository """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["stream_slicer"], input_config, False) @@ -181,7 +181,7 @@ def test_create_substream_slicer(): parent_key: someid stream_slice_field: word_id """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() stream_slicer = factory.create_component(config["stream_slicer"], input_config)() parent_stream_configs = stream_slicer.parent_stream_configs @@ -216,7 +216,7 @@ def test_create_cartesian_stream_slicer(): - "*ref(stream_slicer_A)" - "*ref(stream_slicer_B)" """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["stream_slicer"], input_config, False) @@ -249,7 +249,7 @@ def test_datetime_stream_slicer(): field_name: created[gte] """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["stream_slicer"], input_config, False) @@ -363,7 +363,7 @@ def test_full_config(): description: Test API Key order: 0 """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["list_stream"], input_config, False) @@ -430,7 +430,7 @@ def test_create_record_selector(test_name, record_selector, expected_runtime_sel $ref: "*ref(extractor)" field_pointer: ["{record_selector}"] """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["selector"], input_config, False) @@ -477,7 +477,7 @@ def test_create_record_selector(test_name, record_selector, expected_runtime_sel ], ) def test_options_propagation(test_name, content, expected_field_pointer_value): - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() selector = factory.create_component(config["selector"], input_config, True)() assert selector.extractor.field_pointer[0].eval(input_config) == expected_field_pointer_value @@ -547,7 +547,7 @@ def test_create_requester(test_name, error_handler): header: header_value {error_handler} """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["requester"], input_config, False) @@ -577,7 +577,7 @@ def test_create_composite_error_handler(): - http_codes: [ 403 ] action: RETRY """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["error_handler"], input_config, False) @@ -626,7 +626,7 @@ def test_config_with_defaults(): streams: - "*ref(lists_stream)" """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["lists_stream"], input_config, False) @@ -663,7 +663,7 @@ def test_create_default_paginator(): page_size: 50 cursor_value: "{{ response._metadata.next }}" """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["paginator"], input_config, False) @@ -702,7 +702,7 @@ def test_no_transformations(self): $options: {self.base_options} """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["the_stream"], input_config, False) @@ -722,7 +722,7 @@ def test_remove_fields(self): - ["path", "to", "field1"] - ["path2"] """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["the_stream"], input_config, False) @@ -743,7 +743,7 @@ def test_add_fields(self): - path: ["field1"] value: "static_value" """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["the_stream"], input_config, False) @@ -775,7 +775,7 @@ def test_validation_wrong_input_type(): $ref: "*ref(extractor)" field_pointer: 408 """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() with pytest.raises(ValidationError): factory.create_component(config["selector"], input_config, False) @@ -798,7 +798,7 @@ def test_validation_type_missing_required_fields(): field_name: created[gte] """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() with pytest.raises(ValidationError): factory.create_component(config["stream_slicer"], input_config, False) @@ -818,7 +818,7 @@ def test_validation_wrong_interface_type(): type: "MinMaxDatetime" datetime: "{{ response._metadata.next }}" """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() with pytest.raises(ValidationError): factory.create_component(config["paginator"], input_config, False) @@ -834,7 +834,7 @@ def test_validation_create_composite_error_handler(): - response_filters: - http_codes: [ 403 ] """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() with pytest.raises(ValidationError): factory.create_component(config["error_handler"], input_config, False) @@ -859,7 +859,7 @@ def test_validation_wrong_object_type(): type: "MinMaxDatetime" datetime: "{{ response._metadata.next }}" """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["paginator"], input_config, False) @@ -873,7 +873,7 @@ def test_validate_types_nested_in_list(): - type: DpathExtractor field_pointer: ["result"] """ - config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content), {}, "") + config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["error_handler"], input_config, False) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_yaml_declarative_source.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_yaml_declarative_source.py index ba6af18094f4..9586ecb58c9b 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_yaml_declarative_source.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_yaml_declarative_source.py @@ -7,7 +7,7 @@ import tempfile import pytest -from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException +from airbyte_cdk.sources.declarative.parsers.custom_exceptions import UndefinedReferenceException from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource from yaml.parser import ParserError From a410c43ab93ba350554fce029e4a7d91bc99a65d Mon Sep 17 00:00:00 2001 From: Catherine Noll <noll.catherine@gmail.com> Date: Fri, 30 Dec 2022 23:39:52 +0000 Subject: [PATCH 2/4] Updates per code review comments --- .../parsers/manifest_reference_resolver.py | 4 +- .../test_manifest_reference_resolver.py | 28 ++++--- .../sources/declarative/test_factory.py | 75 ++++++++++++------- 3 files changed, 65 insertions(+), 42 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py index f4adab1a7319..2b61b38fe063 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py @@ -116,6 +116,7 @@ def _evaluate_node(self, node: Any): if not isinstance(evaluated_ref, dict): return evaluated_ref else: + # The values defined on the component take precedence over the reference values return evaluated_ref | evaluated_dict else: return evaluated_dict @@ -143,7 +144,8 @@ def _lookup_reference_value(self, reference: str) -> Any: except (KeyError, IndexError): raise UndefinedReferenceException(path, reference) - def _read_reference_value(self, ref: str, manifest_node: Mapping[str, Any]) -> Any: + @staticmethod + def _read_reference_value(ref: str, manifest_node: Mapping[str, Any]) -> Any: """ Read the value at the referenced location of the manifest. diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py index f03cc0ca8c3c..d41581c66d7f 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py @@ -6,30 +6,28 @@ from airbyte_cdk.sources.declarative.parsers.custom_exceptions import CircularReferenceException, UndefinedReferenceException from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver, _parse_path -resolver = ManifestReferenceResolver - def test_refer(): content = {"limit": 50, "limit_ref": "*ref(limit)"} - config = resolver(content).preprocess_manifest() + config = ManifestReferenceResolver(content).preprocess_manifest() assert config["limit_ref"] == 50 def test_refer_to_inner(): content = {"dict": {"limit": 50}, "limit_ref": "*ref(dict.limit)"} - config = resolver(content).preprocess_manifest() + config = ManifestReferenceResolver(content).preprocess_manifest() assert config["limit_ref"] == 50 def test_refer_to_non_existant_struct(): content = {"dict": {"limit": 50}, "limit_ref": "*ref(not_dict)"} with pytest.raises(UndefinedReferenceException): - resolver(content).preprocess_manifest() + ManifestReferenceResolver(content).preprocess_manifest() def test_refer_in_dict(): content = {"limit": 50, "offset_request_parameters": {"offset": "{{ next_page_token['offset'] }}", "limit": "*ref(limit)"}} - config = resolver(content).preprocess_manifest() + config = ManifestReferenceResolver(content).preprocess_manifest() assert config["offset_request_parameters"]["offset"] == "{{ next_page_token['offset'] }}" assert config["offset_request_parameters"]["limit"] == 50 @@ -43,7 +41,7 @@ def test_refer_to_dict(): "request_parameters": "*ref(offset_request_parameters)", }, } - config = resolver(content).preprocess_manifest() + config = ManifestReferenceResolver(content).preprocess_manifest() assert config["limit"] == 50 assert config["offset_request_parameters"]["limit"] == 50 assert len(config["offset_pagination_request_parameters"]) == 2 @@ -58,7 +56,7 @@ def test_refer_and_overwrite(): "offset_request_parameters": {"offset": "{{ next_page_token['offset'] }}", "limit": "*ref(limit)"}, "custom_request_parameters": {"$ref": "*ref(offset_request_parameters)", "limit": "*ref(custom_limit)"}, } - config = resolver(content).preprocess_manifest() + config = ManifestReferenceResolver(content).preprocess_manifest() assert config["offset_request_parameters"]["limit"] == 50 assert config["custom_request_parameters"]["limit"] == 25 @@ -75,7 +73,7 @@ def test_collision(): "reference_to_nested_path": {"$ref": "*ref(example.nested.path)"}, "reference_to_nested_nested_value": {"$ref": "*ref(example.nested.more_nested.value)"}, } - config = resolver(content).preprocess_manifest() + config = ManifestReferenceResolver(content).preprocess_manifest() assert config["example"]["nested"]["path"] == "first one" assert config["example"]["nested.path"] == "uh oh" assert config["reference_to_nested_path"] == "uh oh" @@ -90,7 +88,7 @@ def test_internal_collision(): }, "reference": {"$ref": "*ref(example.nested.path.internal)"}, } - config = resolver(content).preprocess_manifest() + config = ManifestReferenceResolver(content).preprocess_manifest() assert config["example"]["nested"]["path"]["internal"] == "uh oh" assert config["example"]["nested"]["path.internal"] == "found it!" assert config["reference"] == "found it!" @@ -105,28 +103,28 @@ def test_parse_path(): def test_list(): content = {"list": ["A", "B"], "elem_ref": "*ref(list[0])"} - config = resolver(content).preprocess_manifest() + config = ManifestReferenceResolver(content).preprocess_manifest() elem_ref = config["elem_ref"] assert elem_ref == "A" def test_nested_list(): content = {"list": [["A"], ["B"]], "elem_ref": "*ref(list[1][0])"} - config = resolver(content).preprocess_manifest() + config = ManifestReferenceResolver(content).preprocess_manifest() elem_ref = config["elem_ref"] assert elem_ref == "B" def test_list_of_dicts(): content = {"list": [{"A": "a"}, {"B": "b"}], "elem_ref": "*ref(list[1].B)"} - config = resolver(content).preprocess_manifest() + config = ManifestReferenceResolver(content).preprocess_manifest() elem_ref = config["elem_ref"] assert elem_ref == "b" def test_multiple_levels_of_indexing(): content = {"list": [{"A": ["a1", "a2"]}, {"B": ["b1", "b2"]}], "elem_ref": "*ref(list[1].B[0])"} - config = resolver(content).preprocess_manifest() + config = ManifestReferenceResolver(content).preprocess_manifest() elem_ref = config["elem_ref"] assert elem_ref == "b1" @@ -134,4 +132,4 @@ def test_multiple_levels_of_indexing(): def test_circular_reference(): content = {"elem_ref1": "*ref(elem_ref2)", "elem_ref2": "*ref(elem_ref1)"} with pytest.raises(CircularReferenceException): - resolver(content).preprocess_manifest() + ManifestReferenceResolver(content).preprocess_manifest() diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py index ac436c94ac0f..a898d1f6a53c 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py @@ -34,7 +34,7 @@ ) from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever -from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader +from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader, InlineSchemaLoader from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader from airbyte_cdk.sources.declarative.stream_slicers.datetime_stream_slicer import DatetimeStreamSlicer from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer @@ -45,8 +45,6 @@ factory = DeclarativeComponentFactory() -resolver = ManifestReferenceResolver - input_config = {"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]} @@ -64,7 +62,7 @@ def test_factory(): request_body_json: body_offset: "{{ next_page_token['offset'] }}" """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["request_options"], input_config, False) @@ -89,7 +87,7 @@ def test_interpolate_config(): body_field: "yoyoyo" interpolated_body_field: "{{ config['apikey'] }}" """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["authenticator"], input_config, False) @@ -111,7 +109,7 @@ def test_list_based_stream_slicer_with_values_refd(): slice_values: "*ref(repositories)" cursor_field: repository """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["stream_slicer"], input_config, False) @@ -129,7 +127,7 @@ def test_list_based_stream_slicer_with_values_defined_in_config(): inject_into: header field_name: repository """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["stream_slicer"], input_config, False) @@ -181,7 +179,7 @@ def test_create_substream_slicer(): parent_key: someid stream_slice_field: word_id """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() stream_slicer = factory.create_component(config["stream_slicer"], input_config)() parent_stream_configs = stream_slicer.parent_stream_configs @@ -216,7 +214,7 @@ def test_create_cartesian_stream_slicer(): - "*ref(stream_slicer_A)" - "*ref(stream_slicer_B)" """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["stream_slicer"], input_config, False) @@ -249,7 +247,7 @@ def test_datetime_stream_slicer(): field_name: created[gte] """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["stream_slicer"], input_config, False) @@ -363,7 +361,7 @@ def test_full_config(): description: Test API Key order: 0 """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["list_stream"], input_config, False) @@ -430,7 +428,7 @@ def test_create_record_selector(test_name, record_selector, expected_runtime_sel $ref: "*ref(extractor)" field_pointer: ["{record_selector}"] """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["selector"], input_config, False) @@ -477,7 +475,7 @@ def test_create_record_selector(test_name, record_selector, expected_runtime_sel ], ) def test_options_propagation(test_name, content, expected_field_pointer_value): - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() selector = factory.create_component(config["selector"], input_config, True)() assert selector.extractor.field_pointer[0].eval(input_config) == expected_field_pointer_value @@ -547,7 +545,7 @@ def test_create_requester(test_name, error_handler): header: header_value {error_handler} """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["requester"], input_config, False) @@ -577,7 +575,7 @@ def test_create_composite_error_handler(): - http_codes: [ 403 ] action: RETRY """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["error_handler"], input_config, False) @@ -626,7 +624,7 @@ def test_config_with_defaults(): streams: - "*ref(lists_stream)" """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["lists_stream"], input_config, False) @@ -663,7 +661,7 @@ def test_create_default_paginator(): page_size: 50 cursor_value: "{{ response._metadata.next }}" """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["paginator"], input_config, False) @@ -702,7 +700,7 @@ def test_no_transformations(self): $options: {self.base_options} """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["the_stream"], input_config, False) @@ -722,7 +720,7 @@ def test_remove_fields(self): - ["path", "to", "field1"] - ["path2"] """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["the_stream"], input_config, False) @@ -743,7 +741,7 @@ def test_add_fields(self): - path: ["field1"] value: "static_value" """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["the_stream"], input_config, False) @@ -761,6 +759,31 @@ def test_add_fields(self): ] assert expected == component.transformations + def test_forward_references(self): + content = f""" + the_stream: + type: DeclarativeStream + $options: + {self.base_options} + schema_loader: + type: InlineSchemaLoader + schema: "*ref(schemas.the_stream_schema)" + schemas: + the_stream_schema: + type: object + properties: + title: + type: string + """ + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + + factory.create_component(config["the_stream"], input_config, False) + + component = factory.create_component(config["the_stream"], input_config)() + assert isinstance(component.schema_loader, InlineSchemaLoader) + expected = {"type": "object", "properties": {"title": {"type": "string"}}} + assert expected == component.schema_loader.schema + def test_validation_wrong_input_type(): content = """ @@ -775,7 +798,7 @@ def test_validation_wrong_input_type(): $ref: "*ref(extractor)" field_pointer: 408 """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() with pytest.raises(ValidationError): factory.create_component(config["selector"], input_config, False) @@ -798,7 +821,7 @@ def test_validation_type_missing_required_fields(): field_name: created[gte] """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() with pytest.raises(ValidationError): factory.create_component(config["stream_slicer"], input_config, False) @@ -818,7 +841,7 @@ def test_validation_wrong_interface_type(): type: "MinMaxDatetime" datetime: "{{ response._metadata.next }}" """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() with pytest.raises(ValidationError): factory.create_component(config["paginator"], input_config, False) @@ -834,7 +857,7 @@ def test_validation_create_composite_error_handler(): - response_filters: - http_codes: [ 403 ] """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() with pytest.raises(ValidationError): factory.create_component(config["error_handler"], input_config, False) @@ -859,7 +882,7 @@ def test_validation_wrong_object_type(): type: "MinMaxDatetime" datetime: "{{ response._metadata.next }}" """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["paginator"], input_config, False) @@ -873,7 +896,7 @@ def test_validate_types_nested_in_list(): - type: DpathExtractor field_pointer: ["result"] """ - config = resolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() factory.create_component(config["error_handler"], input_config, False) From e18ddf137bd0550cdcf170b4911393012fa8e378 Mon Sep 17 00:00:00 2001 From: Catherine Noll <noll.catherine@gmail.com> Date: Tue, 10 Jan 2023 11:49:49 +0000 Subject: [PATCH 3/4] Pass manifest into `preprocess_manifest` instead of storing on `ManifestReferenceResolver` --- .../manifest_declarative_source.py | 2 +- .../parsers/manifest_reference_resolver.py | 21 ++++---- .../test_manifest_reference_resolver.py | 29 ++++++----- .../sources/declarative/test_factory.py | 50 ++++++++++--------- 4 files changed, 53 insertions(+), 49 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 4589b09e0f3d..3bed538b680a 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -53,7 +53,7 @@ def __init__(self, source_config: ConnectionDefinition, debug: bool = False): """ self.logger = logging.getLogger(f"airbyte.{self.name}") - resolved_source_config = ManifestReferenceResolver(source_config).preprocess_manifest() + resolved_source_config = ManifestReferenceResolver().preprocess_manifest(source_config) self._source_config = resolved_source_config self._debug = debug self._factory = DeclarativeComponentFactory() diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py index 2b61b38fe063..f5c5edd9f3aa 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py @@ -96,23 +96,22 @@ class ManifestReferenceResolver: ref_tag = "$ref" - def __init__(self, manifest: Mapping[str, Any]): - self.manifest = manifest + def __init__(self): self.visited = set() - def preprocess_manifest(self): + def preprocess_manifest(self, manifest): """ :param manifest: incoming manifest that could have references to previously defined components :return: """ - return self._evaluate_node(self.manifest) + return self._evaluate_node(manifest, manifest) - def _evaluate_node(self, node: Any): + def _evaluate_node(self, node: Any, manifest: Mapping[str, Any]): if isinstance(node, dict): - evaluated_dict = {k: self._evaluate_node(v) for k, v in node.items() if not self._is_ref_key(k)} + evaluated_dict = {k: self._evaluate_node(v, manifest) for k, v in node.items() if not self._is_ref_key(k)} if self.ref_tag in node: # The node includes a $ref key, so we splat the referenced value(s) into the evaluated dict - evaluated_ref = self._evaluate_node(node[self.ref_tag]) + evaluated_ref = self._evaluate_node(node[self.ref_tag], manifest) if not isinstance(evaluated_ref, dict): return evaluated_ref else: @@ -121,12 +120,12 @@ def _evaluate_node(self, node: Any): else: return evaluated_dict elif isinstance(node, list): - return [self._evaluate_node(v) for v in node] + return [self._evaluate_node(v, manifest) for v in node] elif isinstance(node, str) and node.startswith("*ref("): if node in self.visited: raise CircularReferenceException(node) self.visited.add(node) - ret = self._evaluate_node(self._lookup_reference_value(node)) + ret = self._evaluate_node(self._lookup_reference_value(node, manifest), manifest) self.visited.remove(node) return ret else: @@ -135,12 +134,12 @@ def _evaluate_node(self, node: Any): def _is_ref_key(self, key): return key == self.ref_tag - def _lookup_reference_value(self, reference: str) -> Any: + def _lookup_reference_value(self, reference: str, manifest: Mapping[str, Any]) -> Any: path = re.match("\\*ref\\(([^)]+)\\)", reference).groups()[0] if not path: raise UndefinedReferenceException(path, reference) try: - return self._read_reference_value(path, self.manifest) + return self._read_reference_value(path, manifest) except (KeyError, IndexError): raise UndefinedReferenceException(path, reference) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py index d41581c66d7f..281ee9b941d7 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py @@ -7,27 +7,30 @@ from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver, _parse_path +resolver = ManifestReferenceResolver() + + def test_refer(): content = {"limit": 50, "limit_ref": "*ref(limit)"} - config = ManifestReferenceResolver(content).preprocess_manifest() + config = resolver.preprocess_manifest(content) assert config["limit_ref"] == 50 def test_refer_to_inner(): content = {"dict": {"limit": 50}, "limit_ref": "*ref(dict.limit)"} - config = ManifestReferenceResolver(content).preprocess_manifest() + config = resolver.preprocess_manifest(content) assert config["limit_ref"] == 50 def test_refer_to_non_existant_struct(): content = {"dict": {"limit": 50}, "limit_ref": "*ref(not_dict)"} with pytest.raises(UndefinedReferenceException): - ManifestReferenceResolver(content).preprocess_manifest() + resolver.preprocess_manifest(content) def test_refer_in_dict(): content = {"limit": 50, "offset_request_parameters": {"offset": "{{ next_page_token['offset'] }}", "limit": "*ref(limit)"}} - config = ManifestReferenceResolver(content).preprocess_manifest() + config = resolver.preprocess_manifest(content) assert config["offset_request_parameters"]["offset"] == "{{ next_page_token['offset'] }}" assert config["offset_request_parameters"]["limit"] == 50 @@ -41,7 +44,7 @@ def test_refer_to_dict(): "request_parameters": "*ref(offset_request_parameters)", }, } - config = ManifestReferenceResolver(content).preprocess_manifest() + config = resolver.preprocess_manifest(content) assert config["limit"] == 50 assert config["offset_request_parameters"]["limit"] == 50 assert len(config["offset_pagination_request_parameters"]) == 2 @@ -56,7 +59,7 @@ def test_refer_and_overwrite(): "offset_request_parameters": {"offset": "{{ next_page_token['offset'] }}", "limit": "*ref(limit)"}, "custom_request_parameters": {"$ref": "*ref(offset_request_parameters)", "limit": "*ref(custom_limit)"}, } - config = ManifestReferenceResolver(content).preprocess_manifest() + config = resolver.preprocess_manifest(content) assert config["offset_request_parameters"]["limit"] == 50 assert config["custom_request_parameters"]["limit"] == 25 @@ -73,7 +76,7 @@ def test_collision(): "reference_to_nested_path": {"$ref": "*ref(example.nested.path)"}, "reference_to_nested_nested_value": {"$ref": "*ref(example.nested.more_nested.value)"}, } - config = ManifestReferenceResolver(content).preprocess_manifest() + config = resolver.preprocess_manifest(content) assert config["example"]["nested"]["path"] == "first one" assert config["example"]["nested.path"] == "uh oh" assert config["reference_to_nested_path"] == "uh oh" @@ -88,7 +91,7 @@ def test_internal_collision(): }, "reference": {"$ref": "*ref(example.nested.path.internal)"}, } - config = ManifestReferenceResolver(content).preprocess_manifest() + config = resolver.preprocess_manifest(content) assert config["example"]["nested"]["path"]["internal"] == "uh oh" assert config["example"]["nested"]["path.internal"] == "found it!" assert config["reference"] == "found it!" @@ -103,28 +106,28 @@ def test_parse_path(): def test_list(): content = {"list": ["A", "B"], "elem_ref": "*ref(list[0])"} - config = ManifestReferenceResolver(content).preprocess_manifest() + config = resolver.preprocess_manifest(content) elem_ref = config["elem_ref"] assert elem_ref == "A" def test_nested_list(): content = {"list": [["A"], ["B"]], "elem_ref": "*ref(list[1][0])"} - config = ManifestReferenceResolver(content).preprocess_manifest() + config = resolver.preprocess_manifest(content) elem_ref = config["elem_ref"] assert elem_ref == "B" def test_list_of_dicts(): content = {"list": [{"A": "a"}, {"B": "b"}], "elem_ref": "*ref(list[1].B)"} - config = ManifestReferenceResolver(content).preprocess_manifest() + config = resolver.preprocess_manifest(content) elem_ref = config["elem_ref"] assert elem_ref == "b" def test_multiple_levels_of_indexing(): content = {"list": [{"A": ["a1", "a2"]}, {"B": ["b1", "b2"]}], "elem_ref": "*ref(list[1].B[0])"} - config = ManifestReferenceResolver(content).preprocess_manifest() + config = resolver.preprocess_manifest(content) elem_ref = config["elem_ref"] assert elem_ref == "b1" @@ -132,4 +135,4 @@ def test_multiple_levels_of_indexing(): def test_circular_reference(): content = {"elem_ref1": "*ref(elem_ref2)", "elem_ref2": "*ref(elem_ref1)"} with pytest.raises(CircularReferenceException): - ManifestReferenceResolver(content).preprocess_manifest() + resolver.preprocess_manifest(content) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py index a898d1f6a53c..8ab6091495f5 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py @@ -47,6 +47,8 @@ input_config = {"apikey": "verysecrettoken", "repos": ["airbyte", "airbyte-cloud"]} +resolver = ManifestReferenceResolver() + def test_factory(): content = """ @@ -62,7 +64,7 @@ def test_factory(): request_body_json: body_offset: "{{ next_page_token['offset'] }}" """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["request_options"], input_config, False) @@ -87,7 +89,7 @@ def test_interpolate_config(): body_field: "yoyoyo" interpolated_body_field: "{{ config['apikey'] }}" """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["authenticator"], input_config, False) @@ -109,7 +111,7 @@ def test_list_based_stream_slicer_with_values_refd(): slice_values: "*ref(repositories)" cursor_field: repository """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["stream_slicer"], input_config, False) @@ -127,7 +129,7 @@ def test_list_based_stream_slicer_with_values_defined_in_config(): inject_into: header field_name: repository """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["stream_slicer"], input_config, False) @@ -179,7 +181,7 @@ def test_create_substream_slicer(): parent_key: someid stream_slice_field: word_id """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) stream_slicer = factory.create_component(config["stream_slicer"], input_config)() parent_stream_configs = stream_slicer.parent_stream_configs @@ -214,7 +216,7 @@ def test_create_cartesian_stream_slicer(): - "*ref(stream_slicer_A)" - "*ref(stream_slicer_B)" """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["stream_slicer"], input_config, False) @@ -247,7 +249,7 @@ def test_datetime_stream_slicer(): field_name: created[gte] """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["stream_slicer"], input_config, False) @@ -361,7 +363,7 @@ def test_full_config(): description: Test API Key order: 0 """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["list_stream"], input_config, False) @@ -428,7 +430,7 @@ def test_create_record_selector(test_name, record_selector, expected_runtime_sel $ref: "*ref(extractor)" field_pointer: ["{record_selector}"] """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["selector"], input_config, False) @@ -475,7 +477,7 @@ def test_create_record_selector(test_name, record_selector, expected_runtime_sel ], ) def test_options_propagation(test_name, content, expected_field_pointer_value): - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) selector = factory.create_component(config["selector"], input_config, True)() assert selector.extractor.field_pointer[0].eval(input_config) == expected_field_pointer_value @@ -545,7 +547,7 @@ def test_create_requester(test_name, error_handler): header: header_value {error_handler} """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["requester"], input_config, False) @@ -575,7 +577,7 @@ def test_create_composite_error_handler(): - http_codes: [ 403 ] action: RETRY """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["error_handler"], input_config, False) @@ -624,7 +626,7 @@ def test_config_with_defaults(): streams: - "*ref(lists_stream)" """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["lists_stream"], input_config, False) @@ -661,7 +663,7 @@ def test_create_default_paginator(): page_size: 50 cursor_value: "{{ response._metadata.next }}" """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["paginator"], input_config, False) @@ -700,7 +702,7 @@ def test_no_transformations(self): $options: {self.base_options} """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["the_stream"], input_config, False) @@ -720,7 +722,7 @@ def test_remove_fields(self): - ["path", "to", "field1"] - ["path2"] """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["the_stream"], input_config, False) @@ -741,7 +743,7 @@ def test_add_fields(self): - path: ["field1"] value: "static_value" """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["the_stream"], input_config, False) @@ -775,7 +777,7 @@ def test_forward_references(self): title: type: string """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["the_stream"], input_config, False) @@ -798,7 +800,7 @@ def test_validation_wrong_input_type(): $ref: "*ref(extractor)" field_pointer: 408 """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) with pytest.raises(ValidationError): factory.create_component(config["selector"], input_config, False) @@ -821,7 +823,7 @@ def test_validation_type_missing_required_fields(): field_name: created[gte] """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) with pytest.raises(ValidationError): factory.create_component(config["stream_slicer"], input_config, False) @@ -841,7 +843,7 @@ def test_validation_wrong_interface_type(): type: "MinMaxDatetime" datetime: "{{ response._metadata.next }}" """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) with pytest.raises(ValidationError): factory.create_component(config["paginator"], input_config, False) @@ -857,7 +859,7 @@ def test_validation_create_composite_error_handler(): - response_filters: - http_codes: [ 403 ] """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) with pytest.raises(ValidationError): factory.create_component(config["error_handler"], input_config, False) @@ -882,7 +884,7 @@ def test_validation_wrong_object_type(): type: "MinMaxDatetime" datetime: "{{ response._metadata.next }}" """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["paginator"], input_config, False) @@ -896,7 +898,7 @@ def test_validate_types_nested_in_list(): - type: DpathExtractor field_pointer: ["result"] """ - config = ManifestReferenceResolver(YamlDeclarativeSource._parse(content)).preprocess_manifest() + config = resolver.preprocess_manifest(YamlDeclarativeSource._parse(content)) factory.create_component(config["error_handler"], input_config, False) From a624c506dce471b0e0e3ca1d1bc64bb321b89465 Mon Sep 17 00:00:00 2001 From: Catherine Noll <noll.catherine@gmail.com> Date: Tue, 10 Jan 2023 14:50:24 +0000 Subject: [PATCH 4/4] Create `visited` set during preprocessing instead of defining on `ManifestReferenceResolver` --- .../parsers/manifest_reference_resolver.py | 17 ++++++++--------- .../parsers/test_manifest_reference_resolver.py | 1 - 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py index f5c5edd9f3aa..7cb54edcf075 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/manifest_reference_resolver.py @@ -3,7 +3,7 @@ # import re -from typing import Any, Mapping, Tuple, Union +from typing import Any, Mapping, Set, Tuple, Union from airbyte_cdk.sources.declarative.parsers.custom_exceptions import CircularReferenceException, UndefinedReferenceException @@ -96,9 +96,6 @@ class ManifestReferenceResolver: ref_tag = "$ref" - def __init__(self): - self.visited = set() - def preprocess_manifest(self, manifest): """ :param manifest: incoming manifest that could have references to previously defined components @@ -106,7 +103,7 @@ def preprocess_manifest(self, manifest): """ return self._evaluate_node(manifest, manifest) - def _evaluate_node(self, node: Any, manifest: Mapping[str, Any]): + def _evaluate_node(self, node: Any, manifest: Mapping[str, Any], visited: Set = None): if isinstance(node, dict): evaluated_dict = {k: self._evaluate_node(v, manifest) for k, v in node.items() if not self._is_ref_key(k)} if self.ref_tag in node: @@ -122,11 +119,13 @@ def _evaluate_node(self, node: Any, manifest: Mapping[str, Any]): elif isinstance(node, list): return [self._evaluate_node(v, manifest) for v in node] elif isinstance(node, str) and node.startswith("*ref("): - if node in self.visited: + if visited is None: + visited = set() + if node in visited: raise CircularReferenceException(node) - self.visited.add(node) - ret = self._evaluate_node(self._lookup_reference_value(node, manifest), manifest) - self.visited.remove(node) + visited.add(node) + ret = self._evaluate_node(self._lookup_reference_value(node, manifest), manifest, visited) + visited.remove(node) return ret else: return node diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py index 281ee9b941d7..e988870a5336 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/parsers/test_manifest_reference_resolver.py @@ -6,7 +6,6 @@ from airbyte_cdk.sources.declarative.parsers.custom_exceptions import CircularReferenceException, UndefinedReferenceException from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ManifestReferenceResolver, _parse_path - resolver = ManifestReferenceResolver()