Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Added scheduler plugin functionality #5421

Merged
merged 2 commits into from
Sep 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/docs/plugin.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,16 @@ Please see the [RunSpecValidator](https://github.com/mesosphere/marathon/blob/ma

Marathon and the `RunSpecValidator` use the [Accord](https://github.com/wix/accord) validation library which is useful to understand when creating validator rules.

## Scheduler

#### mesosphere.marathon.plugin.scheduler.SchedulerPlugin

This plugin allows to reject offers. Possible use-cases are:
* Maintenance. Mark agent as going to maintenance and reject new offers from it.
* Analytics. If task fails, for example, 5 times for 5 minutes, we can assume that it will fail again and reject new offers for it.
* Binding to agents. For example, agents can be marked as included into primary or secondary group. Task can be marked with group name.
Plugin can schedule task deployment to primary agents. If all primary agents are busy, task can be scheduled to secondary agents

## Notes on Plugins

* The plugins allow for the extension of behavior in Marathon. They do NOT allow for replacement or removal of existing functionality.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package mesosphere.marathon
package plugin.scheduler

import mesosphere.marathon.plugin.RunSpec
import mesosphere.marathon.plugin.plugin.Plugin
import org.apache.mesos.Protos.Offer

/**
* Allows to use external logic to decline offers.
*/
trait SchedulerPlugin extends Plugin {

/**
* @return true if offer matches
*/
def isMatch(offer: Offer, runSpec: RunSpec): Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import mesosphere.marathon.core.plugin.PluginManager
import mesosphere.marathon.core.pod.PodDefinition
import mesosphere.marathon.core.task.Task
import mesosphere.marathon.core.task.state.NetworkInfo
import mesosphere.marathon.plugin.scheduler.SchedulerPlugin
import mesosphere.marathon.plugin.task.RunSpecTaskProcessor
import mesosphere.marathon.plugin.{ ApplicationSpec, PodSpec }
import mesosphere.marathon.state._
Expand All @@ -39,6 +40,8 @@ class InstanceOpFactoryImpl(
new InstanceOpFactoryHelper(principalOpt, roleOpt)
}

private[this] val schedulerPlugins: Seq[SchedulerPlugin] = pluginManager.plugins[SchedulerPlugin]

private[this] lazy val runSpecTaskProc: RunSpecTaskProcessor = combine(
pluginManager.plugins[RunSpecTaskProcessor].toIndexedSeq)

Expand Down Expand Up @@ -153,7 +156,8 @@ class InstanceOpFactoryImpl(
val resourceMatchResponse =
ResourceMatcher.matchResources(
offer, runSpec, instancesToConsiderForConstraints,
ResourceSelector.reservedWithLabels(rolesToConsider, reservationLabels)
ResourceSelector.reservedWithLabels(rolesToConsider, reservationLabels),
schedulerPlugins
)

resourceMatchResponse match {
Expand All @@ -179,7 +183,8 @@ class InstanceOpFactoryImpl(
}

val resourceMatchResponse =
ResourceMatcher.matchResources(offer, runSpec, instances.valuesIterator.toStream, ResourceSelector.reservable)
ResourceMatcher.matchResources(offer, runSpec, instances.valuesIterator.toStream, ResourceSelector.reservable,
schedulerPlugins)
resourceMatchResponse match {
case matches: ResourceMatchResponse.Match =>
val instanceOp = reserveAndCreateVolumes(request.frameworkId, runSpec, offer, matches.resourceMatch)
Expand Down
7 changes: 5 additions & 2 deletions src/main/scala/mesosphere/mesos/ResourceMatcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mesosphere.mesos
import com.typesafe.scalalogging.StrictLogging
import mesosphere.marathon.core.instance.Instance
import mesosphere.marathon.core.launcher.impl.TaskLabels
import mesosphere.marathon.plugin.scheduler.SchedulerPlugin
import mesosphere.marathon.state.{ DiskSource, DiskType, PersistentVolume, ResourceRole, RunSpec }
import mesosphere.marathon.stream.Implicits._
import mesosphere.marathon.tasks.{ PortsMatch, PortsMatcher, ResourceUtil }
Expand Down Expand Up @@ -129,7 +130,7 @@ object ResourceMatcher extends StrictLogging {
* the reservation.
*/
def matchResources(offer: Offer, runSpec: RunSpec, knownInstances: => Seq[Instance],
selector: ResourceSelector): ResourceMatchResponse = {
selector: ResourceSelector, schedulerPlugins: Seq[SchedulerPlugin] = Seq.empty): ResourceMatchResponse = {

val groupedResources: Map[Role, Seq[Protos.Resource]] = offer.getResourcesList.groupBy(_.getName).map { case (k, v) => k -> v.to[Seq] }

Expand Down Expand Up @@ -194,7 +195,9 @@ object ResourceMatcher extends StrictLogging {
badConstraints.isEmpty
}

val resourceMatchOpt = if (scalarMatchResults.forall(_.matches) && meetsAllConstraints) {
val resourceMatchOpt = if (scalarMatchResults.forall(_.matches)
&& meetsAllConstraints
&& schedulerPlugins.forall(_.isMatch(offer, runSpec))) {
portsMatchOpt match {
case Some(portsMatch) =>
Some(ResourceMatch(scalarMatchResults.collect { case m: ScalarMatch => m }, portsMatch))
Expand Down