-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add lightweight toUnary client runtime #506
Conversation
6002735
to
b552e9e
Compare
object ReceiveState { | ||
def init[F[_]: Sync, R]( | ||
callback: Either[Throwable, R] => Unit, | ||
pf: PartialFunction[StatusRuntimeException, Exception] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
error adapter is Throwable. I will take a look tomorrow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, then we are good. I thought I noticed a Throwable signature somewhere
7380a60
to
542ee92
Compare
19dd92c
to
90fe995
Compare
90fe995
to
aa8bfca
Compare
import fs2.grpc.client.ClientOptions | ||
import io.grpc._ | ||
|
||
object Fs2UnaryCallHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
object Fs2UnaryCallHandler { | |
private[client] object Fs2UnaryCallHandler { |
def sendError(error: Throwable): SyncIO[ReceiveState[R]] = | ||
SyncIO(callback(Left(error))).as(new Done[R]) | ||
|
||
def done(): SyncIO[ReceiveState[R]] = SyncIO(callback(Right(message))).as(new Done[R]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def done(): SyncIO[ReceiveState[R]] = SyncIO(callback(Right(message))).as(new Done[R]) | |
def done: SyncIO[ReceiveState[R]] = SyncIO(callback(Right(message))).as(new Done[R]) |
Ref.in(new PendingMessage[R]({ | ||
case r: Right[Throwable, R] => callback(r) | ||
case Left(e: StatusRuntimeException) => callback(Left(pf.lift(e).getOrElse(e))) | ||
case l: Left[Throwable, R] => callback(l) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps Left(NonFatal(e)) => callback(e)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never happens fatal error because this callback is called by ClientCall.Listener.
override def onClose(status: Status, trailers: Metadata): Unit = { | ||
if (status.isOk) { | ||
state.get.flatMap { | ||
case expected: PendingHalfClose[Response] => expected.done().flatMap(state.set) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case expected: PendingHalfClose[Response] => expected.done().flatMap(state.set) | |
case expected: PendingHalfClose[Response] => | |
expected.done().flatMap(state.set) |
What this PR do?
This PR eliminate using Dispatcher[F] from toUnary client runtime.
Benchmark
50 pararell request in 30sec with 2core from MBP to other server
This PR
main