From f845bc5b250070e334ca19c5da5037acf998f7ea Mon Sep 17 00:00:00 2001 From: Valdemar Grange Date: Thu, 13 Oct 2022 13:39:38 +0200 Subject: [PATCH] fix: don't rethrow exceptions after handling message --- .../client/internal/Fs2UnaryCallHandler.scala | 2 ++ .../grpc/server/Fs2ServerCallListener.scala | 13 +++++---- .../grpc/server/internal/Fs2ServerCall.scala | 14 ++++++---- .../scala/fs2/grpc/server/ServerSuite.scala | 28 ++++++++++++++++++- 4 files changed, 46 insertions(+), 11 deletions(-) diff --git a/runtime/src/main/scala/fs2/grpc/client/internal/Fs2UnaryCallHandler.scala b/runtime/src/main/scala/fs2/grpc/client/internal/Fs2UnaryCallHandler.scala index 005e9c67..70958ec5 100644 --- a/runtime/src/main/scala/fs2/grpc/client/internal/Fs2UnaryCallHandler.scala +++ b/runtime/src/main/scala/fs2/grpc/client/internal/Fs2UnaryCallHandler.scala @@ -27,6 +27,7 @@ import cats.effect.syntax.all._ import cats.effect.{Sync, SyncIO} import cats.syntax.flatMap._ import cats.syntax.functor._ +import cats.syntax.applicativeError._ import fs2._ import fs2.grpc.client.ClientOptions import fs2.grpc.shared.StreamOutput @@ -153,6 +154,7 @@ private[client] object Fs2UnaryCallHandler { case Outcome.Errored(e) => F.delay(call.cancel(e.getMessage, e)) case Outcome.Canceled() => onCancel(call) } + .handleError(_ => ()) .start .map(sending => Some(sending.cancel >> onCancel(call))) } diff --git a/runtime/src/main/scala/fs2/grpc/server/Fs2ServerCallListener.scala b/runtime/src/main/scala/fs2/grpc/server/Fs2ServerCallListener.scala index 98692636..c931493e 100644 --- a/runtime/src/main/scala/fs2/grpc/server/Fs2ServerCallListener.scala +++ b/runtime/src/main/scala/fs2/grpc/server/Fs2ServerCallListener.scala @@ -56,11 +56,14 @@ private[server] trait Fs2ServerCallListener[F[_], G[_], Request, Response] { call.sendHeaders(headers) *> call.request(1) *> sendResponse.compile.drain private def unsafeRun(f: F[Unit])(implicit F: Async[F]): Unit = { - val bracketed = F.guaranteeCase(f) { - case Outcome.Succeeded(_) => call.closeStream(Status.OK, new Metadata()) - case Outcome.Canceled() => call.closeStream(Status.CANCELLED, new Metadata()) - case Outcome.Errored(t) => reportError(t) - } + val bracketed = + F.handleError { + F.guaranteeCase(f) { + case Outcome.Succeeded(_) => call.closeStream(Status.OK, new Metadata()) + case Outcome.Canceled() => call.closeStream(Status.CANCELLED, new Metadata()) + case Outcome.Errored(t) => reportError(t) + } + }(_ => ()) // Exceptions are reported by closing the call dispatcher.unsafeRunAndForget(F.race(bracketed, isCancelled.get)) diff --git a/runtime/src/main/scala/fs2/grpc/server/internal/Fs2ServerCall.scala b/runtime/src/main/scala/fs2/grpc/server/internal/Fs2ServerCall.scala index e71426c3..008f7e40 100644 --- a/runtime/src/main/scala/fs2/grpc/server/internal/Fs2ServerCall.scala +++ b/runtime/src/main/scala/fs2/grpc/server/internal/Fs2ServerCall.scala @@ -85,11 +85,15 @@ private[server] final class Fs2ServerCall[Request, Response]( private def run[F[_]](completed: F[Unit], dispatcher: Dispatcher[F])(implicit F: Sync[F]): SyncIO[Cancel] = { SyncIO { - val cancel = dispatcher.unsafeRunCancelable(F.guaranteeCase(completed) { - case Outcome.Succeeded(_) => close(Status.OK, new Metadata()).to[F] - case Outcome.Errored(e) => handleError(e).to[F] - case Outcome.Canceled() => close(Status.CANCELLED, new Metadata()).to[F] - }) + val cancel = dispatcher.unsafeRunCancelable( + F.handleError { + F.guaranteeCase(completed) { + case Outcome.Succeeded(_) => close(Status.OK, new Metadata()).to[F] + case Outcome.Errored(e) => handleError(e).to[F] + case Outcome.Canceled() => close(Status.CANCELLED, new Metadata()).to[F] + } + }(_ => ()) + ) SyncIO(cancel()).void } } diff --git a/runtime/src/test/scala/fs2/grpc/server/ServerSuite.scala b/runtime/src/test/scala/fs2/grpc/server/ServerSuite.scala index 5f67458b..2eabf325 100644 --- a/runtime/src/test/scala/fs2/grpc/server/ServerSuite.scala +++ b/runtime/src/test/scala/fs2/grpc/server/ServerSuite.scala @@ -206,7 +206,32 @@ class ServerSuite extends Fs2GrpcSuite { assertEquals(dummy.currentStatus.get.isOk, true) } - runTest("messages to streamingToStreaming with error") { (tc, d) => + runTest("messages to streamingToStreaming with error") { (tc, d0) => + @volatile var errorInDispatcher = false + val d = new Dispatcher[IO] { + import scala.concurrent._ + def unsafeToFutureCancelable[A](fa: IO[A]): (Future[A], () => Future[Unit]) = { + // d0.unsafeToFutureCancelable(fa) + implicit val parasitic: ExecutionContext = new ExecutionContext { + def execute(runnable: Runnable) = runnable.run() + def reportFailure(t: Throwable) = t.printStackTrace() + } + + val (fut, cancel) = d0.unsafeToFutureCancelable(fa) + val reported = fut.transform( + identity, + t => { + errorInDispatcher = true + t + } + ) + (reported, cancel) + } + + override def unsafeRunSync[A](fa: IO[A]): A = + d0.unsafeRunSync(fa.onError(_ => IO { errorInDispatcher = true })) + } + val dummy = new DummyServerCall val error = new RuntimeException("hello") @@ -225,6 +250,7 @@ class ServerSuite extends Fs2GrpcSuite { assertEquals(dummy.messages.toList, List(1, 2, 0)) assertEquals(dummy.currentStatus.isDefined, true) assertEquals(dummy.currentStatus.get.isOk, false) + assert(!errorInDispatcher, "no error should be encountered by the dispatcher") } runTest("streamingToStreaming send respects isReady") { (tc, d) =>