From 6629ce30367042cbe4826862f772f1491ca6a8e8 Mon Sep 17 00:00:00 2001 From: Maxime Carbonneau-Leclerc <3360483+maxi297@users.noreply.github.com> Date: Mon, 5 Feb 2024 13:05:41 -0500 Subject: [PATCH] Improve error messages for concurrent CDK (#34754) --- .../sources/concurrent_source/thread_pool_manager.py | 6 +++++- .../airbyte_cdk/sources/streams/concurrent/cursor.py | 12 +++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py b/airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py index 04508b3ff1f1..560989af0a6c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py @@ -64,7 +64,11 @@ def _prune_futures(self, futures: List[Future[Any]]) -> None: if optional_exception: # Exception handling should be done in the main thread. Hence, we only store the exception and expect the main # thread to call raise_if_exception - self._most_recently_seen_exception = RuntimeError(f"Failed reading with error: {optional_exception}") + # We do not expect this error to happen. The futures created during concurrent syncs should catch the exception and + # push it to the queue. If this exception occurs, please review the futures and how they handle exceptions. + self._most_recently_seen_exception = RuntimeError( + f"Failed processing a future: {optional_exception}. Please contact the Airbyte team." + ) futures.pop(index) def shutdown(self) -> None: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py index e63358b715d5..82d11318f5ea 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -153,9 +153,19 @@ def _add_slice_to_state(self, partition: Partition) -> None: ) elif self._most_recent_record: if self._has_closed_at_least_one_slice: + # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save + # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing + # boundaries. There are cases where partitions could not have boundaries: + # * The cursor should be per-partition + # * The stream state is actually the parent stream state + # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for + # state management. For the specific user that was affected with this issue, we need to: + # * Fix state tracking (which is currently broken) + # * Make the new version available + # * (Probably) ask the user to reset the stream to avoid data loss raise ValueError( "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is " - "expected." + "expected. Please contact the Airbyte team." ) self.state["slices"].append(