Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Akka Http flow connection instead of singleRequest to avoid termination warning #273

Closed
NicolasRouquette opened this issue May 6, 2019 · 3 comments

Comments

@NicolasRouquette
Copy link
Contributor

With the Kubernetes Job API, it is necessary to periodically poll the state of a job to determine its completion status. With Akka typed actors, this can be done like this:

import java.util.UUID

import akka.NotUsed
import akka.actor.typed._
import akka.actor.typed.scaladsl._
import akka.actor.typed.scaladsl.adapter._
import akka.stream.ActorMaterializer
import skuber.api.client.KubernetesClient
import skuber.{Container, Pod, RestartPolicy, k8sInit}
import skuber.batch.Job
import skuber.json.batch.format._

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

object KubeProcess {

  sealed trait Protocol

  // Wait for a while before checking the job status
  case object Wait extends Protocol

  // Check the job status
  case object CheckJob extends Protocol

  case class JobCompletedSuccessfully(m: String) extends Protocol
  case class JobCompletedUnsuccessfully(error: String) extends Protocol

  def main(args: Array[String]): Unit = {

    implicit val system = akka.actor.ActorSystem("KubeJobSystem")
    implicit val materializer: ActorMaterializer = ActorMaterializer()
    implicit val ec: ExecutionContext = system.dispatcher

    val k8s: KubernetesClient = k8sInit

    val piContainer = Container(
      name = "pi",
      image = "perl",
      command = List("perl", "-Mbignum=bpi", "-wle", "print bpi(3000)"))

    val piSpec = Pod
      .Spec()
      .addContainer(piContainer)
      .withRestartPolicy(RestartPolicy.Never)

    val piTemplateSpec = Pod.Template.Spec.named("pi").withPodSpec(piSpec)

    // Make sure the job name is unique to ensure it can be created.
    val jobName = s"pi-${UUID.randomUUID()}"
    val piJob: Job = Job(jobName).withTemplate(piTemplateSpec)
    val main: Behavior[NotUsed] =
      Behaviors.setup { context =>

        // kick off the job on the cluster
        val jobCreateFut = k8s create piJob
        jobCreateFut onComplete {
          case Success(j: Job) =>
            import scala.concurrent.duration._
            val kubeBehavior = context.spawn[Protocol](behavior(k8s, j, 3.seconds), "KubeJob")
            context.watch(kubeBehavior)
            kubeBehavior ! Wait

          case Failure(ex) =>
            System.err.println("Failed to create job: " + ex)
            ex.printStackTrace(System.err)
            k8s.close
            system.terminate().foreach { _ =>
              System.exit(1)
            }
        }

        Behaviors.receiveSignal {
          case (c, Terminated(ref)) =>
            c.log.info(s"Terminated: $ref")
            Behaviors.stopped { () =>
              c.log.info(s"Terminate!")
              k8s.close
              materializer.shutdown()
              c.system.terminate()
            }
        }
      }

    system.spawn(main, "main")
  }

  def behavior(k8s: KubernetesClient,
               job: Job,
               delay: FiniteDuration): Behavior[Protocol] =
    Behaviors.receive[Protocol] {
      case (c, message) =>
        implicit val ec: ExecutionContext = c.executionContext
        message match {

          case Wait =>
            akka.pattern
              .after(delay, c.system.scheduler)(Future.successful(()))
              .onComplete {
                case Success(_) =>
                  c.self ! CheckJob

                case Failure(t) =>
                  c.self ! JobCompletedUnsuccessfully(t.toString)
              }
            Behaviors.same

          case CheckJob =>
            k8s.get[Job](job.metadata.name).onComplete {
              case Success(jobState) =>
                jobState.status match {
                  case None =>
                    c.self ! Wait
                  case Some(s) =>
                    s.completionTime match {
                      case None =>
                        c.self ! Wait
                      case Some(t) =>
                        if (s.succeeded.nonEmpty)
                          c.self ! JobCompletedSuccessfully(t.toString)
                        else
                          c.self ! JobCompletedUnsuccessfully(t.toString)
                    }
                }

              case Failure(t) =>
                c.self ! JobCompletedUnsuccessfully(t.toString)
            }
            Behaviors.same

          case JobCompletedSuccessfully(message) =>
            c.log.info(s"Success: $message")
            Behaviors.stopped

          case JobCompletedUnsuccessfully(message) =>
            c.log.error(s"Failed: $message")
            Behaviors.stopped
        }
    }

}

With version 2.2.0, we get a behavior that looks like this:

[INFO] [05/06/2019 09:39:41.582] [main] [skuber.api] Using following context for connecting to Kubernetes cluster: Context(Cluster(v1,https://192.168.99.100:8443,false,Some(Left(/home/rouquett/.minikube/ca.crt))),CertAuth(clientCertificate=/home/rouquett/.minikube/client.crt clientKey=/home/rouquett/.minikube/client.key userName= ),Namespace(Namespace,v1,ObjectMeta(default,,,,,,None,None,None,Map(),Map(),List(),0,None,None),None,None))
[INFO] [05/06/2019 09:39:42.391] [KubeJobSystem-akka.actor.default-dispatcher-2] [skuber.api] [ { reqId=d40dde47-2e0f-485f-afe5-3f0f92149ee6} } - about to send HTTP request: POST https://192.168.99.100:8443/apis/batch/v1/namespaces/default/jobs]
[INFO] [05/06/2019 09:39:42.746] [KubeJobSystem-akka.actor.default-dispatcher-5] [skuber.api] [ { reqId=d40dde47-2e0f-485f-afe5-3f0f92149ee6} } - received response with HTTP status 201]
[INFO] [05/06/2019 09:39:45.820] [KubeJobSystem-akka.actor.default-dispatcher-4] [skuber.api] [ { reqId=262b0eef-6466-4d1c-83dc-c9c2b25e91be} } - about to send HTTP request: GET https://192.168.99.100:8443/apis/batch/v1/namespaces/default/jobs/pi-634c802b-597d-4e68-9daf-bd0509f40ec5]
[INFO] [05/06/2019 09:39:45.826] [KubeJobSystem-akka.actor.default-dispatcher-3] [skuber.api] [ { reqId=262b0eef-6466-4d1c-83dc-c9c2b25e91be} } - received response with HTTP status 200]
[INFO] [05/06/2019 09:39:48.850] [KubeJobSystem-akka.actor.default-dispatcher-2] [skuber.api] [ { reqId=88ed82e4-e3dc-4eec-87f7-648dace9c083} } - about to send HTTP request: GET https://192.168.99.100:8443/apis/batch/v1/namespaces/default/jobs/pi-634c802b-597d-4e68-9daf-bd0509f40ec5]
[INFO] [05/06/2019 09:39:48.854] [KubeJobSystem-akka.actor.default-dispatcher-3] [skuber.api] [ { reqId=88ed82e4-e3dc-4eec-87f7-648dace9c083} } - received response with HTTP status 200]
[INFO] [05/06/2019 09:39:51.880] [KubeJobSystem-akka.actor.default-dispatcher-2] [skuber.api] [ { reqId=4d22c982-5a52-48f7-9063-34ac85ed5ea8} } - about to send HTTP request: GET https://192.168.99.100:8443/apis/batch/v1/namespaces/default/jobs/pi-634c802b-597d-4e68-9daf-bd0509f40ec5]
[INFO] [05/06/2019 09:39:51.885] [KubeJobSystem-akka.actor.default-dispatcher-5] [skuber.api] [ { reqId=4d22c982-5a52-48f7-9063-34ac85ed5ea8} } - received response with HTTP status 200]
[INFO] [05/06/2019 09:39:54.910] [KubeJobSystem-akka.actor.default-dispatcher-3] [skuber.api] [ { reqId=4208d9cb-4642-44c0-9b29-6b1f5ac5912f} } - about to send HTTP request: GET https://192.168.99.100:8443/apis/batch/v1/namespaces/default/jobs/pi-634c802b-597d-4e68-9daf-bd0509f40ec5]
[INFO] [05/06/2019 09:39:54.915] [KubeJobSystem-akka.actor.default-dispatcher-3] [skuber.api] [ { reqId=4208d9cb-4642-44c0-9b29-6b1f5ac5912f} } - received response with HTTP status 200]
[INFO] [05/06/2019 09:39:54.920] [KubeJobSystem-akka.actor.default-dispatcher-5] [akka://KubeJobSystem/user/main/KubeJob] Success: 2019-05-06T16:39:52Z
[INFO] [05/06/2019 09:39:54.924] [KubeJobSystem-akka.actor.default-dispatcher-2] [akka://KubeJobSystem/user/main] Terminated: Actor[akka://KubeJobSystem/user/main/KubeJob#1743425372]
[INFO] [05/06/2019 09:39:54.925] [KubeJobSystem-akka.actor.default-dispatcher-2] [akka://KubeJobSystem/user/main] Terminate!
[ERROR] [05/06/2019 09:39:54.939] [KubeJobSystem-akka.actor.default-dispatcher-2] [akka.actor.ActorSystemImpl(KubeJobSystem)] Outgoing request stream error (akka.stream.AbruptTerminationException: Processor actor [Actor[akka://KubeJobSystem/system/StreamSupervisor-1/flow-0-0-PoolFlow#1742495294]] terminated abruptly)

The annoying part is the last error; which is a known problem in Akka, see: akka/akka-http#497 (comment)

Fortunately, it is relatively easy to avoid this by switching from connection pool to graphs.

@NicolasRouquette
Copy link
Contributor Author

@doriordan Can you please review the PR for this? #274

@doriordan
Copy link
Owner

@NicolasRouquette I plan to review this by tomorrow

@NicolasRouquette
Copy link
Contributor Author

@doriordan convinced me this was a bad idea. It is still unclear how to deal with the abrupt termination though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants