-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reimplement light server runtime #496
Conversation
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
*/ | ||
|
||
package fs2.grpc.internal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Place implement to internal package for binary compat control.
I think Fs2ServerCallHandler I/F is what we really need to care about.
Is this right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is exposed to codegen - everything else should be package private.
33f708e
to
c9356cd
Compare
BenchmarkI benchmarked with
This PR reduce 75% performance difference from Akka-gRPC This PR
fs2-grpc 2.4.4
Akka-gRPC
|
I think the usage of unsafe code is a bit too much. The general mantra for fs2-grpc is to have code that is easy to reason about. I think you can come very far using constructs like |
This library provides some unsafe constructs: https://github.com/davenverse/condemned/blob/main/core/src/main/scala/io/chrisdavenport/condemned/UnsafeDeferred.scala - But I think most of SyncIO + non-blocking methods on dispatcher should get us a long way, the cats-effect constructs are also under more eyes and have higher test coverage. |
private def mkListener[Request, Response]( | ||
run: Request => SyncIO[Cancel], | ||
call: Fs2StatefulServerCall[Request, Response], | ||
state: Ref[SyncIO, State[Request]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replaced var to Ref for your advice.
f3be05f
to
393eca6
Compare
state.get.flatMap(_.cancel.getOrElse(SyncIO.unit)).unsafeRunSync() | ||
|
||
override def onMessage(message: Request): Unit = | ||
state.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
access instead of get and use modify from it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's not necessary to use access.
Because all ServerCall.Listener methods called simultaneously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean synchroniously?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Iam sorry about failed to choose English word.
The caller is free to call an instance from multiple threads, but only one call simultaneously
import UnsafeChannel.State | ||
import scala.collection.immutable.Queue | ||
|
||
final class UnsafeChannel[A] extends AtomicReference[State[A]](State.Consumed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use Ref to implement the state machine?
d494558
to
0d0ca45
Compare
import scala.collection.immutable.Queue | ||
|
||
@nowarn | ||
final class UnsafeChannel[A](val ref: Ref[SyncIO, State[A]]) extends AnyVal { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also interesting: https://github.com/davenverse/process/blob/main/core/shared/src/main/scala/io/chrisdavenport/process/structures/UnsafeByteQueue.scala
Seems like some building blocks for unsafe interop are missing in cats-effect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that.
Methods name comes from fs2 one.
I made this parts for removing unnecessary context shift.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok :)
It would probably be a good idea to do a PR that only focuses on unaryTo as the other scenarios have more moving parts and more involved. |
@ahjohannessen I was wondering if you have had checked the change. |
@naoh87 I have not had time yet, will look soon |
} | ||
} | ||
|
||
final class Fs2StatefulServerCall[Request, Response]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is a bit confusing to have this and Fs2ServerCall
I need a computer in order to do a review :) on my phone now. |
Starting to look good :) I have not yet had time to study the code. I wonder what kind of benchmarks and flamegraphs this has compared to main and grpc-java. |
|
||
import Fs2ServerCall.Cancel | ||
|
||
def stream[F[_]](response: Stream[F, Response], dispatcher: Dispatcher[F])(implicit F: Async[F]): SyncIO[Cancel] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def stream[F[_]](response: Stream[F, Response], dispatcher: Dispatcher[F])(implicit F: Async[F]): SyncIO[Cancel] = | |
def stream[F[_]](response: Stream[F, Response], dispatcher: Dispatcher[F])(implicit F: Sync[F]): SyncIO[Cancel] = |
dispatcher | ||
) | ||
|
||
def unary[F[_]](response: F[Response], dispatcher: Dispatcher[F])(implicit F: Async[F]): SyncIO[Cancel] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def unary[F[_]](response: F[Response], dispatcher: Dispatcher[F])(implicit F: Async[F]): SyncIO[Cancel] = | |
def unary[F[_]](response: F[Response], dispatcher: Dispatcher[F])(implicit F: Sync[F]): SyncIO[Cancel] = |
} | ||
|
||
private def closeStreamF[F[_]](status: Status, metadata: Metadata)(implicit F: Sync[F]): F[Unit] = | ||
F.delay(call.close(status, metadata)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
F.delay(call.close(status, metadata)) | |
close(status, metadata).to[F] |
def close(status: Status, metadata: Metadata): SyncIO[Unit] = | ||
SyncIO(call.close(status, metadata)) | ||
|
||
private def run[F[_]](completed: F[Unit], dispatcher: Dispatcher[F])(implicit F: Sync[F]): SyncIO[Cancel] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps define a local helper inside run
to improve readability:
def handleError(t: Throwable): F[Unit] = t match {
case ex: StatusException => closeStreamF(ex.getStatus, Option(ex.getTrailers).getOrElse(new Metadata()))
case ex: StatusRuntimeException => closeStreamF(ex.getStatus, Option(ex.getTrailers).getOrElse(new Metadata()))
case ex => closeStreamF(Status.INTERNAL.withDescription(ex.getMessage).withCause(ex), new Metadata())
}
Then:
case Outcome.Errored(e) => handleError(e)
}) | ||
SyncIO { | ||
cancel() | ||
() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternative: SyncIO(cancel()).void
|
||
} | ||
|
||
def unary[F[_]: Async, Request, Response]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def unary[F[_]: Async, Request, Response]( | |
def unary[F[_]: Sync, Request, Response]( |
startCallSync(call, opt)(call => req => call.unary(impl(req, headers), dispatcher)).unsafeRunSync() | ||
} | ||
|
||
def stream[F[_]: Async, Request, Response]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def stream[F[_]: Async, Request, Response]( | |
def stream[F[_]: Sync, Request, Response]( |
|
||
package fs2.grpc.internal.server | ||
|
||
import cats.effect.Async |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import cats.effect.Async | |
import cats.effect.Sync |
)(f: Fs2ServerCall[Request, Response] => Request => SyncIO[Cancel]): SyncIO[ServerCall.Listener[Request]] = { | ||
for { | ||
call <- Fs2ServerCall.setup(options, call) | ||
_ <- call.request(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps a comment to why two requests are done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is what grpc-java does:
// We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client
// sends more than 1 requests, ServerCall will catch it...
from ServerCalls - A future maintainer would be happy to learn why this is.
import io.grpc._ | ||
import fs2._ | ||
|
||
object Fs2ServerCall { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
object Fs2ServerCall { | |
private[server] object Fs2ServerCall { |
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | ||
*/ | ||
|
||
package fs2.grpc.internal.server |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move internal
in under server
.
} | ||
} | ||
|
||
final class Fs2ServerCall[Request, Response]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final class Fs2ServerCall[Request, Response]( | |
private[server] final class Fs2ServerCall[Request, Response]( |
import fs2.grpc.server.ServerOptions | ||
import io.grpc._ | ||
|
||
object Fs2UnaryServerCallHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
object Fs2UnaryServerCallHandler { | |
private[server] object Fs2UnaryServerCallHandler { |
run( | ||
response.pull.peek1 | ||
.flatMap { | ||
case Some((_, tail)) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling this tail
implies that head of response
is not in there, which it is tl.cons(hd))
, i.e.:
uncons.flatMap {
case None => Pull.pure(None)
case Some((hd, tl)) => Pull.pure(Some((hd(0), tl.cons(hd))))
}
Suggestion: rename tail
to stream
or similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good.
6dd15b8
to
fe1fe39
Compare
Unary Benchmark1-cpu time restricted server benchmark on Linux(kernel 5.4.0). #496
v2.4.4
ScalaPB
ServerStreaming BenchmarkBenchmark with same condition with unary benchmark Stream.emit(HelloReply(request.request)).repeatN(100) ScalaPB implement (0 until 100).foreach(_ => responseObserver.onNext(HelloReply(request.request)))
responseObserver.onCompleted() #496
v2.4.4
ScalaPB
|
import fs2.grpc.server.ServerCallOptions | ||
import io.grpc._ | ||
|
||
private[server] object Fs2ServerCall { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps place this inside the handler as it is only a helper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use this for streaming handler after this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
def init[A](cb: A => SyncIO[Cancel]): SyncIO[Ref[SyncIO, Context[A]]] = | ||
Ref[SyncIO].of[Context[A]](BeforeCall(cb, None)) | ||
} | ||
case class BeforeCall[A](cb: A => SyncIO[Cancel], request: Option[A]) extends Context[A] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name is a bit hard to understand when looking at the fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about BeforeCall(callback: ..., received: ...)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to split that into two cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry. I don't speak English well.
I couldn't remember what two case indicate.
Could you mind tell me another easy explanation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two cases express A => SyncIO[Cancel]
will not called after Called if two cases means BeforeCall
and Called
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it makes sense to split it up more, but the cases are at least
- cancelled
- request message pending
- request message received and half-closed pending
- request message received and half-closed received
- call-completed
- …
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a few state to hold over Listener methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that mean it is more better split like
BeforeReceive
,Received
,Called
?
It looks good.
Yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a few state to hold over Listener methods.
Not sure what you mean by that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ref is used by saving state over ServerCall.Listener method.
I thought this doesn't introduce a bunch of state.
|
||
import Fs2ServerCall.Cancel | ||
|
||
sealed trait Context[A] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not more like a call status? I wonder if more explicit cases would make the code easier to reason about
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you think CallerState
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is more descriptive 👍
I think we are good, if you rebase and squash I’ll merge. Do the extra states have a measurable impact on req/s ? |
72cf19c
to
78462a1
Compare
I rebased to one commit and theres was no measurable performance impact. |
// We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client | ||
// sends more than 1 requests, ServerCall will catch it. | ||
_ <- call.request(2) | ||
ctx <- CallerState.init(f(call)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
state.set(Cancelled()) >> call.close(status, new Metadata()) | ||
} | ||
|
||
def unary[F[_]: Async, Request, Response]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Async needed?
startCallSync(call, opt)(call => req => call.unary(impl(req, headers), dispatcher)).unsafeRunSync() | ||
} | ||
|
||
def stream[F[_]: Async, Request, Response]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Async needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sync is enough.
920a3a3
to
fecc2ae
Compare
fecc2ae
to
c6b4894
Compare
@naoh87 I have cut a release with your changes. Btw, |
@naoh87 Might be interesting to PR an update to https://github.com/LesnyRumcajs/grpc_bench with latest release :) |
I will do it later. |
This runtime call to 1 unsafeRun* / RPC.
This PR come from #493
Background #386