Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Refine logic on how to treat instances that shall be killed.
Browse files Browse the repository at this point in the history
Summary:
This will transition unreachable or reachable stateful instances to Reserved when they're about to be killed.

Fixes #5207

Also-By: tharper@mesosphere.com

Test Plan: sbt test

Reviewers: unterstein, jasongilanfarr, jenkins, meichstedt

Reviewed By: jenkins

Subscribers: jdef, marathon-team

Differential Revision: https://phabricator.mesosphere.com/D529
  • Loading branch information
meichstedt authored and Tim Harper committed Feb 22, 2017
1 parent 0205b29 commit ef6bd77
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ case class Instance(
def isDropped: Boolean = state.condition == Condition.Dropped
def isTerminated: Boolean = state.condition.isTerminal
def isActive: Boolean = state.condition.isActive
def hasReservation =
tasksMap.values.exists {
case _: Task.ReservedTask => true
case _ => false
}

override def mergeFromProto(message: Protos.Json): Instance = {
Json.parse(message.getJson).as[Instance]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ private[marathon] class InstanceUpdateOpResolver(
case op: ReservationTimeout =>
updateExistingInstance(op.instanceId)(updater.reservationTimeout(_, clock.now()))

case op: ForceSuspended =>
updateExistingInstance(op.instanceId)(updater.forceSuspend(_, clock.now()))

case op: Reserve =>
createInstance(op.instanceId)(updater.reserve(op, clock.now()))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ object InstanceUpdateOperation {
override def possibleNewState: Option[Instance] = Some(instance)
}

/** Force a reserved task back into reserved state. This is how we kill an unreachable resident task */
case class ForceSuspended(instance: Instance) extends InstanceUpdateOperation {
override def instanceId: Instance.Id = instance.instanceId
override def possibleNewState: Option[Instance] = Some(instance)
}

case class LaunchOnReservation(
instanceId: Instance.Id,
runSpecVersion: Timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,47 @@ object InstanceUpdater extends StrictLogging {
private[marathon] def revert(instance: Instance): InstanceUpdateEffect = {
InstanceUpdateEffect.Update(instance, oldState = None, events = Nil)
}

//TODO(pods): this only supports apps (single-container instances)
private[marathon] def forceSuspend(instance: Instance, now: Timestamp): InstanceUpdateEffect = {
val reservedTask: Option[Task.Reserved] =
instance.tasksMap.values.collectFirst {
case t: Task.Reserved =>
t
case t: Task.LaunchedOnReservation =>
Task.Reserved(
taskId = t.taskId,
runSpecVersion = t.runSpecVersion,
reservation = t.reservation,
status =
Task.Status(
stagedAt = now,
startedAt = None,
mesosStatus = None,
condition = Condition.Reserved,
networkInfo = t.status.networkInfo))
}

if (!instance.hasReservation) {
InstanceUpdateEffect.Failure("Only reserved instances can be force suspended")
} else if (reservedTask.isEmpty) {
InstanceUpdateEffect.Failure(s"No reserved tasks for ${instance.instanceId}")
} else if (reservedTask == instance.tasksMap.values.headOption) {
InstanceUpdateEffect.Noop(instance.instanceId)
} else {
val updated = instance.copy(
tasksMap = reservedTask.map { t => t.taskId -> t }.toMap,
agentInfo = instance.agentInfo.copy(agentId = None),
state = Instance.InstanceState(
condition = Condition.Reserved,
since = now,
activeSince = None,
healthy = None))

val events = eventsGenerator.events(
updated, task = reservedTask, now, previousCondition = Some(instance.state.condition))

InstanceUpdateEffect.Update(updated, oldState = Some(instance), events)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package mesosphere.marathon
package core.task.termination.impl

/**
* Possible actions that can be chosen in order to `kill` a given instance.
* Depending on the instance's state this can be one of
* - [[KillAction.ExpungeFromState]]
* - [[KillAction.TransitionToReserved]]
* - [[KillAction.IssueKillRequest]]
*/
private[termination] sealed trait KillAction

private[termination] object KillAction {
/**
* Any normal, reachable and stateless instance will simply be killed via the scheduler driver.
*/
case object IssueKillRequest extends KillAction

/**
* If an instance has associated reservations and persistent volumes, killing it should transition
* to the Reserved state. Marathon will thus retain knowledge about the reserved resources and will
* be able to re-use them when trying to launch a new instance.
*/
case object TransitionToReserved extends KillAction

/**
* In case of an instance being Unreachable, killing the related Mesos task is impossible.
* In order to get rid of the instance, processing this action expunges the metadata from
* state. If the instance is reported to be non-terminal in the future, it will be killed.
*
* Note: stateful instances with associated reservations must be treated using [[TransitionToReserved]].
*/
case object ExpungeFromState extends KillAction
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package mesosphere.marathon
package core.task.termination.impl

import com.typesafe.scalalogging.StrictLogging

/**
* Responsible for resolving the relevant [[KillAction]] for an instance that should be killed.
*/
private[termination] object KillActionResolver extends StrictLogging {

/**
* Computes the [[KillAction]] based on the instance's state.
*/
def computeAction(toKill: ToKill): KillAction = {
val instanceId = toKill.instanceId
val taskIds = toKill.taskIdsToKill
val hasReservations = toKill.maybeInstance.fold(false)(_.hasReservation)

// TODO(PODS): align this with other Terminal/Unreachable/whatever extractors
val isLost = toKill.maybeInstance.fold(false) { instance =>
instance.isGone || instance.isUnknown || instance.isDropped || instance.isUnreachable || instance.isUnreachableInactive
}

// An instance will be expunged once all tasks are terminal. Therefore, this case is
// highly unlikely. Should it ever occur, this will still expunge the instance to clean up.
val allTerminal: Boolean = taskIds.isEmpty

if (isLost || allTerminal) {
val msg = if (isLost) "it is lost" else "all its tasks are terminal"
if (hasReservations) {
logger.info(s"Transitioning ${instanceId} to Reserved because it has reservations and ${msg}")
// we will eventually be notified of a taskStatusUpdate after the instance has been updated
KillAction.TransitionToReserved
} else {
logger.warn(s"Expunging ${instanceId} from state because ${msg}")
// we will eventually be notified of a taskStatusUpdate after the instance has been expunged
KillAction.ExpungeFromState
}
} else {
val knownOrNot = if (toKill.maybeInstance.isDefined) "known" else "unknown"
logger.warn("Killing {} {} of instance {}", knownOrNot, taskIds.mkString(","), instanceId)
KillAction.IssueKillRequest
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import akka.actor.{ Actor, ActorLogging, Cancellable, Props }
import akka.stream.ActorMaterializer
import mesosphere.marathon.core.base.Clock
import mesosphere.marathon.core.task.termination.KillConfig
import mesosphere.marathon.state.Timestamp
import mesosphere.marathon.core.task.Task
import mesosphere.marathon.core.task.tracker.TaskStateOpProcessor
import mesosphere.marathon.core.event.{ InstanceChanged, UnknownInstanceTerminated }
Expand All @@ -30,7 +29,7 @@ import scala.concurrent.{ Future, Promise }
* number of retries is exceeded, the instance will be expunged from state similar to a
* lost instance.
*
* For each kill request, a child [[InstanceKillProgressActor]] will be spawned, which
* For each kill request, a [[KillStreamWatcher]] will be created, which
* is supposed to watch the progress and complete a given promise when all watched
* instances are reportedly terminal.
*
Expand Down Expand Up @@ -143,30 +142,21 @@ private[impl] class KillServiceActor(
val instanceId = toKill.instanceId
val taskIds = toKill.taskIdsToKill

// TODO(PODS): align this with other Terminal/Unreachable/whatever extractors
val isLost: Boolean = toKill.maybeInstance.fold(false) { instance =>
instance.isGone || instance.isUnknown || instance.isDropped || instance.isUnreachable || instance.isUnreachableInactive
}
KillActionResolver.computeAction(toKill) match {
case KillAction.IssueKillRequest =>
driverHolder.driver.foreach { driver =>
taskIds.map(_.mesosTaskId).foreach(driver.killTask)
}
val attempts = inFlight.get(toKill.instanceId).fold(1)(_.attempts + 1)
inFlight.update(toKill.instanceId, ToKill(instanceId, taskIds, toKill.maybeInstance, attempts, issued = clock.now()))

// An instance will be expunged once all tasks are terminal. Therefore, this case is
// highly unlikely. Should it ever occur, this will still expunge the instance to clean up.
val allTerminal: Boolean = taskIds.isEmpty
case KillAction.TransitionToReserved =>
toKill.maybeInstance.foreach(instance => stateOpProcessor.process(InstanceUpdateOperation.ForceSuspended(instance)))

if (isLost || allTerminal) {
val msg = if (isLost) "it is lost" else "all its tasks are terminal"
log.warning("Expunging {} from state because {}", instanceId, msg)
// we will eventually be notified of a taskStatusUpdate after the instance has been expunged
stateOpProcessor.process(InstanceUpdateOperation.ForceExpunge(toKill.instanceId))
} else {
val knownOrNot = if (toKill.maybeInstance.isDefined) "known" else "unknown"
log.warning("Killing {} {} of instance {}", knownOrNot, taskIds.mkString(","), instanceId)
driverHolder.driver.foreach { driver =>
taskIds.map(_.mesosTaskId).foreach(driver.killTask)
}
case KillAction.ExpungeFromState =>
stateOpProcessor.process(InstanceUpdateOperation.ForceExpunge(toKill.instanceId))
}

val attempts = inFlight.get(toKill.instanceId).fold(1)(_.attempts + 1)
inFlight.update(toKill.instanceId, ToKill(instanceId, taskIds, toKill.maybeInstance, attempts, issued = clock.now()))
instancesToKill.remove(instanceId)
}

Expand Down Expand Up @@ -199,21 +189,6 @@ private[termination] object KillServiceActor {
sealed trait InternalRequest
case object Retry extends InternalRequest

/**
* Metadata used to track which instances to kill and how many attempts have been made
* @param instanceId id of the instance to kill
* @param taskIdsToKill ids of the tasks to kill
* @param maybeInstance the instance, if available
* @param attempts the number of kill attempts
* @param issued the time of the last issued kill request
*/
case class ToKill(
instanceId: Instance.Id,
taskIdsToKill: Seq[Task.Id],
maybeInstance: Option[Instance],
attempts: Int,
issued: Timestamp = Timestamp.zero)

def props(
driverHolder: MarathonSchedulerDriverHolder,
stateOpProcessor: TaskStateOpProcessor,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package mesosphere.marathon
package core.task.termination.impl

import mesosphere.marathon.Seq
import mesosphere.marathon.core.instance.Instance
import mesosphere.marathon.core.task.Task
import mesosphere.marathon.state.Timestamp

/**
* Metadata used to track which instances to kill and how many attempts have been made
*
* @param instanceId id of the instance to kill
* @param taskIdsToKill ids of the tasks to kill
* @param maybeInstance the instance, if available
* @param attempts the number of kill attempts
* @param issued the time of the last issued kill request
*/
private[termination] case class ToKill(
instanceId: Instance.Id,
taskIdsToKill: Seq[Task.Id],
maybeInstance: Option[Instance],
attempts: Int,
issued: Timestamp = Timestamp.zero)
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ case class TestTaskBuilder(
this.copy(task = Some(TestTaskBuilder.Helper.residentLaunchedTask(instance.instanceId.runSpecId, localVolumeIds: _*).copy(taskId = Task.Id.forInstanceId(instance.instanceId, None))))
}

def taskResidentUnreachable(localVolumeIds: Task.LocalVolumeId*) = {
val instance = instanceBuilder.getInstance()
this.copy(task = Some(TestTaskBuilder.Helper.residentUnreachableTask(instance.instanceId.runSpecId, localVolumeIds: _*).copy(taskId = Task.Id.forInstanceId(instance.instanceId, None))))
}

def taskRunning(containerName: Option[String] = None, stagedAt: Timestamp = now, startedAt: Timestamp = now) = {
val instance = instanceBuilder.getInstance()
this.copy(task = Some(TestTaskBuilder.Helper.runningTask(
Expand Down Expand Up @@ -294,6 +299,21 @@ object TestTaskBuilder {
reservation = Task.Reservation(localVolumeIds.to[Seq], Task.Reservation.State.Launched))
}

def residentUnreachableTask(appId: PathId, localVolumeIds: Task.LocalVolumeId*) = {
val now = Timestamp.now()
Task.LaunchedOnReservation(
taskId = Task.Id.forRunSpec(appId),
runSpecVersion = now,
status = Task.Status(
stagedAt = now,
startedAt = Some(now),
mesosStatus = None,
condition = Condition.Unreachable,
networkInfo = NetworkInfoPlaceholder()
),
reservation = Task.Reservation(localVolumeIds.to[Seq], Task.Reservation.State.Launched))
}

def startingTaskForApp(instanceId: Instance.Id, appVersion: Timestamp = Timestamp(1), stagedAt: Long = 2, container: Option[MesosContainer] = None): Task.LaunchedEphemeral =
startingTask(
Task.Id.forInstanceId(instanceId, container),
Expand Down
Loading

0 comments on commit ef6bd77

Please sign in to comment.