Python source cdk does not support incremental sync with a cursor field configured by user #7390
Labels
area/connectors
Connector related issues
CDK
Connector Development Kit
community
frozen
Not being actively worked on
Icebox
team/connectors-python
type/bug
Something isn't working
Enviroment
Current Behavior
Incremental sync with a cursor field configured by the end user does not work with the current implementation of the python cdk.
There seems to be some flows in cursor field implementation in AbstractSource and Stream classes:
The method Stream::supports_incremental() rely on a source defined cursor_field (self.cursor_field) which is unknown in case of user configuration (and not set at configuration time). So the method will always return False in the latter because self.cursor_field will be left empty.
airbyte/airbyte-integrations/bases/base-python/base_python/cdk/streams/core.py
Lines 73 to 81 in 928aa62
If we override supports_incremental to return True, another issue occurs in the Stream::get_update_state() call. When a user select a cursor field in the UI, the configured cursor field is never passed to this method and therefore not accessible in it.
When the initial state is empty, we cannot know which cursor field has been chosen, and so cannot generate the correct new state.
airbyte/airbyte-integrations/bases/base-python/base_python/cdk/abstract_source.py
Lines 135 to 144 in 928aa62
This behavior works for source defined cursor only, as the cursor_field is accessible by referencing the property self.cursor_field defined in the source code of the stream.
Expected Behavior
The implementation of incremental sync must be out of the box, without any need to override python cdk core methods.
Logs
n/a
Steps to Reproduce
Are you willing to submit a PR?
No
Here are some suggestions to correct this behaviour:
--> if False, we return True,
--> if not, we return the result of _wrapped_cursor_field()
airbyte/airbyte-integrations/bases/base-python/base_python/cdk/streams/core.py
Lines 74 to 78 in 928aa62
Add "cursor_field", to the method signature in the class Stream.get_update_state() as it is done on methods read_records() and stream_slices()
airbyte/airbyte-integrations/bases/base-python/base_python/cdk/streams/core.py
Line 150 in 928aa62
Change the method call in AbstractSource::_read_incremental() to pass the "configured_stream.cursor_field"
airbyte/airbyte-integrations/bases/base-python/base_python/cdk/abstract_source.py
Line 144 in 928aa62
NB: The two latter are not backward compatible.
The text was updated successfully, but these errors were encountered: