-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Emit multiple error trace messages and continue syncs by default #34636
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
…e non-zero error code on exception
@@ -282,17 +303,17 @@ def message_repository(self) -> Union[None, MessageRepository]: | |||
return _default_message_repository | |||
|
|||
@property | |||
def continue_sync_on_stream_failure(self) -> bool: | |||
def stop_sync_on_stream_failure(self) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're 👍 w/ this interface name I can remove the warning
@@ -343,7 +343,7 @@ def test_concurrent_source_yields_the_same_messages_as_abstract_source_when_an_e | |||
source, concurrent_source = _init_sources([stream_slice_to_partition], state, logger) | |||
config = {} | |||
catalog = _create_configured_catalog(source._streams) | |||
messages_from_abstract_source = _read_from_source(source, logger, config, catalog, state, RuntimeError) | |||
messages_from_abstract_source = _read_from_source(source, logger, config, catalog, state, AirbyteTracedException) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is one area of deviation between the concurrent and regular abstract source. The behavior in abstract_source.py
catches exceptions and emits trace errors. And at the end if at least one exception was found, it raises a final AirbyteTracedException
that will make sure the process ends w/ a non-zero error code.
As far as I can tell, the concurrent just emits the same exception the source implementation emits. I am curious, but does that mean that a failure on one-stream in concurrent will derail all the concurrently running steams/slices?
I'm not entirely sure what parity should look like because the last error message for the regular abstract source is a summary of prior errors instead of a specific one. How do we feel about leaving this functionality outside of concurrent and filing a ticket to revisit this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
your understanding is correct. Can we write a follow up issue for concurrent? I think it would be good to implement this as well
error_message = self._generate_failed_streams_error_message(stream_name_to_exception) | ||
logger.info(error_message) | ||
# We still raise at least one exception when a stream raises an exception because the platform | ||
# currently relies on a non-zero exit code to determine if a sync attempt has failed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😠
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅ with a request for a follow up for the concurrent cdk
@@ -343,7 +343,7 @@ def test_concurrent_source_yields_the_same_messages_as_abstract_source_when_an_e | |||
source, concurrent_source = _init_sources([stream_slice_to_partition], state, logger) | |||
config = {} | |||
catalog = _create_configured_catalog(source._streams) | |||
messages_from_abstract_source = _read_from_source(source, logger, config, catalog, state, RuntimeError) | |||
messages_from_abstract_source = _read_from_source(source, logger, config, catalog, state, AirbyteTracedException) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
your understanding is correct. Can we write a follow up issue for concurrent? I think it would be good to implement this as well
Closes #34183
What
Adds new behavior to the abstract source inherited by most connectors so that if one stream raises an exception, subsequent streams will continue to get synced. This is the new default behavior, but connectors can opt out via the
stop_sync_on_stream_failure
property on `AbstractSource.This change is also necessary to fix Sentry error reporting logic that wouldn't work when the final error reported was a summary of previously seen stream exceptions.
How
The implementation is pretty straightforward in that every exception raised by a stream will yield one error trace message. We also collect these exceptions and include them in a final exception that is thrown at the end of the sync if any exceptions were found. This is important because the connector must end with a non-zero status code or the platform will not recognize the sync as failed.
I also got rid of all the original raise exception flow from the legacy behavior so that there is consistency in the pattern we emit error traces messages (whether it be 1 or many) regardless of the value of the
stop_sync_on_stream_failure
.This has been tested using various pre-release versions of
source-klaviyo
on Cloud and verifying that syncs continue onto other streams and that the first stream error that is received gets propagated to Sentry so we retain the grouping behavior.Sentry Dashboard w/ errors caught: https://airbytehq.sentry.io/issues/?query=is%3Aunresolved+klaviyo+connector_name%3AKlaviyo&referrer=issue-list&statsPeriod=30d
Recommended reading order
abstract_source.py
traced_exception.py
🚨 User Impact 🚨
Are there any breaking changes? What is the end result perceived by the user?
This does change the potential amount of messages emitted by a connector, especially if an existing connector and sync has many failing streams. However, beyond that, this change is additive. I will also include a follow up PR to fix
source-klaviyo
to remove the temporarycontinue_sync_on_stream_failure()
method used for the original short term fix.