Skip to content

Commit

Permalink
[Low-Code CDK] Handle forward references in manifest
Browse files Browse the repository at this point in the history
  • Loading branch information
clnoll committed Dec 27, 2022
1 parent eb176f4 commit e849cef
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ def __init__(self, source_config: ConnectionDefinition, debug: bool = False):
"""
self.logger = logging.getLogger(f"airbyte.{self.name}")

evaluated_manifest = {}
resolved_source_config = ManifestReferenceResolver().preprocess_manifest(source_config, evaluated_manifest, "")
resolved_source_config = ManifestReferenceResolver(source_config).preprocess_manifest()
self._source_config = resolved_source_config
self._debug = debug
self._factory = DeclarativeComponentFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@
#


class CircularReferenceException(Exception):
"""
Raised when a circular reference is detected in a manifest.
"""

def __init__(self, reference):
super().__init__(f"Circular reference found: {reference}")


class UndefinedReferenceException(Exception):
"""
Raised when refering to an undefined reference.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from copy import deepcopy
import re
from typing import Any, Mapping, Tuple, Union

from airbyte_cdk.sources.declarative.parsers.undefined_reference_exception import UndefinedReferenceException
from airbyte_cdk.sources.declarative.parsers.custom_exceptions import CircularReferenceException, UndefinedReferenceException


class ManifestReferenceResolver:
Expand Down Expand Up @@ -96,91 +96,102 @@ class ManifestReferenceResolver:

ref_tag = "$ref"

def preprocess_manifest(self, manifest: Mapping[str, Any], evaluated_mapping: Mapping[str, Any], path: Union[str, Tuple[str]]):
def __init__(self, manifest: Mapping[str, Any]):
self.manifest = manifest
self.visited = set()

def preprocess_manifest(self):
"""
:param manifest: incoming manifest that could have references to previously defined components
:param evaluated_mapping: mapping produced by dereferencing the content of input_mapping
:param path: curent path in configuration traversal
:return:
"""
d = {}
if self.ref_tag in manifest:
partial_ref_string = manifest[self.ref_tag]
d = deepcopy(self._preprocess(partial_ref_string, evaluated_mapping, path))

for key, value in manifest.items():
if key == self.ref_tag:
continue
full_path = self._resolve_value(key, path)
if full_path in evaluated_mapping:
raise Exception(f"Databag already contains key={key} with path {full_path}")
processed_value = self._preprocess(value, evaluated_mapping, full_path)
evaluated_mapping[full_path] = processed_value
d[key] = processed_value

return d

def _get_ref_key(self, s: str) -> str:
ref_start = s.find("*ref(")
if ref_start == -1:
return None
return s[ref_start + 5 : s.find(")")]

def _resolve_value(self, value: str, path):
if path:
return *path, value
else:
return (value,)

def _preprocess(self, value, evaluated_config: Mapping[str, Any], path):
if isinstance(value, str):
ref_key = self._get_ref_key(value)
if ref_key is None:
return value
return self._evaluate_node(self.manifest)

def _evaluate_node(self, node: Any):
if isinstance(node, dict):
evaluated_dict = {k: self._evaluate_node(v) for k, v in node.items() if not self._is_ref_key(k)}
if self.ref_tag in node:
# The node includes a $ref key, so we splat the referenced value(s) into the evaluated dict
evaluated_ref = self._evaluate_node(node[self.ref_tag])
if not isinstance(evaluated_ref, dict):
return evaluated_ref
else:
return evaluated_ref | evaluated_dict
else:
"""
references are ambiguous because one could define a key containing with `.`
in this example, we want to refer to the limit key in the dict object:
dict:
limit: 50
limit_ref: "*ref(dict.limit)"
whereas here we want to access the `nested.path` value.
nested:
path: "first one"
nested.path: "uh oh"
value: "ref(nested.path)
to resolve the ambiguity, we try looking for the reference key at the top level, and then traverse the structs downward
until we find a key with the given path, or until there is nothing to traverse.
"""
key = (ref_key,)
while key[-1]:
if key in evaluated_config:
return evaluated_config[key]
else:
split = key[-1].split(".")
key = *key[:-1], split[0], ".".join(split[1:])
raise UndefinedReferenceException(path, ref_key)
elif isinstance(value, dict):
return self.preprocess_manifest(value, evaluated_config, path)
elif type(value) == list:
evaluated_list = [
# pass in elem's path instead of the list's path
self._preprocess(v, evaluated_config, self._get_path_for_list_item(path, index))
for index, v in enumerate(value)
]
# Add the list's element to the evaluated config so they can be referenced
for index, elem in enumerate(evaluated_list):
evaluated_config[self._get_path_for_list_item(path, index)] = elem
return evaluated_list
return evaluated_dict
elif isinstance(node, list):
return [self._evaluate_node(v) for v in node]
elif isinstance(node, str) and node.startswith("*ref("):
if node in self.visited:
raise CircularReferenceException(node)
self.visited.add(node)
ret = self._evaluate_node(self._lookup_reference_value(node))
self.visited.remove(node)
return ret
else:
return value
return node

def _get_path_for_list_item(self, path, index):
# An elem's path is {path_to_list}[{index}]
if len(path) > 1:
return path[:-1], f"{path[-1]}[{index}]"
else:
return (f"{path[-1]}[{index}]",)
def _is_ref_key(self, key):
return key == self.ref_tag

def _lookup_reference_value(self, reference: str) -> Any:
path = re.match("\\*ref\\(([^)]+)\\)", reference).groups()[0]
if not path:
raise UndefinedReferenceException(path, reference)
try:
return self._read_reference_value(path, self.manifest)
except (KeyError, IndexError):
raise UndefinedReferenceException(path, reference)

def _read_reference_value(self, ref: str, manifest_node: Mapping[str, Any]) -> Any:
"""
Read the value at the referenced location of the manifest.
References are ambiguous because one could define a key containing `.`
In this example, we want to refer to the `limit` key in the `dict` object:
dict:
limit: 50
limit_ref: "*ref(dict.limit)"
Whereas here we want to access the `nested.path` value.
nested:
path: "first one"
nested.path: "uh oh"
value: "ref(nested.path)
To resolve the ambiguity, we try looking for the reference key at the top level, and then traverse the structs downward
until we find a key with the given path, or until there is nothing to traverse.
Consider the path foo.bar.baz. To resolve the ambiguity, we first try 'foo.bar.baz' in its entirety as a top-level key. If this
fails, we try 'foo' as the top-level key, and if this succeeds, pass 'bar.baz' on as the key to be tried at the next level.
"""
while ref:
try:
return manifest_node[ref]
except (KeyError, TypeError):
head, ref = _parse_path(ref)
manifest_node = manifest_node[head]
return manifest_node


def _parse_path(ref: str) -> Tuple[Union[str, int], str]:
"""
Return the next path component, together with the rest of the path.
A path component may be a string key, or an int index.
>>> _parse_path("foo.bar")
"foo", "bar"
>>> _parse_path("foo[7][8].bar")
"foo", "[7][8].bar"
>>> _parse_path("[7][8].bar")
7, "[8].bar"
>>> _parse_path("[8].bar")
8, "bar"
"""
if match := re.match(r"^\[([0-9]+)\]\.?(.*)", ref):
idx, rest = match.groups()
result = int(idx), rest
else:
result = re.match(r"([^[.]*)\.?(.*)", ref).groups()
return result
Loading

0 comments on commit e849cef

Please sign in to comment.