From e57d2c2e1062a19a5feb38bd5afb9a6cfc0aea4a Mon Sep 17 00:00:00 2001 From: Daniel James Date: Wed, 3 Aug 2022 00:21:26 -0400 Subject: [PATCH] Add support for CallCredentials in akka-http backend Merge metadata generated by CallCredentials callback into headers at request time. --- .../grpc/internal/AkkaHttpClientUtils.scala | 79 +++++++++++++++++-- 1 file changed, 71 insertions(+), 8 deletions(-) diff --git a/runtime/src/main/scala/akka/grpc/internal/AkkaHttpClientUtils.scala b/runtime/src/main/scala/akka/grpc/internal/AkkaHttpClientUtils.scala index 80a357af4..62b92aac9 100644 --- a/runtime/src/main/scala/akka/grpc/internal/AkkaHttpClientUtils.scala +++ b/runtime/src/main/scala/akka/grpc/internal/AkkaHttpClientUtils.scala @@ -6,7 +6,7 @@ package akka.grpc.internal import java.net.InetSocketAddress import java.security.SecureRandom -import java.util.concurrent.CompletionStage +import java.util.concurrent.{ CompletionStage, Executor } import scala.concurrent.duration._ import akka.{ Done, NotUsed } import akka.actor.ClassicActorSystemProvider @@ -14,6 +14,7 @@ import akka.annotation.InternalApi import akka.event.LoggingAdapter import akka.grpc.GrpcProtocol.GrpcProtocolReader import akka.grpc.{ GrpcClientSettings, GrpcResponseMetadata, GrpcSingleResponse, ProtobufSerializer } +import akka.grpc.scaladsl.MetadataEntry import akka.http.scaladsl.model.HttpEntity.{ Chunk, Chunked, LastChunk, Strict } import akka.http.scaladsl.{ ClientTransport, ConnectionContext, Http } import akka.http.scaladsl.model.{ AttributeKey, HttpHeader, HttpRequest, HttpResponse, RequestResponseAssociation, Uri } @@ -21,7 +22,15 @@ import akka.http.scaladsl.settings.ClientConnectionSettings import akka.stream.{ Materializer, OverflowStrategy } import akka.stream.scaladsl.{ Keep, Sink, Source } import akka.util.ByteString -import io.grpc.{ CallOptions, MethodDescriptor, Status, StatusRuntimeException } +import io.grpc.{ + Attributes, + CallCredentials, + CallOptions, + MethodDescriptor, + SecurityLevel, + Status, + StatusRuntimeException +} import javax.net.ssl.{ KeyManager, SSLContext, TrustManager } import scala.collection.immutable @@ -157,6 +166,10 @@ object AkkaHttpClientUtils { } } + private val appExecutor = new Executor { + def execute(command: Runnable): Unit = ec.execute(command) + } + override def invokeWithMetadata[I, O]( source: Source[I, NotUsed], headers: MetadataImpl, @@ -166,12 +179,62 @@ object AkkaHttpClientUtils { implicit val serializer: ProtobufSerializer[I] = descriptor val deserializer: ProtobufSerializer[O] = descriptor val scheme = if (settings.useTls) "https" else "http" - val httpRequest = GrpcRequestHelpers( - Uri( - s"${scheme}://${settings.overrideAuthority.getOrElse(settings.serviceName)}/" + descriptor.getFullMethodName), - GrpcEntityHelpers.metadataHeaders(headers.entries), - source) - responseToSource(singleRequest(httpRequest), deserializer) + val authority = settings.overrideAuthority.getOrElse(settings.serviceName) + + val metadataPromise = Promise[immutable.List[(String, MetadataEntry)]]() + + Option(options.getCredentials()) match { + case Some(callCredentials) => + val requestInfo = new CallCredentials.RequestInfo { + val getAuthority = authority + val getMethodDescriptor = descriptor + val getSecurityLevel = if (settings.useTls) SecurityLevel.PRIVACY_AND_INTEGRITY else SecurityLevel.NONE + val getTransportAttrs = Attributes.EMPTY + } + + val applier = new CallCredentials.MetadataApplier { + def apply(metadataToMerge: io.grpc.Metadata): Unit = { + if (metadataToMerge eq null) { + throw new IllegalArgumentException("Expected non null io.grpc.Metadata") + } + val isSuccess = metadataPromise.trySuccess( + headers.entries ++ + MetadataImpl.scalaMetadataFromGoogleGrpcMetadata(metadataToMerge).asList) + if (!isSuccess) { + throw new IllegalArgumentException("apply() or fail() already called") + } + } + + def fail(status: Status): Unit = { + if (status eq null) { + throw new IllegalArgumentException("Expected non null io.grpc.Status") + } + val isFailure = metadataPromise.tryFailure(new StatusRuntimeException(status)) + if (!isFailure) { + throw new IllegalArgumentException("apply() or fail() already called") + } + } + } + + callCredentials.applyRequestMetadata(requestInfo, appExecutor, applier) + + case None => + metadataPromise.success(headers.entries) + } + + val futureResponse = { + metadataPromise.future.flatMap { metadata => + val httpRequest = + GrpcRequestHelpers( + Uri(s"${scheme}://${authority}/" + descriptor.getFullMethodName), + GrpcEntityHelpers.metadataHeaders(metadata), + source) + + singleRequest(httpRequest) + } + } + + responseToSource(futureResponse, deserializer) } } }