-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🐛 CDK: fix bug with limit parameter for incremental stream #5833
Changes from 6 commits
b6578ae
a04fcd8
16d1233
bc9b01d
50c00b0
21097c3
18f4830
4aa578c
1713689
c7a3867
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -137,29 +137,40 @@ def _read_stream( | |
|
||
use_incremental = configured_stream.sync_mode == SyncMode.incremental and stream_instance.supports_incremental | ||
if use_incremental: | ||
record_iterator = self._read_incremental(logger, stream_instance, configured_stream, connector_state) | ||
record_iterator = self._read_incremental(logger, stream_instance, configured_stream, connector_state, internal_config) | ||
else: | ||
record_iterator = self._read_full_refresh(stream_instance, configured_stream) | ||
record_iterator = self._read_full_refresh(stream_instance, configured_stream, internal_config) | ||
|
||
record_counter = 0 | ||
stream_name = configured_stream.stream.name | ||
logger.info(f"Syncing stream: {stream_name} ") | ||
for record in record_iterator: | ||
if record.type == MessageType.RECORD: | ||
if internal_config.limit and record_counter >= internal_config.limit: | ||
logger.info(f"Reached limit defined by internal config ({internal_config.limit}), stop reading") | ||
break | ||
record_counter += 1 | ||
yield record | ||
|
||
logger.info(f"Read {record_counter} records from {stream_name} stream") | ||
|
||
@staticmethod | ||
def _limit_reached(internal_config: InternalConfig, records_counter: int) -> bool: | ||
""" | ||
Check if record count reached liimt set by internal config. | ||
:param internal_config - internal CDK configuration separated from user defined config | ||
:records_counter - number of records already red | ||
:return True if limit reached, False otherwise | ||
""" | ||
if internal_config.limit: | ||
if records_counter >= internal_config.limit: | ||
return True | ||
return False | ||
|
||
def _read_incremental( | ||
self, | ||
logger: AirbyteLogger, | ||
stream_instance: Stream, | ||
configured_stream: ConfiguredAirbyteStream, | ||
connector_state: MutableMapping[str, Any], | ||
internal_config: InternalConfig, | ||
) -> Iterator[AirbyteMessage]: | ||
stream_name = configured_stream.stream.name | ||
stream_state = connector_state.get(stream_name, {}) | ||
|
@@ -170,31 +181,40 @@ def _read_incremental( | |
slices = stream_instance.stream_slices( | ||
cursor_field=configured_stream.cursor_field, sync_mode=SyncMode.incremental, stream_state=stream_state | ||
) | ||
total_records_counter = 0 | ||
for slice in slices: | ||
record_counter = 0 | ||
records = stream_instance.read_records( | ||
sync_mode=SyncMode.incremental, | ||
stream_slice=slice, | ||
stream_state=stream_state, | ||
cursor_field=configured_stream.cursor_field or None, | ||
) | ||
for record_data in records: | ||
record_counter += 1 | ||
for record_counter, record_data in enumerate(records, start=1): | ||
yield self._as_airbyte_record(stream_name, record_data) | ||
stream_state = stream_instance.get_updated_state(stream_state, record_data) | ||
if checkpoint_interval and record_counter % checkpoint_interval == 0: | ||
yield self._checkpoint_state(stream_name, stream_state, connector_state, logger) | ||
|
||
total_records_counter += 1 | ||
if self._limit_reached(internal_config, total_records_counter): | ||
break | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so we still going to read all slices, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
|
||
yield self._checkpoint_state(stream_name, stream_state, connector_state, logger) | ||
|
||
def _read_full_refresh(self, stream_instance: Stream, configured_stream: ConfiguredAirbyteStream) -> Iterator[AirbyteMessage]: | ||
def _read_full_refresh( | ||
self, stream_instance: Stream, configured_stream: ConfiguredAirbyteStream, internal_config: InternalConfig | ||
) -> Iterator[AirbyteMessage]: | ||
slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field) | ||
total_records_counter = 0 | ||
for slice in slices: | ||
records = stream_instance.read_records( | ||
stream_slice=slice, sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field | ||
) | ||
for record in records: | ||
yield self._as_airbyte_record(configured_stream.stream.name, record) | ||
total_records_counter += 1 | ||
if self._limit_reached(internal_config, total_records_counter): | ||
break | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
|
||
def _checkpoint_state(self, stream_name, stream_state, connector_state, logger): | ||
logger.info(f"Setting state of {stream_name} stream to {stream_state}") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we still need this because we might have limit > size(slice)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got you idea, but don't think its good to put this condition back, check out my proposal (updated PR)