From 94efcee727612559ca227af4adce3a458281ee2c Mon Sep 17 00:00:00 2001 From: Johannes Unterstein Date: Fri, 10 Feb 2017 13:38:36 +0100 Subject: [PATCH] Initial stab at making deployment plans cheaper. Summary: - Should make deployment plans cheaper to create by making dependencyOrderedSteps only consider apps that actually changed. - Running into issues with getting JMH to not throw Incompatible Class Change issues. Test Plan: ``` for i in `seq 1 5000`; do http :8080/v2/apps < marathon}/streams/ScalaConversions.scala (81%) create mode 100644 benchmark/src/main/scala/mesosphere/marathon/upgrade/DependencyGraphBenchmark.scala diff --git a/benchmark/src/main/scala/mesosphere/marathon/stream/ScalaConversionsState.scala b/benchmark/src/main/scala/mesosphere/marathon/stream/ScalaConversionsState.scala new file mode 100644 index 00000000000..2e8e958afeb --- /dev/null +++ b/benchmark/src/main/scala/mesosphere/marathon/stream/ScalaConversionsState.scala @@ -0,0 +1,13 @@ +package mesosphere.marathon.stream + +import org.openjdk.jmh.annotations.{ Scope, State } + +import scala.collection.immutable.Seq + +@State(Scope.Benchmark) +object ScalaConversionsState { + val small: Seq[Int] = 0.to(100) + val medium: Seq[Int] = 0.to(1000) + val large: Seq[Int] = 0.to(10000) + val veryLarge: Seq[Int] = 0.to(1000 * 1000) +} diff --git a/benchmark/src/main/scala/mesosphere/marahon/benchmarks/streams/ScalaConversions.scala b/benchmark/src/main/scala/mesosphere/marathon/streams/ScalaConversions.scala similarity index 81% rename from benchmark/src/main/scala/mesosphere/marahon/benchmarks/streams/ScalaConversions.scala rename to benchmark/src/main/scala/mesosphere/marathon/streams/ScalaConversions.scala index 7ebd4daecb9..393d7feaffa 100644 --- a/benchmark/src/main/scala/mesosphere/marahon/benchmarks/streams/ScalaConversions.scala +++ b/benchmark/src/main/scala/mesosphere/marathon/streams/ScalaConversions.scala @@ -1,4 +1,5 @@ -package mesosphere.marahon.benchmarks.streams +package mesosphere.marathon +package stream import java.util import java.util.concurrent.TimeUnit @@ -7,19 +8,8 @@ import mesosphere.marathon.stream.Implicits._ import org.openjdk.jmh.annotations.{ Benchmark, BenchmarkMode, Fork, Mode, OutputTimeUnit, Scope, State } import org.openjdk.jmh.infra.Blackhole -import scala.collection.immutable.Seq import scala.collection.JavaConverters._ -import collection.JavaConverters._ - -@State(Scope.Benchmark) -object ScalaConversionsState { - val small: Seq[Int] = 0.to(100) - val medium: Seq[Int] = 0.to(1000) - val large: Seq[Int] = 0.to(10000) - val veryLarge: Seq[Int] = 0.to(1000 * 1000) -} - @OutputTimeUnit(TimeUnit.MICROSECONDS) @BenchmarkMode(Array(Mode.Throughput, Mode.AverageTime)) @Fork(1) diff --git a/benchmark/src/main/scala/mesosphere/marathon/upgrade/DependencyGraphBenchmark.scala b/benchmark/src/main/scala/mesosphere/marathon/upgrade/DependencyGraphBenchmark.scala new file mode 100644 index 00000000000..5543306c84c --- /dev/null +++ b/benchmark/src/main/scala/mesosphere/marathon/upgrade/DependencyGraphBenchmark.scala @@ -0,0 +1,107 @@ +package mesosphere.marathon +package upgrade + +import java.util.concurrent.TimeUnit + +import mesosphere.marathon.state.AppDefinition.AppKey +import mesosphere.marathon.state.Group.GroupKey +import mesosphere.marathon.state.PathId._ +import mesosphere.marathon.state._ +import org.openjdk.jmh.annotations.{ Group => _, _ } +import org.openjdk.jmh.infra.Blackhole + +import scala.collection.breakOut +import scala.util.Random + +@State(Scope.Benchmark) +object DependencyGraphBenchmark { + val r = new Random(1000) + + val superGroupIds = 0 to 4 // no interdependencies here + val groupIds = 0 to 5 + val appIds = 0 to 10 + val version1 = VersionInfo.forNewConfig(Timestamp(1)) + val version2 = VersionInfo.forNewConfig(Timestamp(2)) + + val superGroups: Map[GroupKey, Group] = superGroupIds.map { superGroupId => + + val paths: Vector[Vector[PathId]] = + groupIds.map { groupId => + appIds.map { appId => + s"/supergroup-${superGroupId}/group-${groupId}/app-${appId}".toPath + }.toVector + }(breakOut) + + val appDefs: Map[AppKey, AppDefinition] = + groupIds.flatMap { groupId => + appIds.map { appId => + val dependencies = for { + depGroupId <- groupIds if depGroupId < groupId + depAppId <- appIds + if r.nextBoolean + } yield paths(depGroupId)(depAppId) + + val path = paths(groupId)(appId) + path -> AppDefinition( + id = path, + dependencies = dependencies.toSet, + labels = Map("ID" -> appId.toString), + versionInfo = version1 + ) + }(breakOut) + }(breakOut) + + val subGroups: Map[GroupKey, Group] = groupIds.map { groupId => + val id = s"supergroup-${superGroupId}/group-${groupId}".toPath + id -> Group( + id = id, + transitiveAppsById = appDefs, + transitivePodsById = Map.empty) + }(breakOut) + + val id = s"/supergroup-${superGroupId}".toPath + id -> Group( + id = id, + groupsById = subGroups, + transitiveAppsById = subGroups.flatMap(_._2.transitiveAppsById)(breakOut), + transitivePodsById = Map.empty) + }(breakOut) + + val rootGroup = RootGroup( + groupsById = superGroups) + + val upgraded = RootGroup( + groupsById = superGroups.map { + case (superGroupId, superGroup) => + if (superGroupId == "/supergroup-0".toPath) { + superGroupId -> Group( + id = superGroupId, + groupsById = superGroup.groupsById.map { + case (id, subGroup) => + id -> Group( + id = id, + transitiveAppsById = subGroup.transitiveAppsById.mapValues(_.copy(versionInfo = version2)), + transitivePodsById = Map.empty) + }, + transitiveAppsById = superGroup.groupsById.flatMap { case (_, group) => group.transitiveAppsById.mapValues(_.copy(versionInfo = version2)) }, + transitivePodsById = Map.empty + ) + } else { + superGroupId -> superGroup + } + }(breakOut) + ) +} + +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@BenchmarkMode(Array(Mode.Throughput, Mode.AverageTime)) +@Fork(1) +class DependencyGraphBenchmark { + import DependencyGraphBenchmark._ + + @Benchmark + def deploymentPlanDependencySpeed(hole: Blackhole): Unit = { + val deployment = DeploymentPlan(rootGroup, upgraded) + hole.consume(deployment) + } +} diff --git a/build.sbt b/build.sbt index 97f30d98ce9..463128918db 100644 --- a/build.sbt +++ b/build.sbt @@ -279,6 +279,6 @@ lazy val benchmark = (project in file("benchmark")) .dependsOn(marathon % "compile->compile; test->test") .settings( testOptions in Test += Tests.Argument(TestFrameworks.JUnit), - libraryDependencies ++= Dependencies.benchmark + libraryDependencies ++= Dependencies.benchmark, + generatorType in Jmh := "asm" ) - diff --git a/src/main/scala/mesosphere/marathon/upgrade/DeploymentPlan.scala b/src/main/scala/mesosphere/marathon/upgrade/DeploymentPlan.scala index fd9b06ecf20..818969fa09c 100644 --- a/src/main/scala/mesosphere/marathon/upgrade/DeploymentPlan.scala +++ b/src/main/scala/mesosphere/marathon/upgrade/DeploymentPlan.scala @@ -179,10 +179,11 @@ object DeploymentPlan { DeploymentPlan(UUID.randomUUID().toString, RootGroup.empty, RootGroup.empty, Nil, Timestamp.now()) /** - * Perform a "layered" topological sort of all of the run specs. + * Perform a "layered" topological sort of all of the run specs that are going to be deployed. * The "layered" aspect groups the run specs that have the same length of dependencies for parallel deployment. */ private[upgrade] def runSpecsGroupedByLongestPath( + affectedRunSpecIds: Set[PathId], rootGroup: RootGroup): SortedMap[Int, Set[RunSpec]] = { import org.jgrapht.DirectedGraph @@ -203,7 +204,7 @@ object DeploymentPlan { } - val unsortedEquivalenceClasses = rootGroup.transitiveRunSpecs.groupBy { runSpec => + val unsortedEquivalenceClasses = rootGroup.transitiveRunSpecs.filter(spec => affectedRunSpecIds.contains(spec.id)).groupBy { runSpec => longestPathFromVertex(rootGroup.dependencyGraph, runSpec).length } @@ -214,11 +215,11 @@ object DeploymentPlan { * Returns a sequence of deployment steps, the order of which is derived * from the topology of the target group's dependency graph. */ - def dependencyOrderedSteps(original: RootGroup, target: RootGroup, + def dependencyOrderedSteps(original: RootGroup, target: RootGroup, affectedIds: Set[PathId], toKill: Map[PathId, Seq[Instance]]): Seq[DeploymentStep] = { val originalRunSpecs: Map[PathId, RunSpec] = original.transitiveRunSpecsById - val runsByLongestPath: SortedMap[Int, Set[RunSpec]] = runSpecsGroupedByLongestPath(target) + val runsByLongestPath: SortedMap[Int, Set[RunSpec]] = runSpecsGroupedByLongestPath(affectedIds, target) runsByLongestPath.values.map { (equivalenceClass: Set[RunSpec]) => val actions: Set[DeploymentAction] = equivalenceClass.flatMap { (newSpec: RunSpec) => @@ -288,6 +289,21 @@ object DeploymentPlan { }(collection.breakOut) ) + // applications that are either new or the specs are different should be considered for the dependency graph + val addedOrChanged: Set[PathId] = targetRuns.flatMap { + case (runSpecId, spec) => + if (!originalRuns.containsKey(runSpecId) || + (originalRuns.containsKey(runSpecId) && originalRuns(runSpecId) != spec)) { + // the above could be optimized/refined further by checking the version info. The tests are actually + // really bad about structuring this correctly though, so for now, we just make sure that + // the specs are different (or brand new) + Some(runSpecId) + } else { + None + } + }(collection.breakOut) + val affectedApplications = addedOrChanged ++ (originalRuns.keySet -- targetRuns.keySet) + // 3. For each runSpec in each dependency class, // // A. If this runSpec is new, scale to the target number of instances. @@ -300,7 +316,7 @@ object DeploymentPlan { // the old runSpec or the new runSpec, whichever is less. // ii. Restart the runSpec, up to the new target number of instances. // - steps ++= dependencyOrderedSteps(original, target, toKill) + steps ++= dependencyOrderedSteps(original, target, affectedApplications, toKill) // Build the result. val result = DeploymentPlan( diff --git a/src/test/scala/mesosphere/marathon/upgrade/DeploymentPlanTest.scala b/src/test/scala/mesosphere/marathon/upgrade/DeploymentPlanTest.scala index 437133867f8..4309819805a 100644 --- a/src/test/scala/mesosphere/marathon/upgrade/DeploymentPlanTest.scala +++ b/src/test/scala/mesosphere/marathon/upgrade/DeploymentPlanTest.scala @@ -40,7 +40,7 @@ class DeploymentPlanTest extends UnitTest with GroupCreation { ))) When("the group's apps are grouped by the longest outbound path") - val partitionedApps = DeploymentPlan.runSpecsGroupedByLongestPath(rootGroup) + val partitionedApps = DeploymentPlan.runSpecsGroupedByLongestPath(Set(a.id, b.id, c.id, d.id), rootGroup) Then("three equivalence classes should be computed") partitionedApps should have size 3 @@ -72,7 +72,7 @@ class DeploymentPlanTest extends UnitTest with GroupCreation { ) When("the group's apps are grouped by the longest outbound path") - val partitionedApps = DeploymentPlan.runSpecsGroupedByLongestPath(rootGroup) + val partitionedApps = DeploymentPlan.runSpecsGroupedByLongestPath(rootGroup.transitiveAppsById.keySet, rootGroup) Then("three equivalence classes should be computed") partitionedApps should have size 4