-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
implement non-blocking stream server handler #503
base: main
Are you sure you want to change the base?
Conversation
f0e2687
to
2be5402
Compare
2be5402
to
e5e1b57
Compare
@naoh87 I have been busy with other stuff and have not forgotten about this PR. I'll look more closely into it when I find the time. |
e5e1b57
to
876b4ab
Compare
876b4ab
to
32595a3
Compare
@naoh87 I have not forgotten this, however upcoming work with queues in cats-effect 3.4.x might give us better building blocks. One question, does your work here take into account back pressure? For instance, a server can't keep up with a client streaming too much. |
@@ -73,6 +73,12 @@ private[server] final class Fs2ServerCall[Request, Response]( | |||
dispatcher | |||
) | |||
|
|||
def requestOnPull[F[_]]: Pipe[F, Request, Request] = | |||
_.mapChunks { chunk => | |||
call.request(chunk.size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose, call.request
might be impure. Wouldn't it be better to delay the effect like this:
import cats.syntax.functor._
def requestOnPull[F[_]](implicit F: Sync[F]): Pipe[F, Request, Request] =
_.chunks.flatMap(chunk =>
Stream.evalUnChunk(F.delay(call.request(chunk.size)).as(chunk))
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks good. thank you.
call <- Fs2ServerCall.setup(opt, call) | ||
_ <- call.request(1) // prefetch size | ||
channel <- OneShotChannel.empty[Request] | ||
cancel <- start(call, impl(channel.stream.through(call.requestOnPull), headers)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ahjohannessen back pressure is controlled from here of call.requsetOnPull
.
Server requests next messages from the client each stream chunk is pulled.
Prefetch and buffer size is declarabled by 2 lines ago. _ <- call.request(1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Server to client message back pressure implementation is still missing as main branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this server to client back pressure implementation may cause performance issue.
grpc-java says free to ignore this and main branch does.
https://github.com/grpc/grpc-java/blob/v1.46.0/api/src/main/java/io/grpc/ServerCall.java#L100
I think this feature should be opt-in.
I think new cats-effect 3.4.x queue will now work as good as this PR, because new CE3 queue is implemented as MPMC, but we just need SPSC. and also new implementation does not reduce Dispatcher[F] cost at onMessage. |
460bdcf
to
6b2e2db
Compare
Ok. I think we should try to use as much code from cats-effect/fs2 where possible. That reduces the amount of code we have to maintain - I am thinking mostly about |
What this PR do
ClientStreaming Benchmark
method implement
Request 100 messages with
ghq --cpu=4 -z 60s --connections=5 ...
#503
main