From 1455fdd9e86c2aa2183b443f9ef4e39084e2d75d Mon Sep 17 00:00:00 2001 From: Matthias Eichstedt Date: Tue, 21 Feb 2017 13:11:15 -0700 Subject: [PATCH] Don't destroy persistent volumes when killing unreachable resident tasks Summary: Killing an unreachable resident task will do nothing, rather than destroy the reservations. Fixes #5207 Also-By: tharper@mesosphere.com Test Plan: sbt test Reviewers: unterstein, meichstedt, jasongilanfarr, jenkins Reviewed By: meichstedt, jasongilanfarr, jenkins Subscribers: jdef, marathon-team Differential Revision: https://phabricator.mesosphere.com/D529 --- .../marathon/core/instance/Instance.scala | 5 + .../task/termination/impl/KillAction.scala | 95 +++++++++++++++++++ .../termination/impl/KillServiceActor.scala | 51 +++++----- .../impl/InstanceOpProcessorImpl.scala | 1 + .../update/InstanceUpdateOpResolverTest.scala | 3 +- .../termination/impl/KillActionTest.scala | 49 ++++++++++ 6 files changed, 174 insertions(+), 30 deletions(-) create mode 100644 src/main/scala/mesosphere/marathon/core/task/termination/impl/KillAction.scala create mode 100644 src/test/scala/mesosphere/marathon/core/task/termination/impl/KillActionTest.scala diff --git a/src/main/scala/mesosphere/marathon/core/instance/Instance.scala b/src/main/scala/mesosphere/marathon/core/instance/Instance.scala index a0b05e45ffe..5d5eb9f893d 100644 --- a/src/main/scala/mesosphere/marathon/core/instance/Instance.scala +++ b/src/main/scala/mesosphere/marathon/core/instance/Instance.scala @@ -49,6 +49,11 @@ case class Instance( def isDropped: Boolean = state.condition == Condition.Dropped def isTerminated: Boolean = state.condition.isTerminal def isActive: Boolean = state.condition.isActive + def hasReservation = + tasksMap.values.exists { + case _: Task.ReservedTask => true + case _ => false + } override def mergeFromProto(message: Protos.Json): Instance = { Json.parse(message.getJson).as[Instance] diff --git a/src/main/scala/mesosphere/marathon/core/task/termination/impl/KillAction.scala b/src/main/scala/mesosphere/marathon/core/task/termination/impl/KillAction.scala new file mode 100644 index 00000000000..e387fe8a659 --- /dev/null +++ b/src/main/scala/mesosphere/marathon/core/task/termination/impl/KillAction.scala @@ -0,0 +1,95 @@ +package mesosphere.marathon +package core.task.termination.impl + +import com.typesafe.scalalogging.StrictLogging +import mesosphere.marathon.core.condition.Condition +import mesosphere.marathon.core.instance.Instance +import mesosphere.marathon.core.task.Task + +/** + * Possible actions that can be chosen in order to `kill` a given instance. + * Depending on the instance's state this can be one of + * - [[KillAction.ExpungeFromState]] + * - [[KillAction.Noop]] + * - [[KillAction.IssueKillRequest]] + */ +private[termination] sealed trait KillAction + +private[termination] object KillAction extends StrictLogging { + /** + * Any normal, reachable and stateless instance will simply be killed via the scheduler driver. + */ + case object IssueKillRequest extends KillAction + + /** + * Do nothing. This is currently what we do for unreachable tasks with reservations. See #5261 + */ + case object Noop extends KillAction + + /** + * In case of an instance being Unreachable, killing the related Mesos task is impossible. + * In order to get rid of the instance, processing this action expunges the metadata from + * state. If the instance is reported to be non-terminal in the future, it will be killed. + */ + case object ExpungeFromState extends KillAction + + /* returns whether or not we can expect the task to report a terminal state after sending a kill signal */ + private val wontRespondToKill: Condition => Boolean = { + import Condition._ + Set( + Unknown, Unreachable, UnreachableInactive, + // TODO: it should be safe to remove these from this list, because + // 1) all taskId's should be removed at this point, because Gone & Dropped are terminal. + // 2) Killing a Gone / Dropped task will cause it to be in a terminal state. + // 3) Killing a Gone / Dropped task may result in no status change at all. + // 4) Either way, we end up in a terminal state. + // However, we didn't want to risk changing behavior in a point release. So they remain here. + Dropped, Gone + ) + } + + /** + * Computes the [[KillAction]] based on the instance's state. + * + * if the instance can't be reached, issuing a kill request won't cause the instance to progress towards a terminal + * state; Mesos will simply re-send the current state. Our current behavior, for ephemeral, is to simply delete any + * knowledge that the instance might be running, such that if it is reported by Mesos later we will kill it. (that + * could be improved). + * + * If the instance is lost _and_ has reservations, we do nothing. + * + * any other case -> issue a kill request + */ + def apply(instanceId: Instance.Id, taskIds: Iterable[Task.Id], knownInstance: Option[Instance]): KillAction = { + val hasReservations = knownInstance.fold(false)(_.hasReservation) + + // TODO(PODS): align this with other Terminal/Unreachable/whatever extractors + val maybeCondition = knownInstance.map(_.state.condition) + val isUnkillable = maybeCondition.fold(false)(wontRespondToKill) + + // Ephemeral instances are expunged once all tasks are terminal, it's unlikely for this to be true for them. + // Resident tasks, however, could be in this state if scaled down, or, if kill is attempted between recovery. + val allTerminal: Boolean = taskIds.isEmpty + + if (isUnkillable || allTerminal) { + val msg = if (isUnkillable) + s"it is ${maybeCondition.fold("unknown")(_.toString)}" + else + "none of its tasks are running" + if (hasReservations) { + logger.info( + s"Ignoring kill request for ${instanceId}; killing it while ${msg} is unsupported") + KillAction.Noop + } else { + logger.warn(s"Expunging ${instanceId} from state because ${msg}") + // we will eventually be notified of a taskStatusUpdate after the instance has been expunged + KillAction.ExpungeFromState + } + } else { + val knownOrNot = if (knownInstance.isDefined) "known" else "unknown" + logger.warn("Killing {} {} of instance {}", knownOrNot, taskIds.mkString(","), instanceId) + KillAction.IssueKillRequest + } + } + +} diff --git a/src/main/scala/mesosphere/marathon/core/task/termination/impl/KillServiceActor.scala b/src/main/scala/mesosphere/marathon/core/task/termination/impl/KillServiceActor.scala index a6ab850382b..1bf368071a4 100644 --- a/src/main/scala/mesosphere/marathon/core/task/termination/impl/KillServiceActor.scala +++ b/src/main/scala/mesosphere/marathon/core/task/termination/impl/KillServiceActor.scala @@ -5,7 +5,6 @@ import akka.Done import akka.actor.{ Actor, ActorLogging, Cancellable, Props } import mesosphere.marathon.core.base.Clock import mesosphere.marathon.core.task.termination.KillConfig -import mesosphere.marathon.state.Timestamp import mesosphere.marathon.core.task.Task import mesosphere.marathon.core.task.tracker.TaskStateOpProcessor import mesosphere.marathon.core.event.{ InstanceChanged, UnknownInstanceTerminated } @@ -15,6 +14,7 @@ import mesosphere.marathon.core.task.termination.InstanceChangedPredicates.consi import mesosphere.marathon.core.task.Task.Id import scala.collection.mutable +import mesosphere.marathon.state.Timestamp import scala.concurrent.Promise import scala.util.Try @@ -29,7 +29,7 @@ import scala.util.Try * number of retries is exceeded, the instance will be expunged from state similar to a * lost instance. * - * For each kill request, a child [[InstanceKillProgressActor]] will be spawned, which + * For each kill request, a [[KillStreamWatcher]] will be created, which * is supposed to watch the progress and complete a given promise when all watched * instances are reportedly terminal. * @@ -142,30 +142,22 @@ private[impl] class KillServiceActor( val instanceId = toKill.instanceId val taskIds = toKill.taskIdsToKill - // TODO(PODS): align this with other Terminal/Unreachable/whatever extractors - val isLost: Boolean = toKill.maybeInstance.fold(false) { instance => - instance.isGone || instance.isUnknown || instance.isDropped || instance.isUnreachable || instance.isUnreachableInactive - } + KillAction(toKill.instanceId, toKill.taskIdsToKill, toKill.maybeInstance) match { + case KillAction.Noop => + () - // An instance will be expunged once all tasks are terminal. Therefore, this case is - // highly unlikely. Should it ever occur, this will still expunge the instance to clean up. - val allTerminal: Boolean = taskIds.isEmpty + case KillAction.IssueKillRequest => + driverHolder.driver.foreach { driver => + taskIds.map(_.mesosTaskId).foreach(driver.killTask) + } + val attempts = inFlight.get(toKill.instanceId).fold(1)(_.attempts + 1) + inFlight.update( + toKill.instanceId, ToKill(instanceId, taskIds, toKill.maybeInstance, attempts, issued = clock.now())) - if (isLost || allTerminal) { - val msg = if (isLost) "it is lost" else "all its tasks are terminal" - log.warning("Expunging {} from state because {}", instanceId, msg) - // we will eventually be notified of a taskStatusUpdate after the instance has been expunged - stateOpProcessor.process(InstanceUpdateOperation.ForceExpunge(toKill.instanceId)) - } else { - val knownOrNot = if (toKill.maybeInstance.isDefined) "known" else "unknown" - log.warning("Killing {} {} of instance {}", knownOrNot, taskIds.mkString(","), instanceId) - driverHolder.driver.foreach { driver => - taskIds.map(_.mesosTaskId).foreach(driver.killTask) - } + case KillAction.ExpungeFromState => + stateOpProcessor.process(InstanceUpdateOperation.ForceExpunge(toKill.instanceId)) } - val attempts = inFlight.get(toKill.instanceId).fold(1)(_.attempts + 1) - inFlight.update(toKill.instanceId, ToKill(instanceId, taskIds, toKill.maybeInstance, attempts, issued = clock.now())) instancesToKill.remove(instanceId) } @@ -198,8 +190,16 @@ private[termination] object KillServiceActor { sealed trait InternalRequest case object Retry extends InternalRequest + def props( + driverHolder: MarathonSchedulerDriverHolder, + stateOpProcessor: TaskStateOpProcessor, + config: KillConfig, + clock: Clock): Props = Props( + new KillServiceActor(driverHolder, stateOpProcessor, config, clock)) + /** * Metadata used to track which instances to kill and how many attempts have been made + * * @param instanceId id of the instance to kill * @param taskIdsToKill ids of the tasks to kill * @param maybeInstance the instance, if available @@ -212,13 +212,6 @@ private[termination] object KillServiceActor { maybeInstance: Option[Instance], attempts: Int, issued: Timestamp = Timestamp.zero) - - def props( - driverHolder: MarathonSchedulerDriverHolder, - stateOpProcessor: TaskStateOpProcessor, - config: KillConfig, - clock: Clock): Props = Props( - new KillServiceActor(driverHolder, stateOpProcessor, config, clock)) } /** diff --git a/src/main/scala/mesosphere/marathon/core/task/tracker/impl/InstanceOpProcessorImpl.scala b/src/main/scala/mesosphere/marathon/core/task/tracker/impl/InstanceOpProcessorImpl.scala index 6711448279a..597fa4520fb 100644 --- a/src/main/scala/mesosphere/marathon/core/task/tracker/impl/InstanceOpProcessorImpl.scala +++ b/src/main/scala/mesosphere/marathon/core/task/tracker/impl/InstanceOpProcessorImpl.scala @@ -27,6 +27,7 @@ private[tracker] class InstanceOpProcessorImpl( override def process(op: Operation)(implicit ec: ExecutionContext): Future[Unit] = { val stateChange = stateOpResolver.resolve(op.op) + stateChange.flatMap { case change: InstanceUpdateEffect.Expunge => // Used for task termination or as a result from a UpdateStatus action. diff --git a/src/test/scala/mesosphere/marathon/core/instance/update/InstanceUpdateOpResolverTest.scala b/src/test/scala/mesosphere/marathon/core/instance/update/InstanceUpdateOpResolverTest.scala index cde6ac6d875..7dd71f170b0 100644 --- a/src/test/scala/mesosphere/marathon/core/instance/update/InstanceUpdateOpResolverTest.scala +++ b/src/test/scala/mesosphere/marathon/core/instance/update/InstanceUpdateOpResolverTest.scala @@ -11,6 +11,7 @@ import mesosphere.marathon.core.task.tracker.InstanceTracker import mesosphere.marathon.core.task.{ Task, TaskCondition } import mesosphere.marathon.state.{ PathId, Timestamp } import org.apache.mesos +import org.scalatest.Inside import scala.collection.immutable.Seq import scala.concurrent.Future @@ -20,7 +21,7 @@ import scala.concurrent.Future * * More tests are in [[mesosphere.marathon.tasks.InstanceTrackerImplTest]] */ -class InstanceUpdateOpResolverTest extends UnitTest { +class InstanceUpdateOpResolverTest extends UnitTest with Inside { import scala.concurrent.ExecutionContext.Implicits.global "ForceExpunge for an unknown task" should { diff --git a/src/test/scala/mesosphere/marathon/core/task/termination/impl/KillActionTest.scala b/src/test/scala/mesosphere/marathon/core/task/termination/impl/KillActionTest.scala new file mode 100644 index 00000000000..b3c5fb9fac8 --- /dev/null +++ b/src/test/scala/mesosphere/marathon/core/task/termination/impl/KillActionTest.scala @@ -0,0 +1,49 @@ +package mesosphere.marathon +package core.task.termination.impl + +import mesosphere.UnitTest +import mesosphere.marathon.core.base.ConstantClock +import mesosphere.marathon.core.instance.{ Instance, TestInstanceBuilder } +import mesosphere.marathon.core.task.Task.LocalVolumeId +import mesosphere.marathon.state.PathId +import org.scalatest.prop.TableDrivenPropertyChecks + +class KillActionTest extends UnitTest with TableDrivenPropertyChecks { + + val clock = ConstantClock() + val appId = PathId("/test") + + lazy val localVolumeId = LocalVolumeId(appId, "unwanted-persistent-volume", "uuid1") + lazy val residentLaunchedInstance: Instance = TestInstanceBuilder.newBuilder(appId). + addTaskResidentLaunched(localVolumeId). + getInstance() + + lazy val residentUnreachableInstance: Instance = TestInstanceBuilder.newBuilder(appId). + addTaskWithBuilder(). + taskResidentUnreachable(localVolumeId). + build(). + getInstance() + + lazy val unreachableInstance: Instance = TestInstanceBuilder.newBuilder(appId).addTaskUnreachable().getInstance() + lazy val runningInstance: Instance = TestInstanceBuilder.newBuilder(appId).addTaskLaunched().getInstance() + + "computeKillAction" when { + Table( + ("name", "instance", "expected"), + ("an unreachable reserved instance", residentUnreachableInstance, KillAction.Noop), + ("a running reserved instance", residentLaunchedInstance, KillAction.IssueKillRequest), + ("an unreachable ephemeral instance", unreachableInstance, KillAction.ExpungeFromState), + ("a running ephemeral instance", runningInstance, KillAction.IssueKillRequest) + ). + foreach { + case (name, instance, expected) => + s"killing ${name}" should { + s"result in ${expected}" in { + KillAction( + instance.instanceId, instance.tasksMap.keys, Some(instance)). + shouldBe(expected) + } + } + } + } +}