From 5d55e6ce6d0e3201b3cac175dce0033c50b2a5c6 Mon Sep 17 00:00:00 2001 From: Tim Harper Date: Tue, 21 Feb 2017 22:05:50 -0700 Subject: [PATCH] Use non-deployment-interacting KillService during kill-and-wipe Summary: Expunge caused a deployment, which caused MarathonSchedulerActor to lock for the app. This led to the subsequent kill request to MarathonSchedulerActor to fail. Fixes #5155 Backport of 560ba8bd Test Plan: sbt test Reviewers: aquamatthias, meichstedt, jenkins Subscribers: marathon-team Differential Revision: https://phabricator.mesosphere.com/D548 --- .../marathon/MarathonSchedulerService.scala | 6 +-- .../mesosphere/marathon/api/TaskKiller.scala | 53 +++++++++++-------- .../marathon/api/TaskKillerTest.scala | 11 ++-- .../api/v2/SpecInstancesResourceTest.scala | 7 ++- .../marathon/api/v2/TasksResourceTest.scala | 6 ++- 5 files changed, 52 insertions(+), 31 deletions(-) diff --git a/src/main/scala/mesosphere/marathon/MarathonSchedulerService.scala b/src/main/scala/mesosphere/marathon/MarathonSchedulerService.scala index b22d960fcb7..21c001c8aec 100644 --- a/src/main/scala/mesosphere/marathon/MarathonSchedulerService.scala +++ b/src/main/scala/mesosphere/marathon/MarathonSchedulerService.scala @@ -140,10 +140,10 @@ class MarathonSchedulerService @Inject() ( Await.result(groupManager.appVersion(appId, version.toOffsetDateTime), config.zkTimeoutDuration) } - def killTasks( + def killInstances( appId: PathId, - tasks: Seq[Instance]): Unit = { - schedulerActor ! KillTasks(appId, tasks) + instances: Seq[Instance]): Unit = { + schedulerActor ! KillTasks(appId, instances) } //Begin Service interface diff --git a/src/main/scala/mesosphere/marathon/api/TaskKiller.scala b/src/main/scala/mesosphere/marathon/api/TaskKiller.scala index 1258f834b5a..2161f268a38 100644 --- a/src/main/scala/mesosphere/marathon/api/TaskKiller.scala +++ b/src/main/scala/mesosphere/marathon/api/TaskKiller.scala @@ -3,9 +3,11 @@ package api import javax.inject.Inject +import akka.Done import mesosphere.marathon.core.group.GroupManager import mesosphere.marathon.core.instance.Instance import mesosphere.marathon.core.instance.update.InstanceUpdateOperation +import mesosphere.marathon.core.task.termination.{ KillReason, KillService } import mesosphere.marathon.core.task.tracker.{ InstanceTracker, TaskStateOpProcessor } import mesosphere.marathon.plugin.auth.{ Authenticator, Authorizer, Identity, UpdateRunSpec } import mesosphere.marathon.state._ @@ -24,7 +26,8 @@ class TaskKiller @Inject() ( service: MarathonSchedulerService, val config: MarathonConf, val authenticator: Authenticator, - val authorizer: Authorizer) extends AuthResource { + val authorizer: Authorizer, + killService: KillService) extends AuthResource { private[this] val log = LoggerFactory.getLogger(getClass) @@ -38,32 +41,36 @@ class TaskKiller @Inject() ( case Some(runSpec) => checkAuthorization(UpdateRunSpec, runSpec) async { // linter:ignore:UnnecessaryElseBranch - val allTasks = await(instanceTracker.specInstances(runSpecId)) - val foundTasks = findToKill(allTasks) - - if (wipe) await(expunge(foundTasks)) - - val launchedTasks = foundTasks.filter(_.isLaunched) - if (launchedTasks.nonEmpty) service.killTasks(runSpecId, launchedTasks) - // Return killed *and* expunged tasks. - // The user only cares that all tasks won't exist eventually. That's why we send all tasks back and not just - // the killed tasks. - foundTasks + val allInstances = await(instanceTracker.specInstances(runSpecId)) + val foundInstances = findToKill(allInstances) + val launchedInstances = foundInstances.filter(_.isLaunched) + + if (wipe) { + val done1 = await(expunge(foundInstances)) + val done2 = await(killService.killInstances(launchedInstances, KillReason.KillingTasksViaApi)) + } else { + if (launchedInstances.nonEmpty) service.killInstances(runSpecId, launchedInstances) + } + // Return killed *and* expunged instances. + // The user only cares that all instances won't exist eventually. That's why we send all instances back and + // not just the killed instances. + foundInstances } case None => Future.failed(PathNotFoundException(runSpecId)) } } - private[this] def expunge(tasks: Seq[Instance]): Future[Unit] = { + private[this] def expunge(instances: Seq[Instance]): Future[Done] = { // Note: We process all instances sequentially. - tasks.foldLeft(Future.successful(())) { (resultSoFar, nextInstance) => + instances.foldLeft(Future.successful(Done)) { (resultSoFar, nextInstance) => resultSoFar.flatMap { _ => log.info("Expunging {}", nextInstance.instanceId) - stateOpProcessor.process(InstanceUpdateOperation.ForceExpunge(nextInstance.instanceId)).map(_ => ()).recover { + stateOpProcessor.process(InstanceUpdateOperation.ForceExpunge(nextInstance.instanceId)).map(_ => Done).recover { case NonFatal(cause) => log.info("Failed to expunge {}, got: {}", Array[Object](nextInstance.instanceId, cause): _*) + Done } } } @@ -82,13 +89,13 @@ class TaskKiller @Inject() ( @SuppressWarnings(Array("all")) // async/await def killAndScale( - appTasks: Map[PathId, Seq[Instance]], + appInstances: Map[PathId, Seq[Instance]], force: Boolean)(implicit identity: Identity): Future[DeploymentPlan] = { def scaleApp(app: AppDefinition): AppDefinition = { checkAuthorization(UpdateRunSpec, app) - appTasks.get(app.id).fold(app) { tasks => - // only count active tasks that did not already receive a kill request. - val toKillCount = tasks.count(i => i.isActive && !i.isKilling) + appInstances.get(app.id).fold(app) { instances => + // only count active instances that did not already receive a kill request. + val toKillCount = instances.count(i => i.isActive && !i.isKilling) // make sure we never scale below zero instances. app.copy(instances = math.max(0, app.instances - toKillCount)) } @@ -96,18 +103,18 @@ class TaskKiller @Inject() ( val version = Timestamp.now() - def killTasks = groupManager.updateRoot( + def killDeployment = groupManager.updateRoot( _.updateTransitiveApps(PathId.empty, scaleApp, version), version = version, force = force, - toKill = appTasks + toKill = appInstances ) async { val allInstances = await(instanceTracker.instancesBySpec()).instancesMap //TODO: The exception does not take multiple ids. - appTasks.keys.find(!allInstances.contains(_)).map(id => throw PathNotFoundException(id)) - await(killTasks) + appInstances.keys.find(!allInstances.contains(_)).map(id => throw PathNotFoundException(id)) + await(killDeployment) } } } diff --git a/src/test/scala/mesosphere/marathon/api/TaskKillerTest.scala b/src/test/scala/mesosphere/marathon/api/TaskKillerTest.scala index e8a695229c0..41faf76da26 100644 --- a/src/test/scala/mesosphere/marathon/api/TaskKillerTest.scala +++ b/src/test/scala/mesosphere/marathon/api/TaskKillerTest.scala @@ -1,9 +1,11 @@ package mesosphere.marathon package api +import akka.Done import mesosphere.marathon.core.group.GroupManager import mesosphere.marathon.core.instance.update.{ InstanceUpdateEffect, InstanceUpdateOperation } import mesosphere.marathon.core.instance.{ Instance, TestInstanceBuilder } +import mesosphere.marathon.core.task.termination.{ KillReason, KillService } import mesosphere.marathon.core.task.tracker.InstanceTracker.InstancesBySpec import mesosphere.marathon.core.task.tracker.{ InstanceTracker, TaskStateOpProcessor } import mesosphere.marathon.state._ @@ -103,7 +105,7 @@ class TaskKillerTest extends MarathonSpec }) result.futureValue shouldEqual tasksToKill - verify(f.service, times(1)).killTasks(appId, tasksToKill) + verify(f.service, times(1)).killInstances(appId, tasksToKill) } test("Kill and scale w/o force should fail if there is a deployment") { @@ -140,6 +142,7 @@ class TaskKillerTest extends MarathonSpec val expungeRunning = InstanceUpdateOperation.ForceExpunge(runningInstance.instanceId) val expungeReserved = InstanceUpdateOperation.ForceExpunge(reservedInstance.instanceId) + when(f.killService.killInstances(launchedInstances, KillReason.KillingTasksViaApi)).thenReturn(Future(Done)) when(f.groupManager.runSpec(appId)).thenReturn(Future.successful(Some(AppDefinition(appId)))) when(f.tracker.specInstances(appId)).thenReturn(Future.successful(instancesToKill)) when(f.stateOpProcessor.process(expungeRunning)).thenReturn(Future.successful(InstanceUpdateEffect.Expunge(runningInstance, events = Nil))) @@ -151,7 +154,7 @@ class TaskKillerTest extends MarathonSpec }, wipe = true) result.futureValue shouldEqual instancesToKill // only task1 is killed - verify(f.service, times(1)).killTasks(appId, launchedInstances) + verify(f.killService, times(1)).killInstances(launchedInstances, KillReason.KillingTasksViaApi) // all found instances are expunged and the launched instance is eventually expunged again verify(f.stateOpProcessor, atLeastOnce).process(expungeRunning) verify(f.stateOpProcessor).process(expungeReserved) @@ -161,12 +164,14 @@ class TaskKillerTest extends MarathonSpec val tracker: InstanceTracker = mock[InstanceTracker] val stateOpProcessor: TaskStateOpProcessor = mock[TaskStateOpProcessor] val service: MarathonSchedulerService = mock[MarathonSchedulerService] + val killService: KillService = mock[KillService] val groupManager: GroupManager = mock[GroupManager] val config: MarathonConf = mock[MarathonConf] when(config.zkTimeoutDuration).thenReturn(1.second) - val taskKiller: TaskKiller = new TaskKiller(tracker, stateOpProcessor, groupManager, service, config, auth.auth, auth.auth) + val taskKiller: TaskKiller = new TaskKiller( + tracker, stateOpProcessor, groupManager, service, config, auth.auth, auth.auth, killService) } } diff --git a/src/test/scala/mesosphere/marathon/api/v2/SpecInstancesResourceTest.scala b/src/test/scala/mesosphere/marathon/api/v2/SpecInstancesResourceTest.scala index fff04dde9e9..c7ebeec6b57 100644 --- a/src/test/scala/mesosphere/marathon/api/v2/SpecInstancesResourceTest.scala +++ b/src/test/scala/mesosphere/marathon/api/v2/SpecInstancesResourceTest.scala @@ -8,6 +8,7 @@ import mesosphere.marathon.core.group.GroupManager import mesosphere.marathon.core.health.HealthCheckManager import mesosphere.marathon.core.instance.{ Instance, TestInstanceBuilder } import mesosphere.marathon.core.task.Task +import mesosphere.marathon.core.task.termination.KillService import mesosphere.marathon.core.task.tracker.{ InstanceTracker, TaskStateOpProcessor } import mesosphere.marathon.plugin.auth.Identity import mesosphere.marathon.state.PathId._ @@ -336,12 +337,14 @@ class SpecInstancesResourceTest extends MarathonSpec with Matchers with GivenWhe var service: MarathonSchedulerService = _ var taskTracker: InstanceTracker = _ var stateOpProcessor: TaskStateOpProcessor = _ + var killService: KillService = _ var taskKiller: TaskKiller = _ var healthCheckManager: HealthCheckManager = _ var config: MarathonConf = _ var groupManager: GroupManager = _ var appsTaskResource: AppTasksResource = _ var auth: TestAuthFixture = _ + implicit var identity: Identity = _ before { @@ -349,6 +352,7 @@ class SpecInstancesResourceTest extends MarathonSpec with Matchers with GivenWhe service = mock[MarathonSchedulerService] taskTracker = mock[InstanceTracker] stateOpProcessor = mock[TaskStateOpProcessor] + killService = mock[KillService] taskKiller = mock[TaskKiller] healthCheckManager = mock[HealthCheckManager] config = mock[MarathonConf] @@ -368,7 +372,8 @@ class SpecInstancesResourceTest extends MarathonSpec with Matchers with GivenWhe } private[this] def useRealTaskKiller(): Unit = { - taskKiller = new TaskKiller(taskTracker, stateOpProcessor, groupManager, service, config, auth.auth, auth.auth) + taskKiller = new TaskKiller(taskTracker, stateOpProcessor, groupManager, service, config, auth.auth, auth.auth, + killService) appsTaskResource = new AppTasksResource( taskTracker, taskKiller, diff --git a/src/test/scala/mesosphere/marathon/api/v2/TasksResourceTest.scala b/src/test/scala/mesosphere/marathon/api/v2/TasksResourceTest.scala index 09ddaabb2d9..c6188515487 100644 --- a/src/test/scala/mesosphere/marathon/api/v2/TasksResourceTest.scala +++ b/src/test/scala/mesosphere/marathon/api/v2/TasksResourceTest.scala @@ -10,6 +10,7 @@ import mesosphere.marathon.core.group.GroupManager import mesosphere.marathon.core.health.HealthCheckManager import mesosphere.marathon.core.instance.{ Instance, TestInstanceBuilder } import mesosphere.marathon.core.task.Task +import mesosphere.marathon.core.task.termination.KillService import mesosphere.marathon.core.task.tracker.{ InstanceTracker, TaskStateOpProcessor } import mesosphere.marathon.plugin.auth.Identity import mesosphere.marathon.state.PathId.StringPathId @@ -273,7 +274,8 @@ class TasksResourceTest extends MarathonSpec with GivenWhenThen with Matchers wi val taskId3 = Task.Id.forRunSpec(appId).idString val body = s"""{"ids": ["$taskId1", "$taskId2", "$taskId3"]}""".getBytes - taskKiller = new TaskKiller(taskTracker, stateOpProcessor, groupManager, service, config, auth.auth, auth.auth) + taskKiller = new TaskKiller(taskTracker, stateOpProcessor, groupManager, service, config, auth.auth, auth.auth, + killService) taskResource = new TasksResource( taskTracker, taskKiller, @@ -321,6 +323,7 @@ class TasksResourceTest extends MarathonSpec with GivenWhenThen with Matchers wi var config: MarathonConf = _ var groupManager: GroupManager = _ var healthCheckManager: HealthCheckManager = _ + var killService: KillService = _ var taskResource: TasksResource = _ var auth: TestAuthFixture = _ implicit var identity: Identity = _ @@ -330,6 +333,7 @@ class TasksResourceTest extends MarathonSpec with GivenWhenThen with Matchers wi service = mock[MarathonSchedulerService] taskTracker = mock[InstanceTracker] stateOpProcessor = mock[TaskStateOpProcessor] + killService = mock[KillService] taskKiller = mock[TaskKiller] config = mock[MarathonConf] groupManager = mock[GroupManager]