-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🐛 CDK: fix bug with limit parameter for incremental stream #5833
Conversation
08dfd98
to
b6578ae
Compare
|
||
record_counter = 0 | ||
stream_name = configured_stream.stream.name | ||
logger.info(f"Syncing stream: {stream_name} ") | ||
for record in record_iterator: | ||
if record.type == MessageType.RECORD: | ||
if internal_config.limit and record_counter >= internal_config.limit: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we still need this because we might have limit > size(slice)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got you idea, but don't think its good to put this condition back, check out my proposal (updated PR)
@@ -184,17 +182,23 @@ def _read_incremental( | |||
stream_state = stream_instance.get_updated_state(stream_state, record_data) | |||
if checkpoint_interval and record_counter % checkpoint_interval == 0: | |||
yield self._checkpoint_state(stream_name, stream_state, connector_state, logger) | |||
if internal_config.limit and record_counter >= internal_config.limit: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we don't need it here because of comment above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still need it, check out updated PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seem comments
yield self._as_airbyte_record(configured_stream.stream.name, record) | ||
if internal_config.limit and count + 1 >= internal_config.limit: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
records = stream_instance.read_records( | ||
sync_mode=SyncMode.incremental, | ||
stream_slice=slice, | ||
stream_state=stream_state, | ||
cursor_field=configured_stream.cursor_field or None, | ||
) | ||
for record_data in records: | ||
record_counter += 1 | ||
for record_counter, record_data in enumerate(records): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you know you can do start=1, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, but now yes :)
records = stream_instance.read_records( | ||
sync_mode=SyncMode.incremental, | ||
stream_slice=slice, | ||
stream_state=stream_state, | ||
cursor_field=configured_stream.cursor_field or None, | ||
) | ||
for record_data in records: | ||
record_counter += 1 | ||
for record_counter, record_data in enumerate(records, 1): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for record_counter, record_data in enumerate(records, 1): | |
for record_counter, record_data in enumerate(records, start=1): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
yield self._as_airbyte_record(stream_name, record_data) | ||
stream_state = stream_instance.get_updated_state(stream_state, record_data) | ||
if checkpoint_interval and record_counter % checkpoint_interval == 0: | ||
yield self._checkpoint_state(stream_name, stream_state, connector_state, logger) | ||
if internal_config.limit: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it feels like a code smell to duplicate this logic twice. Can't we put it in the calling method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
yield self._as_airbyte_record(stream_name, record_data) | ||
stream_state = stream_instance.get_updated_state(stream_state, record_data) | ||
if checkpoint_interval and record_counter % checkpoint_interval == 0: | ||
yield self._checkpoint_state(stream_name, stream_state, connector_state, logger) | ||
|
||
total_records_counter += 1 | ||
if self._limit_reached(internal_config, total_records_counter): | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so we still going to read all slices, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
for slice in slices: | ||
records = stream_instance.read_records( | ||
stream_slice=slice, sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field | ||
) | ||
for record in records: | ||
yield self._as_airbyte_record(configured_stream.stream.name, record) | ||
total_records_counter += 1 | ||
if self._limit_reached(internal_config, total_records_counter): | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see comments
…-fix-limit-for-incremental
What
Fix #5832
How
Describe the solution
Recommended reading order
x.java
y.python
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
docs/SUMMARY.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing./publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing./publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changes