diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c5f390c9a..c650a41fb 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,7 +17,7 @@ object Dependencies { // We don't force Akka updates because downstream projects can upgrade // themselves. For more information see // https://doc.akka.io//docs/akka/current/project/downstream-upgrade-strategy.html - val akka = "2.9.0" + val akka = "2.9.1-M1+11-809c4920-SNAPSHOT" val akkaBinary = "2.9" val akkaHttp = "10.6.0" val akkaHttpBinary = "10.6" diff --git a/runtime/src/main/resources/reference.conf b/runtime/src/main/resources/reference.conf index 25d8b47ac..c3729a3ba 100644 --- a/runtime/src/main/resources/reference.conf +++ b/runtime/src/main/resources/reference.conf @@ -17,6 +17,14 @@ akka.grpc.client."*" { # timeout for service discovery resolving resolve-timeout = 1s + + # Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries + # if the discovery mechanism supports that. Expected use is for client side load-balancing, to detect new services + # to load balance across. The default value "off" disables periodic refresh and instead only does refresh when + # the client implementation decides to. + # + # Currently only supported by the Netty client backend. + refresh-interval = off } # port to use if service-discovery-mechism is static or service discovery does not return a port diff --git a/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala b/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala index bd9742bc2..fd6fd2928 100644 --- a/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala +++ b/runtime/src/main/scala/akka/grpc/GrpcClientSettings.scala @@ -15,10 +15,12 @@ import com.typesafe.config.{ Config, ConfigValueFactory } import io.grpc.CallCredentials import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider -import javax.net.ssl.{ SSLContext, TrustManager } +import java.util.Optional +import javax.net.ssl.{ SSLContext, TrustManager } import scala.collection.immutable import scala.concurrent.duration.{ Duration, _ } +import scala.jdk.OptionConverters.RichOptional object GrpcClientSettings { @@ -148,7 +150,9 @@ object GrpcClientSettings { getOptionalString(clientConfiguration, "user-agent"), clientConfiguration.getBoolean("use-tls"), getOptionalString(clientConfiguration, "load-balancing-policy"), - clientConfiguration.getString("backend")) + clientConfiguration.getString("backend"), + identity, + getOptionalDuration(clientConfiguration, "discovery.refresh-interval")) private def getOptionalString(config: Config, path: String): Option[String] = config.getString(path) match { @@ -162,6 +166,12 @@ object GrpcClientSettings { case other => Some(other) } + private def getOptionalDuration(config: Config, path: String): Option[FiniteDuration] = + config.getString(path) match { + case "off" => None + case _ => Some(config.getDuration(path).asScala) + } + private def getPotentiallyInfiniteDuration(underlying: Config, path: String): Duration = Helpers.toRootLowerCase(underlying.getString(path)) match { case "infinite" => Duration.Inf @@ -196,7 +206,8 @@ final class GrpcClientSettings private ( val useTls: Boolean, val loadBalancingPolicy: Option[String], val backend: String, - val channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = identity) { + val channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = identity, + val discoveryRefreshInterval: Option[FiniteDuration]) { require( sslContext.isEmpty || trustManager.isEmpty, "Configuring the sslContext or the trustManager is mutually exclusive") @@ -284,6 +295,28 @@ final class GrpcClientSettings private ( def withBackend(value: String): GrpcClientSettings = copy(backend = value) + /** + * Scala API: Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries + * if the discovery mechanism supports that. The default value `None` disables periodic refresh and instead + * only does refresh when the client implementation decides to. + * + * Currently only supported by the Netty client backend. + */ + @ApiMayChange + def withDiscoveryRefreshInterval(refreshInterval: Option[FiniteDuration]): GrpcClientSettings = + copy(discoveryRefreshInterval = refreshInterval) + + /** + * Java API: Set this to a duration to trigger periodic refresh of the resolved endpoints, evicting cached entries + * if the discovery mechanism supports that. The default value `None` disables periodic refresh and instead + * only does refresh when the client implementation decides to. + * + * Currently only supported by the Netty client backend. + */ + @ApiMayChange + def withDiscoveryRefreshInterval(refreshInterval: Optional[java.time.Duration]): GrpcClientSettings = + copy(discoveryRefreshInterval = refreshInterval.map(_.asScala).toScala) + private def copy( serviceName: String = serviceName, servicePortName: Option[String] = servicePortName, @@ -301,8 +334,8 @@ final class GrpcClientSettings private ( connectionAttempts: Option[Int] = connectionAttempts, loadBalancingPolicy: Option[String] = loadBalancingPolicy, backend: String = backend, - channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = channelBuilderOverrides) - : GrpcClientSettings = + channelBuilderOverrides: NettyChannelBuilder => NettyChannelBuilder = channelBuilderOverrides, + discoveryRefreshInterval: Option[FiniteDuration] = discoveryRefreshInterval): GrpcClientSettings = new GrpcClientSettings( callCredentials = callCredentials, serviceDiscovery = serviceDiscovery, @@ -321,5 +354,6 @@ final class GrpcClientSettings private ( connectionAttempts = connectionAttempts, loadBalancingPolicy = loadBalancingPolicy, backend = backend, - channelBuilderOverrides = channelBuilderOverrides) + channelBuilderOverrides = channelBuilderOverrides, + discoveryRefreshInterval = discoveryRefreshInterval) } diff --git a/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala b/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala index c299ddf3e..949dd6033 100644 --- a/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala +++ b/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolver.scala @@ -4,55 +4,101 @@ package akka.grpc.internal -import java.net.{ InetAddress, InetSocketAddress, UnknownHostException } +import akka.actor.ActorSystem +import akka.actor.Cancellable +import akka.annotation.InternalApi +import java.net.{ InetAddress, InetSocketAddress, UnknownHostException } import akka.discovery.ServiceDiscovery.ResolvedTarget import akka.discovery.{ Lookup, ServiceDiscovery } +import akka.event.Logging import akka.grpc.GrpcClientSettings import io.grpc.{ Attributes, EquivalentAddressGroup, NameResolver, Status } import io.grpc.NameResolver.Listener +import java.util.concurrent.atomic.AtomicReference import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ ExecutionContext, Promise } import scala.util.{ Failure, Success } -class AkkaDiscoveryNameResolver( +/** + * INTERNAL API + */ +@InternalApi +private[akka] final class AkkaDiscoveryNameResolver( discovery: ServiceDiscovery, defaultPort: Int, serviceName: String, portName: Option[String], protocol: Option[String], - resolveTimeout: FiniteDuration)(implicit val ec: ExecutionContext) + resolveTimeout: FiniteDuration, + refreshInterval: Option[FiniteDuration])(implicit val ec: ExecutionContext, system: ActorSystem) extends NameResolver { + + private final val log = Logging(system, getClass) + override def getServiceAuthority: String = serviceName - val listener: Promise[Listener] = Promise() + private val listener: Promise[Listener] = Promise() + + // initialized after first resolve if needed + private val refreshTask = new AtomicReference[Cancellable] override def start(l: Listener): Unit = { + log.debug("Name resolver for {} started", serviceName) listener.trySuccess(l) - lookup(l) + lookup(l, evict = false) } - override def refresh(): Unit = + override def refresh(): Unit = refresh(false) + + private def refresh(evict: Boolean): Unit = listener.future.onComplete { - case Success(l) => lookup(l) + case Success(l) => + log.debug("Name resolver for {} refreshing", serviceName) + lookup(l, evict) case Failure(_) => // We never fail this promise } - def lookup(listener: Listener): Unit = { - discovery.lookup(Lookup(serviceName, portName, protocol), resolveTimeout).onComplete { + def lookup(listener: Listener, evict: Boolean): Unit = { + val request = { + val l = Lookup(serviceName, portName, protocol) + if (evict) l.withDiscardCache + else l + } + val result = discovery.lookup(request, resolveTimeout) + + result.onComplete { case Success(result) => try { + if (log.isDebugEnabled) + log.debug( + "Successful service discovery for service {}, found addresses: {}", + serviceName, + result.addresses.mkString(", ")) listener.onAddresses(addresses(result.addresses), Attributes.EMPTY) } catch { case e: UnknownHostException => - // TODO at least log + log.warning(e, s"Unknown host for service $serviceName") listener.onError(Status.UNKNOWN.withDescription(e.getMessage)) } case Failure(e) => - // TODO at least log + log.warning(e, s"Service discovery failed for service $serviceName") listener.onError(Status.UNKNOWN.withDescription(e.getMessage)) } + + // initialize refresh timer after first lookup, if configured + if (refreshInterval.isDefined && refreshTask.get() == null) { + result.onComplete { _ => + refreshInterval.foreach { interval => + val cancellable = system.scheduler.scheduleWithFixedDelay(interval, interval)(() => refresh(evict = true)) + if (!refreshTask.compareAndSet(null, cancellable)) { + // concurrent update beat us to it, there already is a scheduled task + cancellable.cancel() + } + } + } + } } @throws[UnknownHostException] @@ -67,16 +113,25 @@ class AkkaDiscoveryNameResolver( .asJava } - override def shutdown(): Unit = () + override def shutdown(): Unit = { + val refreshCancellable = refreshTask.get() + if (refreshCancellable ne null) refreshCancellable.cancel() + } } -object AkkaDiscoveryNameResolver { - def apply(settings: GrpcClientSettings)(implicit ec: ExecutionContext): AkkaDiscoveryNameResolver = +/** + * INTERNAL API + */ +@InternalApi +private[akka] object AkkaDiscoveryNameResolver { + def apply( + settings: GrpcClientSettings)(implicit ec: ExecutionContext, system: ActorSystem): AkkaDiscoveryNameResolver = new AkkaDiscoveryNameResolver( settings.serviceDiscovery, settings.defaultPort, settings.serviceName, settings.servicePortName, settings.serviceProtocol, - settings.resolveTimeout) + settings.resolveTimeout, + settings.discoveryRefreshInterval) } diff --git a/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProvider.scala b/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProvider.scala index 995e98cfc..02f5609cb 100644 --- a/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProvider.scala +++ b/runtime/src/main/scala/akka/grpc/internal/AkkaDiscoveryNameResolverProvider.scala @@ -4,21 +4,28 @@ package akka.grpc.internal -import java.net.URI +import akka.actor.ActorSystem +import akka.annotation.InternalApi +import java.net.URI import akka.discovery.ServiceDiscovery import io.grpc.{ NameResolver, NameResolverProvider } import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration -class AkkaDiscoveryNameResolverProvider( +/** + * INTERNAL API + */ +@InternalApi +private[akka] final class AkkaDiscoveryNameResolverProvider( discovery: ServiceDiscovery, defaultPort: Int, serviceName: String, portName: Option[String], protocol: Option[String], - resolveTimeout: FiniteDuration)(implicit ec: ExecutionContext) + resolveTimeout: FiniteDuration, + refreshInterval: Option[FiniteDuration])(implicit ec: ExecutionContext, system: ActorSystem) extends NameResolverProvider { override def isAvailable: Boolean = true @@ -27,6 +34,13 @@ class AkkaDiscoveryNameResolverProvider( override def getDefaultScheme: String = "http" override def newNameResolver(targetUri: URI, args: NameResolver.Args): AkkaDiscoveryNameResolver = { - new AkkaDiscoveryNameResolver(discovery, defaultPort, serviceName, portName, protocol, resolveTimeout) + new AkkaDiscoveryNameResolver( + discovery, + defaultPort, + serviceName, + portName, + protocol, + resolveTimeout, + refreshInterval) } } diff --git a/runtime/src/main/scala/akka/grpc/internal/ChannelUtils.scala b/runtime/src/main/scala/akka/grpc/internal/ChannelUtils.scala index 4922e4a5a..bd7da7c0f 100644 --- a/runtime/src/main/scala/akka/grpc/internal/ChannelUtils.scala +++ b/runtime/src/main/scala/akka/grpc/internal/ChannelUtils.scala @@ -38,7 +38,7 @@ object ChannelUtils { implicit sys: ClassicActorSystemProvider): InternalChannel = { settings.backend match { case "netty" => - NettyClientUtils.createChannel(settings, log)(sys.classicSystem.dispatcher) + NettyClientUtils.createChannel(settings, log)(sys.classicSystem.dispatcher, sys.classicSystem) case "akka-http" => AkkaHttpClientUtils.createChannel(settings, log) case _ => throw new IllegalArgumentException(s"Unexpected backend [${settings.backend}]") diff --git a/runtime/src/main/scala/akka/grpc/internal/NettyClientUtils.scala b/runtime/src/main/scala/akka/grpc/internal/NettyClientUtils.scala index c89997b05..55f594485 100644 --- a/runtime/src/main/scala/akka/grpc/internal/NettyClientUtils.scala +++ b/runtime/src/main/scala/akka/grpc/internal/NettyClientUtils.scala @@ -14,6 +14,7 @@ import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Source import akka.Done import akka.NotUsed +import akka.actor.ActorSystem import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts import io.grpc.netty.shaded.io.grpc.netty.NegotiationType import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder @@ -48,7 +49,8 @@ object NettyClientUtils { */ @InternalApi def createChannel(settings: GrpcClientSettings, log: LoggingAdapter)( - implicit ec: ExecutionContext): InternalChannel = { + implicit ec: ExecutionContext, + system: ActorSystem): InternalChannel = { @nowarn("msg=deprecated") var builder = @@ -66,7 +68,8 @@ object NettyClientUtils { settings.serviceName, settings.servicePortName, settings.serviceProtocol, - settings.resolveTimeout)) + settings.resolveTimeout, + settings.discoveryRefreshInterval)) if (!settings.useTls) builder = builder.usePlaintext()