Skip to content

Commit

Permalink
feat: Periodic client discovery refresh #1152 (#1886)
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren authored Feb 28, 2024
1 parent 5b92da0 commit 02e1335
Show file tree
Hide file tree
Showing 14 changed files with 168 additions and 41 deletions.
2 changes: 1 addition & 1 deletion plugin-tester-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ repositories {

def scalaVersion = org.gradle.util.VersionNumber.parse(System.getenv("TRAVIS_SCALA_VERSION") ?: "2.13")
def scalaBinaryVersion = "${scalaVersion.major}.${scalaVersion.minor}"
def akkaVersion = "2.9.0"
def akkaVersion = "2.9.1"

dependencies {
implementation group: 'ch.megard', name: "akka-http-cors_${scalaBinaryVersion}", version: '1.1.3'
Expand Down
2 changes: 1 addition & 1 deletion plugin-tester-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<maven-dependency-plugin.version>3.1.2</maven-dependency-plugin.version>
<maven-exec-plugin.version>3.0.0</maven-exec-plugin.version>
<akka.http.cors.version>1.1.0</akka.http.cors.version>
<akka.version>2.9.0</akka.version>
<akka.version>2.9.1</akka.version>
<grpc.version>1.60.2</grpc.version> <!-- checked synced by VersionSyncCheckPlugin -->
<project.encoding>UTF-8</project.encoding>
<build-helper-maven-plugin>3.3.0</build-helper-maven-plugin>
Expand Down
2 changes: 1 addition & 1 deletion plugin-tester-scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ repositories {

def scalaVersion = org.gradle.util.VersionNumber.parse(System.getenv("TRAVIS_SCALA_VERSION") ?: "2.13")
def scalaBinaryVersion = "${scalaVersion.major}.${scalaVersion.minor}"
def akkaVersion = "2.9.0"
def akkaVersion = "2.9.1"

dependencies {
implementation group: 'ch.megard', name: "akka-http-cors_${scalaBinaryVersion}", version: '1.1.3'
Expand Down
2 changes: 1 addition & 1 deletion plugin-tester-scala/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<properties>
<maven.compiler.release>11</maven.compiler.release>
<akka.version>2.9.0</akka.version>
<akka.version>2.9.1</akka.version>
<akka.http.cors.version>0.4.2</akka.http.cors.version>
<grpc.version>1.60.2</grpc.version> <!-- checked synced by VersionSyncCheckPlugin -->
<project.encoding>UTF-8</project.encoding>
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object Dependencies {
// We don't force Akka updates because downstream projects can upgrade
// themselves. For more information see
// https://doc.akka.io//docs/akka/current/project/downstream-upgrade-strategy.html
val akka = "2.9.0"
val akka = "2.9.1"
val akkaBinary = "2.9"
val akkaHttp = "10.6.0"
val akkaHttpBinary = "10.6"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# private
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.GrpcClientSettings.this")

# internal
ProblemFilters.exclude[FinalClassProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.apply")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.listener")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.lookup")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolver.apply")
ProblemFilters.exclude[FinalClassProblem]("akka.grpc.internal.AkkaDiscoveryNameResolverProvider")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.AkkaDiscoveryNameResolverProvider.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.NettyClientUtils.createChannel")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.grpc.internal.NettyClientUtils.createChannel")
8 changes: 8 additions & 0 deletions runtime/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ akka.grpc.client."*" {

# timeout for service discovery resolving
resolve-timeout = 1s

# Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries
# if the discovery mechanism supports that. Expected use is for client side load-balancing, to detect new services
# to load balance across. The default value "off" disables periodic refresh and instead only does refresh when
# the client implementation decides to.
#
# Currently only supported by the Netty client backend.
refresh-interval = off
}

# port to use if service-discovery-mechism is static or service discovery does not return a port
Expand Down
44 changes: 38 additions & 6 deletions runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import com.typesafe.config.{ Config, ConfigValueFactory }
import io.grpc.CallCredentials
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider
import javax.net.ssl.{ SSLContext, TrustManager }

import javax.net.ssl.{ SSLContext, TrustManager }
import scala.collection.immutable
import scala.concurrent.duration.{ Duration, _ }

Expand Down Expand Up @@ -148,7 +148,9 @@ object GrpcClientSettings {
getOptionalString(clientConfiguration, "user-agent"),
clientConfiguration.getBoolean("use-tls"),
getOptionalString(clientConfiguration, "load-balancing-policy"),
clientConfiguration.getString("backend"))
clientConfiguration.getString("backend"),
identity,
getOptionalDuration(clientConfiguration, "service-discovery.refresh-interval"))

private def getOptionalString(config: Config, path: String): Option[String] =
config.getString(path) match {
Expand All @@ -162,6 +164,12 @@ object GrpcClientSettings {
case other => Some(other)
}

private def getOptionalDuration(config: Config, path: String): Option[FiniteDuration] =
Helpers.toRootLowerCase(config.getString(path)) match {
case "off" => None
case _ => Some(config.getDuration(path).asScala)
}

private def getPotentiallyInfiniteDuration(underlying: Config, path: String): Duration =
Helpers.toRootLowerCase(underlying.getString(path)) match {
case "infinite" => Duration.Inf
Expand Down Expand Up @@ -196,7 +204,8 @@ final class GrpcClientSettings private (
val useTls: Boolean,
val loadBalancingPolicy: Option[String],
val backend: String,
val channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = identity) {
val channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = identity,
val discoveryRefreshInterval: Option[FiniteDuration]) {
require(
sslContext.isEmpty || trustManager.isEmpty,
"Configuring the sslContext or the trustManager is mutually exclusive")
Expand Down Expand Up @@ -284,6 +293,28 @@ final class GrpcClientSettings private (
def withBackend(value: String): GrpcClientSettings =
copy(backend = value)

/**
* Scala API: Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries
* if the discovery mechanism supports that. The default is no periodic refresh and instead
* * only does refresh when the client implementation decides to.
*
* Currently only supported by the Netty client backend.
*/
@ApiMayChange
def withDiscoveryRefreshInterval(refreshInterval: FiniteDuration): GrpcClientSettings =
copy(discoveryRefreshInterval = Some(refreshInterval))

/**
* Java API: Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries
* if the discovery mechanism supports that. The default is no periodic refresh and instead
* only does refresh when the client implementation decides to.
*
* Currently only supported by the Netty client backend.
*/
@ApiMayChange
def withDiscoveryRefreshInterval(refreshInterval: java.time.Duration): GrpcClientSettings =
copy(discoveryRefreshInterval = Some(refreshInterval.asScala))

private def copy(
serviceName: String = serviceName,
servicePortName: Option[String] = servicePortName,
Expand All @@ -301,8 +332,8 @@ final class GrpcClientSettings private (
connectionAttempts: Option[Int] = connectionAttempts,
loadBalancingPolicy: Option[String] = loadBalancingPolicy,
backend: String = backend,
channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = channelBuilderOverrides)
: GrpcClientSettings =
channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = channelBuilderOverrides,
discoveryRefreshInterval: Option[FiniteDuration] = discoveryRefreshInterval): GrpcClientSettings =
new GrpcClientSettings(
callCredentials = callCredentials,
serviceDiscovery = serviceDiscovery,
Expand All @@ -321,5 +352,6 @@ final class GrpcClientSettings private (
connectionAttempts = connectionAttempts,
loadBalancingPolicy = loadBalancingPolicy,
backend = backend,
channelBuilderOverrides = channelBuilderOverrides)
channelBuilderOverrides = channelBuilderOverrides,
discoveryRefreshInterval = discoveryRefreshInterval)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,55 +4,101 @@

package akka.grpc.internal

import java.net.{ InetAddress, InetSocketAddress, UnknownHostException }
import akka.actor.ActorSystem
import akka.actor.Cancellable
import akka.annotation.InternalApi

import java.net.{ InetAddress, InetSocketAddress, UnknownHostException }
import akka.discovery.ServiceDiscovery.ResolvedTarget
import akka.discovery.{ Lookup, ServiceDiscovery }
import akka.event.Logging
import akka.grpc.GrpcClientSettings
import io.grpc.{ Attributes, EquivalentAddressGroup, NameResolver, Status }
import io.grpc.NameResolver.Listener

import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Promise }
import scala.util.{ Failure, Success }

class AkkaDiscoveryNameResolver(
/**
* INTERNAL API
*/
@InternalApi
private[akka] final class AkkaDiscoveryNameResolver(
discovery: ServiceDiscovery,
defaultPort: Int,
serviceName: String,
portName: Option[String],
protocol: Option[String],
resolveTimeout: FiniteDuration)(implicit val ec: ExecutionContext)
resolveTimeout: FiniteDuration,
refreshInterval: Option[FiniteDuration])(implicit val ec: ExecutionContext, system: ActorSystem)
extends NameResolver {

private final val log = Logging(system, "akka.grpc.internal.AkkaDiscoveryNameResolver")

override def getServiceAuthority: String = serviceName

val listener: Promise[Listener] = Promise()
private val listener: Promise[Listener] = Promise()

// initialized after first resolve if needed
private val refreshTask = new AtomicReference[Cancellable]

override def start(l: Listener): Unit = {
log.debug("Name resolver for {} started", serviceName)
listener.trySuccess(l)
lookup(l)
lookup(l, evict = false)
}

override def refresh(): Unit =
override def refresh(): Unit = refresh(false)

private def refresh(evict: Boolean): Unit =
listener.future.onComplete {
case Success(l) => lookup(l)
case Success(l) =>
log.debug("Name resolver for {} refreshing", serviceName)
lookup(l, evict)
case Failure(_) => // We never fail this promise
}

def lookup(listener: Listener): Unit = {
discovery.lookup(Lookup(serviceName, portName, protocol), resolveTimeout).onComplete {
def lookup(listener: Listener, evict: Boolean): Unit = {
val request = {
val l = Lookup(serviceName, portName, protocol)
if (evict) l.withDiscardCache
else l
}
val result = discovery.lookup(request, resolveTimeout)

result.onComplete {
case Success(result) =>
try {
if (log.isDebugEnabled)
log.debug(
"Successful service discovery for service {}, found addresses: {}",
serviceName,
result.addresses.mkString(", "))
listener.onAddresses(addresses(result.addresses), Attributes.EMPTY)
} catch {
case e: UnknownHostException =>
// TODO at least log
log.warning(e, "Unknown host for service {}", serviceName)
listener.onError(Status.UNKNOWN.withDescription(e.getMessage))
}
case Failure(e) =>
// TODO at least log
log.warning(e, "Service discovery failed for service {}", serviceName)
listener.onError(Status.UNKNOWN.withDescription(e.getMessage))
}

// initialize refresh timer after first lookup, if configured
if (refreshInterval.isDefined && refreshTask.get() == null) {
result.onComplete { _ =>
refreshInterval.foreach { interval =>
val cancellable = system.scheduler.scheduleWithFixedDelay(interval, interval)(() => refresh(evict = true))
if (!refreshTask.compareAndSet(null, cancellable)) {
// concurrent update beat us to it, there already is a scheduled task
cancellable.cancel()
}
}
}
}
}

@throws[UnknownHostException]
Expand All @@ -67,16 +113,25 @@ class AkkaDiscoveryNameResolver(
.asJava
}

override def shutdown(): Unit = ()
override def shutdown(): Unit = {
val refreshCancellable = refreshTask.get()
if (refreshCancellable ne null) refreshCancellable.cancel()
}
}

object AkkaDiscoveryNameResolver {
def apply(settings: GrpcClientSettings)(implicit ec: ExecutionContext): AkkaDiscoveryNameResolver =
/**
* INTERNAL API
*/
@InternalApi
private[akka] object AkkaDiscoveryNameResolver {
def apply(
settings: GrpcClientSettings)(implicit ec: ExecutionContext, system: ActorSystem): AkkaDiscoveryNameResolver =
new AkkaDiscoveryNameResolver(
settings.serviceDiscovery,
settings.defaultPort,
settings.serviceName,
settings.servicePortName,
settings.serviceProtocol,
settings.resolveTimeout)
settings.resolveTimeout,
settings.discoveryRefreshInterval)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,28 @@

package akka.grpc.internal

import java.net.URI
import akka.actor.ActorSystem
import akka.annotation.InternalApi

import java.net.URI
import akka.discovery.ServiceDiscovery
import io.grpc.{ NameResolver, NameResolverProvider }

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration

class AkkaDiscoveryNameResolverProvider(
/**
* INTERNAL API
*/
@InternalApi
private[akka] final class AkkaDiscoveryNameResolverProvider(
discovery: ServiceDiscovery,
defaultPort: Int,
serviceName: String,
portName: Option[String],
protocol: Option[String],
resolveTimeout: FiniteDuration)(implicit ec: ExecutionContext)
resolveTimeout: FiniteDuration,
refreshInterval: Option[FiniteDuration])(implicit ec: ExecutionContext, system: ActorSystem)
extends NameResolverProvider {
override def isAvailable: Boolean = true

Expand All @@ -27,6 +34,13 @@ class AkkaDiscoveryNameResolverProvider(
override def getDefaultScheme: String = "http"

override def newNameResolver(targetUri: URI, args: NameResolver.Args): AkkaDiscoveryNameResolver = {
new AkkaDiscoveryNameResolver(discovery, defaultPort, serviceName, portName, protocol, resolveTimeout)
new AkkaDiscoveryNameResolver(
discovery,
defaultPort,
serviceName,
portName,
protocol,
resolveTimeout,
refreshInterval)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object ChannelUtils {
implicit sys: ClassicActorSystemProvider): InternalChannel = {
settings.backend match {
case "netty" =>
NettyClientUtils.createChannel(settings, log)(sys.classicSystem.dispatcher)
NettyClientUtils.createChannel(settings, log)(sys.classicSystem.dispatcher, sys.classicSystem)
case "akka-http" =>
AkkaHttpClientUtils.createChannel(settings, log)
case _ => throw new IllegalArgumentException(s"Unexpected backend [${settings.backend}]")
Expand Down
Loading

0 comments on commit 02e1335

Please sign in to comment.