diff --git a/src/main/scala/mesosphere/marathon/core/instance/Instance.scala b/src/main/scala/mesosphere/marathon/core/instance/Instance.scala index 595847a1863..fd7adb70636 100644 --- a/src/main/scala/mesosphere/marathon/core/instance/Instance.scala +++ b/src/main/scala/mesosphere/marathon/core/instance/Instance.scala @@ -49,6 +49,11 @@ case class Instance( def isDropped: Boolean = state.condition == Condition.Dropped def isTerminated: Boolean = state.condition.isTerminal def isActive: Boolean = state.condition.isActive + def hasReservation = + tasksMap.values.exists { + case _: Task.ReservedTask => true + case _ => false + } override def mergeFromProto(message: Protos.Json): Instance = { Json.parse(message.getJson).as[Instance] diff --git a/src/main/scala/mesosphere/marathon/core/instance/update/InstanceUpdateOpResolver.scala b/src/main/scala/mesosphere/marathon/core/instance/update/InstanceUpdateOpResolver.scala index 99bc5317f60..0afcb54cf94 100644 --- a/src/main/scala/mesosphere/marathon/core/instance/update/InstanceUpdateOpResolver.scala +++ b/src/main/scala/mesosphere/marathon/core/instance/update/InstanceUpdateOpResolver.scala @@ -42,6 +42,9 @@ private[marathon] class InstanceUpdateOpResolver( case op: ReservationTimeout => updateExistingInstance(op.instanceId)(updater.reservationTimeout(_, clock.now())) + case op: ForceSuspended => + updateExistingInstance(op.instanceId)(updater.forceSuspend(_, clock.now())) + case op: Reserve => createInstance(op.instanceId)(updater.reserve(op, clock.now())) diff --git a/src/main/scala/mesosphere/marathon/core/instance/update/InstanceUpdateOperation.scala b/src/main/scala/mesosphere/marathon/core/instance/update/InstanceUpdateOperation.scala index 91a6fd4ba55..b17d2aed9d0 100644 --- a/src/main/scala/mesosphere/marathon/core/instance/update/InstanceUpdateOperation.scala +++ b/src/main/scala/mesosphere/marathon/core/instance/update/InstanceUpdateOperation.scala @@ -36,6 +36,12 @@ object InstanceUpdateOperation { override def possibleNewState: Option[Instance] = Some(instance) } + /** Force a reserved task back into reserved state. This is how we kill an unreachable resident task */ + case class ForceSuspended(instance: Instance) extends InstanceUpdateOperation { + override def instanceId: Instance.Id = instance.instanceId + override def possibleNewState: Option[Instance] = Some(instance) + } + case class LaunchOnReservation( instanceId: Instance.Id, runSpecVersion: Timestamp, diff --git a/src/main/scala/mesosphere/marathon/core/instance/update/InstanceUpdater.scala b/src/main/scala/mesosphere/marathon/core/instance/update/InstanceUpdater.scala index ed68b578d5a..b93d3153a99 100644 --- a/src/main/scala/mesosphere/marathon/core/instance/update/InstanceUpdater.scala +++ b/src/main/scala/mesosphere/marathon/core/instance/update/InstanceUpdater.scala @@ -135,4 +135,47 @@ object InstanceUpdater extends StrictLogging { private[marathon] def revert(instance: Instance): InstanceUpdateEffect = { InstanceUpdateEffect.Update(instance, oldState = None, events = Nil) } + + //TODO(pods): this only supports apps (single-container instances) + private[marathon] def forceSuspend(instance: Instance, now: Timestamp): InstanceUpdateEffect = { + val reservedTask: Option[Task.Reserved] = + instance.tasksMap.values.collectFirst { + case t: Task.Reserved => + t + case t: Task.LaunchedOnReservation => + Task.Reserved( + taskId = t.taskId, + runSpecVersion = t.runSpecVersion, + reservation = t.reservation, + status = + Task.Status( + stagedAt = now, + startedAt = None, + mesosStatus = None, + condition = Condition.Reserved, + networkInfo = t.status.networkInfo)) + } + + if (!instance.hasReservation) { + InstanceUpdateEffect.Failure("Only reserved instances can be force suspended") + } else if (reservedTask.isEmpty) { + InstanceUpdateEffect.Failure(s"No reserved tasks for ${instance.instanceId}") + } else if (reservedTask == instance.tasksMap.values.headOption) { + InstanceUpdateEffect.Noop(instance.instanceId) + } else { + val updated = instance.copy( + tasksMap = reservedTask.map { t => t.taskId -> t }.toMap, + agentInfo = instance.agentInfo.copy(agentId = None), + state = Instance.InstanceState( + condition = Condition.Reserved, + since = now, + activeSince = None, + healthy = None)) + + val events = eventsGenerator.events( + updated, task = reservedTask, now, previousCondition = Some(instance.state.condition)) + + InstanceUpdateEffect.Update(updated, oldState = Some(instance), events) + } + } } diff --git a/src/main/scala/mesosphere/marathon/core/task/termination/impl/KillAction.scala b/src/main/scala/mesosphere/marathon/core/task/termination/impl/KillAction.scala new file mode 100644 index 00000000000..e1dd5ec26fa --- /dev/null +++ b/src/main/scala/mesosphere/marathon/core/task/termination/impl/KillAction.scala @@ -0,0 +1,34 @@ +package mesosphere.marathon +package core.task.termination.impl + +/** + * Possible actions that can be chosen in order to `kill` a given instance. + * Depending on the instance's state this can be one of + * - [[KillAction.ExpungeFromState]] + * - [[KillAction.TransitionToReserved]] + * - [[KillAction.IssueKillRequest]] + */ +private[termination] sealed trait KillAction + +private[termination] object KillAction { + /** + * Any normal, reachable and stateless instance will simply be killed via the scheduler driver. + */ + case object IssueKillRequest extends KillAction + + /** + * If an instance has associated reservations and persistent volumes, killing it should transition + * to the Reserved state. Marathon will thus retain knowledge about the reserved resources and will + * be able to re-use them when trying to launch a new instance. + */ + case object TransitionToReserved extends KillAction + + /** + * In case of an instance being Unreachable, killing the related Mesos task is impossible. + * In order to get rid of the instance, processing this action expunges the metadata from + * state. If the instance is reported to be non-terminal in the future, it will be killed. + * + * Note: stateful instances with associated reservations must be treated using [[TransitionToReserved]]. + */ + case object ExpungeFromState extends KillAction +} diff --git a/src/main/scala/mesosphere/marathon/core/task/termination/impl/KillActionResolver.scala b/src/main/scala/mesosphere/marathon/core/task/termination/impl/KillActionResolver.scala new file mode 100644 index 00000000000..b471e130bcf --- /dev/null +++ b/src/main/scala/mesosphere/marathon/core/task/termination/impl/KillActionResolver.scala @@ -0,0 +1,45 @@ +package mesosphere.marathon +package core.task.termination.impl + +import com.typesafe.scalalogging.StrictLogging + +/** + * Responsible for resolving the relevant [[KillAction]] for an instance that should be killed. + */ +private[termination] object KillActionResolver extends StrictLogging { + + /** + * Computes the [[KillAction]] based on the instance's state. + */ + def computeAction(toKill: ToKill): KillAction = { + val instanceId = toKill.instanceId + val taskIds = toKill.taskIdsToKill + val hasReservations = toKill.maybeInstance.fold(false)(_.hasReservation) + + // TODO(PODS): align this with other Terminal/Unreachable/whatever extractors + val isLost = toKill.maybeInstance.fold(false) { instance => + instance.isGone || instance.isUnknown || instance.isDropped || instance.isUnreachable || instance.isUnreachableInactive + } + + // An instance will be expunged once all tasks are terminal. Therefore, this case is + // highly unlikely. Should it ever occur, this will still expunge the instance to clean up. + val allTerminal: Boolean = taskIds.isEmpty + + if (isLost || allTerminal) { + val msg = if (isLost) "it is lost" else "all its tasks are terminal" + if (hasReservations) { + logger.info(s"Transitioning ${instanceId} to Reserved because it has reservations and ${msg}") + // we will eventually be notified of a taskStatusUpdate after the instance has been updated + KillAction.TransitionToReserved + } else { + logger.warn(s"Expunging ${instanceId} from state because ${msg}") + // we will eventually be notified of a taskStatusUpdate after the instance has been expunged + KillAction.ExpungeFromState + } + } else { + val knownOrNot = if (toKill.maybeInstance.isDefined) "known" else "unknown" + logger.warn("Killing {} {} of instance {}", knownOrNot, taskIds.mkString(","), instanceId) + KillAction.IssueKillRequest + } + } +} diff --git a/src/main/scala/mesosphere/marathon/core/task/termination/impl/KillServiceActor.scala b/src/main/scala/mesosphere/marathon/core/task/termination/impl/KillServiceActor.scala index b845798cec0..c986079a509 100644 --- a/src/main/scala/mesosphere/marathon/core/task/termination/impl/KillServiceActor.scala +++ b/src/main/scala/mesosphere/marathon/core/task/termination/impl/KillServiceActor.scala @@ -6,7 +6,6 @@ import akka.actor.{ Actor, ActorLogging, Cancellable, Props } import akka.stream.ActorMaterializer import mesosphere.marathon.core.base.Clock import mesosphere.marathon.core.task.termination.KillConfig -import mesosphere.marathon.state.Timestamp import mesosphere.marathon.core.task.Task import mesosphere.marathon.core.task.tracker.TaskStateOpProcessor import mesosphere.marathon.core.event.{ InstanceChanged, UnknownInstanceTerminated } @@ -30,7 +29,7 @@ import scala.concurrent.{ Future, Promise } * number of retries is exceeded, the instance will be expunged from state similar to a * lost instance. * - * For each kill request, a child [[InstanceKillProgressActor]] will be spawned, which + * For each kill request, a [[KillStreamWatcher]] will be created, which * is supposed to watch the progress and complete a given promise when all watched * instances are reportedly terminal. * @@ -143,30 +142,21 @@ private[impl] class KillServiceActor( val instanceId = toKill.instanceId val taskIds = toKill.taskIdsToKill - // TODO(PODS): align this with other Terminal/Unreachable/whatever extractors - val isLost: Boolean = toKill.maybeInstance.fold(false) { instance => - instance.isGone || instance.isUnknown || instance.isDropped || instance.isUnreachable || instance.isUnreachableInactive - } + KillActionResolver.computeAction(toKill) match { + case KillAction.IssueKillRequest => + driverHolder.driver.foreach { driver => + taskIds.map(_.mesosTaskId).foreach(driver.killTask) + } + val attempts = inFlight.get(toKill.instanceId).fold(1)(_.attempts + 1) + inFlight.update(toKill.instanceId, ToKill(instanceId, taskIds, toKill.maybeInstance, attempts, issued = clock.now())) - // An instance will be expunged once all tasks are terminal. Therefore, this case is - // highly unlikely. Should it ever occur, this will still expunge the instance to clean up. - val allTerminal: Boolean = taskIds.isEmpty + case KillAction.TransitionToReserved => + toKill.maybeInstance.foreach(instance => stateOpProcessor.process(InstanceUpdateOperation.ForceSuspended(instance))) - if (isLost || allTerminal) { - val msg = if (isLost) "it is lost" else "all its tasks are terminal" - log.warning("Expunging {} from state because {}", instanceId, msg) - // we will eventually be notified of a taskStatusUpdate after the instance has been expunged - stateOpProcessor.process(InstanceUpdateOperation.ForceExpunge(toKill.instanceId)) - } else { - val knownOrNot = if (toKill.maybeInstance.isDefined) "known" else "unknown" - log.warning("Killing {} {} of instance {}", knownOrNot, taskIds.mkString(","), instanceId) - driverHolder.driver.foreach { driver => - taskIds.map(_.mesosTaskId).foreach(driver.killTask) - } + case KillAction.ExpungeFromState => + stateOpProcessor.process(InstanceUpdateOperation.ForceExpunge(toKill.instanceId)) } - val attempts = inFlight.get(toKill.instanceId).fold(1)(_.attempts + 1) - inFlight.update(toKill.instanceId, ToKill(instanceId, taskIds, toKill.maybeInstance, attempts, issued = clock.now())) instancesToKill.remove(instanceId) } @@ -199,21 +189,6 @@ private[termination] object KillServiceActor { sealed trait InternalRequest case object Retry extends InternalRequest - /** - * Metadata used to track which instances to kill and how many attempts have been made - * @param instanceId id of the instance to kill - * @param taskIdsToKill ids of the tasks to kill - * @param maybeInstance the instance, if available - * @param attempts the number of kill attempts - * @param issued the time of the last issued kill request - */ - case class ToKill( - instanceId: Instance.Id, - taskIdsToKill: Seq[Task.Id], - maybeInstance: Option[Instance], - attempts: Int, - issued: Timestamp = Timestamp.zero) - def props( driverHolder: MarathonSchedulerDriverHolder, stateOpProcessor: TaskStateOpProcessor, diff --git a/src/main/scala/mesosphere/marathon/core/task/termination/impl/ToKill.scala b/src/main/scala/mesosphere/marathon/core/task/termination/impl/ToKill.scala new file mode 100644 index 00000000000..a1f1fe6eeef --- /dev/null +++ b/src/main/scala/mesosphere/marathon/core/task/termination/impl/ToKill.scala @@ -0,0 +1,23 @@ +package mesosphere.marathon +package core.task.termination.impl + +import mesosphere.marathon.Seq +import mesosphere.marathon.core.instance.Instance +import mesosphere.marathon.core.task.Task +import mesosphere.marathon.state.Timestamp + +/** + * Metadata used to track which instances to kill and how many attempts have been made + * + * @param instanceId id of the instance to kill + * @param taskIdsToKill ids of the tasks to kill + * @param maybeInstance the instance, if available + * @param attempts the number of kill attempts + * @param issued the time of the last issued kill request + */ +private[termination] case class ToKill( + instanceId: Instance.Id, + taskIdsToKill: Seq[Task.Id], + maybeInstance: Option[Instance], + attempts: Int, + issued: Timestamp = Timestamp.zero) diff --git a/src/test/scala/mesosphere/marathon/core/instance/TestTaskBuilder.scala b/src/test/scala/mesosphere/marathon/core/instance/TestTaskBuilder.scala index a625cc9d7ff..96056aa028b 100644 --- a/src/test/scala/mesosphere/marathon/core/instance/TestTaskBuilder.scala +++ b/src/test/scala/mesosphere/marathon/core/instance/TestTaskBuilder.scala @@ -59,6 +59,11 @@ case class TestTaskBuilder( this.copy(task = Some(TestTaskBuilder.Helper.residentLaunchedTask(instance.instanceId.runSpecId, localVolumeIds: _*).copy(taskId = Task.Id.forInstanceId(instance.instanceId, None)))) } + def taskResidentUnreachable(localVolumeIds: Task.LocalVolumeId*) = { + val instance = instanceBuilder.getInstance() + this.copy(task = Some(TestTaskBuilder.Helper.residentUnreachableTask(instance.instanceId.runSpecId, localVolumeIds: _*).copy(taskId = Task.Id.forInstanceId(instance.instanceId, None)))) + } + def taskRunning(containerName: Option[String] = None, stagedAt: Timestamp = now, startedAt: Timestamp = now) = { val instance = instanceBuilder.getInstance() this.copy(task = Some(TestTaskBuilder.Helper.runningTask( @@ -294,6 +299,21 @@ object TestTaskBuilder { reservation = Task.Reservation(localVolumeIds.to[Seq], Task.Reservation.State.Launched)) } + def residentUnreachableTask(appId: PathId, localVolumeIds: Task.LocalVolumeId*) = { + val now = Timestamp.now() + Task.LaunchedOnReservation( + taskId = Task.Id.forRunSpec(appId), + runSpecVersion = now, + status = Task.Status( + stagedAt = now, + startedAt = Some(now), + mesosStatus = None, + condition = Condition.Unreachable, + networkInfo = NetworkInfoPlaceholder() + ), + reservation = Task.Reservation(localVolumeIds.to[Seq], Task.Reservation.State.Launched)) + } + def startingTaskForApp(instanceId: Instance.Id, appVersion: Timestamp = Timestamp(1), stagedAt: Long = 2, container: Option[MesosContainer] = None): Task.LaunchedEphemeral = startingTask( Task.Id.forInstanceId(instanceId, container), diff --git a/src/test/scala/mesosphere/marathon/core/instance/update/InstanceUpdateOpResolverTest.scala b/src/test/scala/mesosphere/marathon/core/instance/update/InstanceUpdateOpResolverTest.scala index 293ab95f290..470c1cd7fec 100644 --- a/src/test/scala/mesosphere/marathon/core/instance/update/InstanceUpdateOpResolverTest.scala +++ b/src/test/scala/mesosphere/marathon/core/instance/update/InstanceUpdateOpResolverTest.scala @@ -12,6 +12,7 @@ import mesosphere.marathon.core.task.tracker.InstanceTracker import mesosphere.marathon.core.task.{ Task, TaskCondition } import mesosphere.marathon.state.{ PathId, Timestamp } import org.apache.mesos +import org.scalatest.Inside import scala.collection.immutable.Seq import scala.concurrent.Future @@ -21,7 +22,7 @@ import scala.concurrent.Future * * More tests are in [[mesosphere.marathon.tasks.InstanceTrackerImplTest]] */ -class InstanceUpdateOpResolverTest extends UnitTest { +class InstanceUpdateOpResolverTest extends UnitTest with Inside { import scala.concurrent.ExecutionContext.Implicits.global @@ -218,11 +219,11 @@ class InstanceUpdateOpResolverTest extends UnitTest { } "Processing a Reserve for an existing instanceId" in new Fixture { - instanceTracker.instance(existingReservedInstance.instanceId) returns Future.successful(Some(existingReservedInstance)) - val stateChange = updateOpResolver.resolve(InstanceUpdateOperation.Reserve(existingReservedInstance)).futureValue + instanceTracker.instance(reservedInstance.instanceId) returns Future.successful(Some(reservedInstance)) + val stateChange = updateOpResolver.resolve(InstanceUpdateOperation.Reserve(reservedInstance)).futureValue When("call taskTracker.task") - verify(instanceTracker).instance(existingReservedInstance.instanceId) + verify(instanceTracker).instance(reservedInstance.instanceId) Then("result in a Failure") stateChange shouldBe a[InstanceUpdateEffect.Failure] @@ -230,11 +231,42 @@ class InstanceUpdateOpResolverTest extends UnitTest { verifyNoMoreInteractions() } + "result in failure when processing a ForceSuspended for a non-reserved task" in new Fixture { + instanceTracker.instance(existingInstance.instanceId) returns Future.successful(Some(existingInstance)) + updateOpResolver. + resolve(InstanceUpdateOperation.ForceSuspended(existingInstance)). + futureValue. + shouldBe(a[InstanceUpdateEffect.Failure]) + } + + "result in Noop when processing a ForceSuspended for a Reserved (not-launched) task" in new Fixture { + instanceTracker.instance(reservedInstance.instanceId) returns Future.successful(Some(reservedInstance)) + updateOpResolver. + resolve(InstanceUpdateOperation.ForceSuspended(reservedInstance)). + futureValue. + shouldBe(a[InstanceUpdateEffect.Noop]) + } + + "result in Suspend update when processing a ForceSuspended for a Reserved, launched task" in new Fixture { + instanceTracker.instance(reservedLaunchedInstance.instanceId) returns Future.successful(Some(reservedLaunchedInstance)) + val result = updateOpResolver. + resolve(InstanceUpdateOperation.ForceSuspended(reservedLaunchedInstance)). + futureValue + + inside(result) { + case update: InstanceUpdateEffect.Update => + update.instance.state.condition.shouldBe(Condition.Reserved) + update.instance.tasksMap.values.head.shouldBe(a[Task.Reserved]) + update.oldState.shouldBe(Some(reservedLaunchedInstance)) + update.events.head.eventType.shouldBe("status_update_event") + } + } + "Revert" in new Fixture { - val stateChange = updateOpResolver.resolve(InstanceUpdateOperation.Revert(existingReservedInstance)).futureValue + val stateChange = updateOpResolver.resolve(InstanceUpdateOperation.Revert(reservedInstance)).futureValue When("result in an Update") - stateChange shouldEqual InstanceUpdateEffect.Update(existingReservedInstance, None, events = Nil) + stateChange shouldEqual InstanceUpdateEffect.Update(reservedInstance, None, events = Nil) Then("not query the taskTracker all") verifyNoMoreInteractions() @@ -384,14 +416,17 @@ class InstanceUpdateOpResolverTest extends UnitTest { val instanceTracker = mock[InstanceTracker] val updateOpResolver = new InstanceUpdateOpResolver(instanceTracker, clock) - val appId = PathId("/app") - val existingInstance: Instance = TestInstanceBuilder.newBuilder(appId).addTaskRunning().getInstance() - val existingTask: Task.LaunchedEphemeral = existingInstance.appTask + lazy val appId = PathId("/app") + lazy val existingInstance: Instance = TestInstanceBuilder.newBuilder(appId).addTaskRunning().getInstance() + lazy val existingTask: Task.LaunchedEphemeral = existingInstance.appTask + + lazy val reservedInstance = TestInstanceBuilder.newBuilder(appId).addTaskReserved().getInstance() + lazy val existingReservedTask: Task.Reserved = reservedInstance.appTask + + lazy val reservedLaunchedInstance: Instance = TestInstanceBuilder.newBuilder(appId).addTaskResidentLaunched().getInstance() - val existingReservedInstance = TestInstanceBuilder.newBuilder(appId).addTaskReserved().getInstance() - val existingReservedTask: Task.Reserved = existingReservedInstance.appTask - val notExistingInstanceId = Instance.Id.forRunSpec(appId) - val unreachableInstance = TestInstanceBuilder.newBuilder(appId).addTaskUnreachable().getInstance() + lazy val notExistingInstanceId = Instance.Id.forRunSpec(appId) + lazy val unreachableInstance = TestInstanceBuilder.newBuilder(appId).addTaskUnreachable().getInstance() def verifyNoMoreInteractions(): Unit = { noMoreInteractions(instanceTracker) diff --git a/src/test/scala/mesosphere/marathon/core/task/termination/impl/KillActionResolverTest.scala b/src/test/scala/mesosphere/marathon/core/task/termination/impl/KillActionResolverTest.scala new file mode 100644 index 00000000000..d14ca1dd53c --- /dev/null +++ b/src/test/scala/mesosphere/marathon/core/task/termination/impl/KillActionResolverTest.scala @@ -0,0 +1,53 @@ +package mesosphere.marathon +package core.task.termination.impl + +import mesosphere.UnitTest +import mesosphere.marathon.core.base.ConstantClock +import mesosphere.marathon.core.instance.{ Instance, TestInstanceBuilder } +import mesosphere.marathon.core.task.Task.LocalVolumeId +import mesosphere.marathon.state.PathId +import org.scalatest.prop.TableDrivenPropertyChecks + +class KillActionResolverTest extends UnitTest with TableDrivenPropertyChecks { + + val clock = ConstantClock() + val appId = PathId("/test") + + lazy val localVolumeId = LocalVolumeId(appId, "unwanted-persistent-volume", "uuid1") + lazy val residentLaunchedInstance: Instance = TestInstanceBuilder.newBuilder(appId). + addTaskResidentLaunched(localVolumeId). + getInstance() + + lazy val residentUnreachableInstance: Instance = TestInstanceBuilder.newBuilder(appId). + addTaskWithBuilder(). + taskResidentUnreachable(localVolumeId). + build(). + getInstance() + + lazy val unreachableInstance: Instance = TestInstanceBuilder.newBuilder(appId).addTaskUnreachable().getInstance() + lazy val runningInstance: Instance = TestInstanceBuilder.newBuilder(appId).addTaskLaunched().getInstance() + + def createToKill(instance: Instance) = ToKill(instance.instanceId, instance.tasksMap.keysIterator.to[Seq], Some(instance), attempts = 0) + + "computeKillAction" when { + // format: OFF + Table( + ("name" , "instance", "expected"), + ("an unreachable reserved instance" , residentUnreachableInstance, KillAction.TransitionToReserved), + ("a running reserved instance" , residentLaunchedInstance, KillAction.IssueKillRequest), + ("an unreachable ephemeral instance" , unreachableInstance, KillAction.ExpungeFromState), + ("a running ephemeral instance" , runningInstance, KillAction.IssueKillRequest) + ). + // format: ON + foreach { + case (name, instance, expected) => + s"killing ${name}" should { + s"result in ${expected}" in { + val toKill = createToKill(instance) + KillActionResolver.computeAction(toKill). + shouldBe(expected) + } + } + } + } +}