diff --git a/distage/distage-core-api/src/main/scala/izumi/distage/model/effect/QuasiIO.scala b/distage/distage-core-api/src/main/scala/izumi/distage/model/effect/QuasiIO.scala index f0c7840ef8..d6b7dca5d6 100644 --- a/distage/distage-core-api/src/main/scala/izumi/distage/model/effect/QuasiIO.scala +++ b/distage/distage-core-api/src/main/scala/izumi/distage/model/effect/QuasiIO.scala @@ -38,6 +38,8 @@ trait QuasiIO[F[_]] extends QuasiApplicative[F] { */ def maybeSuspend[A](eff: => A): F[A] + def maybeSuspendEither[A](eff: => Either[Throwable, A]): F[A] + /** A stronger version of `handleErrorWith`, the difference is that * this will _also_ intercept Throwable defects in `ZIO`, not only typed errors */ @@ -103,6 +105,10 @@ object QuasiIO extends LowPriorityQuasiIOInstances { override def flatMap[A, B](a: A)(f: A => Identity[B]): Identity[B] = f(a) override def maybeSuspend[A](eff: => A): Identity[A] = eff + override def maybeSuspendEither[A](eff: => Either[Throwable, A]): Identity[A] = eff match { + case Left(err) => throw err + case Right(v) => v + } override def suspendF[A](effAction: => A): Identity[A] = effAction override def definitelyRecover[A](fa: => Identity[A])(recover: Throwable => Identity[A]): Identity[A] = { try { fa } @@ -153,6 +159,7 @@ object QuasiIO extends LowPriorityQuasiIOInstances { override def flatMap[A, B](fa: F[E, A])(f: A => F[E, B]): F[E, B] = F.flatMap(fa)(f) override def maybeSuspend[A](eff: => A): F[E, A] = F.syncThrowable(eff) + override def maybeSuspendEither[A](eff: => Either[Throwable, A]): F[Throwable, A] = F.syncThrowable(eff).flatMap(F.fromEither(_)) override def suspendF[A](effAction: => F[Throwable, A]): F[Throwable, A] = F.suspend(effAction) override def definitelyRecover[A](action: => F[E, A])(recover: Throwable => F[E, A]): F[E, A] = { F.suspend(action).sandbox.catchAll(recover apply _.toThrowable) @@ -202,6 +209,7 @@ private[effect] sealed trait LowPriorityQuasiIOInstances { override def flatMap[A, B](fa: F[A])(f: A => F[B]): F[B] = F.flatMap(fa)(f) override def maybeSuspend[A](eff: => A): F[A] = F.delay(eff) + override def maybeSuspendEither[A](eff: => Either[Throwable, A]): F[A] = F.defer(F.fromEither(eff)) override def suspendF[A](effAction: => F[A]): F[A] = F.defer(effAction) override def definitelyRecover[A](action: => F[A])(recover: Throwable => F[A]): F[A] = { F.handleErrorWith(F.defer(action))(recover) diff --git a/distage/distage-core/src/test/scala/izumi/distage/fixtures/ResourceCases.scala b/distage/distage-core/src/test/scala/izumi/distage/fixtures/ResourceCases.scala index 36a0114f86..c33f395792 100644 --- a/distage/distage-core/src/test/scala/izumi/distage/fixtures/ResourceCases.scala +++ b/distage/distage-core/src/test/scala/izumi/distage/fixtures/ResourceCases.scala @@ -166,6 +166,14 @@ object ResourceCases { override def pure[A](a: A): Suspend2[E, A] = Suspend2(a) override def fail[A](t: => Throwable): Suspend2[E, A] = Suspend2[A](throw t) override def maybeSuspend[A](eff: => A): Suspend2[E, A] = Suspend2(eff) + override def maybeSuspendEither[A](eff: => Either[Throwable, A]): Suspend2[E, A] = { + Suspend2( + eff match { + case Left(err) => throw err + case Right(v) => v + } + ) + } override def definitelyRecover[A](fa: => Suspend2[E, A])(recover: Throwable => Suspend2[E, A]): Suspend2[E, A] = { Suspend2( () => diff --git a/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/ContainerNetworkDef.scala b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/ContainerNetworkDef.scala index a77625ce5e..1c9d5b4b32 100644 --- a/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/ContainerNetworkDef.scala +++ b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/ContainerNetworkDef.scala @@ -55,7 +55,7 @@ object ContainerNetworkDef { ) extends Lifecycle.Basic[F, ContainerNetwork[T]] { import client.rawClient - private[this] val prefix: String = prefixName.camelToUnderscores.drop(1).replace("$", "") + private[this] val prefix: String = prefixName.camelToUnderscores.replace("$", "") private[this] val networkLabels: Map[String, String] = Map( DockerConst.Labels.reuseLabel -> Docker.shouldReuse(config.reuse, client.clientConfig.globalReuse).toString, s"${DockerConst.Labels.networkDriverPrefix}.${config.driver}" -> true.toString, diff --git a/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/ContainerResource.scala b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/ContainerResource.scala index 5ce2b3962e..7e53f1a845 100644 --- a/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/ContainerResource.scala +++ b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/ContainerResource.scala @@ -1,13 +1,12 @@ package izumi.distage.docker -import java.util.concurrent.{TimeUnit, TimeoutException} - import com.github.dockerjava.api.command.InspectContainerResponse +import com.github.dockerjava.api.exception.NotFoundException import com.github.dockerjava.api.model._ import izumi.distage.docker.ContainerResource.PortDecl import izumi.distage.docker.Docker._ import izumi.distage.docker.DockerClientWrapper.ContainerDestroyMeta -import izumi.distage.docker.healthcheck.ContainerHealthCheck.HealthCheckResult.GoodHealthcheck +import izumi.distage.docker.healthcheck.ContainerHealthCheck.HealthCheckResult.GoodHealth import izumi.distage.docker.healthcheck.ContainerHealthCheck.{HealthCheckResult, VerifiedContainerConnectivity} import izumi.distage.framework.model.exceptions.IntegrationCheckException import izumi.distage.model.definition.Lifecycle @@ -21,9 +20,11 @@ import izumi.fundamentals.platform.network.IzSockets import izumi.fundamentals.platform.strings.IzString._ import izumi.logstage.api.IzLogger +import java.util.concurrent.{TimeUnit, TimeoutException} import scala.annotation.nowarn import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ +import scala.util.{Failure, Success, Try} case class ContainerResource[F[_], T]( config: Docker.ContainerConfig[T], @@ -77,29 +78,20 @@ case class ContainerResource[F[_], T]( def await(container: DockerContainer[T], attempt: Int): F[DockerContainer[T]] = { F.maybeSuspend { logger.debug(s"Awaiting until alive: $container...") - try { - val status = rawClient.inspectContainerCmd(container.id.name).exec() - if (status.getState.getExitCodeLong == 0L && config.ports.isEmpty) { - logger.debug(s"Container has exited but that was a singleshot container $container...") - Right(HealthCheckResult.Available) - } else if (status.getState.getRunning) { - logger.debug(s"Trying healthcheck on running $container...") - Right(config.healthCheck.check(logger, container)) - } else { - Left(new RuntimeException(s"$container exited, status: ${status.getState}")) - } - } catch { - case t: Throwable => - Left(t) + inspectContainerAndGetState(container.id.name).map { + config.healthCheck.check(logger, container, _) } }.flatMap { - case Right(HealthCheckResult.Available) => + case Right(HealthCheckResult.Terminated(failure)) => + F.fail(new RuntimeException(s"$container terminated with failure: $failure")) + + case Right(HealthCheckResult.Good) => F.maybeSuspend { logger.info(s"Continuing without port checks: $container") container } - case Right(status: HealthCheckResult.AvailableOnPorts) if status.allTCPPortsAccessible => + case Right(status: HealthCheckResult.GoodOnPorts) if status.allTCPPortsAccessible => F.maybeSuspend { val out = container.copy(availablePorts = VerifiedContainerConnectivity.HasAvailablePorts(status.availablePorts)) logger.info(s"Looks good: ${out -> "container"}") @@ -114,10 +106,10 @@ case class ContainerResource[F[_], T]( P.sleep(config.healthCheckInterval).flatMap(_ => await(container, next)) } else { last match { - case HealthCheckResult.Unavailable => + case HealthCheckResult.Bad => F.fail(new TimeoutException(s"Health checks failed after $maxAttempts attempts, no diagnostics available: $container")) - case HealthCheckResult.UnavailableWithMeta(unavailablePorts, unverifiedPorts) => + case HealthCheckResult.BadWithMeta(unavailablePorts, unverifiedPorts) => val sb = new StringBuilder() sb.append(s"Health checks failed after $maxAttempts attempts: $container\n") if (unverifiedPorts.nonEmpty) { @@ -146,8 +138,11 @@ case class ContainerResource[F[_], T]( F.fail(new TimeoutException(sb.toString())) - case impossible: GoodHealthcheck => + case impossible: GoodHealth => F.fail(new TimeoutException(s"BUG: good healthcheck $impossible while health checks failed after $maxAttempts attempts: $container")) + + case HealthCheckResult.Terminated(failure) => + F.fail(new RuntimeException(s"Unexpected condition: $container terminated with failure: $failure")) } } @@ -266,9 +261,40 @@ case class ContainerResource[F[_], T]( val adjustedEnv = portsEnv ++ config.env for { + _ <- F.when(config.autoPull) { + F.maybeSuspendEither { + val existedImages = rawClient + .listImagesCmd().exec() + .asScala + .flatMap(i => Option(i.getRepoTags).fold(List.empty[String])(_.toList)) + .toSet + // test if image exists + // docker official images may be pulled with or without `library` user prefix, but it being saved locally without prefix + if (existedImages.contains(config.image) || existedImages.contains(config.image.replace("library/", ""))) { + logger.info(s"Skipping pull of `${config.image}`. Already exist.") + Right(()) + } else { + logger.info(s"Going to pull `${config.image}`...") + // try to pull image with timeout. If pulling was timed out - return [IntegrationCheckException] to skip tests. + Try { + rawClient + .pullImageCmd(config.image) + .start() + .awaitCompletion(config.pullTimeout.toMillis, TimeUnit.MILLISECONDS) + } match { + case Success(pulled) if pulled => // pulled successfully + Right(()) + case Success(_) => // timed out + Left(new IntegrationCheckException(NonEmptyList(ResourceCheck.ResourceUnavailable(s"Image `${config.image}` pull timeout exception.", None)))) + case Failure(t) => // failure occurred (e.g. rate limiter failure) + Left(new IntegrationCheckException(NonEmptyList(ResourceCheck.ResourceUnavailable(s"Image pulling failed due to: ${t.getMessage}", Some(t))))) + } + } + } + } out <- F.maybeSuspend { @nowarn("msg=method.*Bind.*deprecated") - val cmd = Value(baseCmd) + val createContainerCmd = Value(baseCmd) .mut(config.name)(_.withName(_)) .mut(ports.nonEmpty)(_.withExposedPorts(ports.map(_.binding.getExposedPort).asJava)) .mut(ports.nonEmpty)(_.withPortBindings(ports.map(_.binding).asJava)) @@ -282,18 +308,8 @@ case class ContainerResource[F[_], T]( .map(c => c.withHostConfig(c.getHostConfig.withAutoRemove(config.autoRemove))) .get - if (config.alwaysPull) { - logger.info(s"Going to pull `${config.image}`...") - rawClient - .pullImageCmd(config.image) - .start() - .awaitCompletion(config.pullTimeout.toMillis, TimeUnit.MILLISECONDS); - } else { - logger.info(s"Skipping explicit pull of `${config.image}`") - } - logger.debug(s"Going to create container from image `${config.image}`...") - val res = cmd.exec() + val res = createContainerCmd.exec() logger.debug(s"Going to start container ${res.getId -> "id"}...") rawClient.startContainerCmd(res.getId).exec() @@ -335,6 +351,20 @@ case class ContainerResource[F[_], T]( } yield result } + private[this] def inspectContainerAndGetState(containerId: String): Either[Throwable, ContainerState] = { + try { + val status = rawClient.inspectContainerCmd(containerId).exec() + status.getState match { + case s if s.getRunning => Right(ContainerState.Running) + case s if s.getExitCodeLong == 0L => Right(ContainerState.SuccessfullyExited) + case s => Right(ContainerState.Failed(s.getExitCodeLong)) + } + } catch { + case _: NotFoundException => Right(ContainerState.NotFound) + case t: Throwable => Left(t) + } + } + private def mapContainerPorts(inspection: InspectContainerResponse): Either[UnmappedPorts, ReportedContainerConnectivity] = { val network = inspection.getNetworkSettings val labels = inspection.getConfig.getLabels diff --git a/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/Docker.scala b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/Docker.scala index 51effe9793..a528c45778 100644 --- a/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/Docker.scala +++ b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/Docker.scala @@ -130,7 +130,7 @@ object Docker { pullTimeout: FiniteDuration = FiniteDuration(120, TimeUnit.SECONDS), healthCheck: ContainerHealthCheck[T] = ContainerHealthCheck.portCheck[T], portProbeTimeout: FiniteDuration = FiniteDuration(200, TimeUnit.MILLISECONDS), - alwaysPull: Boolean = true, + autoPull: Boolean = true, ) { def tcpPorts: Set[DockerPort] = ports.collect { case t: DockerPort.TCPBase => t: DockerPort }.toSet def udpPorts: Set[DockerPort] = ports.collect { case t: DockerPort.UDPBase => t: DockerPort }.toSet @@ -224,4 +224,11 @@ object Docker { override def toString: String = s"{host: $dockerHost; addresses=$containerAddressesV4; ports=$dockerPorts}" } + sealed trait ContainerState + object ContainerState { + case object Running extends ContainerState + case object SuccessfullyExited extends ContainerState + case object NotFound extends ContainerState + final case class Failed(status: Long) extends ContainerState + } } diff --git a/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/bundled/PostgresFlyWayDocker.scala b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/bundled/PostgresFlyWayDocker.scala index 16ab39f03e..52629ce32f 100644 --- a/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/bundled/PostgresFlyWayDocker.scala +++ b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/bundled/PostgresFlyWayDocker.scala @@ -51,7 +51,7 @@ object PostgresFlyWayDocker extends ContainerDef { Config( image = "flyway/flyway:6.0-alpine", ports = Seq.empty, - healthCheck = ContainerHealthCheck.succeed, + healthCheck = ContainerHealthCheck.exited(canBeDestroyed = true), reuse = DockerReusePolicy.ReuseEnabled, autoRemove = false, ) diff --git a/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/ContainerExitedCheck.scala b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/ContainerExitedCheck.scala new file mode 100644 index 0000000000..dd760ffbe0 --- /dev/null +++ b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/ContainerExitedCheck.scala @@ -0,0 +1,25 @@ +package izumi.distage.docker.healthcheck +import izumi.distage.docker.Docker.ContainerState +import izumi.distage.docker.healthcheck.ContainerHealthCheck.HealthCheckResult +import izumi.distage.docker.{Docker, DockerContainer} +import izumi.logstage.api.IzLogger + +final class ContainerExitedCheck[Tag](canBeDestroyed: Boolean) extends ContainerHealthCheck[Tag] { + override def check(logger: IzLogger, container: DockerContainer[Tag], state: Docker.ContainerState): HealthCheckResult = { + state match { + case ContainerState.Running => + logger.debug(s"$container still running, marked as unavailable.") + HealthCheckResult.Bad + case ContainerState.SuccessfullyExited => + logger.debug(s"$container successfully exited, marked available.") + HealthCheckResult.Good + case ContainerState.NotFound if canBeDestroyed => + logger.debug(s"$container was destroyed, marked available.") + HealthCheckResult.Good + case ContainerState.NotFound => + HealthCheckResult.Terminated("Container not found.") + case ContainerState.Failed(status) => + HealthCheckResult.Terminated(s"Container terminated with non zero code. Code=$status") + } + } +} diff --git a/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/ContainerHealthCheck.scala b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/ContainerHealthCheck.scala index 868f32801e..ce56684b64 100644 --- a/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/ContainerHealthCheck.scala +++ b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/ContainerHealthCheck.scala @@ -7,19 +7,19 @@ import izumi.fundamentals.collections.nonempty.{NonEmptyList, NonEmptyMap} import izumi.logstage.api.IzLogger trait ContainerHealthCheck[Tag] { - def check(logger: IzLogger, container: DockerContainer[Tag]): HealthCheckResult + def check(logger: IzLogger, container: DockerContainer[Tag], state: ContainerState): HealthCheckResult final def ++(next: HealthCheckResult => ContainerHealthCheck[Tag]): ContainerHealthCheck[Tag] = this combine next final def combine(next: HealthCheckResult => ContainerHealthCheck[Tag]): ContainerHealthCheck[Tag] = { - (logger: IzLogger, container: DockerContainer[Tag]) => - next(check(logger, container)).check(logger, container) + (logger: IzLogger, container: DockerContainer[Tag], state: ContainerState) => + next(check(logger, container, state)).check(logger, container, state) } - final def combineOnPorts(next: HealthCheckResult.AvailableOnPorts => ContainerHealthCheck[Tag]): ContainerHealthCheck[Tag] = { - (logger: IzLogger, container: DockerContainer[Tag]) => - check(logger, container) match { - case thisCheckResult: HealthCheckResult.AvailableOnPorts => - next(thisCheckResult).check(logger, container) match { - case HealthCheckResult.Available => thisCheckResult + final def combineOnPorts(next: HealthCheckResult.GoodOnPorts => ContainerHealthCheck[Tag]): ContainerHealthCheck[Tag] = { + (logger: IzLogger, container: DockerContainer[Tag], state: ContainerState) => + check(logger, container, state) match { + case thisCheckResult: HealthCheckResult.GoodOnPorts => + next(thisCheckResult).check(logger, container, state) match { + case HealthCheckResult.Good => thisCheckResult case other => other } case other => other @@ -43,27 +43,33 @@ object ContainerHealthCheck { portCheck.combineOnPorts(new PostgreSqlProtocolCheck(_, port, user, password)) } - def succeed[T]: ContainerHealthCheck[T] = (_, _) => HealthCheckResult.Available + def succeed[T]: ContainerHealthCheck[T] = (_, _, _) => HealthCheckResult.Good + + def exited[T](canBeDestroyed: Boolean): ContainerHealthCheck[T] = { + new ContainerExitedCheck[T](canBeDestroyed) + } sealed trait HealthCheckResult object HealthCheckResult { - sealed trait BadHealthcheck extends HealthCheckResult - case object Unavailable extends BadHealthcheck + sealed trait BadHealth extends HealthCheckResult + case object Bad extends BadHealth + + final case class BadWithMeta( + unavailablePorts: UnavailablePorts, + unverifiedPorts: Set[DockerPort], + ) extends BadHealth - sealed trait GoodHealthcheck extends HealthCheckResult - case object Available extends GoodHealthcheck + sealed trait GoodHealth extends HealthCheckResult + case object Good extends GoodHealth - final case class AvailableOnPorts( + final case class GoodOnPorts( availablePorts: AvailablePorts, unavailablePorts: UnavailablePorts, unverifiedPorts: Set[DockerPort], allTCPPortsAccessible: Boolean, - ) extends GoodHealthcheck + ) extends GoodHealth - final case class UnavailableWithMeta( - unavailablePorts: UnavailablePorts, - unverifiedPorts: Set[DockerPort], - ) extends BadHealthcheck + final case class Terminated(failure: String) extends HealthCheckResult } final case class AvailablePorts(availablePorts: NonEmptyMap[DockerPort, NonEmptyList[AvailablePort]]) { @@ -109,4 +115,18 @@ object ContainerHealthCheck { override def firstOption(port: DockerPort): Option[AvailablePort] = None } } + + + def checkIfRunning(state: ContainerState)(performCheck: => HealthCheckResult): HealthCheckResult = { + state match { + case ContainerState.Running => + performCheck + case ContainerState.SuccessfullyExited => + HealthCheckResult.Terminated("Container unexpectedly exited with code 0.") + case ContainerState.NotFound => + HealthCheckResult.Terminated("Container not found.") + case ContainerState.Failed(status) => + HealthCheckResult.Terminated(s"Container terminated with non zero code. Code=$status") + } + } } diff --git a/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/ContainerHealthCheckBase.scala b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/ContainerHealthCheckBase.scala index c94fecd94f..f5605272e9 100644 --- a/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/ContainerHealthCheckBase.scala +++ b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/ContainerHealthCheckBase.scala @@ -9,24 +9,24 @@ import izumi.fundamentals.platform.strings.IzString._ import izumi.logstage.api.IzLogger abstract class ContainerHealthCheckBase[Tag] extends ContainerHealthCheck[Tag] { - override final def check(logger: IzLogger, container: DockerContainer[Tag]): HealthCheckResult = { - - val tcpPorts: Map[DockerPort.TCPBase, NonEmptyList[ServicePort]] = - container.connectivity.dockerPorts - .collect { - case (port: DockerPort.TCPBase, bindings) => - (port, bindings) - } - - val udpPorts: Map[DockerPort.UDPBase, NonEmptyList[ServicePort]] = - container.connectivity.dockerPorts - .collect { - case (port: DockerPort.UDP, bindings) => - (port, bindings) - } + override final def check(logger: IzLogger, container: DockerContainer[Tag], state: ContainerState): HealthCheckResult = { + ContainerHealthCheck.checkIfRunning(state) { + val tcpPorts: Map[DockerPort.TCPBase, NonEmptyList[ServicePort]] = + container.connectivity.dockerPorts + .collect { + case (port: DockerPort.TCPBase, bindings) => + (port, bindings) + } - perform(logger, container, tcpPorts, udpPorts) + val udpPorts: Map[DockerPort.UDPBase, NonEmptyList[ServicePort]] = + container.connectivity.dockerPorts + .collect { + case (port: DockerPort.UDP, bindings) => + (port, bindings) + } + perform(logger, container, tcpPorts, udpPorts) + } } protected def perform( diff --git a/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/HttpGetCheck.scala b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/HttpGetCheck.scala index 88032d9970..65a6afabd9 100644 --- a/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/HttpGetCheck.scala +++ b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/HttpGetCheck.scala @@ -1,42 +1,43 @@ package izumi.distage.docker.healthcheck import java.net.{HttpURLConnection, URL} - -import izumi.distage.docker.Docker.DockerPort +import izumi.distage.docker.Docker.{ContainerState, DockerPort} import izumi.distage.docker.DockerContainer import izumi.distage.docker.healthcheck.ContainerHealthCheck.HealthCheckResult import izumi.logstage.api.IzLogger final class HttpGetCheck[Tag]( - portStatus: HealthCheckResult.AvailableOnPorts, + portStatus: HealthCheckResult.GoodOnPorts, port: DockerPort, useHttps: Boolean, ) extends ContainerHealthCheck[Tag] { - override def check(logger: IzLogger, container: DockerContainer[Tag]): ContainerHealthCheck.HealthCheckResult = { - portStatus.availablePorts.firstOption(port) match { - case Some(availablePort) if portStatus.allTCPPortsAccessible => - val protocol = if (useHttps) "https" else "http" - val url = new URL(s"$protocol://${availablePort.hostV4}:${availablePort.port}") - logger.info(s"Checking docker port $port via $url for $container. Will try to establish HTTP connection.") - try { - val connection = url.openConnection().asInstanceOf[HttpURLConnection] - connection.setRequestMethod("GET") - connection.setConnectTimeout(container.containerConfig.portProbeTimeout.toMillis.toInt) - val responseCode = connection.getResponseCode - if (responseCode != -1) { - logger.info(s"HTTP connection was successfully established with $port.") - ContainerHealthCheck.HealthCheckResult.Available - } else { - logger.info(s"Cannot establish HTTP connection with $port. Wrong protocol.") - ContainerHealthCheck.HealthCheckResult.Unavailable + override def check(logger: IzLogger, container: DockerContainer[Tag], state: ContainerState): HealthCheckResult = { + ContainerHealthCheck.checkIfRunning(state) { + portStatus.availablePorts.firstOption(port) match { + case Some(availablePort) if portStatus.allTCPPortsAccessible => + val protocol = if (useHttps) "https" else "http" + val url = new URL(s"$protocol://${availablePort.hostV4}:${availablePort.port}") + logger.info(s"Checking docker port $port via $url for $container. Will try to establish HTTP connection.") + try { + val connection = url.openConnection().asInstanceOf[HttpURLConnection] + connection.setRequestMethod("GET") + connection.setConnectTimeout(container.containerConfig.portProbeTimeout.toMillis.toInt) + val responseCode = connection.getResponseCode + if (responseCode != -1) { + logger.info(s"HTTP connection was successfully established with $port.") + HealthCheckResult.Good + } else { + logger.info(s"Cannot establish HTTP connection with $port. Wrong protocol.") + HealthCheckResult.Bad + } + } catch { + case failure: Throwable => + logger.warn(s"Cannot establish HTTP connection with $port due to $failure") + HealthCheckResult.Bad } - } catch { - case failure: Throwable => - logger.warn(s"Cannot establish HTTP connection with $port due to $failure") - ContainerHealthCheck.HealthCheckResult.Unavailable - } - case _ => - ContainerHealthCheck.HealthCheckResult.Unavailable + case _ => + HealthCheckResult.Bad + } } } } diff --git a/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/PostgreSqlProtocolCheck.scala b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/PostgreSqlProtocolCheck.scala index 1f4f62f571..25c41ea62b 100644 --- a/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/PostgreSqlProtocolCheck.scala +++ b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/PostgreSqlProtocolCheck.scala @@ -1,53 +1,55 @@ package izumi.distage.docker.healthcheck -import java.net.{InetSocketAddress, Socket} -import java.nio.{Buffer, ByteBuffer} - -import izumi.distage.docker.Docker.DockerPort +import izumi.distage.docker.Docker.{ContainerState, DockerPort} import izumi.distage.docker.DockerContainer import izumi.distage.docker.healthcheck.ContainerHealthCheck.HealthCheckResult import izumi.fundamentals.platform.strings.IzString._ import izumi.logstage.api.IzLogger +import java.net.{InetSocketAddress, Socket} +import java.nio.{Buffer, ByteBuffer} + final class PostgreSqlProtocolCheck[Tag]( - portStatus: HealthCheckResult.AvailableOnPorts, + portStatus: HealthCheckResult.GoodOnPorts, port: DockerPort, userName: String, databaseName: String, ) extends ContainerHealthCheck[Tag] { - override def check(logger: IzLogger, container: DockerContainer[Tag]): ContainerHealthCheck.HealthCheckResult = { - portStatus.availablePorts.firstOption(port) match { - case Some(availablePort) if portStatus.allTCPPortsAccessible => - val startupMessage = genStartupMessage() - val socket = new Socket() - try { - socket.connect(new InetSocketAddress(availablePort.hostV4, availablePort.port), container.containerConfig.portProbeTimeout.toMillis.toInt) - logger.info(s"Checking PostgreSQL protocol on $port for $container. ${startupMessage.toIterable.toHex -> "Startup message"}.") - val out = socket.getOutputStream - val in = socket.getInputStream - out.write(startupMessage) - val messageType = { - val outByte = Array[Byte](0) - in.read(outByte, 0, 1) - new String(outByte) - } - // first byte of response message should be `R` char - // every authentication message from PostgreSQL starts with `R` - if (messageType == "R") { - logger.info(s"PostgreSQL protocol on $port is available.") - ContainerHealthCheck.HealthCheckResult.Available - } else { - logger.info(s"PostgreSQL protocol on $port unavailable due to unknown message type: $messageType.") - ContainerHealthCheck.HealthCheckResult.Unavailable + override def check(logger: IzLogger, container: DockerContainer[Tag], state: ContainerState): HealthCheckResult = { + ContainerHealthCheck.checkIfRunning(state) { + portStatus.availablePorts.firstOption(port) match { + case Some(availablePort) if portStatus.allTCPPortsAccessible => + val startupMessage = genStartupMessage() + val socket = new Socket() + try { + socket.connect(new InetSocketAddress(availablePort.hostV4, availablePort.port), container.containerConfig.portProbeTimeout.toMillis.toInt) + logger.info(s"Checking PostgreSQL protocol on $port for $container. ${startupMessage.toIterable.toHex -> "Startup message"}.") + val out = socket.getOutputStream + val in = socket.getInputStream + out.write(startupMessage) + val messageType = { + val outByte = Array[Byte](0) + in.read(outByte, 0, 1) + new String(outByte) + } + // first byte of response message should be `R` char + // every authentication message from PostgreSQL starts with `R` + if (messageType == "R") { + logger.info(s"PostgreSQL protocol on $port is available.") + HealthCheckResult.Good + } else { + logger.info(s"PostgreSQL protocol on $port unavailable due to unknown message type: $messageType.") + HealthCheckResult.Bad + } + } catch { + case failure: Throwable => + logger.warn(s"PostgreSQL protocol on $port unavailable due to unexpected exception. $failure") + HealthCheckResult.Bad + } finally { + socket.close() } - } catch { - case failure: Throwable => - logger.warn(s"PostgreSQL protocol on $port unavailable due to unexpected exception. $failure") - ContainerHealthCheck.HealthCheckResult.Unavailable - } finally { - socket.close() - } - case _ => ContainerHealthCheck.HealthCheckResult.Unavailable + case _ => HealthCheckResult.Bad + } } } diff --git a/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/TCPContainerHealthCheck.scala b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/TCPContainerHealthCheck.scala index 8d3c542153..8f1271ba5b 100644 --- a/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/TCPContainerHealthCheck.scala +++ b/distage/distage-framework-docker/src/main/scala/izumi/distage/docker/healthcheck/TCPContainerHealthCheck.scala @@ -61,7 +61,7 @@ class TCPContainerHealthCheck[Tag] extends ContainerHealthCheckBase[Tag] { case Some(value) => val available = AvailablePorts(value) val allTCPPortsAvailable = tcpPortsGood(container, available) - HealthCheckResult.AvailableOnPorts( + HealthCheckResult.GoodOnPorts( available, errored, udpPorts.keySet.map(p => p: DockerPort), @@ -70,9 +70,9 @@ class TCPContainerHealthCheck[Tag] extends ContainerHealthCheckBase[Tag] { case None => if (container.containerConfig.tcpPorts.isEmpty) { - HealthCheckResult.Available + HealthCheckResult.Good } else { - HealthCheckResult.UnavailableWithMeta( + HealthCheckResult.BadWithMeta( errored, udpPorts.keySet.map(p => p: DockerPort), ) diff --git a/distage/distage-framework-docker/src/test/scala/izumi/distage/testkit/docker/DistageTestDockerBIO.scala b/distage/distage-framework-docker/src/test/scala/izumi/distage/testkit/docker/DistageTestDockerBIO.scala index 047dde84ff..fbd3b76399 100644 --- a/distage/distage-framework-docker/src/test/scala/izumi/distage/testkit/docker/DistageTestDockerBIO.scala +++ b/distage/distage-framework-docker/src/test/scala/izumi/distage/testkit/docker/DistageTestDockerBIO.scala @@ -19,7 +19,6 @@ abstract class DistageTestDockerBIO extends Spec2[IO] { "distage test runner should start only one container for reusable" should { "support docker resources" in { - // FIXME: verifier exit code is not checked, the test isn't reliable // TODO: additionally check flyway outcome with doobie (service: PgSvcExample, verifier: Lifecycle[IO[Throwable, ?], ReuseCheckContainer.Container]) => for { diff --git a/distage/distage-framework-docker/src/test/scala/izumi/distage/testkit/docker/fixtures/ReusedOneshotContainer.scala b/distage/distage-framework-docker/src/test/scala/izumi/distage/testkit/docker/fixtures/ReusedOneshotContainer.scala index 022d2ac389..2c6b9c0ca9 100644 --- a/distage/distage-framework-docker/src/test/scala/izumi/distage/testkit/docker/fixtures/ReusedOneshotContainer.scala +++ b/distage/distage-framework-docker/src/test/scala/izumi/distage/testkit/docker/fixtures/ReusedOneshotContainer.scala @@ -1,10 +1,10 @@ package izumi.distage.testkit.docker.fixtures import java.util.UUID - import distage.{ModuleDef, TagK} import izumi.distage.docker.ContainerDef import izumi.distage.docker.Docker.{DockerReusePolicy, Mount} +import izumi.distage.docker.healthcheck.ContainerHealthCheck import izumi.distage.model.definition.Lifecycle object ReusedOneshotContainer extends ContainerDef { @@ -13,9 +13,10 @@ object ReusedOneshotContainer extends ContainerDef { image = "alpine:3.12.0", ports = Seq(), mounts = Seq(CmdContainerModule.stateFileMount), - entrypoint = Seq("sh", "-c", s"echo `date` >> ${CmdContainerModule.stateFilePath}"), + entrypoint = Seq("sh", "-c", s"sleep 1; echo `date` >> ${CmdContainerModule.stateFilePath}"), reuse = DockerReusePolicy.ReuseEnabled, autoRemove = false, // we need this container to be preserved after exit as a marker + healthCheck = ContainerHealthCheck.exited(canBeDestroyed = false), ) } } @@ -29,6 +30,7 @@ object ReuseCheckContainer extends ContainerDef { entrypoint = Seq("sh", "-c", s"if [[ $$(cat ${CmdContainerModule.stateFilePath} | wc -l | awk '{print $$1}') == 1 ]]; then exit 0; else exit 42; fi"), reuse = DockerReusePolicy.ReuseDisabled, autoRemove = false, // we need this container to be preserved after exit as a marker + healthCheck = ContainerHealthCheck.exited(canBeDestroyed = false), ) } }