Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Initial stab at making deployment plans cheaper.
Browse files Browse the repository at this point in the history
Summary:
- Should make deployment plans cheaper to create
  by making dependencyOrderedSteps only consider
  apps that actually changed.
- Running into issues with getting JMH to not throw
  Incompatible Class Change issues.

Test Plan:
```
for i in `seq 1 5000`; do
http :8080/v2/apps <<EOF
{ "id": "/$i", "cmd": "cat", "dependencies": ["/$((i - 1))"] }
EOF
done
```

Profiled that. shared in #marathon-dev.

Reviewers: unterstein, zen-dog, aquamatthias, jasongilanfarr, jenkins

Reviewed By: unterstein, jasongilanfarr, jenkins

Subscribers: jeschkies, marathon-team

Differential Revision: https://phabricator.mesosphere.com/D476
  • Loading branch information
unterstein authored and jeschkies committed Feb 10, 2017
1 parent fdecd32 commit 94efcee
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package mesosphere.marathon.stream

import org.openjdk.jmh.annotations.{ Scope, State }

import scala.collection.immutable.Seq

@State(Scope.Benchmark)
object ScalaConversionsState {
val small: Seq[Int] = 0.to(100)
val medium: Seq[Int] = 0.to(1000)
val large: Seq[Int] = 0.to(10000)
val veryLarge: Seq[Int] = 0.to(1000 * 1000)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package mesosphere.marahon.benchmarks.streams
package mesosphere.marathon
package stream

import java.util
import java.util.concurrent.TimeUnit
Expand All @@ -7,19 +8,8 @@ import mesosphere.marathon.stream.Implicits._
import org.openjdk.jmh.annotations.{ Benchmark, BenchmarkMode, Fork, Mode, OutputTimeUnit, Scope, State }
import org.openjdk.jmh.infra.Blackhole

import scala.collection.immutable.Seq
import scala.collection.JavaConverters._

import collection.JavaConverters._

@State(Scope.Benchmark)
object ScalaConversionsState {
val small: Seq[Int] = 0.to(100)
val medium: Seq[Int] = 0.to(1000)
val large: Seq[Int] = 0.to(10000)
val veryLarge: Seq[Int] = 0.to(1000 * 1000)
}

@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Array(Mode.Throughput, Mode.AverageTime))
@Fork(1)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package mesosphere.marathon
package upgrade

import java.util.concurrent.TimeUnit

import mesosphere.marathon.state.AppDefinition.AppKey
import mesosphere.marathon.state.Group.GroupKey
import mesosphere.marathon.state.PathId._
import mesosphere.marathon.state._
import org.openjdk.jmh.annotations.{ Group => _, _ }
import org.openjdk.jmh.infra.Blackhole

import scala.collection.breakOut
import scala.util.Random

@State(Scope.Benchmark)
object DependencyGraphBenchmark {
val r = new Random(1000)

val superGroupIds = 0 to 4 // no interdependencies here
val groupIds = 0 to 5
val appIds = 0 to 10
val version1 = VersionInfo.forNewConfig(Timestamp(1))
val version2 = VersionInfo.forNewConfig(Timestamp(2))

val superGroups: Map[GroupKey, Group] = superGroupIds.map { superGroupId =>

val paths: Vector[Vector[PathId]] =
groupIds.map { groupId =>
appIds.map { appId =>
s"/supergroup-${superGroupId}/group-${groupId}/app-${appId}".toPath
}.toVector
}(breakOut)

val appDefs: Map[AppKey, AppDefinition] =
groupIds.flatMap { groupId =>
appIds.map { appId =>
val dependencies = for {
depGroupId <- groupIds if depGroupId < groupId
depAppId <- appIds
if r.nextBoolean
} yield paths(depGroupId)(depAppId)

val path = paths(groupId)(appId)
path -> AppDefinition(
id = path,
dependencies = dependencies.toSet,
labels = Map("ID" -> appId.toString),
versionInfo = version1
)
}(breakOut)
}(breakOut)

val subGroups: Map[GroupKey, Group] = groupIds.map { groupId =>
val id = s"supergroup-${superGroupId}/group-${groupId}".toPath
id -> Group(
id = id,
transitiveAppsById = appDefs,
transitivePodsById = Map.empty)
}(breakOut)

val id = s"/supergroup-${superGroupId}".toPath
id -> Group(
id = id,
groupsById = subGroups,
transitiveAppsById = subGroups.flatMap(_._2.transitiveAppsById)(breakOut),
transitivePodsById = Map.empty)
}(breakOut)

val rootGroup = RootGroup(
groupsById = superGroups)

val upgraded = RootGroup(
groupsById = superGroups.map {
case (superGroupId, superGroup) =>
if (superGroupId == "/supergroup-0".toPath) {
superGroupId -> Group(
id = superGroupId,
groupsById = superGroup.groupsById.map {
case (id, subGroup) =>
id -> Group(
id = id,
transitiveAppsById = subGroup.transitiveAppsById.mapValues(_.copy(versionInfo = version2)),
transitivePodsById = Map.empty)
},
transitiveAppsById = superGroup.groupsById.flatMap { case (_, group) => group.transitiveAppsById.mapValues(_.copy(versionInfo = version2)) },
transitivePodsById = Map.empty
)
} else {
superGroupId -> superGroup
}
}(breakOut)
)
}

@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Array(Mode.Throughput, Mode.AverageTime))
@Fork(1)
class DependencyGraphBenchmark {
import DependencyGraphBenchmark._

@Benchmark
def deploymentPlanDependencySpeed(hole: Blackhole): Unit = {
val deployment = DeploymentPlan(rootGroup, upgraded)
hole.consume(deployment)
}
}
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,6 @@ lazy val benchmark = (project in file("benchmark"))
.dependsOn(marathon % "compile->compile; test->test")
.settings(
testOptions in Test += Tests.Argument(TestFrameworks.JUnit),
libraryDependencies ++= Dependencies.benchmark
libraryDependencies ++= Dependencies.benchmark,
generatorType in Jmh := "asm"
)

26 changes: 21 additions & 5 deletions src/main/scala/mesosphere/marathon/upgrade/DeploymentPlan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,11 @@ object DeploymentPlan {
DeploymentPlan(UUID.randomUUID().toString, RootGroup.empty, RootGroup.empty, Nil, Timestamp.now())

/**
* Perform a "layered" topological sort of all of the run specs.
* Perform a "layered" topological sort of all of the run specs that are going to be deployed.
* The "layered" aspect groups the run specs that have the same length of dependencies for parallel deployment.
*/
private[upgrade] def runSpecsGroupedByLongestPath(
affectedRunSpecIds: Set[PathId],
rootGroup: RootGroup): SortedMap[Int, Set[RunSpec]] = {

import org.jgrapht.DirectedGraph
Expand All @@ -203,7 +204,7 @@ object DeploymentPlan {

}

val unsortedEquivalenceClasses = rootGroup.transitiveRunSpecs.groupBy { runSpec =>
val unsortedEquivalenceClasses = rootGroup.transitiveRunSpecs.filter(spec => affectedRunSpecIds.contains(spec.id)).groupBy { runSpec =>
longestPathFromVertex(rootGroup.dependencyGraph, runSpec).length
}

Expand All @@ -214,11 +215,11 @@ object DeploymentPlan {
* Returns a sequence of deployment steps, the order of which is derived
* from the topology of the target group's dependency graph.
*/
def dependencyOrderedSteps(original: RootGroup, target: RootGroup,
def dependencyOrderedSteps(original: RootGroup, target: RootGroup, affectedIds: Set[PathId],
toKill: Map[PathId, Seq[Instance]]): Seq[DeploymentStep] = {
val originalRunSpecs: Map[PathId, RunSpec] = original.transitiveRunSpecsById

val runsByLongestPath: SortedMap[Int, Set[RunSpec]] = runSpecsGroupedByLongestPath(target)
val runsByLongestPath: SortedMap[Int, Set[RunSpec]] = runSpecsGroupedByLongestPath(affectedIds, target)

runsByLongestPath.values.map { (equivalenceClass: Set[RunSpec]) =>
val actions: Set[DeploymentAction] = equivalenceClass.flatMap { (newSpec: RunSpec) =>
Expand Down Expand Up @@ -288,6 +289,21 @@ object DeploymentPlan {
}(collection.breakOut)
)

// applications that are either new or the specs are different should be considered for the dependency graph
val addedOrChanged: Set[PathId] = targetRuns.flatMap {
case (runSpecId, spec) =>
if (!originalRuns.containsKey(runSpecId) ||
(originalRuns.containsKey(runSpecId) && originalRuns(runSpecId) != spec)) {
// the above could be optimized/refined further by checking the version info. The tests are actually
// really bad about structuring this correctly though, so for now, we just make sure that
// the specs are different (or brand new)
Some(runSpecId)
} else {
None
}
}(collection.breakOut)
val affectedApplications = addedOrChanged ++ (originalRuns.keySet -- targetRuns.keySet)

// 3. For each runSpec in each dependency class,
//
// A. If this runSpec is new, scale to the target number of instances.
Expand All @@ -300,7 +316,7 @@ object DeploymentPlan {
// the old runSpec or the new runSpec, whichever is less.
// ii. Restart the runSpec, up to the new target number of instances.
//
steps ++= dependencyOrderedSteps(original, target, toKill)
steps ++= dependencyOrderedSteps(original, target, affectedApplications, toKill)

// Build the result.
val result = DeploymentPlan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class DeploymentPlanTest extends UnitTest with GroupCreation {
)))

When("the group's apps are grouped by the longest outbound path")
val partitionedApps = DeploymentPlan.runSpecsGroupedByLongestPath(rootGroup)
val partitionedApps = DeploymentPlan.runSpecsGroupedByLongestPath(Set(a.id, b.id, c.id, d.id), rootGroup)

Then("three equivalence classes should be computed")
partitionedApps should have size 3
Expand Down Expand Up @@ -72,7 +72,7 @@ class DeploymentPlanTest extends UnitTest with GroupCreation {
)

When("the group's apps are grouped by the longest outbound path")
val partitionedApps = DeploymentPlan.runSpecsGroupedByLongestPath(rootGroup)
val partitionedApps = DeploymentPlan.runSpecsGroupedByLongestPath(rootGroup.transitiveAppsById.keySet, rootGroup)

Then("three equivalence classes should be computed")
partitionedApps should have size 4
Expand Down

0 comments on commit 94efcee

Please sign in to comment.