Skip to content

Commit

Permalink
[low-code] Propagate options to InterpolatedRequestInputProvider (#18050
Browse files Browse the repository at this point in the history
)

* properly propagate options

* cleanup

* turn into dataclass

* rename

* no need for deepcopy

* fix test

* bump

* cleaner
  • Loading branch information
girarda authored Oct 21, 2022
1 parent b347829 commit 76acfb8
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 30 deletions.
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.2.3
- Propagate options to InterpolatedRequestInputProvider

## 0.2.2
- Report config validation errors as failed connection status during `check`.
- Report config validation errors as `config_error` failure type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,35 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
from typing import Any, Mapping, Optional, Union

from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.types import Config, StreamSlice, StreamState


@dataclass
class InterpolatedRequestInputProvider:
"""
Helper class that generically performs string interpolation on the provided dictionary or string input
"""

def __init__(
self, *, config: Config, request_inputs: Optional[Union[str, Mapping[str, str]]] = None, **options: Optional[Mapping[str, Any]]
):
"""
:param config: The user-provided configuration as specified by the source's spec
:param request_inputs: The dictionary to interpolate
:param options: Additional runtime parameters to be used for string interpolation
"""
options: InitVar[Mapping[str, Any]]
request_inputs: Optional[Union[str, Mapping[str, str]]] = field(default=None)
config: Config = field(default_factory=dict)
_interpolator: Union[InterpolatedString, InterpolatedMapping] = field(init=False, repr=False, default=None)
_request_inputs: Union[str, Mapping[str, str]] = field(init=False, repr=False, default=None)

self._config = config
def __post_init__(self, options: Mapping[str, Any]):

if request_inputs is None:
request_inputs = {}
if isinstance(request_inputs, str):
self._interpolator = InterpolatedString(request_inputs, default="", options=options)
self._request_inputs = self.request_inputs or {}
if isinstance(self.request_inputs, str):
self._interpolator = InterpolatedString(self.request_inputs, default="", options=options)
else:
self._interpolator = InterpolatedMapping(request_inputs, options=options)
self._interpolator = InterpolatedMapping(self._request_inputs, options=options)

def request_inputs(
def eval_request_inputs(
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Mapping[str, Any] = None
) -> Mapping[str, Any]:
"""
Expand All @@ -44,7 +42,7 @@ def request_inputs(
:return: The request inputs to set on an outgoing HTTP request
"""
kwargs = {"stream_state": stream_state, "stream_slice": stream_slice, "next_page_token": next_page_token}
interpolated_value = self._interpolator.eval(self._config, **kwargs)
interpolated_value = self._interpolator.eval(self.config, **kwargs)

if isinstance(interpolated_value, dict):
non_null_tokens = {k: v for k, v in interpolated_value.items() if v}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,18 @@ def __post_init__(self, options: Mapping[str, Any]):
if self.request_body_json and self.request_body_data:
raise ValueError("RequestOptionsProvider should only contain either 'request_body_data' or 'request_body_json' not both")

self._parameter_interpolator = InterpolatedRequestInputProvider(config=self.config, request_inputs=self.request_parameters)
self._headers_interpolator = InterpolatedRequestInputProvider(config=self.config, request_inputs=self.request_headers)
self._body_data_interpolator = InterpolatedRequestInputProvider(config=self.config, request_inputs=self.request_body_data)
self._body_json_interpolator = InterpolatedRequestInputProvider(config=self.config, request_inputs=self.request_body_json)
self._parameter_interpolator = InterpolatedRequestInputProvider(
config=self.config, request_inputs=self.request_parameters, options=options
)
self._headers_interpolator = InterpolatedRequestInputProvider(
config=self.config, request_inputs=self.request_headers, options=options
)
self._body_data_interpolator = InterpolatedRequestInputProvider(
config=self.config, request_inputs=self.request_body_data, options=options
)
self._body_json_interpolator = InterpolatedRequestInputProvider(
config=self.config, request_inputs=self.request_body_json, options=options
)

def get_request_params(
self,
Expand All @@ -58,7 +66,7 @@ def get_request_params(
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
interpolated_value = self._parameter_interpolator.request_inputs(stream_state, stream_slice, next_page_token)
interpolated_value = self._parameter_interpolator.eval_request_inputs(stream_state, stream_slice, next_page_token)
if isinstance(interpolated_value, dict):
return interpolated_value
return {}
Expand All @@ -70,7 +78,7 @@ def get_request_headers(
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
return self._headers_interpolator.request_inputs(stream_state, stream_slice, next_page_token)
return self._headers_interpolator.eval_request_inputs(stream_state, stream_slice, next_page_token)

def get_request_body_data(
self,
Expand All @@ -79,7 +87,7 @@ def get_request_body_data(
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Union[Mapping, str]]:
return self._body_data_interpolator.request_inputs(stream_state, stream_slice, next_page_token)
return self._body_data_interpolator.eval_request_inputs(stream_state, stream_slice, next_page_token)

def get_request_body_json(
self,
Expand All @@ -88,4 +96,4 @@ def get_request_body_json(
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Optional[Mapping]:
return self._body_json_interpolator.request_inputs(stream_state, stream_slice, next_page_token)
return self._body_json_interpolator.eval_request_inputs(stream_state, stream_slice, next_page_token)
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.2.2",
version="0.2.3",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,20 @@
("test_static_map_data", {"a_static_request_param": "a_static_value"}, {"a_static_request_param": "a_static_value"}),
("test_map_depends_on_stream_slice", {"read_from_slice": "{{ stream_slice['slice_key'] }}"}, {"read_from_slice": "slice_value"}),
("test_map_depends_on_config", {"read_from_config": "{{ config['config_key'] }}"}, {"read_from_config": "value_of_config"}),
(
"test_map_depends_on_options",
{"read_from_options": "{{ options['read_from_options'] }}"},
{"read_from_options": "value_of_options"},
),
("test_defaults_to_empty_dictionary", None, {}),
],
)
def test_initialize_interpolated_mapping_request_input_provider(test_name, input_request_data, expected_request_data):
config = {"config_key": "value_of_config"}
stream_slice = {"slice_key": "slice_value"}

provider = InterpolatedRequestInputProvider(config=config, request_inputs=input_request_data)
actual_request_data = provider.request_inputs(stream_state={}, stream_slice=stream_slice)
options = {"read_from_options": "value_of_options"}
provider = InterpolatedRequestInputProvider(request_inputs=input_request_data, config=config, options=options)
actual_request_data = provider.eval_request_inputs(stream_state={}, stream_slice=stream_slice)

assert isinstance(provider._interpolator, InterpolatedMapping)
assert actual_request_data == expected_request_data
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ def test_factory():
request_options_provider = factory.create_component(config["request_options"], input_config)()

assert type(request_options_provider) == InterpolatedRequestOptionsProvider
assert request_options_provider._parameter_interpolator._config == input_config
assert request_options_provider._parameter_interpolator.config == input_config
assert request_options_provider._parameter_interpolator._interpolator.mapping["offset"] == "{{ next_page_token['offset'] }}"
assert request_options_provider._body_json_interpolator._config == input_config
assert request_options_provider._body_json_interpolator.config == input_config
assert request_options_provider._body_json_interpolator._interpolator.mapping["body_offset"] == "{{ next_page_token['offset'] }}"


Expand Down

0 comments on commit 76acfb8

Please sign in to comment.