Skip to content

Commit

Permalink
Improve error messages for concurrent CDK (#34754)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxi297 authored and xiaohansong committed Feb 27, 2024
1 parent 3dd39c3 commit f2b3283
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ def _prune_futures(self, futures: List[Future[Any]]) -> None:
if optional_exception:
# Exception handling should be done in the main thread. Hence, we only store the exception and expect the main
# thread to call raise_if_exception
self._most_recently_seen_exception = RuntimeError(f"Failed reading with error: {optional_exception}")
# We do not expect this error to happen. The futures created during concurrent syncs should catch the exception and
# push it to the queue. If this exception occurs, please review the futures and how they handle exceptions.
self._most_recently_seen_exception = RuntimeError(
f"Failed processing a future: {optional_exception}. Please contact the Airbyte team."
)
futures.pop(index)

def shutdown(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,19 @@ def _add_slice_to_state(self, partition: Partition) -> None:
)
elif self._most_recent_record:
if self._has_closed_at_least_one_slice:
# If we track state value using records cursor field, we can only do that if there is one partition. This is because we save
# the state every time we close a partition. We assume that if there are multiple slices, they need to be providing
# boundaries. There are cases where partitions could not have boundaries:
# * The cursor should be per-partition
# * The stream state is actually the parent stream state
# There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for
# state management. For the specific user that was affected with this issue, we need to:
# * Fix state tracking (which is currently broken)
# * Make the new version available
# * (Probably) ask the user to reset the stream to avoid data loss
raise ValueError(
"Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is "
"expected."
"expected. Please contact the Airbyte team."
)

self.state["slices"].append(
Expand Down

0 comments on commit f2b3283

Please sign in to comment.