-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove unbounded queue on server-side stream reception #553
Remove unbounded queue on server-side stream reception #553
Conversation
I will review when back from holidays. The only branch that is maintained is main. |
No worries, thanks. I've closed the 0.x PRs to focus on main 👍 |
ping @ahjohannessen (no worries if you just haven't gotten to it, just pinging in case you're like me and forget what you were doing when you come back from vacation ;) ) |
@timbertson Did you look at this as inspiration for your PR? Also, there is another PR that accounts for backpressure here. Not sure what is the best approach forward. |
@ahjohannessen yes, the first commit in this PR is largely a port of #39 updated to I don't believe #503 accounts for backpressure. In the thread, the author describes that backpressure semantics are unchanged compared to
I disagree with this - if you disregard backpressure then you could still write a correct app, but you would have to implement your own backpressure out-of-band (or provision enough memory that your app won't die). But having a library make this choice is clearly suboptimal, and quite surprising given how important backpressure is in fs2. There will be merge conflicts if we want both changes, I'm not sure what order you'r prefer. If you want to merge that PR first then I will take on the work of updating this PR, but I didn't want to do that prematurely. My (biased) preference would be to focus on correctness (i.e. this PR) before performance, as that other PR would probably just result in my app dying quicker 😅 |
import cats.implicits._ | ||
import cats.effect.kernel.Concurrent | ||
import cats.effect.std.Dispatcher | ||
import io.grpc.{ClientCall, Metadata, Status} | ||
|
||
class Fs2StreamClientCallListener[F[_], Response] private ( | ||
ingest: StreamIngest[F, Response], | ||
signalReadiness: SyncIO[Unit], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any special reason for SyncIO? Have you considered SignallingRef from fs2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not super fluent in the guts of cats/fs2, so happy to take recommendations.
SyncIO seems useful somewhere in the mix, because:
- the invocations from the GRPC listeners need to run sync, so we need either
SyncIO
or aDispatcher
- the outgoing unary call types set
signalReadiness=SyncIO.unit
, because there's no need to respect readiness with only one outgoing message.- It'd be wasteful to call a dispatcher in that case just to run
F.unit
. - Though now that I think about it, a single outgoing message probably never triggers the
onReady
code path so maybe it doesn't matter? 🤷
- It'd be wasteful to call a dispatcher in that case just to run
I refactored to use SignallingRef
, which seems nicer thanks: a1668d7.
As part of that I removed the SyncIO
from the StreamOutput
class, but it's still used in various listeners due to the above reasoning.
If you prefer consistency and aren't worried about onReady
for unary calls being a bit wasteful (since it's probably never called), I think I can just use F
across the board.
def startCall(call: ServerCall[Request, Response], headers: Metadata): ServerCall.Listener[Request] = | ||
startCallSync(call, opt)(call => req => call.stream(impl(req, headers), dispatcher)).unsafeRunSync() | ||
def startCall(call: ServerCall[Request, Response], headers: Metadata): ServerCall.Listener[Request] = { | ||
val outputStream = dispatcher.unsafeRunSync(StreamOutput.server(call, dispatcher)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any way to reduce usage of dispatcher.unsafeRunSync
- it is generally harmful for performance in java grpc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. Here the GRPC startCall
interface demands a synchronous return value, but we can't build the Ref[F,_]
or SignallingRef[F,_]
needed by StreamOutput
without a dispatcher.
I could hoist it up, i.e. return a F[ServerCallHandler[_,_]]
and then startCall
could synchronously turn it into a full StreamOutput
by supplying the parts needed from ClientCall
. But tracing the call chain, everything is synchronous all the way up to unaryToStreamingCall, which I believe is invoked by the generated code, so we'd need to change that to be able to handle a F[ServerCallHandler[Request, Response]]
.
I'm hoping that since this is only needed for streaming use cases, the overhead will matter less than it does for unary calls.
c: ClientCall[Request, Response], | ||
dispatcher: Dispatcher[F] | ||
)(implicit F: Async[F]): F[StreamOutput[F, Request]] = { | ||
Ref[F].of(Option.empty[Deferred[F, Unit]]).map { waiting => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered Ref[SyncIO]
instead to avoid dispatcher?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the SignallingRef
refactor I've removed the dispatcher from this code, but it has only moved. We need a dispatcher somewhere in the code path because the goal is to resume a suspended message send in F
, which we can't do from SyncIO
.
@timbertson It would be interesting to see if your changes affect performance. Might be a good idea to do benchmarks as @naoh87 did here. |
I don't think it should affect the unaryToUnary case, since there's no significant code changes in that path. If java-GRPC calls onReady() for a unary call, that should be the only difference. I did a few runs with v2.4.11:
This branch:
(this is using Docker for Mac, so I'm not sure how stable the results are) |
@timbertson Could you handle this - I am thinking that we could first cut a version with a suffix like -RC1 and let people test that out for a while, WDYT? |
Sounds good to me! 👍 |
@timbertson Seems like CI is still failing |
Ah yep, sorry. It doesn't seem to run when I push, do you have to approve each run or something? I see "1 workflow awaiting approval" |
Seems like. It is a setting for |
No worries. I've also raised a PR in my own repo to get faster CI feedback. I may need help figuring out what to do with these mima issues though. e.g. I added a constructor argument to
Arguably this class should be package-private (I don't think user code should ever call it), so I'm not sure if we care. Is it even possible to add a second backwards-compatible constructor in scala? |
Make it so and mark it in mima. We can do a major version and increase Scala to 3.2 in the same go. |
I've just marked the two altered classes for now. It might be good to do a sweep on all non-public classes so this doesn't happen again in the future, but I'd prefer to do that in a separate PR if needed, this diff is already plenty big. |
Thanks @ahjohannessen 🙂 |
Great flow control was finally was dealt with :) good job |
This is #545 but on top of #552 (i.e. targeting
main
).As in the original PR, the first commit is #552, the second commit contains the changes specific to this PR.