diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 8c963f344042..75eeccc8cdad 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.4.2 +Low-code: Fix off by one error with the stream slicers + ## 0.4.1 Low-code: Fix a few bugs with the stream slicers diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py index 975d7d4a2ee2..8379bd7fbc03 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/list_stream_slicer.py @@ -43,9 +43,12 @@ def __post_init__(self, options: Mapping[str, Any]): self._cursor = None def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] = None): + # This method is called after the records are processed. slice_value = stream_slice.get(self.cursor_field.eval(self.config)) if slice_value and slice_value in self.slice_values: self._cursor = slice_value + else: + raise ValueError(f"Unexpected stream slice: {slice_value}") def get_stream_state(self) -> StreamState: return {self.cursor_field.eval(self.config): self._cursor} if self._cursor else {} @@ -56,7 +59,8 @@ def get_request_params( stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: - return self._get_request_option(RequestOptionType.request_parameter) + # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response + return self._get_request_option(RequestOptionType.request_parameter, stream_slice) def get_request_headers( self, @@ -64,7 +68,8 @@ def get_request_headers( stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: - return self._get_request_option(RequestOptionType.header) + # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response + return self._get_request_option(RequestOptionType.header, stream_slice) def get_request_body_data( self, @@ -72,7 +77,8 @@ def get_request_body_data( stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: - return self._get_request_option(RequestOptionType.body_data) + # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response + return self._get_request_option(RequestOptionType.body_data, stream_slice) def get_request_body_json( self, @@ -80,13 +86,18 @@ def get_request_body_json( stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: - return self._get_request_option(RequestOptionType.body_json) + # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response + return self._get_request_option(RequestOptionType.body_json, stream_slice) def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) -> Iterable[Mapping[str, Any]]: return [{self.cursor_field.eval(self.config): slice_value} for slice_value in self.slice_values] - def _get_request_option(self, request_option_type: RequestOptionType): - if self.request_option and self.request_option.inject_into == request_option_type and self._cursor: - return {self.request_option.field_name: self._cursor} + def _get_request_option(self, request_option_type: RequestOptionType, stream_slice: StreamSlice): + if self.request_option and self.request_option.inject_into == request_option_type and stream_slice: + slice_value = stream_slice.get(self.cursor_field.eval(self.config)) + if slice_value: + return {self.request_option.field_name: slice_value} + else: + return {} else: return {} diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py index d933e7c8f02f..dbe46d924269 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/substream_slicer.py @@ -51,6 +51,7 @@ def __post_init__(self, options: Mapping[str, Any]): self._options = options def update_cursor(self, stream_slice: StreamSlice, last_record: Optional[Record] = None): + # This method is called after the records are processed. cursor = {} for parent_stream_config in self.parent_stream_configs: slice_value = stream_slice.get(parent_stream_config.stream_slice_field) @@ -64,7 +65,8 @@ def get_request_params( stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: - return self._get_request_option(RequestOptionType.request_parameter) + # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response + return self._get_request_option(RequestOptionType.request_parameter, stream_slice) def get_request_headers( self, @@ -72,7 +74,8 @@ def get_request_headers( stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: - return self._get_request_option(RequestOptionType.header) + # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response + return self._get_request_option(RequestOptionType.header, stream_slice) def get_request_body_data( self, @@ -80,7 +83,8 @@ def get_request_body_data( stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Mapping[str, Any]: - return self._get_request_option(RequestOptionType.body_data) + # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response + return self._get_request_option(RequestOptionType.body_data, stream_slice) def get_request_body_json( self, @@ -88,15 +92,16 @@ def get_request_body_json( stream_slice: Optional[StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None, ) -> Optional[Mapping]: - return self._get_request_option(RequestOptionType.body_json) + # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response + return self._get_request_option(RequestOptionType.body_json, stream_slice) - def _get_request_option(self, option_type: RequestOptionType): + def _get_request_option(self, option_type: RequestOptionType, stream_slice: StreamSlice): params = {} - if self._cursor: + if stream_slice: for parent_config in self.parent_stream_configs: if parent_config.request_option and parent_config.request_option.inject_into == option_type: key = parent_config.stream_slice_field - value = self._cursor.get(key) + value = stream_slice.get(key) if value: params.update({key: value}) return params diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 25ef06d7ca07..adfc9b1c6d43 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.4.1", + version="0.4.2", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py index b1eb1f1d6ed0..eced8061f340 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_cartesian_product_stream_slicer.py @@ -95,9 +95,14 @@ def test_update_cursor(test_name, stream_slice, expected_state): ), ] slicer = CartesianProductStreamSlicer(stream_slicers=stream_slicers, options={}) - slicer.update_cursor(stream_slice, None) - updated_state = slicer.get_stream_state() - assert expected_state == updated_state + + if expected_state: + slicer.update_cursor(stream_slice, None) + updated_state = slicer.get_stream_state() + assert expected_state == updated_state + else: + with pytest.raises(ValueError): + slicer.update_cursor(stream_slice, None) @pytest.mark.parametrize( @@ -169,12 +174,12 @@ def test_request_option( ], options={}, ) - slicer.update_cursor({"owner_resource": "customer", "repository": "airbyte"}, None) + stream_slice = {"owner_resource": "customer", "repository": "airbyte"} - assert expected_req_params == slicer.get_request_params() - assert expected_headers == slicer.get_request_headers() - assert expected_body_json == slicer.get_request_body_json() - assert expected_body_data == slicer.get_request_body_data() + assert expected_req_params == slicer.get_request_params(stream_slice=stream_slice) + assert expected_headers == slicer.get_request_headers(stream_slice=stream_slice) + assert expected_body_json == slicer.get_request_body_json(stream_slice=stream_slice) + assert expected_body_data == slicer.get_request_body_data(stream_slice=stream_slice) def test_request_option_before_updating_cursor(): diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_stream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_stream_slicer.py index b02eb59af795..73a8e8c1465b 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_stream_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_list_stream_slicer.py @@ -51,9 +51,13 @@ def test_list_stream_slicer(test_name, slice_values, cursor_field, expected_slic ) def test_update_cursor(test_name, stream_slice, last_record, expected_state): slicer = ListStreamSlicer(slice_values=slice_values, cursor_field=cursor_field, config={}, options={}) - slicer.update_cursor(stream_slice, last_record) - updated_state = slicer.get_stream_state() - assert expected_state == updated_state + if expected_state: + slicer.update_cursor(stream_slice, last_record) + updated_state = slicer.get_stream_state() + assert expected_state == updated_state + else: + with pytest.raises(ValueError): + slicer.update_cursor(stream_slice, last_record) @pytest.mark.parametrize( @@ -111,11 +115,10 @@ def test_request_option(test_name, request_option, expected_req_params, expected slicer = ListStreamSlicer(slice_values=slice_values, cursor_field=cursor_field, config={}, request_option=request_option, options={}) stream_slice = {cursor_field: "customer"} - slicer.update_cursor(stream_slice) - assert expected_req_params == slicer.get_request_params(stream_slice) - assert expected_headers == slicer.get_request_headers() - assert expected_body_json == slicer.get_request_body_json() - assert expected_body_data == slicer.get_request_body_data() + assert expected_req_params == slicer.get_request_params(stream_slice=stream_slice) + assert expected_headers == slicer.get_request_headers(stream_slice=stream_slice) + assert expected_body_json == slicer.get_request_body_json(stream_slice=stream_slice) + assert expected_body_data == slicer.get_request_body_data(stream_slice=stream_slice) def test_request_option_before_updating_cursor(): diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_substream_slicer.py b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_substream_slicer.py index 3f980ce94a83..5de520b40d5e 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_substream_slicer.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/stream_slicers/test_substream_slicer.py @@ -256,9 +256,9 @@ def test_request_option( ], options={}, ) - slicer.update_cursor({"first_stream_id": "1234", "second_stream_id": "4567"}, None) + stream_slice = {"first_stream_id": "1234", "second_stream_id": "4567"} - assert expected_req_params == slicer.get_request_params() - assert expected_headers == slicer.get_request_headers() - assert expected_body_json == slicer.get_request_body_json() - assert expected_body_data == slicer.get_request_body_data() + assert expected_req_params == slicer.get_request_params(stream_slice=stream_slice) + assert expected_headers == slicer.get_request_headers(stream_slice=stream_slice) + assert expected_body_json == slicer.get_request_body_json(stream_slice=stream_slice) + assert expected_body_data == slicer.get_request_body_data(stream_slice=stream_slice)