Skip to content

Commit

Permalink
Add support for CallCredentials in akka-http backend
Browse files Browse the repository at this point in the history
Merge metadata generated by CallCredentials callback into headers at
request time.
  • Loading branch information
dwhjames committed Aug 3, 2022
1 parent 9c613f4 commit 170d97e
Showing 1 changed file with 71 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,31 @@ package akka.grpc.internal

import java.net.InetSocketAddress
import java.security.SecureRandom
import java.util.concurrent.CompletionStage
import java.util.concurrent.{ CompletionStage, Executor }
import scala.concurrent.duration._
import akka.{ Done, NotUsed }
import akka.actor.ClassicActorSystemProvider
import akka.annotation.InternalApi
import akka.event.LoggingAdapter
import akka.grpc.GrpcProtocol.GrpcProtocolReader
import akka.grpc.{ GrpcClientSettings, GrpcResponseMetadata, GrpcSingleResponse, ProtobufSerializer }
import akka.grpc.scaladsl.MetadataEntry
import akka.http.scaladsl.model.HttpEntity.{ Chunk, Chunked, LastChunk, Strict }
import akka.http.scaladsl.{ ClientTransport, ConnectionContext, Http }
import akka.http.scaladsl.model.{ AttributeKey, HttpHeader, HttpRequest, HttpResponse, RequestResponseAssociation, Uri }
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.stream.{ Materializer, OverflowStrategy }
import akka.stream.scaladsl.{ Keep, Sink, Source }
import akka.util.ByteString
import io.grpc.{ CallOptions, MethodDescriptor, Status, StatusRuntimeException }
import io.grpc.{
Attributes,
CallCredentials,
CallOptions,
MethodDescriptor,
SecurityLevel,
Status,
StatusRuntimeException
}

import javax.net.ssl.{ KeyManager, SSLContext, TrustManager }
import scala.collection.immutable
Expand Down Expand Up @@ -157,6 +166,10 @@ object AkkaHttpClientUtils {
}
}

private val appExecutor = new Executor {
def execute(command: Runnable): Unit = ec.execute(command)
}

override def invokeWithMetadata[I, O](
source: Source[I, NotUsed],
headers: MetadataImpl,
Expand All @@ -166,12 +179,62 @@ object AkkaHttpClientUtils {
implicit val serializer: ProtobufSerializer[I] = descriptor
val deserializer: ProtobufSerializer[O] = descriptor
val scheme = if (settings.useTls) "https" else "http"
val httpRequest = GrpcRequestHelpers(
Uri(
s"${scheme}://${settings.overrideAuthority.getOrElse(settings.serviceName)}/" + descriptor.getFullMethodName),
GrpcEntityHelpers.metadataHeaders(headers.entries),
source)
responseToSource(singleRequest(httpRequest), deserializer)
val authority = settings.overrideAuthority.getOrElse(settings.serviceName)

val metadataPromise = Promise[immutable.List[(String, MetadataEntry)]]()

Option(options.getCredentials()) match {
case Some(callCredentials) =>
val requestInfo = new CallCredentials.RequestInfo {
val getAuthority = authority
val getMethodDescriptor = descriptor
val getSecurityLevel = if (settings.useTls) SecurityLevel.PRIVACY_AND_INTEGRITY else SecurityLevel.NONE
val getTransportAttrs = Attributes.EMPTY
}

val applier = new CallCredentials.MetadataApplier {
def apply(metadataToMerge: io.grpc.Metadata): Unit = {
if (metadataToMerge eq null) {
throw new IllegalArgumentException("Expected non null io.grpc.Metadata")
}
val isSuccess = metadataPromise.trySuccess(
headers.entries ++
MetadataImpl.scalaMetadataFromGoogleGrpcMetadata(metadataToMerge).asList)
if (!isSuccess) {
throw new IllegalArgumentException("apply() or fail() already called")
}
}

def fail(status: Status): Unit = {
if (status eq null) {
throw new IllegalArgumentException("Expected non null io.grpc.Status")
}
val isFailure = metadataPromise.tryFailure(new StatusRuntimeException(status))
if (!isFailure) {
throw new IllegalArgumentException("apply() or fail() already called")
}
}
}

callCredentials.applyRequestMetadata(requestInfo, appExecutor, applier)

case None =>
metadataPromise.success(headers.entries)
}

val futureResponse = {
metadataPromise.future.flatMap { metadata =>
val httpRequest =
GrpcRequestHelpers(
Uri(s"${scheme}://${authority}/" + descriptor.getFullMethodName),
GrpcEntityHelpers.metadataHeaders(metadata),
source)

singleRequest(httpRequest)
}
}

responseToSource(futureResponse, deserializer)
}
}
}
Expand Down

0 comments on commit 170d97e

Please sign in to comment.