-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HBASE-24813 ReplicationSource should clear buffer usage on Replicatio… #2546
Conversation
…nSourceManager upon termination (rebased after HBASE-25117)
This comment has been minimized.
This comment has been minimized.
...server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Outdated
Show resolved
Hide resolved
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
...src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple of comments.
} catch (InterruptedException e) { | ||
LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch: {}", | ||
this.source.getPeerId(), this.getName(), e); | ||
Thread.currentThread().interrupt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't be just INFO? Also, I think it might be better tho handle those InterruptedException inside ReplicationSource.terminate()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left as WARN because it aborts the flow without effectively updating the buffer usage, which is the fundamental issue we are trying to solve here.
for (ReplicationSourceShipper worker : workers) { | ||
worker.stopWorker(); | ||
if (worker.entryReader != null) { | ||
worker.entryReader.setReaderRunning(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a worker is doing some async work when it is asked to stop and can take time. then I think we should keep the implementation as it was done before, like ask all to stop at once and then wait. because if no. of workers gets large due to backlog and someone changes wait time config to 10s of seconds, then removePeer command/procedure has to wait for a long time (no. of workers * (sleep time + time for clearWalEntryBatch) ) to terminate the replication source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I'm not following your concern here. I don't see how the extra loop in the same method context just setting two a flag in the shipper and other in the reader can help with the contention scenario described, terminate execution would be stuck in the second for loop anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, let me try to explain again.
I was referring to restore this loop.
for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
if(worker.entryReader != null) {
worker.entryReader.setReaderRunning(false);
}
}
As your current flow is stopping the worker in a linear manner:-
- Stop a worker
- wait for the worker thread to complete.
- stop another worker
- wait for it finishes
- continue for others......
So in the worst case, you would have to wait for the number of workers * min(time taken by the worker to finish, timeout)
though by restoring the old loop, you are parallelizing the stopping of the workers.
- ask all worker threads to finish their work by setting their state.
- then in the second loop, wait for each worker to finish, while you are waiting for 1 worker, others are also completing their work in parallel.
- so when you are done with one worker it is possible that all other workers are also done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got you, thanks for explaining in more details. Will address it on next commit.
...src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
Show resolved
Hide resolved
...src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
Outdated
Show resolved
Hide resolved
LOG.warn("Interrupting source thread for peer {} without cleaning buffer usage " | ||
+ "because clearWALEntryBatch method timed out whilst waiting reader/shipper " | ||
+ "thread to stop.", this.source.getPeerId()); | ||
Thread.currentThread().interrupt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need additional interrupt here when ReplicationSource.terminate() is already interrupted the worker thread prior to clearWALEntryBatch method call?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are just interrupting if either shipper or reader thread is still alive. We can't guarantee that the caller will always have stopped these threads, therefore, the extra check here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- This method should only be called upon replication source termination.
so what this interrupt will do, how is it handled in the source?
LOG.warn("Interrupting source thread for peer {} without cleaning buffer usage "
+ "because clearWALEntryBatch method timed out whilst waiting reader/shipper "
+ "thread to stop.", this.source.getPeerId());
don't we need to return here as we timed out and not clearing the batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, it's not been handled. Changing to simply log the exceptional and return back to source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if return, then we do not clean the batch, so replication quota will be leaked.
🎊 +1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
} | ||
} catch (InterruptedException e) { | ||
LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. " | ||
+ "Not cleaning buffer usage: {}", this.source.getPeerId(), this.getName(), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please restore interrupt flag here (Thread.currentThread().interrupt();) and then return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't do any handling of interrupt at ReplicationSource. Would you still think we need this here?
apache#2546) (apache#2849) Signed-off-by: Ankit Singhal <ankit@apache.org> Signed-off-by: Josh Elser <elserj@apache.org> (cherry picked from commit fdae12d) (cherry picked from commit 3242c8a) Change-Id: I8552da6cb7b37271204e255a6ca96a8af544da48
…nSourceManager upon termination (rebased after HBASE-25117)