Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZClient fast shutdown #2410

Merged
merged 4 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion zio-http/src/main/scala/zio/http/ZClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ object ZClient {

val default: ZLayer[Any, Throwable, Client] = {
implicit val trace: Trace = Trace.empty
(ZLayer.succeed(Config.default) ++ ZLayer.succeed(NettyConfig.default) ++
(ZLayer.succeed(Config.default) ++ ZLayer.succeed(NettyConfig.defaultWithFastShutdown) ++
DnsResolver.default) >>> live
}

Expand Down
64 changes: 39 additions & 25 deletions zio-http/src/main/scala/zio/http/netty/EventLoopGroups.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

package zio.http.netty

import java.time.temporal.TemporalUnit
import java.util.concurrent.Executor

import scala.concurrent.duration.TimeUnit

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

Expand All @@ -33,52 +36,63 @@ import io.netty.incubator.channel.uring.IOUringEventLoopGroup
object EventLoopGroups {
trait Config extends ChannelType.Config {
def nThreads: Int
def shutdownQuietPeriod: Long
def shutdownTimeOut: Long

def shutdownTimeUnit: TimeUnit
}

def nio(nThreads: Int)(implicit trace: Trace): ZIO[Scope, Nothing, EventLoopGroup] =
make(ZIO.succeed(new NioEventLoopGroup(nThreads)))
def nio(config: Config)(implicit trace: Trace): ZIO[Scope, Nothing, EventLoopGroup] =
make(config: Config, ZIO.succeed(new NioEventLoopGroup(config.nThreads)))

def nio(nThreads: Int, executor: Executor)(implicit trace: Trace): ZIO[Scope, Nothing, EventLoopGroup] =
make(ZIO.succeed(new NioEventLoopGroup(nThreads, executor)))
def nio(config: Config, executor: Executor)(implicit trace: Trace): ZIO[Scope, Nothing, EventLoopGroup] =
make(config, ZIO.succeed(new NioEventLoopGroup(config.nThreads, executor)))

def make(eventLoopGroup: UIO[EventLoopGroup])(implicit trace: Trace): ZIO[Scope, Nothing, EventLoopGroup] =
ZIO.acquireRelease(eventLoopGroup)(ev => NettyFutureExecutor.executed(ev.shutdownGracefully).orDie)
def make(config: Config, eventLoopGroup: UIO[EventLoopGroup])(implicit
trace: Trace,
): ZIO[Scope, Nothing, EventLoopGroup] =
ZIO.acquireRelease(eventLoopGroup)(ev =>
NettyFutureExecutor
.executed(ev.shutdownGracefully(config.shutdownQuietPeriod, config.shutdownTimeOut, config.shutdownTimeUnit))
.orDie,
)

def epoll(nThreads: Int)(implicit trace: Trace): ZIO[Scope, Nothing, EventLoopGroup] =
make(ZIO.succeed(new EpollEventLoopGroup(nThreads)))
def epoll(config: Config)(implicit trace: Trace): ZIO[Scope, Nothing, EventLoopGroup] =
make(config, ZIO.succeed(new EpollEventLoopGroup(config.nThreads)))

def kqueue(nThreads: Int)(implicit trace: Trace): ZIO[Scope, Nothing, EventLoopGroup] =
make(ZIO.succeed(new KQueueEventLoopGroup(nThreads)))
def kqueue(config: Config)(implicit trace: Trace): ZIO[Scope, Nothing, EventLoopGroup] =
make(config, ZIO.succeed(new KQueueEventLoopGroup(config.nThreads)))

def epoll(nThreads: Int, executor: Executor)(implicit trace: Trace): ZIO[Scope, Nothing, EventLoopGroup] =
make(ZIO.succeed(new EpollEventLoopGroup(nThreads, executor)))
def epoll(config: Config, executor: Executor)(implicit trace: Trace): ZIO[Scope, Nothing, EventLoopGroup] =
make(config, ZIO.succeed(new EpollEventLoopGroup(config.nThreads, executor)))

def uring(nThread: Int)(implicit trace: Trace): ZIO[Scope, Nothing, EventLoopGroup] =
make(ZIO.succeed(new IOUringEventLoopGroup(nThread)))
def uring(config: Config)(implicit trace: Trace): ZIO[Scope, Nothing, EventLoopGroup] =
make(config, ZIO.succeed(new IOUringEventLoopGroup(config.nThreads)))

def uring(nThread: Int, executor: Executor)(implicit trace: Trace): ZIO[Scope, Nothing, EventLoopGroup] =
make(ZIO.succeed(new IOUringEventLoopGroup(nThread, executor)))
def uring(config: Config, executor: Executor)(implicit trace: Trace): ZIO[Scope, Nothing, EventLoopGroup] =
make(config, ZIO.succeed(new IOUringEventLoopGroup(config.nThreads, executor)))

def default(implicit trace: Trace): ZIO[Scope, Nothing, EventLoopGroup] = make(
def default(config: Config)(implicit trace: Trace): ZIO[Scope, Nothing, EventLoopGroup] = make(
config,
ZIO.succeed(new DefaultEventLoopGroup()),
)

implicit val trace: Trace = Trace.empty

val live: ZLayer[Config, Nothing, EventLoopGroup] =
ZLayer.scoped {
ZIO.service[Config].flatMap { config =>
ZIO.serviceWithZIO[Config] { config =>
config.channelType match {
case ChannelType.NIO => nio(config.nThreads)
case ChannelType.EPOLL => epoll(config.nThreads)
case ChannelType.KQUEUE => kqueue(config.nThreads)
case ChannelType.URING => uring(config.nThreads)
case ChannelType.NIO => nio(config)
case ChannelType.EPOLL => epoll(config)
case ChannelType.KQUEUE => kqueue(config)
case ChannelType.URING => uring(config)
case ChannelType.AUTO =>
if (Epoll.isAvailable)
epoll(config.nThreads)
epoll(config)
else if (KQueue.isAvailable)
kqueue(config.nThreads)
else nio(config.nThreads)
kqueue(config)
else nio(config)
}
}
}
Expand Down
28 changes: 23 additions & 5 deletions zio-http/src/main/scala/zio/http/netty/NettyConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@

package zio.http.netty

import zio.Config
import zio.stacktracer.TracingImplicits.disableAutoTrace
import java.util.concurrent.TimeUnit

import zio.{Config, Duration}

import zio.http.netty.NettyConfig.LeakDetectionLevel

import io.netty.util.ResourceLeakDetector

final case class NettyConfig(
leakDetectionLevel: LeakDetectionLevel,
channelType: ChannelType,
nThreads: Int,
shutdownQuietPeriodDuration: Duration,
shutdownTimeoutDuration: Duration,
) extends EventLoopGroups.Config { self =>

def channelType(channelType: ChannelType): NettyConfig = self.copy(channelType = channelType)
Expand All @@ -41,6 +45,10 @@ final case class NettyConfig(
*/
def maxThreads(nThreads: Int): NettyConfig = self.copy(nThreads = nThreads)

val shutdownTimeUnit: TimeUnit = TimeUnit.MILLISECONDS

val shutdownQuietPeriod: Long = shutdownQuietPeriodDuration.toMillis
val shutdownTimeOut: Long = shutdownTimeoutDuration.toMillis
}

object NettyConfig {
Expand All @@ -57,15 +65,25 @@ object NettyConfig {
case other => Left(Config.Error.InvalidData(message = s"Invalid channel type: $other"))
}
.withDefault(NettyConfig.default.channelType) ++
Config.int("max-threads").withDefault(NettyConfig.default.nThreads)).map {
case (leakDetectionLevel, channelType, maxThreads) =>
NettyConfig(leakDetectionLevel, channelType, maxThreads)
Config.int("max-threads").withDefault(NettyConfig.default.nThreads) ++
Config.duration("shutdown-quiet-period").withDefault(NettyConfig.default.shutdownQuietPeriodDuration) ++
Config.duration("shutdown-timeout").withDefault(NettyConfig.default.shutdownTimeoutDuration)).map {
case (leakDetectionLevel, channelType, maxThreads, quietPeriod, timeout) =>
NettyConfig(leakDetectionLevel, channelType, maxThreads, quietPeriod, timeout)
}

lazy val default: NettyConfig = NettyConfig(
LeakDetectionLevel.SIMPLE,
ChannelType.AUTO,
0,
// Defaults taken from io.netty.util.concurrent.AbstractEventExecutor
Duration.fromSeconds(2),
Duration.fromSeconds(15),
)

lazy val defaultWithFastShutdown: NettyConfig = default.copy(
shutdownQuietPeriodDuration = Duration.fromMillis(50),
shutdownTimeoutDuration = Duration.fromMillis(250),
)

sealed trait LeakDetectionLevel {
Expand Down
47 changes: 47 additions & 0 deletions zio-http/src/test/scala/zio/http/ClientLayerSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package zio.http

import java.util.concurrent.TimeUnit

import zio.test.Assertion.{isGreaterThan, isLessThan, isWithin}
import zio.test.{TestAspect, assertZIO}
import zio.{Clock, ZIO, ZLayer, durationInt}

import zio.http.Client
import zio.http.netty.NettyConfig

object ClientLayerSpec extends ZIOHttpSpec {

def clientLayerSpec = suite("ClientLayerSpec")(
test("default client should shutdown within 250 ms") {
val timeDifference = for {
startTime <- ZIO.scoped {
Client.default.build *>
Clock.currentTime(TimeUnit.MILLISECONDS)
}
endTime <- Clock.currentTime(TimeUnit.MILLISECONDS)
} yield endTime - startTime
assertZIO[Any, Throwable, Long](timeDifference)(isWithin(50L, 250L))
} @@ TestAspect.withLiveClock,
test("netty client should allow customizing quiet period for client shutdown") {
val customNettyConfig =
NettyConfig.default
.copy(shutdownQuietPeriodDuration = 2900.millis, shutdownTimeoutDuration = 3100.millis)
val customClientLayer =
(ZLayer.succeed(Client.Config.default) ++ ZLayer.succeed(customNettyConfig) ++
DnsResolver.default) >>> Client.live

val timeDifference = for {
startTime <- ZIO.scoped {
customClientLayer.build *> Clock.currentTime(TimeUnit.MILLISECONDS)
}
endTime <- Clock.currentTime(TimeUnit.MILLISECONDS)
} yield endTime - startTime
assertZIO[Any, Throwable, Long](timeDifference)(
isWithin(2900L, 3100L),
)
} @@ TestAspect.withLiveClock,
)

override def spec = suite("ClientLayer")(clientLayerSpec)

}