-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[low-code connectors] Add request options and state to stream slicers #14552
Changes from all commits
51fc8eb
efc40b5
e33e004
fc41d54
e2ef317
b8faa8d
6f96b26
d864b01
d209dd6
04fca0d
0301e9a
0367f0f
994d125
f3a0af4
2140c28
77a0325
ecbfcf3
5bda8f8
1521d40
9f170ba
d8ca4f3
0f68ae1
67995c7
233b805
9803e59
a23dcdc
950d8fd
9de8ccf
1ddb4ad
f507f91
b510b17
d03c809
4444c4e
6471b15
b8b3c3a
71fa551
e0da35d
f7bc427
0fca96c
04fc03e
dda7b9c
a35c9bc
9862b4b
ad1cce6
15c3f4e
eca2f45
5a756bb
9b289cb
fff16ed
918e31a
82fb6e8
98e17eb
9ab1008
2a6c9d2
8a72ac0
73279d7
181fb74
104ec1b
cf17cae
08a3663
764eca0
34f3a77
5cde974
573dd90
efdca4a
45df5d9
16e0287
7432ff7
86f7c6c
9ae77f3
64fbdff
b44aff2
0c09220
21d550d
07eecea
31517aa
124b68b
725b8f9
996024f
5b8d5a8
8120f00
1a6a943
9bf3808
db47529
9e5fddc
7aae31d
6656af3
1a49c95
d30aa5b
a525afe
64750ec
f3fc604
2d2ef71
8b422c7
73b47b1
f97cfee
18e562a
2d1312e
a0ab86b
d4c7db1
32e34dc
187858f
bbc8cf5
4647033
a201c3f
198c1d2
0dbe4a9
43a3d69
6cec644
09c43f7
d996baa
47a56d8
22521bf
6b77952
598c36a
72dc105
2d291b6
403521d
7673b52
7bdc243
ee7de21
58d154a
a00c6a8
129cea0
a3f708b
b9ab1f2
fb93593
2188b7d
cea2206
f30c33d
1b266fb
e4a26e0
e80b543
03990a1
19ddda8
ed11332
91443ed
da663a1
bda51cf
d94da00
2f54949
3987c72
3dc3422
fae07e3
c5abb0a
d33c6e6
923da6a
4794669
cce9b59
f63a4b0
864872d
0319dec
f6e4054
831e32d
6851be6
f08ac16
8be4a09
b16758c
ffefeb6
7b51d06
2ca69f1
b8326c0
52656e2
cfc6da9
880dc19
00d6bfb
a547040
2023614
69e99af
63188b8
c0808c8
227e537
e0e7dd2
68f5dfe
870ff1b
c6d0e73
6ed0c9f
a624f2f
2ba2a6f
4124210
256cce2
2d08a8c
19d4808
87951bc
3bc44ba
27add18
ea5010d
16f21eb
c7071ee
bb770ec
319da0e
964d9c6
139a388
ef6a3bf
a901299
4b0aeb2
d34563f
8be926b
cda82eb
7fc6ab2
a447ef6
ad5d88a
91b106d
b41f713
245ebc5
77b8cf5
d919e06
4489ba4
1fd3c91
71d5c0c
cf465ab
56aad5e
a594861
134cf0b
5b4a66a
25ab261
acf4c67
fcd7d85
6195d1c
16a4130
76031cb
1e8d64c
2a17a9b
eb9a918
ce14cd9
fca5c26
e2b5903
8fc3f5b
72bd263
adfd416
46d9ae1
2d57df3
dcb6daf
c778730
31f4c79
45a0bfb
4883766
0440860
020f287
bd38299
6b4b405
e06a808
fb6f49c
edaa436
c516c8f
9419b15
b4a2ffb
909e381
d8f5c1d
f488f3b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,14 +20,13 @@ class RequestOptionsProvider(ABC): | |
""" | ||
|
||
@abstractmethod | ||
def request_params( | ||
self, stream_state: StreamState, stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None | ||
) -> MutableMapping[str, Any]: | ||
def request_params(self, **kwargs) -> MutableMapping[str, Any]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the stream slicer and paginator don't need these extra params |
||
""" | ||
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. | ||
|
||
E.g: you might want to define query parameters for paging if next_page_token is not None. | ||
""" | ||
pass | ||
|
||
@abstractmethod | ||
def request_headers( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,8 +13,6 @@ | |
from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator | ||
from airbyte_cdk.sources.declarative.requesters.requester import Requester | ||
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever | ||
from airbyte_cdk.sources.declarative.states.dict_state import DictState | ||
from airbyte_cdk.sources.declarative.states.state import State | ||
from airbyte_cdk.sources.declarative.stream_slicers.single_slice import SingleSlice | ||
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer | ||
from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState | ||
|
@@ -42,7 +40,6 @@ def __init__( | |
record_selector: HttpSelector, | ||
paginator: Optional[Paginator] = None, | ||
stream_slicer: Optional[StreamSlicer] = SingleSlice(), | ||
state: Optional[State] = None, | ||
): | ||
""" | ||
:param name: The stream's name | ||
|
@@ -59,8 +56,7 @@ def __init__( | |
self._requester = requester | ||
self._record_selector = record_selector | ||
super().__init__(self._requester.get_authenticator()) | ||
self._iterator = stream_slicer | ||
self._state: State = (state or DictState()).deep_copy() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the stream slicer owns the state now |
||
self._stream_slicer = stream_slicer | ||
self._last_response = None | ||
self._last_records = None | ||
|
||
|
@@ -300,12 +296,14 @@ def read_records( | |
stream_state: Optional[StreamState] = None, | ||
) -> Iterable[Mapping[str, Any]]: | ||
# Warning: use self.state instead of the stream_state passed as argument! | ||
stream_slice = stream_slice or {} # None-check | ||
records_generator = HttpStream.read_records(self, sync_mode, cursor_field, stream_slice, self.state) | ||
for r in records_generator: | ||
self._state.update_state(stream_slice=stream_slice, stream_state=self.state, last_response=self._last_response, last_record=r) | ||
self._stream_slicer.update_cursor(stream_slice, last_record=r) | ||
yield r | ||
else: | ||
self._state.update_state(stream_slice=stream_slice, stream_state=self.state, last_reponse=self._last_response) | ||
last_record = self._last_records[-1] if self._last_records else None | ||
self._stream_slicer.update_cursor(stream_slice, last_record=last_record) | ||
yield from [] | ||
|
||
def stream_slices( | ||
|
@@ -320,13 +318,13 @@ def stream_slices( | |
:return: | ||
""" | ||
# Warning: use self.state instead of the stream_state passed as argument! | ||
return self._iterator.stream_slices(sync_mode, self.state) | ||
return self._stream_slicer.stream_slices(sync_mode, self.state) | ||
|
||
@property | ||
def state(self) -> StreamState: | ||
return self._state.get_stream_state() | ||
def state(self) -> MutableMapping[str, Any]: | ||
return self._stream_slicer.get_stream_state() | ||
|
||
@state.setter | ||
def state(self, value: StreamState): | ||
"""State setter, accept state serialized by state getter.""" | ||
self._state.set_state(value) | ||
self._stream_slicer.update_cursor(value) |
This file was deleted.
This file was deleted.
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,7 @@ | |
|
||
import itertools | ||
from collections import ChainMap | ||
from typing import Any, Iterable, List, Mapping | ||
from typing import Any, Iterable, List, Mapping, Optional | ||
|
||
from airbyte_cdk.models import SyncMode | ||
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer | ||
|
@@ -28,8 +28,34 @@ class CartesianProductStreamSlicer(StreamSlicer): | |
""" | ||
|
||
def __init__(self, stream_slicers: List[StreamSlicer]): | ||
""" | ||
:param stream_slicers: Underlying stream slicers. The RequestOptions (e.g: Request headers, parameters, etc..) returned by this slicer are the combination of the RequestOptions of its input slicers. If there are conflicts e.g: two slicers define the same header or request param, the conflict is resolved by taking the value from the first slicer, where ordering is determined by the order in which slicers were input to this composite slicer. | ||
""" | ||
self._stream_slicers = stream_slicers | ||
|
||
def update_cursor(self, stream_slice: Mapping[str, Any], last_record: Optional[Mapping[str, Any]] = None): | ||
for slicer in self._stream_slicers: | ||
slicer.update_cursor(stream_slice, last_record) | ||
|
||
def request_params(self) -> Mapping[str, Any]: | ||
return dict(ChainMap(*[s.request_params() for s in self._stream_slicers])) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TIL about ChainMap |
||
|
||
def request_headers(self) -> Mapping[str, Any]: | ||
return dict(ChainMap(*[s.request_headers() for s in self._stream_slicers])) | ||
|
||
def request_body_data(self) -> Mapping[str, Any]: | ||
return dict(ChainMap(*[s.request_body_data() for s in self._stream_slicers])) | ||
|
||
def request_body_json(self) -> Optional[Mapping]: | ||
return dict(ChainMap(*[s.request_body_json() for s in self._stream_slicers])) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should there be an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated the type hints to make it clear the methods should returns empty |
||
|
||
def request_kwargs(self) -> Mapping[str, Any]: | ||
# Never update kwargs | ||
return {} | ||
|
||
def get_stream_state(self) -> Mapping[str, Any]: | ||
return dict(ChainMap(*[slicer.get_stream_state() for slicer in self._stream_slicers])) | ||
|
||
def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]: | ||
sub_slices = (s.stream_slices(sync_mode, stream_state) for s in self._stream_slicers) | ||
return (ChainMap(*a) for a in itertools.product(*sub_slices)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the stream slicer now owns the state