Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Low-Code CDK] Handle forward references in manifest #20893

Merged
merged 4 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ def __init__(self, source_config: ConnectionDefinition, debug: bool = False):
"""
self.logger = logging.getLogger(f"airbyte.{self.name}")

evaluated_manifest = {}
resolved_source_config = ManifestReferenceResolver().preprocess_manifest(source_config, evaluated_manifest, "")
resolved_source_config = ManifestReferenceResolver().preprocess_manifest(source_config)
self._source_config = resolved_source_config
self._debug = debug
self._factory = DeclarativeComponentFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@
#


class CircularReferenceException(Exception):
"""
Raised when a circular reference is detected in a manifest.
"""

def __init__(self, reference):
super().__init__(f"Circular reference found: {reference}")


class UndefinedReferenceException(Exception):
"""
Raised when refering to an undefined reference.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from copy import deepcopy
from typing import Any, Mapping, Tuple, Union
import re
from typing import Any, Mapping, Set, Tuple, Union

from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException
from airbyte_cdk.sources.declarative.parsers.custom_exceptions import CircularReferenceException, UndefinedReferenceException


class ManifestReferenceResolver:
Expand Down Expand Up @@ -96,91 +96,102 @@ class ManifestReferenceResolver:

ref_tag = "$ref"

def preprocess_manifest(self, manifest: Mapping[str, Any], evaluated_mapping: Mapping[str, Any], path: Union[str, Tuple[str]]):

def preprocess_manifest(self, manifest):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you add type hints and outputs to the method signature

"""
:param manifest: incoming manifest that could have references to previously defined components
:param evaluated_mapping: mapping produced by dereferencing the content of input_mapping
:param path: curent path in configuration traversal
:return:
"""
d = {}
if self.ref_tag in manifest:
partial_ref_string = manifest[self.ref_tag]
d = deepcopy(self._preprocess(partial_ref_string, evaluated_mapping, path))

for key, value in manifest.items():
if key == self.ref_tag:
continue
full_path = self._resolve_value(key, path)
if full_path in evaluated_mapping:
raise Exception(f"Databag already contains key={key} with path {full_path}")
processed_value = self._preprocess(value, evaluated_mapping, full_path)
evaluated_mapping[full_path] = processed_value
d[key] = processed_value

return d

def _get_ref_key(self, s: str) -> str:
ref_start = s.find("*ref(")
if ref_start == -1:
return None
return s[ref_start + 5 : s.find(")")]

def _resolve_value(self, value: str, path):
if path:
return *path, value
else:
return (value,)

def _preprocess(self, value, evaluated_config: Mapping[str, Any], path):
if isinstance(value, str):
ref_key = self._get_ref_key(value)
if ref_key is None:
return value
return self._evaluate_node(manifest, manifest)

def _evaluate_node(self, node: Any, manifest: Mapping[str, Any], visited: Set = None):
if isinstance(node, dict):
evaluated_dict = {k: self._evaluate_node(v, manifest) for k, v in node.items() if not self._is_ref_key(k)}
if self.ref_tag in node:
# The node includes a $ref key, so we splat the referenced value(s) into the evaluated dict
evaluated_ref = self._evaluate_node(node[self.ref_tag], manifest)
if not isinstance(evaluated_ref, dict):
return evaluated_ref
else:
# The values defined on the component take precedence over the reference values
return evaluated_ref | evaluated_dict
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding this right, we get the references values, and then update with the values defined on this component. So the values defined on this component take precedence. If so, can we add a small comment saying that values defined on this node take precedence over the referenced values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that's right. Added a comment.

else:
"""
references are ambiguous because one could define a key containing with `.`
in this example, we want to refer to the limit key in the dict object:
dict:
limit: 50
limit_ref: "*ref(dict.limit)"

whereas here we want to access the `nested.path` value.
nested:
path: "first one"
nested.path: "uh oh"
value: "ref(nested.path)

to resolve the ambiguity, we try looking for the reference key at the top level, and then traverse the structs downward
until we find a key with the given path, or until there is nothing to traverse.
"""
key = (ref_key,)
while key[-1]:
if key in evaluated_config:
return evaluated_config[key]
else:
split = key[-1].split(".")
key = *key[:-1], split[0], ".".join(split[1:])
raise UndefinedReferenceException(path, ref_key)
elif isinstance(value, dict):
return self.preprocess_manifest(value, evaluated_config, path)
elif type(value) == list:
evaluated_list = [
# pass in elem's path instead of the list's path
self._preprocess(v, evaluated_config, self._get_path_for_list_item(path, index))
for index, v in enumerate(value)
]
# Add the list's element to the evaluated config so they can be referenced
for index, elem in enumerate(evaluated_list):
evaluated_config[self._get_path_for_list_item(path, index)] = elem
return evaluated_list
return evaluated_dict
elif isinstance(node, list):
return [self._evaluate_node(v, manifest) for v in node]
elif isinstance(node, str) and node.startswith("*ref("):
if visited is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its a bit more clear to instantiate the empty set and pass it as a parameter which when we first invoke the first _evaluate_node() rather than rely on this conditional block to set it up. Also given how its used, in the flow it feels more like a required parameter instead of defaulting to None:

return self._evaluate_node(manifest, manifest, set())

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brianjlai sorry I overlooked these before merging! Good suggestions, I'll make these changes in a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(#21268)

visited = set()
if node in visited:
raise CircularReferenceException(node)
visited.add(node)
ret = self._evaluate_node(self._lookup_reference_value(node, manifest), manifest, visited)
visited.remove(node)
return ret
else:
return value
return node

def _is_ref_key(self, key):
return key == self.ref_tag

def _lookup_reference_value(self, reference: str, manifest: Mapping[str, Any]) -> Any:
path = re.match("\\*ref\\(([^)]+)\\)", reference).groups()[0]
if not path:
raise UndefinedReferenceException(path, reference)
try:
return self._read_reference_value(path, manifest)
except (KeyError, IndexError):
raise UndefinedReferenceException(path, reference)

@staticmethod
def _read_reference_value(ref: str, manifest_node: Mapping[str, Any]) -> Any:
"""
Read the value at the referenced location of the manifest.

def _get_path_for_list_item(self, path, index):
# An elem's path is {path_to_list}[{index}]
if len(path) > 1:
return path[:-1], f"{path[-1]}[{index}]"
else:
return (f"{path[-1]}[{index}]",)
References are ambiguous because one could define a key containing `.`
In this example, we want to refer to the `limit` key in the `dict` object:
dict:
limit: 50
limit_ref: "*ref(dict.limit)"

Whereas here we want to access the `nested.path` value.
nested:
path: "first one"
nested.path: "uh oh"
value: "ref(nested.path)

To resolve the ambiguity, we try looking for the reference key at the top level, and then traverse the structs downward
until we find a key with the given path, or until there is nothing to traverse.

Consider the path foo.bar.baz. To resolve the ambiguity, we first try 'foo.bar.baz' in its entirety as a top-level key. If this
fails, we try 'foo' as the top-level key, and if this succeeds, pass 'bar.baz' on as the key to be tried at the next level.
"""
while ref:
try:
return manifest_node[ref]
except (KeyError, TypeError):
head, ref = _parse_path(ref)
manifest_node = manifest_node[head]
return manifest_node


def _parse_path(ref: str) -> Tuple[Union[str, int], str]:
"""
Return the next path component, together with the rest of the path.

A path component may be a string key, or an int index.

>>> _parse_path("foo.bar")
"foo", "bar"
>>> _parse_path("foo[7][8].bar")
"foo", "[7][8].bar"
>>> _parse_path("[7][8].bar")
7, "[8].bar"
>>> _parse_path("[8].bar")
8, "bar"
"""
if match := re.match(r"^\[([0-9]+)\]\.?(.*)", ref):
idx, rest = match.groups()
result = int(idx), rest
else:
result = re.match(r"([^[.]*)\.?(.*)", ref).groups()
return result
Loading