-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix concurrent deadlock #34454
Fix concurrent deadlock #34454
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
...-cdk/python/airbyte_cdk/sources/concurrent_source/partition_generation_completed_sentinel.py
Outdated
Show resolved
Hide resolved
airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py
Outdated
Show resolved
Hide resolved
concurrent_stream_processor: ConcurrentReadProcessor, | ||
) -> Iterable[AirbyteMessage]: | ||
while airbyte_message_or_record_or_exception := queue.get(): | ||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tests were very minimal on this class and adding this did not break anything. Given the urgency, I'll ask for a review right now but I'll go back and add some tests to make sure we document the behavior of this class
|
||
@staticmethod | ||
def _get_priority(value) -> int: | ||
# The order of the `isinstance` is a bit funky but it is order in terms of which QueueItem we expect to see the most |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this raise an exception on unexpected type to ensure we don't forget to update the method if we ever add a new one?
Alternative would be to set a default priority.
edit: I like this test with the caveat that the current implement actually returns an Optional[int]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the test as you were commenting.
For the link, it seems like it is pointing to a blank like. Can you be more explicit about which method return Optional[int]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_get_priority
does not return anything if the value is not of the expected type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's raising an exception though right? So the return type is indeed just int
and not Optional[int]
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see where the method is raising an exception. As far as I can tell, the check relies on the unit test to verify all types of QueueItem
s are handled. is that what you mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about this line?
airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/concurrent_source.py
Outdated
Show resolved
Hide resolved
airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py
Outdated
Show resolved
Hide resolved
airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @maxi297! LGTM. Just one logging request. Also would you mind putting your diagram in the PR description?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉 🙏
What
Addresses the second issue from https://github.com/airbytehq/oncall/issues/3602
How
Adding priority on the queue so that records are processed faster and hence free more memorywe did not add a priority queue because of the complexity it adds. Indeed, theConcurrentReadProcessor
assume a certain sequence of event which is not true if the order changeHere is the diagram of the possible cases in terms of what populates the queue and what populates the list of futures:
Recommended reading order
x.java
y.python
🚨 User Impact 🚨
We should not see the case where the source hangs and dies on
_queue.Empty
. My personal connection on https://cloud.airbyte.com/workspaces/8ba9a84d-ef49-41fb-9aaa-9f75fb0f351e/connections/bf6ede5c-e346-4489-a965-63a381eb3c30 is working now.TODO
Re-release stripe with this new CDK version