Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[low-code] Propagate options to InterpolatedRequestInputProvider #18050

Merged
merged 10 commits into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.2.3
- Propagate options to InterpolatedRequestInputProvider

## 0.2.2
- Report config validation errors as failed connection status during `check`.
- Report config validation errors as `config_error` failure type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,35 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
from typing import Any, Mapping, Optional, Union

from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.types import Config, StreamSlice, StreamState


@dataclass
class InterpolatedRequestInputProvider:
"""
Helper class that generically performs string interpolation on the provided dictionary or string input
"""

def __init__(
self, *, config: Config, request_inputs: Optional[Union[str, Mapping[str, str]]] = None, **options: Optional[Mapping[str, Any]]
):
"""
:param config: The user-provided configuration as specified by the source's spec
:param request_inputs: The dictionary to interpolate
:param options: Additional runtime parameters to be used for string interpolation
"""
options: InitVar[Mapping[str, Any]]
request_inputs: Optional[Union[str, Mapping[str, str]]] = field(default=None)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class isn't exposed in the DSL, but I still abstained from modifying the name of the parameters. I chose to rename the method instead

config: Config = field(default_factory=dict)
_interpolator: Union[InterpolatedString, InterpolatedMapping] = field(init=False, repr=False, default=None)
_request_inputs: Union[str, Mapping[str, str]] = field(init=False, repr=False, default=None)

self._config = config
def __post_init__(self, options: Mapping[str, Any]):

if request_inputs is None:
request_inputs = {}
if isinstance(request_inputs, str):
self._interpolator = InterpolatedString(request_inputs, default="", options=options)
self._request_inputs = self.request_inputs or {}
if isinstance(self.request_inputs, str):
self._interpolator = InterpolatedString(self.request_inputs, default="", options=options)
else:
self._interpolator = InterpolatedMapping(request_inputs, options=options)
self._interpolator = InterpolatedMapping(self._request_inputs, options=options)

def request_inputs(
def eval_request_inputs(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
"""
Expand All @@ -44,7 +42,7 @@ def request_inputs(
:return: The request inputs to set on an outgoing HTTP request
"""
kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token}
interpolated_value = self._interpolator.eval(self._config, **kwargs)
interpolated_value = self._interpolator.eval(self.config, **kwargs)

if isinstance(interpolated_value, dict):
non_null_tokens = {k: v for k, v in interpolated_value.items() if v}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,18 @@ def __post_init__(self, options: Mapping[str, Any]):
if self.request_body_json and self.request_body_data:
raise ValueError("RequestOptionsProvider should only contain either 'request_body_data' or 'request_body_json' not both")

self._parameter_interpolator = InterpolatedRequestInputProvider(config=self.config, request_inputs=self.request_parameters)
self._headers_interpolator = InterpolatedRequestInputProvider(config=self.config, request_inputs=self.request_headers)
self._body_data_interpolator = InterpolatedRequestInputProvider(config=self.config, request_inputs=self.request_body_data)
self._body_json_interpolator = InterpolatedRequestInputProvider(config=self.config, request_inputs=self.request_body_json)
self._parameter_interpolator = InterpolatedRequestInputProvider(
config=self.config, request_inputs=self.request_parameters, options=options
)
self._headers_interpolator = InterpolatedRequestInputProvider(
config=self.config, request_inputs=self.request_headers, options=options
)
self._body_data_interpolator = InterpolatedRequestInputProvider(
config=self.config, request_inputs=self.request_body_data, options=options
)
self._body_json_interpolator = InterpolatedRequestInputProvider(
config=self.config, request_inputs=self.request_body_json, options=options
)

def get_request_params(
self,
Expand All @@ -58,7 +66,7 @@ def get_request_params(
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
interpolated_value = self._parameter_interpolator.request_inputs(stream_state, stream_slice, next_page_token)
interpolated_value = self._parameter_interpolator.eval_request_inputs(stream_state, stream_slice, next_page_token)
if isinstance(interpolated_value, dict):
return interpolated_value
return {}
Expand All @@ -70,7 +78,7 @@ def get_request_headers(
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._headers_interpolator.request_inputs(stream_state, stream_slice, next_page_token)
return self._headers_interpolator.eval_request_inputs(stream_state, stream_slice, next_page_token)

def get_request_body_data(
self,
Expand All @@ -79,7 +87,7 @@ def get_request_body_data(
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Union[Mapping, str]]:
return self._body_data_interpolator.request_inputs(stream_state, stream_slice, next_page_token)
return self._body_data_interpolator.eval_request_inputs(stream_state, stream_slice, next_page_token)

def get_request_body_json(
self,
Expand All @@ -88,4 +96,4 @@ def get_request_body_json(
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping]:
return self._body_json_interpolator.request_inputs(stream_state, stream_slice, next_page_token)
return self._body_json_interpolator.eval_request_inputs(stream_state, stream_slice, next_page_token)
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.2.2",
version="0.2.3",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,20 @@
("test_static_map_data", {"a_static_request_param": "a_static_value"}, {"a_static_request_param": "a_static_value"}),
("test_map_depends_on_stream_slice", {"read_from_slice": "{{ stream_slice['slice_key'] }}"}, {"read_from_slice": "slice_value"}),
("test_map_depends_on_config", {"read_from_config": "{{ config['config_key'] }}"}, {"read_from_config": "value_of_config"}),
(
"test_map_depends_on_options",
{"read_from_options": "{{ options['read_from_options'] }}"},
{"read_from_options": "value_of_options"},
),
("test_defaults_to_empty_dictionary", None, {}),
],
)
def test_initialize_interpolated_mapping_request_input_provider(test_name, input_request_data, expected_request_data):
config = {"config_key": "value_of_config"}
stream_slice = {"slice_key": "slice_value"}

provider = InterpolatedRequestInputProvider(config=config, request_inputs=input_request_data)
actual_request_data = provider.request_inputs(stream_state={}, stream_slice=stream_slice)
options = {"read_from_options": "value_of_options"}
provider = InterpolatedRequestInputProvider(request_inputs=input_request_data, config=config, options=options)
actual_request_data = provider.eval_request_inputs(stream_state={}, stream_slice=stream_slice)

assert isinstance(provider._interpolator, InterpolatedMapping)
assert actual_request_data == expected_request_data
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ def test_factory():
request_options_provider = factory.create_component(config["request_options"], input_config)()

assert type(request_options_provider) == InterpolatedRequestOptionsProvider
assert request_options_provider._parameter_interpolator._config == input_config
assert request_options_provider._parameter_interpolator.config == input_config
assert request_options_provider._parameter_interpolator._interpolator.mapping["offset"] == "{{ next_page_token['offset'] }}"
assert request_options_provider._body_json_interpolator._config == input_config
assert request_options_provider._body_json_interpolator.config == input_config
assert request_options_provider._body_json_interpolator._interpolator.mapping["body_offset"] == "{{ next_page_token['offset'] }}"


Expand Down