Skip to content

Commit

Permalink
Low-Code CDK: SubstreamSlicer.parent_key - dpath support added (#21900)
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
  • Loading branch information
grubberr authored Jan 31, 2023
1 parent 4061939 commit 66b8f5a
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ def create_parent_stream_config(self, model: ParentStreamConfigModel, config: Co
request_option=request_option,
stream=declarative_stream,
stream_slice_field=model.stream_slice_field,
config=config,
options=model.options,
)

Expand Down Expand Up @@ -726,7 +727,7 @@ def create_substream_slicer(self, model: SubstreamSlicerModel, config: Config, *
]
)

return SubstreamSlicer(parent_stream_configs=parent_stream_configs, options=model.options)
return SubstreamSlicer(parent_stream_configs=parent_stream_configs, options=model.options, config=config)

@staticmethod
def create_wait_time_from_header(model: WaitTimeFromHeaderModel, config: Config, **kwargs) -> WaitTimeFromHeaderBackoffStrategy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
#

from dataclasses import InitVar, dataclass
from typing import Any, Iterable, List, Mapping, Optional
from typing import Any, Iterable, List, Mapping, Optional, Union

import dpath.util
from airbyte_cdk.models import AirbyteMessage, SyncMode, Type
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.requesters.request_option import RequestOption, RequestOptionType
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.streams.core import Stream
from dataclasses_jsonschema import JsonSchemaMixin

Expand All @@ -25,11 +27,16 @@ class ParentStreamConfig(JsonSchemaMixin):
"""

stream: Stream
parent_key: str
stream_slice_field: str
parent_key: Union[InterpolatedString, str]
stream_slice_field: Union[InterpolatedString, str]
config: Config
options: InitVar[Mapping[str, Any]]
request_option: Optional[RequestOption] = None

def __post_init__(self, options: Mapping[str, Any]):
self.parent_key = InterpolatedString.create(self.parent_key, options=options)
self.stream_slice_field = InterpolatedString.create(self.stream_slice_field, options=options)


@dataclass
class SubstreamSlicer(StreamSlicer, JsonSchemaMixin):
Expand All @@ -42,6 +49,7 @@ class SubstreamSlicer(StreamSlicer, JsonSchemaMixin):
"""

parent_stream_configs: List[ParentStreamConfig]
config: Config
options: InitVar[Mapping[str, Any]]

def __post_init__(self, options: Mapping[str, Any]):
Expand All @@ -54,9 +62,10 @@ def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record]
# This method is called after the records are processed.
cursor = {}
for parent_stream_config in self.parent_stream_configs:
slice_value = stream_slice.get(parent_stream_config.stream_slice_field)
stream_slice_field = parent_stream_config.stream_slice_field.eval(self.config)
slice_value = stream_slice.get(stream_slice_field)
if slice_value:
cursor.update({parent_stream_config.stream_slice_field: slice_value})
cursor.update({stream_slice_field: slice_value})
self._cursor = cursor

def get_request_params(
Expand Down Expand Up @@ -100,7 +109,7 @@ def _get_request_option(self, option_type: RequestOptionType, stream_slice: Stre
if stream_slice:
for parent_config in self.parent_stream_configs:
if parent_config.request_option and parent_config.request_option.inject_into == option_type:
key = parent_config.stream_slice_field
key = parent_config.stream_slice_field.eval(self.config)
value = stream_slice.get(key)
if value:
params.update({key: value})
Expand Down Expand Up @@ -129,8 +138,8 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: StreamState) -> Itera
else:
for parent_stream_config in self.parent_stream_configs:
parent_stream = parent_stream_config.stream
parent_field = parent_stream_config.parent_key
stream_state_field = parent_stream_config.stream_slice_field
parent_field = parent_stream_config.parent_key.eval(self.config)
stream_state_field = parent_stream_config.stream_slice_field.eval(self.config)
for parent_stream_slice in parent_stream.stream_slices(sync_mode=sync_mode, cursor_field=None, stream_state=stream_state):
empty_parent_slice = True
parent_slice = parent_stream_slice
Expand All @@ -144,9 +153,14 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: StreamState) -> Itera
parent_record = parent_record.record.data
else:
continue
empty_parent_slice = False
stream_state_value = parent_record.get(parent_field)
yield {stream_state_field: stream_state_value, "parent_slice": parent_slice}

try:
stream_state_value = dpath.util.get(parent_record, parent_field)
except KeyError:
pass
else:
empty_parent_slice = False
yield {stream_state_field: stream_state_value, "parent_slice": parent_slice}
# If the parent slice contains no records,
if empty_parent_slice:
yield from []
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,13 @@ def test_create_substream_slicer():
assert isinstance(parent_stream_configs[0].stream, DeclarativeStream)
assert isinstance(parent_stream_configs[1].stream, DeclarativeStream)

assert stream_slicer.parent_stream_configs[0].parent_key == "id"
assert stream_slicer.parent_stream_configs[0].stream_slice_field == "repository_id"
assert stream_slicer.parent_stream_configs[0].parent_key.eval({}) == "id"
assert stream_slicer.parent_stream_configs[0].stream_slice_field.eval({}) == "repository_id"
assert stream_slicer.parent_stream_configs[0].request_option.inject_into == RequestOptionType.request_parameter
assert stream_slicer.parent_stream_configs[0].request_option.field_name == "repository_id"

assert stream_slicer.parent_stream_configs[1].parent_key == "someid"
assert stream_slicer.parent_stream_configs[1].stream_slice_field == "word_id"
assert stream_slicer.parent_stream_configs[1].parent_key.eval({}) == "someid"
assert stream_slicer.parent_stream_configs[1].stream_slice_field.eval({}) == "word_id"
assert stream_slicer.parent_stream_configs[1].request_option is None


Expand Down Expand Up @@ -865,8 +865,8 @@ def test_custom_components_do_not_contain_extra_fields():
assert isinstance(custom_substream_slicer, TestingCustomSubstreamSlicer)

assert len(custom_substream_slicer.parent_stream_configs) == 1
assert custom_substream_slicer.parent_stream_configs[0].parent_key == "id"
assert custom_substream_slicer.parent_stream_configs[0].stream_slice_field == "repository_id"
assert custom_substream_slicer.parent_stream_configs[0].parent_key.eval({}) == "id"
assert custom_substream_slicer.parent_stream_configs[0].stream_slice_field.eval({}) == "repository_id"
assert custom_substream_slicer.parent_stream_configs[0].request_option.inject_into == RequestOptionType.request_parameter
assert custom_substream_slicer.parent_stream_configs[0].request_option.field_name == "repository_id"

Expand Down Expand Up @@ -911,8 +911,8 @@ def test_parse_custom_component_fields_if_subcomponent():
assert custom_substream_slicer.custom_field == "here"

assert len(custom_substream_slicer.parent_stream_configs) == 1
assert custom_substream_slicer.parent_stream_configs[0].parent_key == "id"
assert custom_substream_slicer.parent_stream_configs[0].stream_slice_field == "repository_id"
assert custom_substream_slicer.parent_stream_configs[0].parent_key.eval({}) == "id"
assert custom_substream_slicer.parent_stream_configs[0].stream_slice_field.eval({}) == "repository_id"
assert custom_substream_slicer.parent_stream_configs[0].request_option.inject_into == RequestOptionType.request_parameter
assert custom_substream_slicer.parent_stream_configs[0].request_option.field_name == "repository_id"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def read_records(
"test_single_parent_slices_no_records",
[
ParentStreamConfig(
stream=MockStream([{}], [], "first_stream"), parent_key="id", stream_slice_field="first_stream_id", options={}
stream=MockStream([{}], [], "first_stream"), parent_key="id", stream_slice_field="first_stream_id", options={}, config={},
)
],
[],
Expand All @@ -76,6 +76,7 @@ def read_records(
parent_key="id",
stream_slice_field="first_stream_id",
options={},
config={},
)
],
[{"first_stream_id": 1, "parent_slice": {}}, {"first_stream_id": 2, "parent_slice": {}}],
Expand All @@ -88,6 +89,7 @@ def read_records(
parent_key="id",
stream_slice_field="first_stream_id",
options={},
config={},
)
],
[
Expand All @@ -104,12 +106,14 @@ def read_records(
parent_key="id",
stream_slice_field="first_stream_id",
options={},
config={},
),
ParentStreamConfig(
stream=MockStream(second_parent_stream_slice, more_records, "second_stream"),
parent_key="id",
stream_slice_field="second_stream_id",
options={},
config={},
),
],
[
Expand All @@ -120,16 +124,42 @@ def read_records(
{"parent_slice": {"slice": "second_parent"}, "second_stream_id": 20},
],
),
(
"test_missed_parent_key",
[
ParentStreamConfig(
stream=MockStream([{}], [{"id": 0}, {"id": 1}, {"_id": 2}, {"id": 3}], "first_stream"),
parent_key="id",
stream_slice_field="first_stream_id",
options={},
config={},
)
],
[{"first_stream_id": 0, "parent_slice": {}}, {"first_stream_id": 1, "parent_slice": {}}, {"first_stream_id": 3, "parent_slice": {}}],
),
(
"test_dpath_extraction",
[
ParentStreamConfig(
stream=MockStream([{}], [{"a": {"b": 0}}, {"a": {"b": 1}}, {"a": {"c": 2}}, {"a": {"b": 3}}], "first_stream"),
parent_key="a/b",
stream_slice_field="first_stream_id",
options={},
config={},
)
],
[{"first_stream_id": 0, "parent_slice": {}}, {"first_stream_id": 1, "parent_slice": {}}, {"first_stream_id": 3, "parent_slice": {}}],
),
],
)
def test_substream_slicer(test_name, parent_stream_configs, expected_slices):
if expected_slices is None:
try:
SubstreamSlicer(parent_stream_configs=parent_stream_configs, options={})
SubstreamSlicer(parent_stream_configs=parent_stream_configs, options={}, config={})
assert False
except ValueError:
return
slicer = SubstreamSlicer(parent_stream_configs=parent_stream_configs, options={})
slicer = SubstreamSlicer(parent_stream_configs=parent_stream_configs, options={}, config={})
slices = [s for s in slicer.stream_slices(SyncMode.incremental, stream_state=None)]
assert slices == expected_slices

Expand All @@ -154,16 +184,18 @@ def test_update_cursor(test_name, stream_slice, expected_state):
parent_key="id",
stream_slice_field="first_stream_id",
options={},
config={},
),
ParentStreamConfig(
stream=MockStream(second_parent_stream_slice, more_records, "second_stream"),
parent_key="id",
stream_slice_field="second_stream_id",
options={},
config={},
),
]

slicer = SubstreamSlicer(parent_stream_configs=parent_stream_name_to_config, options={})
slicer = SubstreamSlicer(parent_stream_configs=parent_stream_name_to_config, options={}, config={})
slicer.update_cursor(stream_slice, None)
updated_state = slicer.get_stream_state()
assert expected_state == updated_state
Expand Down Expand Up @@ -244,17 +276,20 @@ def test_request_option(
parent_key="id",
stream_slice_field="first_stream_id",
options={},
config={},
request_option=parent_stream_request_options[0],
),
ParentStreamConfig(
stream=MockStream(second_parent_stream_slice, more_records, "second_stream"),
parent_key="id",
stream_slice_field="second_stream_id",
options={},
config={},
request_option=parent_stream_request_options[1],
),
],
options={},
config={},
)
stream_slice = {"first_stream_id": "1234", "second_stream_id": "4567"}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,13 @@ def test_create_substream_slicer():
assert len(parent_stream_configs) == 2
assert isinstance(parent_stream_configs[0].stream, DeclarativeStream)
assert isinstance(parent_stream_configs[1].stream, DeclarativeStream)
assert stream_slicer.parent_stream_configs[0].parent_key == "id"
assert stream_slicer.parent_stream_configs[0].stream_slice_field == "repository_id"
assert stream_slicer.parent_stream_configs[0].parent_key.eval({}) == "id"
assert stream_slicer.parent_stream_configs[0].stream_slice_field.eval({}) == "repository_id"
assert stream_slicer.parent_stream_configs[0].request_option.inject_into == RequestOptionType.request_parameter
assert stream_slicer.parent_stream_configs[0].request_option.field_name == "repository_id"

assert stream_slicer.parent_stream_configs[1].parent_key == "someid"
assert stream_slicer.parent_stream_configs[1].stream_slice_field == "word_id"
assert stream_slicer.parent_stream_configs[1].parent_key.eval({}) == "someid"
assert stream_slicer.parent_stream_configs[1].stream_slice_field.eval({}) == "word_id"
assert stream_slicer.parent_stream_configs[1].request_option is None


Expand Down

0 comments on commit 66b8f5a

Please sign in to comment.