Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Use non-deployment-interacting KillService during kill-and-wipe #5242

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,10 @@ class MarathonSchedulerService @Inject() (
Await.result(groupManager.appVersion(appId, version.toOffsetDateTime), config.zkTimeoutDuration)
}

def killTasks(
def killInstances(
appId: PathId,
tasks: Seq[Instance]): Unit = {
schedulerActor ! KillTasks(appId, tasks)
instances: Seq[Instance]): Unit = {
schedulerActor ! KillTasks(appId, instances)
}

//Begin Service interface
Expand Down
53 changes: 30 additions & 23 deletions src/main/scala/mesosphere/marathon/api/TaskKiller.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package api

import javax.inject.Inject

import akka.Done
import mesosphere.marathon.core.group.GroupManager
import mesosphere.marathon.core.instance.Instance
import mesosphere.marathon.core.instance.update.InstanceUpdateOperation
import mesosphere.marathon.core.task.termination.{ KillReason, KillService }
import mesosphere.marathon.core.task.tracker.{ InstanceTracker, TaskStateOpProcessor }
import mesosphere.marathon.plugin.auth.{ Authenticator, Authorizer, Identity, UpdateRunSpec }
import mesosphere.marathon.state._
Expand All @@ -24,7 +26,8 @@ class TaskKiller @Inject() (
service: MarathonSchedulerService,
val config: MarathonConf,
val authenticator: Authenticator,
val authorizer: Authorizer) extends AuthResource {
val authorizer: Authorizer,
killService: KillService) extends AuthResource {

private[this] val log = LoggerFactory.getLogger(getClass)

Expand All @@ -38,32 +41,36 @@ class TaskKiller @Inject() (
case Some(runSpec) =>
checkAuthorization(UpdateRunSpec, runSpec)
async { // linter:ignore:UnnecessaryElseBranch
val allTasks = await(instanceTracker.specInstances(runSpecId))
val foundTasks = findToKill(allTasks)

if (wipe) await(expunge(foundTasks))

val launchedTasks = foundTasks.filter(_.isLaunched)
if (launchedTasks.nonEmpty) service.killTasks(runSpecId, launchedTasks)
// Return killed *and* expunged tasks.
// The user only cares that all tasks won't exist eventually. That's why we send all tasks back and not just
// the killed tasks.
foundTasks
val allInstances = await(instanceTracker.specInstances(runSpecId))
val foundInstances = findToKill(allInstances)
val launchedInstances = foundInstances.filter(_.isLaunched)

if (wipe) {
val done1 = await(expunge(foundInstances))
val done2 = await(killService.killInstances(launchedInstances, KillReason.KillingTasksViaApi))
} else {
if (launchedInstances.nonEmpty) service.killInstances(runSpecId, launchedInstances)
}
// Return killed *and* expunged instances.
// The user only cares that all instances won't exist eventually. That's why we send all instances back and
// not just the killed instances.
foundInstances
}

case None => Future.failed(PathNotFoundException(runSpecId))
}
}

private[this] def expunge(tasks: Seq[Instance]): Future[Unit] = {
private[this] def expunge(instances: Seq[Instance]): Future[Done] = {
// Note: We process all instances sequentially.

tasks.foldLeft(Future.successful(())) { (resultSoFar, nextInstance) =>
instances.foldLeft(Future.successful(Done)) { (resultSoFar, nextInstance) =>
resultSoFar.flatMap { _ =>
log.info("Expunging {}", nextInstance.instanceId)
stateOpProcessor.process(InstanceUpdateOperation.ForceExpunge(nextInstance.instanceId)).map(_ => ()).recover {
stateOpProcessor.process(InstanceUpdateOperation.ForceExpunge(nextInstance.instanceId)).map(_ => Done).recover {
case NonFatal(cause) =>
log.info("Failed to expunge {}, got: {}", Array[Object](nextInstance.instanceId, cause): _*)
Done
}
}
}
Expand All @@ -82,32 +89,32 @@ class TaskKiller @Inject() (

@SuppressWarnings(Array("all")) // async/await
def killAndScale(
appTasks: Map[PathId, Seq[Instance]],
appInstances: Map[PathId, Seq[Instance]],
force: Boolean)(implicit identity: Identity): Future[DeploymentPlan] = {
def scaleApp(app: AppDefinition): AppDefinition = {
checkAuthorization(UpdateRunSpec, app)
appTasks.get(app.id).fold(app) { tasks =>
// only count active tasks that did not already receive a kill request.
val toKillCount = tasks.count(i => i.isActive && !i.isKilling)
appInstances.get(app.id).fold(app) { instances =>
// only count active instances that did not already receive a kill request.
val toKillCount = instances.count(i => i.isActive && !i.isKilling)
// make sure we never scale below zero instances.
app.copy(instances = math.max(0, app.instances - toKillCount))
}
}

val version = Timestamp.now()

def killTasks = groupManager.updateRoot(
def killDeployment = groupManager.updateRoot(
_.updateTransitiveApps(PathId.empty, scaleApp, version),
version = version,
force = force,
toKill = appTasks
toKill = appInstances
)

async {
val allInstances = await(instanceTracker.instancesBySpec()).instancesMap
//TODO: The exception does not take multiple ids.
appTasks.keys.find(!allInstances.contains(_)).map(id => throw PathNotFoundException(id))
await(killTasks)
appInstances.keys.find(!allInstances.contains(_)).map(id => throw PathNotFoundException(id))
await(killDeployment)
}
}
}
11 changes: 8 additions & 3 deletions src/test/scala/mesosphere/marathon/api/TaskKillerTest.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package mesosphere.marathon
package api

import akka.Done
import mesosphere.marathon.core.group.GroupManager
import mesosphere.marathon.core.instance.update.{ InstanceUpdateEffect, InstanceUpdateOperation }
import mesosphere.marathon.core.instance.{ Instance, TestInstanceBuilder }
import mesosphere.marathon.core.task.termination.{ KillReason, KillService }
import mesosphere.marathon.core.task.tracker.InstanceTracker.InstancesBySpec
import mesosphere.marathon.core.task.tracker.{ InstanceTracker, TaskStateOpProcessor }
import mesosphere.marathon.state._
Expand Down Expand Up @@ -103,7 +105,7 @@ class TaskKillerTest extends MarathonSpec
})

result.futureValue shouldEqual tasksToKill
verify(f.service, times(1)).killTasks(appId, tasksToKill)
verify(f.service, times(1)).killInstances(appId, tasksToKill)
}

test("Kill and scale w/o force should fail if there is a deployment") {
Expand Down Expand Up @@ -140,6 +142,7 @@ class TaskKillerTest extends MarathonSpec
val expungeRunning = InstanceUpdateOperation.ForceExpunge(runningInstance.instanceId)
val expungeReserved = InstanceUpdateOperation.ForceExpunge(reservedInstance.instanceId)

when(f.killService.killInstances(launchedInstances, KillReason.KillingTasksViaApi)).thenReturn(Future(Done))
when(f.groupManager.runSpec(appId)).thenReturn(Future.successful(Some(AppDefinition(appId))))
when(f.tracker.specInstances(appId)).thenReturn(Future.successful(instancesToKill))
when(f.stateOpProcessor.process(expungeRunning)).thenReturn(Future.successful(InstanceUpdateEffect.Expunge(runningInstance, events = Nil)))
Expand All @@ -151,7 +154,7 @@ class TaskKillerTest extends MarathonSpec
}, wipe = true)
result.futureValue shouldEqual instancesToKill
// only task1 is killed
verify(f.service, times(1)).killTasks(appId, launchedInstances)
verify(f.killService, times(1)).killInstances(launchedInstances, KillReason.KillingTasksViaApi)
// all found instances are expunged and the launched instance is eventually expunged again
verify(f.stateOpProcessor, atLeastOnce).process(expungeRunning)
verify(f.stateOpProcessor).process(expungeReserved)
Expand All @@ -161,12 +164,14 @@ class TaskKillerTest extends MarathonSpec
val tracker: InstanceTracker = mock[InstanceTracker]
val stateOpProcessor: TaskStateOpProcessor = mock[TaskStateOpProcessor]
val service: MarathonSchedulerService = mock[MarathonSchedulerService]
val killService: KillService = mock[KillService]
val groupManager: GroupManager = mock[GroupManager]

val config: MarathonConf = mock[MarathonConf]
when(config.zkTimeoutDuration).thenReturn(1.second)

val taskKiller: TaskKiller = new TaskKiller(tracker, stateOpProcessor, groupManager, service, config, auth.auth, auth.auth)
val taskKiller: TaskKiller = new TaskKiller(
tracker, stateOpProcessor, groupManager, service, config, auth.auth, auth.auth, killService)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import mesosphere.marathon.core.group.GroupManager
import mesosphere.marathon.core.health.HealthCheckManager
import mesosphere.marathon.core.instance.{ Instance, TestInstanceBuilder }
import mesosphere.marathon.core.task.Task
import mesosphere.marathon.core.task.termination.KillService
import mesosphere.marathon.core.task.tracker.{ InstanceTracker, TaskStateOpProcessor }
import mesosphere.marathon.plugin.auth.Identity
import mesosphere.marathon.state.PathId._
Expand Down Expand Up @@ -336,19 +337,22 @@ class SpecInstancesResourceTest extends MarathonSpec with Matchers with GivenWhe
var service: MarathonSchedulerService = _
var taskTracker: InstanceTracker = _
var stateOpProcessor: TaskStateOpProcessor = _
var killService: KillService = _
var taskKiller: TaskKiller = _
var healthCheckManager: HealthCheckManager = _
var config: MarathonConf = _
var groupManager: GroupManager = _
var appsTaskResource: AppTasksResource = _
var auth: TestAuthFixture = _

implicit var identity: Identity = _

before {
auth = new TestAuthFixture
service = mock[MarathonSchedulerService]
taskTracker = mock[InstanceTracker]
stateOpProcessor = mock[TaskStateOpProcessor]
killService = mock[KillService]
taskKiller = mock[TaskKiller]
healthCheckManager = mock[HealthCheckManager]
config = mock[MarathonConf]
Expand All @@ -368,7 +372,8 @@ class SpecInstancesResourceTest extends MarathonSpec with Matchers with GivenWhe
}

private[this] def useRealTaskKiller(): Unit = {
taskKiller = new TaskKiller(taskTracker, stateOpProcessor, groupManager, service, config, auth.auth, auth.auth)
taskKiller = new TaskKiller(taskTracker, stateOpProcessor, groupManager, service, config, auth.auth, auth.auth,
killService)
appsTaskResource = new AppTasksResource(
taskTracker,
taskKiller,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import mesosphere.marathon.core.group.GroupManager
import mesosphere.marathon.core.health.HealthCheckManager
import mesosphere.marathon.core.instance.{ Instance, TestInstanceBuilder }
import mesosphere.marathon.core.task.Task
import mesosphere.marathon.core.task.termination.KillService
import mesosphere.marathon.core.task.tracker.{ InstanceTracker, TaskStateOpProcessor }
import mesosphere.marathon.plugin.auth.Identity
import mesosphere.marathon.state.PathId.StringPathId
Expand Down Expand Up @@ -273,7 +274,8 @@ class TasksResourceTest extends MarathonSpec with GivenWhenThen with Matchers wi
val taskId3 = Task.Id.forRunSpec(appId).idString
val body = s"""{"ids": ["$taskId1", "$taskId2", "$taskId3"]}""".getBytes

taskKiller = new TaskKiller(taskTracker, stateOpProcessor, groupManager, service, config, auth.auth, auth.auth)
taskKiller = new TaskKiller(taskTracker, stateOpProcessor, groupManager, service, config, auth.auth, auth.auth,
killService)
taskResource = new TasksResource(
taskTracker,
taskKiller,
Expand Down Expand Up @@ -321,6 +323,7 @@ class TasksResourceTest extends MarathonSpec with GivenWhenThen with Matchers wi
var config: MarathonConf = _
var groupManager: GroupManager = _
var healthCheckManager: HealthCheckManager = _
var killService: KillService = _
var taskResource: TasksResource = _
var auth: TestAuthFixture = _
implicit var identity: Identity = _
Expand All @@ -330,6 +333,7 @@ class TasksResourceTest extends MarathonSpec with GivenWhenThen with Matchers wi
service = mock[MarathonSchedulerService]
taskTracker = mock[InstanceTracker]
stateOpProcessor = mock[TaskStateOpProcessor]
killService = mock[KillService]
taskKiller = mock[TaskKiller]
config = mock[MarathonConf]
groupManager = mock[GroupManager]
Expand Down