Skip to content

Commit

Permalink
feat: Periodic client discovery refresh #1152
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Feb 19, 2024
1 parent 8600fd8 commit ca3a50a
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 29 deletions.
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object Dependencies {
// We don't force Akka updates because downstream projects can upgrade
// themselves. For more information see
// https://doc.akka.io//docs/akka/current/project/downstream-upgrade-strategy.html
val akka = "2.9.0"
val akka = "2.9.1-M1+11-809c4920-SNAPSHOT"
val akkaBinary = "2.9"
val akkaHttp = "10.6.0"
val akkaHttpBinary = "10.6"
Expand Down
8 changes: 8 additions & 0 deletions runtime/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ akka.grpc.client."*" {

# timeout for service discovery resolving
resolve-timeout = 1s

# Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries
# if the discovery mechanism supports that. Expected use is for client side load-balancing, to detect new services
# to load balance across. The default value "off" disables periodic refresh and instead only does refresh when
# the client implementation decides to.
#
# Currently only supported by the Netty client backend.
refresh-interval = off
}

# port to use if service-discovery-mechism is static or service discovery does not return a port
Expand Down
46 changes: 40 additions & 6 deletions runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import com.typesafe.config.{ Config, ConfigValueFactory }
import io.grpc.CallCredentials
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider
import javax.net.ssl.{ SSLContext, TrustManager }

import java.util.Optional
import javax.net.ssl.{ SSLContext, TrustManager }
import scala.collection.immutable
import scala.concurrent.duration.{ Duration, _ }
import scala.jdk.OptionConverters.RichOptional

object GrpcClientSettings {

Expand Down Expand Up @@ -148,7 +150,9 @@ object GrpcClientSettings {
getOptionalString(clientConfiguration, "user-agent"),
clientConfiguration.getBoolean("use-tls"),
getOptionalString(clientConfiguration, "load-balancing-policy"),
clientConfiguration.getString("backend"))
clientConfiguration.getString("backend"),
identity,
getOptionalDuration(clientConfiguration, "discovery.refresh-interval"))

private def getOptionalString(config: Config, path: String): Option[String] =
config.getString(path) match {
Expand All @@ -162,6 +166,12 @@ object GrpcClientSettings {
case other => Some(other)
}

private def getOptionalDuration(config: Config, path: String): Option[FiniteDuration] =
config.getString(path) match {
case "off" => None
case _ => Some(config.getDuration(path).asScala)
}

private def getPotentiallyInfiniteDuration(underlying: Config, path: String): Duration =
Helpers.toRootLowerCase(underlying.getString(path)) match {
case "infinite" => Duration.Inf
Expand Down Expand Up @@ -196,7 +206,8 @@ final class GrpcClientSettings private (
val useTls: Boolean,
val loadBalancingPolicy: Option[String],
val backend: String,
val channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = identity) {
val channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = identity,
val discoveryRefreshInterval: Option[FiniteDuration]) {
require(
sslContext.isEmpty || trustManager.isEmpty,
"Configuring the sslContext or the trustManager is mutually exclusive")
Expand Down Expand Up @@ -284,6 +295,28 @@ final class GrpcClientSettings private (
def withBackend(value: String): GrpcClientSettings =
copy(backend = value)

/**
* Scala API: Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries
* if the discovery mechanism supports that. The default value `None` disables periodic refresh and instead
* only does refresh when the client implementation decides to.
*
* Currently only supported by the Netty client backend.
*/
@ApiMayChange
def withDiscoveryRefreshInterval(refreshInterval: Option[FiniteDuration]): GrpcClientSettings =
copy(discoveryRefreshInterval = refreshInterval)

/**
* Java API: Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries
* if the discovery mechanism supports that. The default value `None` disables periodic refresh and instead
* only does refresh when the client implementation decides to.
*
* Currently only supported by the Netty client backend.
*/
@ApiMayChange
def withDiscoveryRefreshInterval(refreshInterval: Optional[java.time.Duration]): GrpcClientSettings =
copy(discoveryRefreshInterval = refreshInterval.map(_.asScala).toScala)

private def copy(
serviceName: String = serviceName,
servicePortName: Option[String] = servicePortName,
Expand All @@ -301,8 +334,8 @@ final class GrpcClientSettings private (
connectionAttempts: Option[Int] = connectionAttempts,
loadBalancingPolicy: Option[String] = loadBalancingPolicy,
backend: String = backend,
channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = channelBuilderOverrides)
: GrpcClientSettings =
channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = channelBuilderOverrides,
discoveryRefreshInterval: Option[FiniteDuration] = discoveryRefreshInterval): GrpcClientSettings =
new GrpcClientSettings(
callCredentials = callCredentials,
serviceDiscovery = serviceDiscovery,
Expand All @@ -321,5 +354,6 @@ final class GrpcClientSettings private (
connectionAttempts = connectionAttempts,
loadBalancingPolicy = loadBalancingPolicy,
backend = backend,
channelBuilderOverrides = channelBuilderOverrides)
channelBuilderOverrides = channelBuilderOverrides,
discoveryRefreshInterval = discoveryRefreshInterval)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,55 +4,101 @@

package akka.grpc.internal

import java.net.{ InetAddress, InetSocketAddress, UnknownHostException }
import akka.actor.ActorSystem
import akka.actor.Cancellable
import akka.annotation.InternalApi

import java.net.{ InetAddress, InetSocketAddress, UnknownHostException }
import akka.discovery.ServiceDiscovery.ResolvedTarget
import akka.discovery.{ Lookup, ServiceDiscovery }
import akka.event.Logging
import akka.grpc.GrpcClientSettings
import io.grpc.{ Attributes, EquivalentAddressGroup, NameResolver, Status }
import io.grpc.NameResolver.Listener

import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Promise }
import scala.util.{ Failure, Success }

class AkkaDiscoveryNameResolver(
/**
* INTERNAL API
*/
@InternalApi
private[akka] final class AkkaDiscoveryNameResolver(
discovery: ServiceDiscovery,
defaultPort: Int,
serviceName: String,
portName: Option[String],
protocol: Option[String],
resolveTimeout: FiniteDuration)(implicit val ec: ExecutionContext)
resolveTimeout: FiniteDuration,
refreshInterval: Option[FiniteDuration])(implicit val ec: ExecutionContext, system: ActorSystem)
extends NameResolver {

private final val log = Logging(system, getClass)

override def getServiceAuthority: String = serviceName

val listener: Promise[Listener] = Promise()
private val listener: Promise[Listener] = Promise()

// initialized after first resolve if needed
private val refreshTask = new AtomicReference[Cancellable]

override def start(l: Listener): Unit = {
log.debug("Name resolver for {} started", serviceName)
listener.trySuccess(l)
lookup(l)
lookup(l, evict = false)
}

override def refresh(): Unit =
override def refresh(): Unit = refresh(false)

private def refresh(evict: Boolean): Unit =
listener.future.onComplete {
case Success(l) => lookup(l)
case Success(l) =>
log.debug("Name resolver for {} refreshing", serviceName)
lookup(l, evict)
case Failure(_) => // We never fail this promise
}

def lookup(listener: Listener): Unit = {
discovery.lookup(Lookup(serviceName, portName, protocol), resolveTimeout).onComplete {
def lookup(listener: Listener, evict: Boolean): Unit = {
val request = {
val l = Lookup(serviceName, portName, protocol)
if (evict) l.withDiscardCache
else l
}
val result = discovery.lookup(request, resolveTimeout)

result.onComplete {
case Success(result) =>
try {
if (log.isDebugEnabled)
log.debug(
"Successful service discovery for service {}, found addresses: {}",
serviceName,
result.addresses.mkString(", "))
listener.onAddresses(addresses(result.addresses), Attributes.EMPTY)
} catch {
case e: UnknownHostException =>
// TODO at least log
log.warning(e, s"Unknown host for service $serviceName")
listener.onError(Status.UNKNOWN.withDescription(e.getMessage))
}
case Failure(e) =>
// TODO at least log
log.warning(e, s"Service discovery failed for service $serviceName")
listener.onError(Status.UNKNOWN.withDescription(e.getMessage))
}

// initialize refresh timer after first lookup, if configured
if (refreshInterval.isDefined && refreshTask.get() == null) {
result.onComplete { _ =>
refreshInterval.foreach { interval =>
val cancellable = system.scheduler.scheduleWithFixedDelay(interval, interval)(() => refresh(evict = true))
if (!refreshTask.compareAndSet(null, cancellable)) {
// concurrent update beat us to it, there already is a scheduled task
cancellable.cancel()
}
}
}
}
}

@throws[UnknownHostException]
Expand All @@ -67,16 +113,25 @@ class AkkaDiscoveryNameResolver(
.asJava
}

override def shutdown(): Unit = ()
override def shutdown(): Unit = {
val refreshCancellable = refreshTask.get()
if (refreshCancellable ne null) refreshCancellable.cancel()
}
}

object AkkaDiscoveryNameResolver {
def apply(settings: GrpcClientSettings)(implicit ec: ExecutionContext): AkkaDiscoveryNameResolver =
/**
* INTERNAL API
*/
@InternalApi
private[akka] object AkkaDiscoveryNameResolver {
def apply(
settings: GrpcClientSettings)(implicit ec: ExecutionContext, system: ActorSystem): AkkaDiscoveryNameResolver =
new AkkaDiscoveryNameResolver(
settings.serviceDiscovery,
settings.defaultPort,
settings.serviceName,
settings.servicePortName,
settings.serviceProtocol,
settings.resolveTimeout)
settings.resolveTimeout,
settings.discoveryRefreshInterval)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,28 @@

package akka.grpc.internal

import java.net.URI
import akka.actor.ActorSystem
import akka.annotation.InternalApi

import java.net.URI
import akka.discovery.ServiceDiscovery
import io.grpc.{ NameResolver, NameResolverProvider }

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

class AkkaDiscoveryNameResolverProvider(
/**
* INTERNAL API
*/
@InternalApi
private[akka] final class AkkaDiscoveryNameResolverProvider(
discovery: ServiceDiscovery,
defaultPort: Int,
serviceName: String,
portName: Option[String],
protocol: Option[String],
resolveTimeout: FiniteDuration)(implicit ec: ExecutionContext)
resolveTimeout: FiniteDuration,
refreshInterval: Option[FiniteDuration])(implicit ec: ExecutionContext, system: ActorSystem)
extends NameResolverProvider {
override def isAvailable: Boolean = true

Expand All @@ -27,6 +34,13 @@ class AkkaDiscoveryNameResolverProvider(
override def getDefaultScheme: String = "http"

override def newNameResolver(targetUri: URI, args: NameResolver.Args): AkkaDiscoveryNameResolver = {
new AkkaDiscoveryNameResolver(discovery, defaultPort, serviceName, portName, protocol, resolveTimeout)
new AkkaDiscoveryNameResolver(
discovery,
defaultPort,
serviceName,
portName,
protocol,
resolveTimeout,
refreshInterval)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object ChannelUtils {
implicit sys: ClassicActorSystemProvider): InternalChannel = {
settings.backend match {
case "netty" =>
NettyClientUtils.createChannel(settings, log)(sys.classicSystem.dispatcher)
NettyClientUtils.createChannel(settings, log)(sys.classicSystem.dispatcher, sys.classicSystem)
case "akka-http" =>
AkkaHttpClientUtils.createChannel(settings, log)
case _ => throw new IllegalArgumentException(s"Unexpected backend [${settings.backend}]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Source
import akka.Done
import akka.NotUsed
import akka.actor.ActorSystem
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts
import io.grpc.netty.shaded.io.grpc.netty.NegotiationType
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
Expand Down Expand Up @@ -48,7 +49,8 @@ object NettyClientUtils {
*/
@InternalApi
def createChannel(settings: GrpcClientSettings, log: LoggingAdapter)(
implicit ec: ExecutionContext): InternalChannel = {
implicit ec: ExecutionContext,
system: ActorSystem): InternalChannel = {

@nowarn("msg=deprecated")
var builder =
Expand All @@ -66,7 +68,8 @@ object NettyClientUtils {
settings.serviceName,
settings.servicePortName,
settings.serviceProtocol,
settings.resolveTimeout))
settings.resolveTimeout,
settings.discoveryRefreshInterval))

if (!settings.useTls)
builder = builder.usePlaintext()
Expand Down

0 comments on commit ca3a50a

Please sign in to comment.