-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Send file chunks asynchronously in peer recovery #39769
Conversation
Pinging @elastic/es-distributed |
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM just a few comments/questions :)
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some comments and questions
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
Outdated
Show resolved
Hide resolved
if (error.get() == null) { | ||
cancellableThreads.execute(() -> requestSeqIdTracker.waitForOpsToComplete(requestSeqIdTracker.getMaxSeqNo())); | ||
|
||
synchronized FileChunk readChunk(final byte[] buffer) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why we parallelize the reading on top of a single file and then synchronize all of it. This doesn't make sense to me. I think we should build the model on top of the file and chuck head of time. ie. if we want to read with N threads in parallel then chunk the file up in N pieces and send them all in parallel. That means we must write them in the correct places on the other side as well but blocking on the read side here is not making much sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option is to have a multiplexer if you want to make use of the parallelism between sending and reading. We need some kind of threadpool and a task queue for that. Once I am done reading a chunk I put it on a queue and read the next chunk. Another worker can then pick it up and send it. If the queue fills up we add more threads until we saturate. Or we do reading and sending in the same thread but notify others that another chunk can be read. But there is so much blocking going on here I feel like we didn't make the right design decisions?
@s1monw Thanks for reviewing. We introduced SeqId to FileChunk in #36981. The idea was to send up to N consecutive file chunks without waiting for replies from the recovery target. We used SeqId instead of Semaphore to make sure that the recovery target won't buffer more than N chunks in memory in any situation. This change reduces the recovery time significantly without using any extra thread. This PR makes #36981 non-blocking with these keys: (1) maintain the recovery time, (2) does not require extra threads, (3) never block any thread. Here we use a semaphore to ensure that only one thread can read file chunks. Other threads can quickly check this condition and exit without being blocked. I hope this clarifies the approach. As I said in the PR description, I am open to suggestions. |
I looked at the non-blocking version and it's more intuitive here. I would still like to have a comment what we are trying to do with the seqIds etc. What confuses me is the partial synchronization |
r -> { | ||
recycledBuffers.addFirst(buffer); | ||
requestSeqIdTracker.markSeqNoAsCompleted(chunk.seqId); | ||
sendFileChunks(listener); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make sure we always fork here somehow? I am a bit worried that we are ending up with a stack overflow? Like we can assert that we don't have sendFileChunks
in the stacktrace for instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I opened #39988 for this.
I am closing this PR and will open a new one. @original-brownbear @s1monw Thanks for looking. |
With this change, peer recovery will send file chunks asynchronously and concurrently. Recovery with compression enabled should be faster with this implementation. I will run a recovery benchmark after we agree on the approach.
Relates #36981