From d6bce1b7999321bf59ada2e8884e0fe048010010 Mon Sep 17 00:00:00 2001 From: Dmitry Tsydzik Date: Mon, 4 Sep 2017 17:05:18 +0300 Subject: [PATCH 1/2] Added scheduler plugin functionality --- docs/docs/plugin.md | 10 ++++++++++ .../plugin/scheduler/SchedulerPlugin.scala | 19 +++++++++++++++++++ .../launcher/impl/InstanceOpFactoryImpl.scala | 9 +++++++-- .../mesosphere/mesos/ResourceMatcher.scala | 7 +++++-- 4 files changed, 41 insertions(+), 4 deletions(-) create mode 100644 plugin-interface/src/main/scala/mesosphere/marathon/plugin/scheduler/SchedulerPlugin.scala diff --git a/docs/docs/plugin.md b/docs/docs/plugin.md index a5fbfa47430..57b6858e39c 100644 --- a/docs/docs/plugin.md +++ b/docs/docs/plugin.md @@ -190,6 +190,16 @@ Please see the [RunSpecValidator](https://github.com/mesosphere/marathon/blob/ma Marathon and the `RunSpecValidator` use the [Accord](https://github.com/wix/accord) validation library which is useful to understand when creating validator rules. +## Scheduler + +#### mesosphere.marathon.plugin.scheduler.SchedulerPlugin + +This plugin allows to reject offers. Possible use-cases are: +* Maintenance. Mark agent as going to maintenance and reject new offers from it. +* Analytics. If task fails, for example, 5 times for 5 minutes, we can assume that it will fail again and reject new offers for it. +* Binding to agents. For example, agents can be marked as included into primary or secondary group. Task can be marked with group name. + Plugin can schedule task deployment to primary agents. If all primary agents are busy, task can be scheduled to secondary agents + ## Notes on Plugins * The plugins allow for the extension of behavior in Marathon. They do NOT allow for replacement or removal of existing functionality. diff --git a/plugin-interface/src/main/scala/mesosphere/marathon/plugin/scheduler/SchedulerPlugin.scala b/plugin-interface/src/main/scala/mesosphere/marathon/plugin/scheduler/SchedulerPlugin.scala new file mode 100644 index 00000000000..1f1da8292d7 --- /dev/null +++ b/plugin-interface/src/main/scala/mesosphere/marathon/plugin/scheduler/SchedulerPlugin.scala @@ -0,0 +1,19 @@ +package mesosphere.marathon +package plugin.scheduler + +import mesosphere.marathon.plugin.RunSpec +import mesosphere.marathon.plugin.plugin.Plugin +import org.apache.mesos.Protos.Offer + +/** + * Allows to use external logic to reject offers + * + * @author Dmitry Tsydzik. + * @since 6/15/17. + */ +trait SchedulerPlugin extends Plugin { + /** + * @return true if offer is accepted + */ + def isMatch(offer: Offer, runSpec: RunSpec): Boolean +} diff --git a/src/main/scala/mesosphere/marathon/core/launcher/impl/InstanceOpFactoryImpl.scala b/src/main/scala/mesosphere/marathon/core/launcher/impl/InstanceOpFactoryImpl.scala index c8b34d4b43e..e92120bf6cc 100644 --- a/src/main/scala/mesosphere/marathon/core/launcher/impl/InstanceOpFactoryImpl.scala +++ b/src/main/scala/mesosphere/marathon/core/launcher/impl/InstanceOpFactoryImpl.scala @@ -13,6 +13,7 @@ import mesosphere.marathon.core.plugin.PluginManager import mesosphere.marathon.core.pod.PodDefinition import mesosphere.marathon.core.task.Task import mesosphere.marathon.core.task.state.NetworkInfo +import mesosphere.marathon.plugin.scheduler.SchedulerPlugin import mesosphere.marathon.plugin.task.RunSpecTaskProcessor import mesosphere.marathon.plugin.{ ApplicationSpec, PodSpec } import mesosphere.marathon.state._ @@ -39,6 +40,8 @@ class InstanceOpFactoryImpl( new InstanceOpFactoryHelper(principalOpt, roleOpt) } + private[this] val schedulerPlugins: Seq[SchedulerPlugin] = pluginManager.plugins[SchedulerPlugin] + private[this] lazy val runSpecTaskProc: RunSpecTaskProcessor = combine( pluginManager.plugins[RunSpecTaskProcessor].toIndexedSeq) @@ -153,7 +156,8 @@ class InstanceOpFactoryImpl( val resourceMatchResponse = ResourceMatcher.matchResources( offer, runSpec, instancesToConsiderForConstraints, - ResourceSelector.reservedWithLabels(rolesToConsider, reservationLabels) + ResourceSelector.reservedWithLabels(rolesToConsider, reservationLabels), + schedulerPlugins ) resourceMatchResponse match { @@ -179,7 +183,8 @@ class InstanceOpFactoryImpl( } val resourceMatchResponse = - ResourceMatcher.matchResources(offer, runSpec, instances.valuesIterator.toStream, ResourceSelector.reservable) + ResourceMatcher.matchResources(offer, runSpec, instances.valuesIterator.toStream, ResourceSelector.reservable, + schedulerPlugins) resourceMatchResponse match { case matches: ResourceMatchResponse.Match => val instanceOp = reserveAndCreateVolumes(request.frameworkId, runSpec, offer, matches.resourceMatch) diff --git a/src/main/scala/mesosphere/mesos/ResourceMatcher.scala b/src/main/scala/mesosphere/mesos/ResourceMatcher.scala index b72df2a005e..22465cda801 100644 --- a/src/main/scala/mesosphere/mesos/ResourceMatcher.scala +++ b/src/main/scala/mesosphere/mesos/ResourceMatcher.scala @@ -3,6 +3,7 @@ package mesosphere.mesos import com.typesafe.scalalogging.StrictLogging import mesosphere.marathon.core.instance.Instance import mesosphere.marathon.core.launcher.impl.TaskLabels +import mesosphere.marathon.plugin.scheduler.SchedulerPlugin import mesosphere.marathon.state.{ DiskSource, DiskType, PersistentVolume, ResourceRole, RunSpec } import mesosphere.marathon.stream.Implicits._ import mesosphere.marathon.tasks.{ PortsMatch, PortsMatcher, ResourceUtil } @@ -129,7 +130,7 @@ object ResourceMatcher extends StrictLogging { * the reservation. */ def matchResources(offer: Offer, runSpec: RunSpec, knownInstances: => Seq[Instance], - selector: ResourceSelector): ResourceMatchResponse = { + selector: ResourceSelector, schedulerPlugins: Seq[SchedulerPlugin] = Seq.empty): ResourceMatchResponse = { val groupedResources: Map[Role, Seq[Protos.Resource]] = offer.getResourcesList.groupBy(_.getName).map { case (k, v) => k -> v.to[Seq] } @@ -194,7 +195,9 @@ object ResourceMatcher extends StrictLogging { badConstraints.isEmpty } - val resourceMatchOpt = if (scalarMatchResults.forall(_.matches) && meetsAllConstraints) { + val resourceMatchOpt = if (scalarMatchResults.forall(_.matches) + && meetsAllConstraints + && schedulerPlugins.forall(_.isMatch(offer, runSpec))) { portsMatchOpt match { case Some(portsMatch) => Some(ResourceMatch(scalarMatchResults.collect { case m: ScalarMatch => m }, portsMatch)) From 59421c7eed0ae518cbc2193edab8d08d7bf62e90 Mon Sep 17 00:00:00 2001 From: Johannes Unterstein Date: Wed, 13 Sep 2017 14:06:12 +0200 Subject: [PATCH 2/2] Updated docs --- .../marathon/plugin/scheduler/SchedulerPlugin.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/plugin-interface/src/main/scala/mesosphere/marathon/plugin/scheduler/SchedulerPlugin.scala b/plugin-interface/src/main/scala/mesosphere/marathon/plugin/scheduler/SchedulerPlugin.scala index 1f1da8292d7..397335c174e 100644 --- a/plugin-interface/src/main/scala/mesosphere/marathon/plugin/scheduler/SchedulerPlugin.scala +++ b/plugin-interface/src/main/scala/mesosphere/marathon/plugin/scheduler/SchedulerPlugin.scala @@ -6,14 +6,12 @@ import mesosphere.marathon.plugin.plugin.Plugin import org.apache.mesos.Protos.Offer /** - * Allows to use external logic to reject offers - * - * @author Dmitry Tsydzik. - * @since 6/15/17. + * Allows to use external logic to decline offers. */ trait SchedulerPlugin extends Plugin { + /** - * @return true if offer is accepted + * @return true if offer matches */ def isMatch(offer: Offer, runSpec: RunSpec): Boolean }