Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit multiple error trace messages and continue syncs by default #34636

Merged
merged 15 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 32 additions & 13 deletions airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Status,
StreamDescriptor,
SyncMode,
)
from airbyte_cdk.models import Type as MessageType
Expand All @@ -27,6 +28,7 @@
from airbyte_cdk.sources.utils.record_helper import stream_data_to_airbyte_message
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger, SliceLogger
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
from airbyte_cdk.utils.event_timing import create_timer
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
Expand Down Expand Up @@ -133,27 +135,44 @@ def read(
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.COMPLETE)
except AirbyteTracedException as e:
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
if self.continue_sync_on_stream_failure:
stream_name_to_exception[stream_instance.name] = e
else:
raise e
yield e.as_sanitized_airbyte_message(stream_descriptor=StreamDescriptor(name=configured_stream.stream.name))
stream_name_to_exception[stream_instance.name] = e
if self.stop_sync_on_stream_failure:
logger.info(
f"Stopping sync on error from stream {configured_stream.stream.name} because {self.name} does not support continuing syncs on error."
)
break
except Exception as e:
yield from self._emit_queued_messages()
logger.exception(f"Encountered an exception while reading stream {configured_stream.stream.name}")
logger.info(f"Marking stream {configured_stream.stream.name} as STOPPED")
yield stream_status_as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
display_message = stream_instance.get_error_display_message(e)
if display_message:
raise AirbyteTracedException.from_exception(e, message=display_message) from e
raise e
traced_exception = AirbyteTracedException.from_exception(e, message=display_message)
else:
traced_exception = AirbyteTracedException.from_exception(e)
yield traced_exception.as_sanitized_airbyte_message(
stream_descriptor=StreamDescriptor(name=configured_stream.stream.name)
)
stream_name_to_exception[stream_instance.name] = traced_exception
if self.stop_sync_on_stream_failure:
logger.info(f"{self.name} does not support continuing syncs on error from stream {configured_stream.stream.name}")
break
finally:
timer.finish_event()
logger.info(f"Finished syncing {configured_stream.stream.name}")
logger.info(timer.report())

if self.continue_sync_on_stream_failure and len(stream_name_to_exception) > 0:
raise AirbyteTracedException(message=self._generate_failed_streams_error_message(stream_name_to_exception))
if len(stream_name_to_exception) > 0:
error_message = self._generate_failed_streams_error_message(stream_name_to_exception)
logger.info(error_message)
# We still raise at least one exception when a stream raises an exception because the platform
# currently relies on a non-zero exit code to determine if a sync attempt has failed
Copy link
Contributor Author

@brianjlai brianjlai Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😠

raise AirbyteTracedException(message=error_message)
logger.info(f"Finished syncing {self.name}")

@property
Expand Down Expand Up @@ -282,17 +301,17 @@ def message_repository(self) -> Union[None, MessageRepository]:
return _default_message_repository

@property
def continue_sync_on_stream_failure(self) -> bool:
def stop_sync_on_stream_failure(self) -> bool:
Copy link
Contributor Author

@brianjlai brianjlai Feb 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're 👍 w/ this interface name I can remove the warning

"""
WARNING: This function is in-development which means it is subject to change. Use at your own risk.

By default, a source should raise an exception and stop the sync when it encounters an error while syncing a stream. This
method can be overridden on a per-source basis so that a source will continue syncing streams other streams even if an
exception is raised for a stream.
By default, when a source encounters an exception while syncing a stream, it will emit an error trace message and then
continue syncing the next stream. This can be overwridden on a per-source basis so that the source will stop the sync
on the first error seen and emit a single error trace message for that stream.
"""
return False

@staticmethod
def _generate_failed_streams_error_message(stream_failures: Mapping[str, AirbyteTracedException]) -> str:
failures = ", ".join([f"{stream}: {exception.__repr__()}" for stream, exception in stream_failures.items()])
failures = ", ".join([f"{stream}: {filter_secrets(exception.__repr__())}" for stream, exception in stream_failures.items()])
return f"During the sync, the following streams did not sync successfully: {failures}"
17 changes: 16 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/utils/traced_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
AirbyteTraceMessage,
FailureType,
Status,
StreamDescriptor,
TraceType,
)
from airbyte_cdk.models import Type as MessageType
Expand Down Expand Up @@ -43,7 +44,7 @@ def __init__(
self._exception = exception
super().__init__(internal_message)

def as_airbyte_message(self) -> AirbyteMessage:
def as_airbyte_message(self, stream_descriptor: StreamDescriptor = None) -> AirbyteMessage:
"""
Builds an AirbyteTraceMessage from the exception
"""
Expand All @@ -60,6 +61,7 @@ def as_airbyte_message(self) -> AirbyteMessage:
internal_message=self.internal_message,
failure_type=self.failure_type,
stack_trace=stack_trace_str,
stream_descriptor=stream_descriptor,
),
)

Expand Down Expand Up @@ -88,3 +90,16 @@ def from_exception(cls, exc: BaseException, *args, **kwargs) -> "AirbyteTracedEx
:param exc: the exception that caused the error
"""
return cls(internal_message=str(exc), exception=exc, *args, **kwargs) # type: ignore # ignoring because of args and kwargs

def as_sanitized_airbyte_message(self, stream_descriptor: StreamDescriptor = None) -> AirbyteMessage:
"""
Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body
"""
error_message = self.as_airbyte_message(stream_descriptor=stream_descriptor)
if error_message.trace.error.message:
error_message.trace.error.message = filter_secrets(error_message.trace.error.message)
if error_message.trace.error.internal_message:
error_message.trace.error.internal_message = filter_secrets(error_message.trace.error.internal_message)
if error_message.trace.error.stack_trace:
error_message.trace.error.stack_trace = filter_secrets(error_message.trace.error.stack_trace)
return error_message
Loading
Loading