Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Added scheduler plugin functionality (#5421)
Browse files Browse the repository at this point in the history
  • Loading branch information
tsydd authored and unterstein committed Sep 15, 2017
1 parent df8ef75 commit 3e61a17
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 4 deletions.
10 changes: 10 additions & 0 deletions docs/docs/plugin.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,16 @@ Please see the [RunSpecValidator](https://github.com/mesosphere/marathon/blob/ma

Marathon and the `RunSpecValidator` use the [Accord](https://github.com/wix/accord) validation library which is useful to understand when creating validator rules.

## Scheduler

#### mesosphere.marathon.plugin.scheduler.SchedulerPlugin

This plugin allows to reject offers. Possible use-cases are:
* Maintenance. Mark agent as going to maintenance and reject new offers from it.
* Analytics. If task fails, for example, 5 times for 5 minutes, we can assume that it will fail again and reject new offers for it.
* Binding to agents. For example, agents can be marked as included into primary or secondary group. Task can be marked with group name.
Plugin can schedule task deployment to primary agents. If all primary agents are busy, task can be scheduled to secondary agents

## Notes on Plugins

* The plugins allow for the extension of behavior in Marathon. They do NOT allow for replacement or removal of existing functionality.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package mesosphere.marathon
package plugin.scheduler

import mesosphere.marathon.plugin.RunSpec
import mesosphere.marathon.plugin.plugin.Plugin
import org.apache.mesos.Protos.Offer

/**
* Allows to use external logic to decline offers.
*/
trait SchedulerPlugin extends Plugin {

/**
* @return true if offer matches
*/
def isMatch(offer: Offer, runSpec: RunSpec): Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import mesosphere.marathon.core.plugin.PluginManager
import mesosphere.marathon.core.pod.PodDefinition
import mesosphere.marathon.core.task.Task
import mesosphere.marathon.core.task.state.NetworkInfo
import mesosphere.marathon.plugin.scheduler.SchedulerPlugin
import mesosphere.marathon.plugin.task.RunSpecTaskProcessor
import mesosphere.marathon.plugin.{ ApplicationSpec, PodSpec }
import mesosphere.marathon.state._
Expand All @@ -39,6 +40,8 @@ class InstanceOpFactoryImpl(
new InstanceOpFactoryHelper(principalOpt, roleOpt)
}

private[this] val schedulerPlugins: Seq[SchedulerPlugin] = pluginManager.plugins[SchedulerPlugin]

private[this] lazy val runSpecTaskProc: RunSpecTaskProcessor = combine(
pluginManager.plugins[RunSpecTaskProcessor].toIndexedSeq)

Expand Down Expand Up @@ -153,7 +156,8 @@ class InstanceOpFactoryImpl(
val resourceMatchResponse =
ResourceMatcher.matchResources(
offer, runSpec, instancesToConsiderForConstraints,
ResourceSelector.reservedWithLabels(rolesToConsider, reservationLabels)
ResourceSelector.reservedWithLabels(rolesToConsider, reservationLabels),
schedulerPlugins
)

resourceMatchResponse match {
Expand All @@ -179,7 +183,8 @@ class InstanceOpFactoryImpl(
}

val resourceMatchResponse =
ResourceMatcher.matchResources(offer, runSpec, instances.valuesIterator.toStream, ResourceSelector.reservable)
ResourceMatcher.matchResources(offer, runSpec, instances.valuesIterator.toStream, ResourceSelector.reservable,
schedulerPlugins)
resourceMatchResponse match {
case matches: ResourceMatchResponse.Match =>
val instanceOp = reserveAndCreateVolumes(request.frameworkId, runSpec, offer, matches.resourceMatch)
Expand Down
7 changes: 5 additions & 2 deletions src/main/scala/mesosphere/mesos/ResourceMatcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mesosphere.mesos
import com.typesafe.scalalogging.StrictLogging
import mesosphere.marathon.core.instance.Instance
import mesosphere.marathon.core.launcher.impl.TaskLabels
import mesosphere.marathon.plugin.scheduler.SchedulerPlugin
import mesosphere.marathon.state.{ DiskSource, DiskType, PersistentVolume, ResourceRole, RunSpec }
import mesosphere.marathon.stream.Implicits._
import mesosphere.marathon.tasks.{ PortsMatch, PortsMatcher, ResourceUtil }
Expand Down Expand Up @@ -129,7 +130,7 @@ object ResourceMatcher extends StrictLogging {
* the reservation.
*/
def matchResources(offer: Offer, runSpec: RunSpec, knownInstances: => Seq[Instance],
selector: ResourceSelector): ResourceMatchResponse = {
selector: ResourceSelector, schedulerPlugins: Seq[SchedulerPlugin] = Seq.empty): ResourceMatchResponse = {

val groupedResources: Map[Role, Seq[Protos.Resource]] = offer.getResourcesList.groupBy(_.getName).map { case (k, v) => k -> v.to[Seq] }

Expand Down Expand Up @@ -194,7 +195,9 @@ object ResourceMatcher extends StrictLogging {
badConstraints.isEmpty
}

val resourceMatchOpt = if (scalarMatchResults.forall(_.matches) && meetsAllConstraints) {
val resourceMatchOpt = if (scalarMatchResults.forall(_.matches)
&& meetsAllConstraints
&& schedulerPlugins.forall(_.isMatch(offer, runSpec))) {
portsMatchOpt match {
case Some(portsMatch) =>
Some(ResourceMatch(scalarMatchResults.collect { case m: ScalarMatch => m }, portsMatch))
Expand Down

0 comments on commit 3e61a17

Please sign in to comment.