Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for CallCredentials in akka-http backend #1646

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,31 @@ package akka.grpc.internal

import java.net.InetSocketAddress
import java.security.SecureRandom
import java.util.concurrent.CompletionStage
import java.util.concurrent.{ CompletionStage, Executor }
import scala.concurrent.duration._
import akka.{ Done, NotUsed }
import akka.actor.ClassicActorSystemProvider
import akka.annotation.InternalApi
import akka.event.LoggingAdapter
import akka.grpc.GrpcProtocol.GrpcProtocolReader
import akka.grpc.{ GrpcClientSettings, GrpcResponseMetadata, GrpcSingleResponse, ProtobufSerializer }
import akka.grpc.scaladsl.MetadataEntry
import akka.http.scaladsl.model.HttpEntity.{ Chunk, Chunked, LastChunk, Strict }
import akka.http.scaladsl.{ ClientTransport, ConnectionContext, Http }
import akka.http.scaladsl.model.{ AttributeKey, HttpHeader, HttpRequest, HttpResponse, RequestResponseAssociation, Uri }
import akka.http.scaladsl.settings.ClientConnectionSettings
import akka.stream.{ Materializer, OverflowStrategy }
import akka.stream.scaladsl.{ Keep, Sink, Source }
import akka.util.ByteString
import io.grpc.{ CallOptions, MethodDescriptor, Status, StatusRuntimeException }
import io.grpc.{
Attributes,
CallCredentials,
CallOptions,
MethodDescriptor,
SecurityLevel,
Status,
StatusRuntimeException
}

import javax.net.ssl.{ KeyManager, SSLContext, TrustManager }
import scala.collection.immutable
Expand Down Expand Up @@ -157,6 +166,10 @@ object AkkaHttpClientUtils {
}
}

private val appExecutor = new Executor {
def execute(command: Runnable): Unit = ec.execute(command)
}

override def invokeWithMetadata[I, O](
source: Source[I, NotUsed],
headers: MetadataImpl,
Expand All @@ -166,12 +179,62 @@ object AkkaHttpClientUtils {
implicit val serializer: ProtobufSerializer[I] = descriptor
val deserializer: ProtobufSerializer[O] = descriptor
val scheme = if (settings.useTls) "https" else "http"
val httpRequest = GrpcRequestHelpers(
Uri(
s"${scheme}://${settings.overrideAuthority.getOrElse(settings.serviceName)}/" + descriptor.getFullMethodName),
GrpcEntityHelpers.metadataHeaders(headers.entries),
source)
responseToSource(singleRequest(httpRequest), deserializer)
val authority = settings.overrideAuthority.getOrElse(settings.serviceName)

val metadataPromise = Promise[immutable.List[(String, MetadataEntry)]]()

Option(options.getCredentials()) match {
case Some(callCredentials) =>
val requestInfo = new CallCredentials.RequestInfo {
val getAuthority = authority
val getMethodDescriptor = descriptor
val getSecurityLevel = if (settings.useTls) SecurityLevel.PRIVACY_AND_INTEGRITY else SecurityLevel.NONE
val getTransportAttrs = Attributes.EMPTY
}

val applier = new CallCredentials.MetadataApplier {
def apply(metadataToMerge: io.grpc.Metadata): Unit = {
if (metadataToMerge eq null) {
throw new IllegalArgumentException("Expected non null io.grpc.Metadata")
}
val isSuccess = metadataPromise.trySuccess(
headers.entries ++
MetadataImpl.scalaMetadataFromGoogleGrpcMetadata(metadataToMerge).asList)
if (!isSuccess) {
throw new IllegalArgumentException("apply() or fail() already called")
}
}

def fail(status: Status): Unit = {
if (status eq null) {
throw new IllegalArgumentException("Expected non null io.grpc.Status")
}
val isFailure = metadataPromise.tryFailure(new StatusRuntimeException(status))
if (!isFailure) {
throw new IllegalArgumentException("apply() or fail() already called")
}
}
}

callCredentials.applyRequestMetadata(requestInfo, appExecutor, applier)

case None =>
metadataPromise.success(headers.entries)
}

val futureResponse = {
metadataPromise.future.flatMap { metadata =>
val httpRequest =
GrpcRequestHelpers(
Uri(s"${scheme}://${authority}/" + descriptor.getFullMethodName),
GrpcEntityHelpers.metadataHeaders(metadata),
source)

singleRequest(httpRequest)
}
}

responseToSource(futureResponse, deserializer)
}
}
}
Expand Down