Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support setting deadline per-call #1838

Merged
merged 6 commits into from
Oct 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# New APIs for setting deadline per call

ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.grpc.javadsl.SingleResponseRequestBuilder.setDeadline")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.grpc.javadsl.StreamResponseRequestBuilder.setDeadline")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.grpc.scaladsl.SingleResponseRequestBuilder.setDeadline")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.grpc.scaladsl.StreamResponseRequestBuilder.setDeadline")
78 changes: 76 additions & 2 deletions runtime/src/main/scala/akka/grpc/internal/RequestBuilderImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

package akka.grpc.internal

import java.util.concurrent.CompletionStage

import java.time.{ Duration => JDuration }
import java.util.concurrent.{ CompletionStage, TimeUnit }
import akka.NotUsed
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.dispatch.ExecutionContexts
import akka.grpc.scaladsl.SingleResponseRequestBuilder
import akka.grpc.{ GrpcClientSettings, GrpcResponseMetadata, GrpcServiceException, GrpcSingleResponse }
import akka.stream.{ Graph, Materializer, SourceShape }
import akka.stream.javadsl.{ Source => JavaSource }
Expand All @@ -17,6 +18,7 @@ import akka.util.ByteString
import io.grpc._

import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{ ExecutionContext, Future }

/**
Expand Down Expand Up @@ -52,6 +54,15 @@ final class ScalaUnaryRequestBuilder[I, O](

override def withHeaders(headers: MetadataImpl): ScalaUnaryRequestBuilder[I, O] =
new ScalaUnaryRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers)

override def setDeadline(deadline: Duration): SingleResponseRequestBuilder[I, O] =
new ScalaUnaryRequestBuilder[I, O](
descriptor,
channel,
if (!deadline.isFinite) defaultOptions.withDeadline(null)
else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS),
settings,
headers)
}

/**
Expand Down Expand Up @@ -84,6 +95,15 @@ final class JavaUnaryRequestBuilder[I, O](

override def withHeaders(headers: MetadataImpl): JavaUnaryRequestBuilder[I, O] =
new JavaUnaryRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers)

override def setDeadline(deadline: JDuration): JavaUnaryRequestBuilder[I, O] =
new JavaUnaryRequestBuilder[I, O](
descriptor,
channel,
if (deadline == null) defaultOptions.withDeadline(null)
else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS),
settings,
headers)
}

/**
Expand Down Expand Up @@ -152,6 +172,15 @@ final class ScalaClientStreamingRequestBuilder[I, O](

override def withHeaders(headers: MetadataImpl): ScalaClientStreamingRequestBuilder[I, O] =
new ScalaClientStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers)

override def setDeadline(deadline: Duration): ScalaClientStreamingRequestBuilder[I, O] =
new ScalaClientStreamingRequestBuilder[I, O](
descriptor,
channel,
if (!deadline.isFinite) defaultOptions.withDeadline(null)
else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS),
settings,
headers)
}

/**
Expand Down Expand Up @@ -195,6 +224,15 @@ final class JavaClientStreamingRequestBuilder[I, O](

override def withHeaders(headers: MetadataImpl): JavaClientStreamingRequestBuilder[I, O] =
new JavaClientStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers)

override def setDeadline(deadline: JDuration): JavaClientStreamingRequestBuilder[I, O] =
new JavaClientStreamingRequestBuilder[I, O](
descriptor,
channel,
if (deadline == null) defaultOptions.withDeadline(null)
else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS),
settings,
headers)
}

/**
Expand Down Expand Up @@ -242,6 +280,15 @@ final class ScalaServerStreamingRequestBuilder[I, O](

override def withHeaders(headers: MetadataImpl): ScalaServerStreamingRequestBuilder[I, O] =
new ScalaServerStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers)

override def setDeadline(deadline: Duration): ScalaServerStreamingRequestBuilder[I, O] =
new ScalaServerStreamingRequestBuilder[I, O](
descriptor,
channel,
if (!deadline.isFinite) defaultOptions.withDeadline(null)
else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS),
settings,
headers)
}

/**
Expand Down Expand Up @@ -285,6 +332,15 @@ final class JavaServerStreamingRequestBuilder[I, O](

override def withHeaders(headers: MetadataImpl): JavaServerStreamingRequestBuilder[I, O] =
new JavaServerStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers)

override def setDeadline(deadline: JDuration): JavaServerStreamingRequestBuilder[I, O] =
new JavaServerStreamingRequestBuilder[I, O](
descriptor,
channel,
if (deadline == null) defaultOptions.withDeadline(null)
else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS),
settings,
headers)
}

/**
Expand Down Expand Up @@ -333,6 +389,15 @@ final class ScalaBidirectionalStreamingRequestBuilder[I, O](

override def withHeaders(headers: MetadataImpl): ScalaBidirectionalStreamingRequestBuilder[I, O] =
new ScalaBidirectionalStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers)

override def setDeadline(deadline: Duration): ScalaBidirectionalStreamingRequestBuilder[I, O] =
new ScalaBidirectionalStreamingRequestBuilder[I, O](
descriptor,
channel,
if (!deadline.isFinite) defaultOptions.withDeadline(null)
else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS),
settings,
headers)
}

/**
Expand Down Expand Up @@ -377,6 +442,15 @@ final class JavaBidirectionalStreamingRequestBuilder[I, O](

override def withHeaders(headers: MetadataImpl): JavaBidirectionalStreamingRequestBuilder[I, O] =
new JavaBidirectionalStreamingRequestBuilder[I, O](descriptor, channel, defaultOptions, settings, headers)

override def setDeadline(deadline: JDuration): JavaBidirectionalStreamingRequestBuilder[I, O] =
new JavaBidirectionalStreamingRequestBuilder[I, O](
descriptor,
channel,
if (deadline == null) defaultOptions.withDeadline(null)
else defaultOptions.withDeadlineAfter(deadline.toMillis, TimeUnit.MILLISECONDS),
settings,
headers)
}

/**
Expand Down
13 changes: 13 additions & 0 deletions runtime/src/main/scala/akka/grpc/javadsl/RequestBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akka.grpc.javadsl

import java.time.Duration
import java.util.concurrent.CompletionStage

import akka.NotUsed
Expand Down Expand Up @@ -48,6 +49,12 @@ trait SingleResponseRequestBuilder[Req, Res] {
* Invoke the gRPC method with the additional metadata added and provide access to response metadata
*/
def invokeWithMetadata(request: Req): CompletionStage[GrpcSingleResponse[Res]]

/**
* Set the deadline for this call
* @return A new request builder, that will use the supplied deadline when invoked
*/
def setDeadline(deadline: Duration): SingleResponseRequestBuilder[Req, Res]
}

/**
Expand Down Expand Up @@ -86,4 +93,10 @@ trait StreamResponseRequestBuilder[Req, Res] {
* Invoke the gRPC method with the additional metadata added and provide access to response metadata
*/
def invokeWithMetadata(request: Req): Source[Res, CompletionStage[GrpcResponseMetadata]]

/**
* Set the deadline for this call
* @return A new request builder, that will use the supplied deadline when invoked
*/
def setDeadline(deadline: Duration): StreamResponseRequestBuilder[Req, Res]
}
13 changes: 13 additions & 0 deletions runtime/src/main/scala/akka/grpc/scaladsl/RequestBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import akka.stream.scaladsl.Source
import akka.util.ByteString

import scala.concurrent.Future
import scala.concurrent.duration.Duration

/**
* Request builder for requests providing per call specific metadata capabilities in
Expand Down Expand Up @@ -48,6 +49,12 @@ trait SingleResponseRequestBuilder[Req, Res] {
* Invoke the gRPC method with the additional metadata added and provide access to response metadata
*/
def invokeWithMetadata(request: Req): Future[GrpcSingleResponse[Res]]

/**
* Set the deadline for this call
* @return A new request builder, that will use the supplied deadline when invoked
*/
def setDeadline(deadline: Duration): SingleResponseRequestBuilder[Req, Res]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use FiniteDuration here instead. Always somewhat confusing what non finite Duration means.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went to make this change but found that GrpcClientSettings#deadline is already typed as Duration and supports supplying infinite, which means no timeout -- in NettyClientUtils it's implemented as follows:

  @InternalApi private[akka] def callOptionsWithDeadline(
      defaultOptions: CallOptions,
      settings: GrpcClientSettings): CallOptions =
    settings.deadline match {
      case d: FiniteDuration => defaultOptions.withDeadlineAfter(d.toMillis, TimeUnit.MILLISECONDS)
      case _                 => defaultOptions
    }

...so, I updated the call sites to check isFinite/null (Scala/Java) and clear timeout if its infinite, which I think is more consistent with how the settings are parsed. Let me know what you think though - happy to make it FiniteDuration if you think that's better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good, thanks

}

/**
Expand Down Expand Up @@ -88,4 +95,10 @@ trait StreamResponseRequestBuilder[Req, Res] {
* Invoke the gRPC method with the additional metadata added and provide access to response metadata
*/
def invokeWithMetadata(request: Req): Source[Res, Future[GrpcResponseMetadata]]

/**
* Set the deadline for this call
* @return A new request builder, that will use the supplied deadline when invoked
*/
def setDeadline(deadline: Duration): StreamResponseRequestBuilder[Req, Res]
}