Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Don't destroy persistent volumes when killing unreachable resident tasks
Browse files Browse the repository at this point in the history
Summary:
Killing an unreachable resident task will do nothing, rather than
destroy the reservations.

Fixes #5207

Also-By: tharper@mesosphere.com

Test Plan: sbt test

Reviewers: unterstein, meichstedt, jasongilanfarr, jenkins

Reviewed By: meichstedt, jasongilanfarr, jenkins

Subscribers: jdef, marathon-team

Differential Revision: https://phabricator.mesosphere.com/D529
  • Loading branch information
meichstedt authored and timcharper committed Mar 9, 2017
1 parent 62bb2c7 commit 5f34abd
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ case class Instance(
def isDropped: Boolean = state.condition == Condition.Dropped
def isTerminated: Boolean = state.condition.isTerminal
def isActive: Boolean = state.condition.isActive
def hasReservation =
tasksMap.values.exists {
case _: Task.ReservedTask => true
case _ => false
}

override def mergeFromProto(message: Protos.Json): Instance = {
Json.parse(message.getJson).as[Instance]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package mesosphere.marathon
package core.task.termination.impl

import com.typesafe.scalalogging.StrictLogging
import mesosphere.marathon.core.condition.Condition
import mesosphere.marathon.core.instance.Instance
import mesosphere.marathon.core.task.Task

/**
* Possible actions that can be chosen in order to `kill` a given instance.
* Depending on the instance's state this can be one of
* - [[KillAction.ExpungeFromState]]
* - [[KillAction.Noop]]
* - [[KillAction.IssueKillRequest]]
*/
private[termination] sealed trait KillAction

private[termination] object KillAction extends StrictLogging {
/**
* Any normal, reachable and stateless instance will simply be killed via the scheduler driver.
*/
case object IssueKillRequest extends KillAction

/**
* Do nothing. This is currently what we do for unreachable tasks with reservations. See #5261
*/
case object Noop extends KillAction

/**
* In case of an instance being Unreachable, killing the related Mesos task is impossible.
* In order to get rid of the instance, processing this action expunges the metadata from
* state. If the instance is reported to be non-terminal in the future, it will be killed.
*/
case object ExpungeFromState extends KillAction

/* returns whether or not we can expect the task to report a terminal state after sending a kill signal */
private val wontRespondToKill: Condition => Boolean = {
import Condition._
Set(
Unknown, Unreachable, UnreachableInactive,
// TODO: it should be safe to remove these from this list, because
// 1) all taskId's should be removed at this point, because Gone & Dropped are terminal.
// 2) Killing a Gone / Dropped task will cause it to be in a terminal state.
// 3) Killing a Gone / Dropped task may result in no status change at all.
// 4) Either way, we end up in a terminal state.
// However, we didn't want to risk changing behavior in a point release. So they remain here.
Dropped, Gone
)
}

/**
* Computes the [[KillAction]] based on the instance's state.
*
* if the instance can't be reached, issuing a kill request won't cause the instance to progress towards a terminal
* state; Mesos will simply re-send the current state. Our current behavior, for ephemeral, is to simply delete any
* knowledge that the instance might be running, such that if it is reported by Mesos later we will kill it. (that
* could be improved).
*
* If the instance is lost _and_ has reservations, we do nothing.
*
* any other case -> issue a kill request
*/
def apply(instanceId: Instance.Id, taskIds: Iterable[Task.Id], knownInstance: Option[Instance]): KillAction = {
val hasReservations = knownInstance.fold(false)(_.hasReservation)

// TODO(PODS): align this with other Terminal/Unreachable/whatever extractors
val maybeCondition = knownInstance.map(_.state.condition)
val isUnkillable = maybeCondition.fold(false)(wontRespondToKill)

// Ephemeral instances are expunged once all tasks are terminal, it's unlikely for this to be true for them.
// Resident tasks, however, could be in this state if scaled down, or, if kill is attempted between recovery.
val allTerminal: Boolean = taskIds.isEmpty

if (isUnkillable || allTerminal) {
val msg = if (isUnkillable)
s"it is ${maybeCondition.fold("unknown")(_.toString)}"
else
"none of its tasks are running"
if (hasReservations) {
logger.info(
s"Ignoring kill request for ${instanceId}; killing it while ${msg} is unsupported")
KillAction.Noop
} else {
logger.warn(s"Expunging ${instanceId} from state because ${msg}")
// we will eventually be notified of a taskStatusUpdate after the instance has been expunged
KillAction.ExpungeFromState
}
} else {
val knownOrNot = if (knownInstance.isDefined) "known" else "unknown"
logger.warn("Killing {} {} of instance {}", knownOrNot, taskIds.mkString(","), instanceId)
KillAction.IssueKillRequest
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import akka.Done
import akka.actor.{ Actor, ActorLogging, Cancellable, Props }
import mesosphere.marathon.core.base.Clock
import mesosphere.marathon.core.task.termination.KillConfig
import mesosphere.marathon.state.Timestamp
import mesosphere.marathon.core.task.Task
import mesosphere.marathon.core.task.tracker.TaskStateOpProcessor
import mesosphere.marathon.core.event.{ InstanceChanged, UnknownInstanceTerminated }
Expand All @@ -15,6 +14,7 @@ import mesosphere.marathon.core.task.termination.InstanceChangedPredicates.consi
import mesosphere.marathon.core.task.Task.Id

import scala.collection.mutable
import mesosphere.marathon.state.Timestamp
import scala.concurrent.Promise
import scala.util.Try

Expand All @@ -29,7 +29,7 @@ import scala.util.Try
* number of retries is exceeded, the instance will be expunged from state similar to a
* lost instance.
*
* For each kill request, a child [[InstanceKillProgressActor]] will be spawned, which
* For each kill request, a [[KillStreamWatcher]] will be created, which
* is supposed to watch the progress and complete a given promise when all watched
* instances are reportedly terminal.
*
Expand Down Expand Up @@ -142,30 +142,22 @@ private[impl] class KillServiceActor(
val instanceId = toKill.instanceId
val taskIds = toKill.taskIdsToKill

// TODO(PODS): align this with other Terminal/Unreachable/whatever extractors
val isLost: Boolean = toKill.maybeInstance.fold(false) { instance =>
instance.isGone || instance.isUnknown || instance.isDropped || instance.isUnreachable || instance.isUnreachableInactive
}
KillAction(toKill.instanceId, toKill.taskIdsToKill, toKill.maybeInstance) match {
case KillAction.Noop =>
()

// An instance will be expunged once all tasks are terminal. Therefore, this case is
// highly unlikely. Should it ever occur, this will still expunge the instance to clean up.
val allTerminal: Boolean = taskIds.isEmpty
case KillAction.IssueKillRequest =>
driverHolder.driver.foreach { driver =>
taskIds.map(_.mesosTaskId).foreach(driver.killTask)
}
val attempts = inFlight.get(toKill.instanceId).fold(1)(_.attempts + 1)
inFlight.update(
toKill.instanceId, ToKill(instanceId, taskIds, toKill.maybeInstance, attempts, issued = clock.now()))

if (isLost || allTerminal) {
val msg = if (isLost) "it is lost" else "all its tasks are terminal"
log.warning("Expunging {} from state because {}", instanceId, msg)
// we will eventually be notified of a taskStatusUpdate after the instance has been expunged
stateOpProcessor.process(InstanceUpdateOperation.ForceExpunge(toKill.instanceId))
} else {
val knownOrNot = if (toKill.maybeInstance.isDefined) "known" else "unknown"
log.warning("Killing {} {} of instance {}", knownOrNot, taskIds.mkString(","), instanceId)
driverHolder.driver.foreach { driver =>
taskIds.map(_.mesosTaskId).foreach(driver.killTask)
}
case KillAction.ExpungeFromState =>
stateOpProcessor.process(InstanceUpdateOperation.ForceExpunge(toKill.instanceId))
}

val attempts = inFlight.get(toKill.instanceId).fold(1)(_.attempts + 1)
inFlight.update(toKill.instanceId, ToKill(instanceId, taskIds, toKill.maybeInstance, attempts, issued = clock.now()))
instancesToKill.remove(instanceId)
}

Expand Down Expand Up @@ -198,8 +190,16 @@ private[termination] object KillServiceActor {
sealed trait InternalRequest
case object Retry extends InternalRequest

def props(
driverHolder: MarathonSchedulerDriverHolder,
stateOpProcessor: TaskStateOpProcessor,
config: KillConfig,
clock: Clock): Props = Props(
new KillServiceActor(driverHolder, stateOpProcessor, config, clock))

/**
* Metadata used to track which instances to kill and how many attempts have been made
*
* @param instanceId id of the instance to kill
* @param taskIdsToKill ids of the tasks to kill
* @param maybeInstance the instance, if available
Expand All @@ -212,13 +212,6 @@ private[termination] object KillServiceActor {
maybeInstance: Option[Instance],
attempts: Int,
issued: Timestamp = Timestamp.zero)

def props(
driverHolder: MarathonSchedulerDriverHolder,
stateOpProcessor: TaskStateOpProcessor,
config: KillConfig,
clock: Clock): Props = Props(
new KillServiceActor(driverHolder, stateOpProcessor, config, clock))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ private[tracker] class InstanceOpProcessorImpl(

override def process(op: Operation)(implicit ec: ExecutionContext): Future[Unit] = {
val stateChange = stateOpResolver.resolve(op.op)

stateChange.flatMap {
case change: InstanceUpdateEffect.Expunge =>
// Used for task termination or as a result from a UpdateStatus action.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import mesosphere.marathon.core.task.tracker.InstanceTracker
import mesosphere.marathon.core.task.{ Task, TaskCondition }
import mesosphere.marathon.state.{ PathId, Timestamp }
import org.apache.mesos
import org.scalatest.Inside

import scala.collection.immutable.Seq
import scala.concurrent.Future
Expand All @@ -20,7 +21,7 @@ import scala.concurrent.Future
*
* More tests are in [[mesosphere.marathon.tasks.InstanceTrackerImplTest]]
*/
class InstanceUpdateOpResolverTest extends UnitTest {
class InstanceUpdateOpResolverTest extends UnitTest with Inside {
import scala.concurrent.ExecutionContext.Implicits.global

"ForceExpunge for an unknown task" should {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package mesosphere.marathon
package core.task.termination.impl

import mesosphere.UnitTest
import mesosphere.marathon.core.base.ConstantClock
import mesosphere.marathon.core.instance.{ Instance, TestInstanceBuilder }
import mesosphere.marathon.core.task.Task.LocalVolumeId
import mesosphere.marathon.state.PathId
import org.scalatest.prop.TableDrivenPropertyChecks

class KillActionTest extends UnitTest with TableDrivenPropertyChecks {

val clock = ConstantClock()
val appId = PathId("/test")

lazy val localVolumeId = LocalVolumeId(appId, "unwanted-persistent-volume", "uuid1")
lazy val residentLaunchedInstance: Instance = TestInstanceBuilder.newBuilder(appId).
addTaskResidentLaunched(localVolumeId).
getInstance()

lazy val residentUnreachableInstance: Instance = TestInstanceBuilder.newBuilder(appId).
addTaskWithBuilder().
taskResidentUnreachable(localVolumeId).
build().
getInstance()

lazy val unreachableInstance: Instance = TestInstanceBuilder.newBuilder(appId).addTaskUnreachable().getInstance()
lazy val runningInstance: Instance = TestInstanceBuilder.newBuilder(appId).addTaskLaunched().getInstance()

"computeKillAction" when {
Table(
("name", "instance", "expected"),
("an unreachable reserved instance", residentUnreachableInstance, KillAction.Noop),
("a running reserved instance", residentLaunchedInstance, KillAction.IssueKillRequest),
("an unreachable ephemeral instance", unreachableInstance, KillAction.ExpungeFromState),
("a running ephemeral instance", runningInstance, KillAction.IssueKillRequest)
).
foreach {
case (name, instance, expected) =>
s"killing ${name}" should {
s"result in ${expected}" in {
KillAction(
instance.instanceId, instance.tasksMap.keys, Some(instance)).
shouldBe(expected)
}
}
}
}
}

0 comments on commit 5f34abd

Please sign in to comment.