-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kafka-python #1744 Attempt 2 at race conditions with IFR #1766
kafka-python #1744 Attempt 2 at race conditions with IFR #1766
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments inline. I took a stab at resolving some of these issues in #1768 - what do you think?
@@ -617,7 +617,7 @@ def _poll(self, timeout): | |||
conn = key.data | |||
processed.add(conn) | |||
|
|||
if not conn.in_flight_requests: | |||
if not conn.has_in_flight_requests(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do these types of reads require additional locking? Perhaps naive, but my understanding is that a boolean check on a dict is atomic (via single CALL_FUNCTION opcode):
>>> import dis
>>> n = {'a': 1, 'b': 2}
>>> def foo():
... bool(n)
...
>>> dis.dis(foo)
2 0 LOAD_GLOBAL 0 (bool)
2 LOAD_GLOBAL 1 (n)
4 CALL_FUNCTION 1
6 POP_TOP
8 LOAD_CONST 0 (None)
10 RETURN_VALUE
@@ -273,7 +273,7 @@ def __init__(self, host, port, afi, **configs): | |||
# per-connection locks to the upstream client, we will use this lock to | |||
# make sure that access to the protocol buffer is synchronized | |||
# when sends happen on multiple threads | |||
self._lock = threading.Lock() | |||
self._lock = threading.RLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can avoid the RLock if we call lock acquire() and release() more strategically, and not rely exclusively on the context manager -- particularly when we want to release the lock before processing an exception
if selector is not None: | ||
selector.close() | ||
selector = None | ||
with self._lock: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine skipping the locking here for now -- we only use this method in check_version()
, which is only called during initialization, and I think we can just make sure that always has the client lock.
|
||
def connect(self): | ||
with self._lock: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this makes sense long term, but for now I think we can rely on connect being synchronized via the KafkaClient lock (in _maybe_connect
)
if self.state is ConnectionStates.DISCONNECTED: | ||
if error is not None: | ||
log.warning('%s: Duplicate close() with error: %s', self, error) | ||
self._fail_ifrs(error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is your thinking on this one? If we take care to always drain here and never add more ifrs while the state is disconnected, can't we assume that ifrs will also always be empty here?
return | ||
log.info('%s: Closing connection. %s', self, error or '') | ||
self.state = ConnectionStates.DISCONNECTING | ||
self.config['state_change_callback'](self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there may be some deadlock issues wrt this callback because it requires the client lock:
thread A acquires conn._lock
thread B acquires client._lock
thread A calls conn_state_change, blocks waiting for client._lock
thread B calls conn.close() or conn.send() etc and blocks waiting for conn._lock
=> deadlock
self._protocol = KafkaProtocol( | ||
client_id=self.config['client_id'], | ||
api_version=self.config['api_version']) | ||
self._fail_ifrs(error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want to release the conn lock before processing callbacks here
I merged #1768, which uses a non-reentrant lock. I'm planning to merge #1775 as well and I think that should cover all of concurrency issues I found on review. If you're able to test these changes in your setup, I would love that. We're planning to push out a patch release quickly. Thanks again for all your excellent work on this issue. Fantastic bug report, debugging, and PRs. We really appreciate it, and I hope 1.4.6 will not fail you! |
Attempt 2 (after #1757) at fixing #1744 with more aggresive (and reentrant) locking.
I expect this to be less performant, especially on Python 2.x, but it's safer.
@dpkp I tried to avoid using RLock but then just gave up as due to avoiding problems with callbacks (particularly during authorization), the size and complexity of it was quickly getting out of hand.
This change is