Skip to content

Commit

Permalink
Feature: healthcheck for docker container exit (#1464)
Browse files Browse the repository at this point in the history
* Healthcheck for exited containers added; Fixed Network namings.

* Update namings; Add pull skipping if image already exist, catch errors during image pulling to throw IntegrationCheckException and skip tests.

* Remove alwaysPull; test if image is available under the docker user; handle pulling timeout properly to fail fast.

* maybeSuspendEither

* add autoPull option
  • Loading branch information
Caparow authored Apr 27, 2021
1 parent 6366fdc commit d8db087
Show file tree
Hide file tree
Showing 14 changed files with 245 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ trait QuasiIO[F[_]] extends QuasiApplicative[F] {
*/
def maybeSuspend[A](eff: => A): F[A]

def maybeSuspendEither[A](eff: => Either[Throwable, A]): F[A]

/** A stronger version of `handleErrorWith`, the difference is that
* this will _also_ intercept Throwable defects in `ZIO`, not only typed errors
*/
Expand Down Expand Up @@ -103,6 +105,10 @@ object QuasiIO extends LowPriorityQuasiIOInstances {
override def flatMap[A, B](a: A)(f: A => Identity[B]): Identity[B] = f(a)

override def maybeSuspend[A](eff: => A): Identity[A] = eff
override def maybeSuspendEither[A](eff: => Either[Throwable, A]): Identity[A] = eff match {
case Left(err) => throw err
case Right(v) => v
}
override def suspendF[A](effAction: => A): Identity[A] = effAction
override def definitelyRecover[A](fa: => Identity[A])(recover: Throwable => Identity[A]): Identity[A] = {
try { fa }
Expand Down Expand Up @@ -153,6 +159,7 @@ object QuasiIO extends LowPriorityQuasiIOInstances {
override def flatMap[A, B](fa: F[E, A])(f: A => F[E, B]): F[E, B] = F.flatMap(fa)(f)

override def maybeSuspend[A](eff: => A): F[E, A] = F.syncThrowable(eff)
override def maybeSuspendEither[A](eff: => Either[Throwable, A]): F[Throwable, A] = F.syncThrowable(eff).flatMap(F.fromEither(_))
override def suspendF[A](effAction: => F[Throwable, A]): F[Throwable, A] = F.suspend(effAction)
override def definitelyRecover[A](action: => F[E, A])(recover: Throwable => F[E, A]): F[E, A] = {
F.suspend(action).sandbox.catchAll(recover apply _.toThrowable)
Expand Down Expand Up @@ -202,6 +209,7 @@ private[effect] sealed trait LowPriorityQuasiIOInstances {
override def flatMap[A, B](fa: F[A])(f: A => F[B]): F[B] = F.flatMap(fa)(f)

override def maybeSuspend[A](eff: => A): F[A] = F.delay(eff)
override def maybeSuspendEither[A](eff: => Either[Throwable, A]): F[A] = F.defer(F.fromEither(eff))
override def suspendF[A](effAction: => F[A]): F[A] = F.defer(effAction)
override def definitelyRecover[A](action: => F[A])(recover: Throwable => F[A]): F[A] = {
F.handleErrorWith(F.defer(action))(recover)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,14 @@ object ResourceCases {
override def pure[A](a: A): Suspend2[E, A] = Suspend2(a)
override def fail[A](t: => Throwable): Suspend2[E, A] = Suspend2[A](throw t)
override def maybeSuspend[A](eff: => A): Suspend2[E, A] = Suspend2(eff)
override def maybeSuspendEither[A](eff: => Either[Throwable, A]): Suspend2[E, A] = {
Suspend2(
eff match {
case Left(err) => throw err
case Right(v) => v
}
)
}
override def definitelyRecover[A](fa: => Suspend2[E, A])(recover: Throwable => Suspend2[E, A]): Suspend2[E, A] = {
Suspend2(
() =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object ContainerNetworkDef {
) extends Lifecycle.Basic[F, ContainerNetwork[T]] {
import client.rawClient

private[this] val prefix: String = prefixName.camelToUnderscores.drop(1).replace("$", "")
private[this] val prefix: String = prefixName.camelToUnderscores.replace("$", "")
private[this] val networkLabels: Map[String, String] = Map(
DockerConst.Labels.reuseLabel -> Docker.shouldReuse(config.reuse, client.clientConfig.globalReuse).toString,
s"${DockerConst.Labels.networkDriverPrefix}.${config.driver}" -> true.toString,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package izumi.distage.docker

import java.util.concurrent.{TimeUnit, TimeoutException}

import com.github.dockerjava.api.command.InspectContainerResponse
import com.github.dockerjava.api.exception.NotFoundException
import com.github.dockerjava.api.model._
import izumi.distage.docker.ContainerResource.PortDecl
import izumi.distage.docker.Docker._
import izumi.distage.docker.DockerClientWrapper.ContainerDestroyMeta
import izumi.distage.docker.healthcheck.ContainerHealthCheck.HealthCheckResult.GoodHealthcheck
import izumi.distage.docker.healthcheck.ContainerHealthCheck.HealthCheckResult.GoodHealth
import izumi.distage.docker.healthcheck.ContainerHealthCheck.{HealthCheckResult, VerifiedContainerConnectivity}
import izumi.distage.framework.model.exceptions.IntegrationCheckException
import izumi.distage.model.definition.Lifecycle
Expand All @@ -21,9 +20,11 @@ import izumi.fundamentals.platform.network.IzSockets
import izumi.fundamentals.platform.strings.IzString._
import izumi.logstage.api.IzLogger

import java.util.concurrent.{TimeUnit, TimeoutException}
import scala.annotation.nowarn
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

case class ContainerResource[F[_], T](
config: Docker.ContainerConfig[T],
Expand Down Expand Up @@ -77,29 +78,20 @@ case class ContainerResource[F[_], T](
def await(container: DockerContainer[T], attempt: Int): F[DockerContainer[T]] = {
F.maybeSuspend {
logger.debug(s"Awaiting until alive: $container...")
try {
val status = rawClient.inspectContainerCmd(container.id.name).exec()
if (status.getState.getExitCodeLong == 0L && config.ports.isEmpty) {
logger.debug(s"Container has exited but that was a singleshot container $container...")
Right(HealthCheckResult.Available)
} else if (status.getState.getRunning) {
logger.debug(s"Trying healthcheck on running $container...")
Right(config.healthCheck.check(logger, container))
} else {
Left(new RuntimeException(s"$container exited, status: ${status.getState}"))
}
} catch {
case t: Throwable =>
Left(t)
inspectContainerAndGetState(container.id.name).map {
config.healthCheck.check(logger, container, _)
}
}.flatMap {
case Right(HealthCheckResult.Available) =>
case Right(HealthCheckResult.Terminated(failure)) =>
F.fail(new RuntimeException(s"$container terminated with failure: $failure"))

case Right(HealthCheckResult.Good) =>
F.maybeSuspend {
logger.info(s"Continuing without port checks: $container")
container
}

case Right(status: HealthCheckResult.AvailableOnPorts) if status.allTCPPortsAccessible =>
case Right(status: HealthCheckResult.GoodOnPorts) if status.allTCPPortsAccessible =>
F.maybeSuspend {
val out = container.copy(availablePorts = VerifiedContainerConnectivity.HasAvailablePorts(status.availablePorts))
logger.info(s"Looks good: ${out -> "container"}")
Expand All @@ -114,10 +106,10 @@ case class ContainerResource[F[_], T](
P.sleep(config.healthCheckInterval).flatMap(_ => await(container, next))
} else {
last match {
case HealthCheckResult.Unavailable =>
case HealthCheckResult.Bad =>
F.fail(new TimeoutException(s"Health checks failed after $maxAttempts attempts, no diagnostics available: $container"))

case HealthCheckResult.UnavailableWithMeta(unavailablePorts, unverifiedPorts) =>
case HealthCheckResult.BadWithMeta(unavailablePorts, unverifiedPorts) =>
val sb = new StringBuilder()
sb.append(s"Health checks failed after $maxAttempts attempts: $container\n")
if (unverifiedPorts.nonEmpty) {
Expand Down Expand Up @@ -146,8 +138,11 @@ case class ContainerResource[F[_], T](

F.fail(new TimeoutException(sb.toString()))

case impossible: GoodHealthcheck =>
case impossible: GoodHealth =>
F.fail(new TimeoutException(s"BUG: good healthcheck $impossible while health checks failed after $maxAttempts attempts: $container"))

case HealthCheckResult.Terminated(failure) =>
F.fail(new RuntimeException(s"Unexpected condition: $container terminated with failure: $failure"))
}
}

Expand Down Expand Up @@ -266,9 +261,40 @@ case class ContainerResource[F[_], T](
val adjustedEnv = portsEnv ++ config.env

for {
_ <- F.when(config.autoPull) {
F.maybeSuspendEither {
val existedImages = rawClient
.listImagesCmd().exec()
.asScala
.flatMap(i => Option(i.getRepoTags).fold(List.empty[String])(_.toList))
.toSet
// test if image exists
// docker official images may be pulled with or without `library` user prefix, but it being saved locally without prefix
if (existedImages.contains(config.image) || existedImages.contains(config.image.replace("library/", ""))) {
logger.info(s"Skipping pull of `${config.image}`. Already exist.")
Right(())
} else {
logger.info(s"Going to pull `${config.image}`...")
// try to pull image with timeout. If pulling was timed out - return [IntegrationCheckException] to skip tests.
Try {
rawClient
.pullImageCmd(config.image)
.start()
.awaitCompletion(config.pullTimeout.toMillis, TimeUnit.MILLISECONDS)
} match {
case Success(pulled) if pulled => // pulled successfully
Right(())
case Success(_) => // timed out
Left(new IntegrationCheckException(NonEmptyList(ResourceCheck.ResourceUnavailable(s"Image `${config.image}` pull timeout exception.", None))))
case Failure(t) => // failure occurred (e.g. rate limiter failure)
Left(new IntegrationCheckException(NonEmptyList(ResourceCheck.ResourceUnavailable(s"Image pulling failed due to: ${t.getMessage}", Some(t)))))
}
}
}
}
out <- F.maybeSuspend {
@nowarn("msg=method.*Bind.*deprecated")
val cmd = Value(baseCmd)
val createContainerCmd = Value(baseCmd)
.mut(config.name)(_.withName(_))
.mut(ports.nonEmpty)(_.withExposedPorts(ports.map(_.binding.getExposedPort).asJava))
.mut(ports.nonEmpty)(_.withPortBindings(ports.map(_.binding).asJava))
Expand All @@ -282,18 +308,8 @@ case class ContainerResource[F[_], T](
.map(c => c.withHostConfig(c.getHostConfig.withAutoRemove(config.autoRemove)))
.get

if (config.alwaysPull) {
logger.info(s"Going to pull `${config.image}`...")
rawClient
.pullImageCmd(config.image)
.start()
.awaitCompletion(config.pullTimeout.toMillis, TimeUnit.MILLISECONDS);
} else {
logger.info(s"Skipping explicit pull of `${config.image}`")
}

logger.debug(s"Going to create container from image `${config.image}`...")
val res = cmd.exec()
val res = createContainerCmd.exec()

logger.debug(s"Going to start container ${res.getId -> "id"}...")
rawClient.startContainerCmd(res.getId).exec()
Expand Down Expand Up @@ -335,6 +351,20 @@ case class ContainerResource[F[_], T](
} yield result
}

private[this] def inspectContainerAndGetState(containerId: String): Either[Throwable, ContainerState] = {
try {
val status = rawClient.inspectContainerCmd(containerId).exec()
status.getState match {
case s if s.getRunning => Right(ContainerState.Running)
case s if s.getExitCodeLong == 0L => Right(ContainerState.SuccessfullyExited)
case s => Right(ContainerState.Failed(s.getExitCodeLong))
}
} catch {
case _: NotFoundException => Right(ContainerState.NotFound)
case t: Throwable => Left(t)
}
}

private def mapContainerPorts(inspection: InspectContainerResponse): Either[UnmappedPorts, ReportedContainerConnectivity] = {
val network = inspection.getNetworkSettings
val labels = inspection.getConfig.getLabels
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ object Docker {
pullTimeout: FiniteDuration = FiniteDuration(120, TimeUnit.SECONDS),
healthCheck: ContainerHealthCheck[T] = ContainerHealthCheck.portCheck[T],
portProbeTimeout: FiniteDuration = FiniteDuration(200, TimeUnit.MILLISECONDS),
alwaysPull: Boolean = true,
autoPull: Boolean = true,
) {
def tcpPorts: Set[DockerPort] = ports.collect { case t: DockerPort.TCPBase => t: DockerPort }.toSet
def udpPorts: Set[DockerPort] = ports.collect { case t: DockerPort.UDPBase => t: DockerPort }.toSet
Expand Down Expand Up @@ -224,4 +224,11 @@ object Docker {
override def toString: String = s"{host: $dockerHost; addresses=$containerAddressesV4; ports=$dockerPorts}"
}

sealed trait ContainerState
object ContainerState {
case object Running extends ContainerState
case object SuccessfullyExited extends ContainerState
case object NotFound extends ContainerState
final case class Failed(status: Long) extends ContainerState
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object PostgresFlyWayDocker extends ContainerDef {
Config(
image = "flyway/flyway:6.0-alpine",
ports = Seq.empty,
healthCheck = ContainerHealthCheck.succeed,
healthCheck = ContainerHealthCheck.exited(canBeDestroyed = true),
reuse = DockerReusePolicy.ReuseEnabled,
autoRemove = false,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package izumi.distage.docker.healthcheck
import izumi.distage.docker.Docker.ContainerState
import izumi.distage.docker.healthcheck.ContainerHealthCheck.HealthCheckResult
import izumi.distage.docker.{Docker, DockerContainer}
import izumi.logstage.api.IzLogger

final class ContainerExitedCheck[Tag](canBeDestroyed: Boolean) extends ContainerHealthCheck[Tag] {
override def check(logger: IzLogger, container: DockerContainer[Tag], state: Docker.ContainerState): HealthCheckResult = {
state match {
case ContainerState.Running =>
logger.debug(s"$container still running, marked as unavailable.")
HealthCheckResult.Bad
case ContainerState.SuccessfullyExited =>
logger.debug(s"$container successfully exited, marked available.")
HealthCheckResult.Good
case ContainerState.NotFound if canBeDestroyed =>
logger.debug(s"$container was destroyed, marked available.")
HealthCheckResult.Good
case ContainerState.NotFound =>
HealthCheckResult.Terminated("Container not found.")
case ContainerState.Failed(status) =>
HealthCheckResult.Terminated(s"Container terminated with non zero code. Code=$status")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ import izumi.fundamentals.collections.nonempty.{NonEmptyList, NonEmptyMap}
import izumi.logstage.api.IzLogger

trait ContainerHealthCheck[Tag] {
def check(logger: IzLogger, container: DockerContainer[Tag]): HealthCheckResult
def check(logger: IzLogger, container: DockerContainer[Tag], state: ContainerState): HealthCheckResult

final def ++(next: HealthCheckResult => ContainerHealthCheck[Tag]): ContainerHealthCheck[Tag] = this combine next
final def combine(next: HealthCheckResult => ContainerHealthCheck[Tag]): ContainerHealthCheck[Tag] = {
(logger: IzLogger, container: DockerContainer[Tag]) =>
next(check(logger, container)).check(logger, container)
(logger: IzLogger, container: DockerContainer[Tag], state: ContainerState) =>
next(check(logger, container, state)).check(logger, container, state)
}
final def combineOnPorts(next: HealthCheckResult.AvailableOnPorts => ContainerHealthCheck[Tag]): ContainerHealthCheck[Tag] = {
(logger: IzLogger, container: DockerContainer[Tag]) =>
check(logger, container) match {
case thisCheckResult: HealthCheckResult.AvailableOnPorts =>
next(thisCheckResult).check(logger, container) match {
case HealthCheckResult.Available => thisCheckResult
final def combineOnPorts(next: HealthCheckResult.GoodOnPorts => ContainerHealthCheck[Tag]): ContainerHealthCheck[Tag] = {
(logger: IzLogger, container: DockerContainer[Tag], state: ContainerState) =>
check(logger, container, state) match {
case thisCheckResult: HealthCheckResult.GoodOnPorts =>
next(thisCheckResult).check(logger, container, state) match {
case HealthCheckResult.Good => thisCheckResult
case other => other
}
case other => other
Expand All @@ -43,27 +43,33 @@ object ContainerHealthCheck {
portCheck.combineOnPorts(new PostgreSqlProtocolCheck(_, port, user, password))
}

def succeed[T]: ContainerHealthCheck[T] = (_, _) => HealthCheckResult.Available
def succeed[T]: ContainerHealthCheck[T] = (_, _, _) => HealthCheckResult.Good

def exited[T](canBeDestroyed: Boolean): ContainerHealthCheck[T] = {
new ContainerExitedCheck[T](canBeDestroyed)
}

sealed trait HealthCheckResult
object HealthCheckResult {
sealed trait BadHealthcheck extends HealthCheckResult
case object Unavailable extends BadHealthcheck
sealed trait BadHealth extends HealthCheckResult
case object Bad extends BadHealth

final case class BadWithMeta(
unavailablePorts: UnavailablePorts,
unverifiedPorts: Set[DockerPort],
) extends BadHealth

sealed trait GoodHealthcheck extends HealthCheckResult
case object Available extends GoodHealthcheck
sealed trait GoodHealth extends HealthCheckResult
case object Good extends GoodHealth

final case class AvailableOnPorts(
final case class GoodOnPorts(
availablePorts: AvailablePorts,
unavailablePorts: UnavailablePorts,
unverifiedPorts: Set[DockerPort],
allTCPPortsAccessible: Boolean,
) extends GoodHealthcheck
) extends GoodHealth

final case class UnavailableWithMeta(
unavailablePorts: UnavailablePorts,
unverifiedPorts: Set[DockerPort],
) extends BadHealthcheck
final case class Terminated(failure: String) extends HealthCheckResult
}

final case class AvailablePorts(availablePorts: NonEmptyMap[DockerPort, NonEmptyList[AvailablePort]]) {
Expand Down Expand Up @@ -109,4 +115,18 @@ object ContainerHealthCheck {
override def firstOption(port: DockerPort): Option[AvailablePort] = None
}
}


def checkIfRunning(state: ContainerState)(performCheck: => HealthCheckResult): HealthCheckResult = {
state match {
case ContainerState.Running =>
performCheck
case ContainerState.SuccessfullyExited =>
HealthCheckResult.Terminated("Container unexpectedly exited with code 0.")
case ContainerState.NotFound =>
HealthCheckResult.Terminated("Container not found.")
case ContainerState.Failed(status) =>
HealthCheckResult.Terminated(s"Container terminated with non zero code. Code=$status")
}
}
}
Loading

0 comments on commit d8db087

Please sign in to comment.