-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unblock waiting threads sooner #975
Conversation
_log.info("Thread {} will wait for notification from the event thread for {} ms.", | ||
Thread.currentThread().getName(), duration.toMillis()); | ||
this.wait(duration.toMillis()); | ||
try { | ||
while (!_shutdown) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't _shutdown always true at this point as it is being set in the same thread in the caller method ?
// attempting to acquire the Coordinator object. We never halt the event thread (coordinator thread) | ||
// explicitly via this CV. | ||
_log.info("Thread {} will wait for notification from the event thread for {} ms.", | ||
Thread.currentThread().getName(), duration.toMillis()); | ||
this.wait(duration.toMillis()); | ||
while (!_notified) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is worry some if event thread does not get free for 60 mins, we will keep be waiting which will trigger timeout on the deployment framework.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True but what alternate choice do we have ? Hopefully, this doesn’t happen often. Is the heaviest operation partition assignment ? If it is, then it should now take < 10 secs on an average ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in this case, I would just have one time wait, something like
if (!_notified) {
this.wait(duration.toMillis());
}
bcs, all we want is to let the evenThread enter into the handleEvent, we added all this logic for that purpose only, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you suggest changing while to if condition ? While is there to handle other spurious signals and not via notify all
@@ -219,6 +219,7 @@ public class Coordinator implements ZkAdapter.ZkAdapterListener, MetricsAware { | |||
private final Map<String, SerdeAdmin> _serdeAdmins = new HashMap<>(); | |||
private final Map<String, Authorizer> _authorizers = new HashMap<>(); | |||
private volatile boolean _shutdown = false; | |||
private volatile boolean _notified = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename to coordinatorStopped or some such. Notified sounds too generic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
// attempting to acquire the Coordinator object. We never halt the event thread (coordinator thread) | ||
// explicitly via this CV. | ||
_log.info("Thread {} will wait for notification from the event thread for {} ms.", | ||
Thread.currentThread().getName(), duration.toMillis()); | ||
this.wait(duration.toMillis()); | ||
while (!_notified) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True but what alternate choice do we have ? Hopefully, this doesn’t happen often. Is the heaviest operation partition assignment ? If it is, then it should now take < 10 secs on an average ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
@@ -219,6 +219,9 @@ public class Coordinator implements ZkAdapter.ZkAdapterListener, MetricsAware { | |||
private final Map<String, SerdeAdmin> _serdeAdmins = new HashMap<>(); | |||
private final Map<String, Authorizer> _authorizers = new HashMap<>(); | |||
private volatile boolean _shutdown = false; | |||
// TODO we have _shutdown, eventThread and now _handleEventCompleted, for some distinct usage, | |||
// we should revisit and refactor to have less variation | |||
private volatile boolean _handleEventCompleted = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: instead of _handleEventCompleted
can we name this variable to something like, _coordinatorEventThreadExiting
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought about this back and forth, ideally we just need to wait for the handleEvent to get the lock on the coordinator object, once we are out of it, zk expiry or main thread can get another chance to acquire lock. coordintorEventThread exists only after we get out of the run method, which may not be the case.
nothing will be exactly certain here, so tried to came up with the one which is more closer to the logic.
currently, during the shutdown, we always wait 60seconds before we unblock other waiting threads(zk callback thread or main threads) on coordinator object, this was being added as part of #964, our expectation was the intrinsic CV will help us with notifying blocking threads sooner, but
notifyAll
was not mentioned at the right place, which was making other threads to be blocked on for 60 seconds always.This PR uses explicit state variable for intrinsic CV, checks the new state variable
_handleEventCompleted
and wait only if false. From evenThread run method, we update the state variable and call notifyAll() outside the while loop; when shutdown is true. This will help blocking thread to unblock and have a chance to acquire the lock on the coordinator object and perform the clean shutdown.