-
Notifications
You must be signed in to change notification settings - Fork 7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async read from socket #17868
Async read from socket #17868
Conversation
a96eb7e
to
faa5b71
Compare
src/IO/ReadBufferFromPocoSocket.cpp
Outdated
{ | ||
//fiber->fd = socket.impl()->sockfd(); | ||
//fiber->timeout = socket.impl()->getReceiveTimeout(); | ||
*fiber = std::move(*fiber).resume(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So when the read would block, we (1) resume the caller fiber. Before that, we save into the processor state:
- current call state as a fiber
- socket descriptor for the caller to do epoll
- mark that the processor result is Async
(2) After that, the caller sees that the processor result is Async, and adds the corresponding file descriptor to its epoll set.
(3) When epoll succeeds, we see for which descriptor, and resume the corresponding fiber we created at step (1).
Am I getting this right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
Also, epoll is executed into PipelineExecutor, in separate thread. When it succeeds, we continue execution of a processor, which continues fiber execution. And next read should return some bytes.
read_context.is_read_in_progress = true; | ||
read_context.packet = connections.receivePacketUnlocked(&sink); | ||
read_context.is_read_in_progress = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never thought I'd suggest adding a callback instead of removing it, but there's always the first time: did you consider injecting a callback into the connection::read, that would set this flag, and also save the corresponding socket descriptor? You're injecting the fiber there anyway.
This control flow I can barely understand... At least with callback you'll be saving all the resume data (socket, flag, fiber) in one place, not in three different ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, ok. I will try it.
{ | ||
std::lock_guard guard(fiber_lock); | ||
if (!fiber) | ||
return false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reading fiber to resume means 'end of data', so we return false that also means 'end of data', right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
Also we can delete fiber in cancel
call, so this is an indication that we should finish.
return false; | ||
|
||
{ | ||
std::lock_guard guard(fiber_lock); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this one needed? The fiber saves state of one exchange with the remote server, that can only be logically processed in a sequential fashion. Where does the multi-threaded concurrent access arise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's in cancel
method which destroys fiber.
if (is_pipe_alarmed) | ||
return false; | ||
|
||
if (has_timer_alarm && !is_socket_ready) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, so the timer is for checking the timeouts, OK. Maybe one giant timer per the entire pipeline executor would suffice (i.e. it would go to the RemoteIOQueue aka AsyncTaskQueue). This would save us some wakeups -- I imagine this might matter if you have a big cluster with a lot of queries running.
Another thing that would require periodic wakeups is our main task -- switching over to other connections if the first one is slow. But you're not implementing it here yet, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe one giant timer per the entire pipeline executor would suffice
I had the same idea, but there are considerations:
- different sockets may have different timeouts
- if timeout for one socket is over, we may not notice it - in case if other sockets are fine
- there may be other async processors with different logic
This would save us some wakeups
Actually, I think quite opposite. Now, we restart timer after every read. So, timer do signal only if timeout really exceeded.
switching over to other connections if the first one is slow. But you're not implementing it here yet, right?
Right)
@@ -207,7 +204,7 @@ class IProcessor | |||
* Note that it can fire many events in EventCounter while doing its job, | |||
* and you have to wait for next event (or do something else) every time when 'prepare' returned Wait. | |||
*/ | |||
virtual void schedule(EventCounter & /*watch*/) | |||
virtual int schedule() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment should be updated to reflect the new return value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get the general idea, looks OK to merge after adding the comments about the things we discussed above. Also there are some commented out lines here and there.
Special build check - |
@@ -28,10 +28,23 @@ bool ReadBufferFromPocoSocket::nextImpl() | |||
ssize_t bytes_read = 0; | |||
Stopwatch watch; | |||
|
|||
int flags = 0; | |||
if (async_callback) | |||
flags |= MSG_DONTWAIT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that async_socket_for_remote
is completely broken with unbundled poco, since it cannot handle MSG_DONTWAIT
correctly.
But IIUC arcadia build does not support distributed queries, so it is not a problem there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed.
@azat does it cause any problem for you? I think I can just read form socket manually, avoid using poco in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@azat does it cause any problem for you?
For development I'm using unbundled build (since it is faster), and I was using unbundled poco there, but now I've switched to bundled poco there so it is not a problem anymore
I think I can just read form socket manually, avoid using poco in this case.
Maybe it is worth it, since this patches already has some low-level code anyway (epoll_ctl and similar), although I'm not sure.
Will it close #9900 ? |
I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Support for async tasks in
PipelineExecutor
. Initial support of async sockets for remote queries.