From cdd7264f97457a9150df4a46591f2a5b691dfc0f Mon Sep 17 00:00:00 2001 From: Karthick Duraisamy Soundararaj Date: Wed, 23 Mar 2016 16:45:26 -0700 Subject: [PATCH] Introducing new scheduler framework & more unit tests --- storm/pom.xml | 7 + storm/src/main/storm/mesos/LaunchTask.java | 4 +- storm/src/main/storm/mesos/MesosNimbus.java | 145 +----- .../src/main/storm/mesos/MesosSupervisor.java | 2 + .../src/main/storm/mesos/NimbusScheduler.java | 6 +- .../mesos/schedulers/DefaultScheduler.java | 251 ++++++++++ .../schedulers/IMesosStormScheduler.java | 46 ++ .../MesosWorkerSlot.java} | 21 +- .../mesos/schedulers/OfferResources.java | 153 +++++++ .../mesos/schedulers/SchedulerUtils.java | 127 ++++++ .../storm/mesos/{ => util}/MesosCommon.java | 6 +- .../mesos/{ => util}/PrettyProtobuf.java | 15 +- .../storm/mesos/{ => util}/RotatingMap.java | 2 +- .../src/test/storm/mesos/MesosNimbusTest.java | 365 +++------------ .../storm/mesos/OfferRoleComparatorTest.java | 21 +- storm/src/test/storm/mesos/TestUtils.java | 256 +++++++++++ .../schedulers/DefaultSchedulerTest.java | 429 ++++++++++++++++++ .../mesos/schedulers/OfferResourcesTest.java | 62 +++ .../mesos/schedulers/SchedulerUtilsTest.java | 96 ++++ 19 files changed, 1550 insertions(+), 464 deletions(-) create mode 100644 storm/src/main/storm/mesos/schedulers/DefaultScheduler.java create mode 100644 storm/src/main/storm/mesos/schedulers/IMesosStormScheduler.java rename storm/src/main/storm/mesos/{OfferResources.java => schedulers/MesosWorkerSlot.java} (69%) create mode 100644 storm/src/main/storm/mesos/schedulers/OfferResources.java create mode 100644 storm/src/main/storm/mesos/schedulers/SchedulerUtils.java rename storm/src/main/storm/mesos/{ => util}/MesosCommon.java (99%) rename storm/src/main/storm/mesos/{ => util}/PrettyProtobuf.java (97%) rename storm/src/main/storm/mesos/{ => util}/RotatingMap.java (99%) create mode 100644 storm/src/test/storm/mesos/TestUtils.java create mode 100644 storm/src/test/storm/mesos/schedulers/DefaultSchedulerTest.java create mode 100644 storm/src/test/storm/mesos/schedulers/OfferResourcesTest.java create mode 100644 storm/src/test/storm/mesos/schedulers/SchedulerUtilsTest.java diff --git a/storm/pom.xml b/storm/pom.xml index 9b955a59d..b819209bb 100644 --- a/storm/pom.xml +++ b/storm/pom.xml @@ -188,6 +188,12 @@ mockito-all test + + org.mockito + mockito-all + 1.9.5 + test + org.apache.mesos ${shim} @@ -198,5 +204,6 @@ storm-shim ${project.parent.version} + diff --git a/storm/src/main/storm/mesos/LaunchTask.java b/storm/src/main/storm/mesos/LaunchTask.java index 14a129ed8..eac985dc1 100644 --- a/storm/src/main/storm/mesos/LaunchTask.java +++ b/storm/src/main/storm/mesos/LaunchTask.java @@ -19,8 +19,8 @@ import org.apache.mesos.Protos; -import static storm.mesos.PrettyProtobuf.offerToString; -import static storm.mesos.PrettyProtobuf.taskInfoToString; +import static storm.mesos.util.PrettyProtobuf.offerToString; +import static storm.mesos.util.PrettyProtobuf.taskInfoToString; class LaunchTask { private final Protos.TaskInfo task; diff --git a/storm/src/main/storm/mesos/MesosNimbus.java b/storm/src/main/storm/mesos/MesosNimbus.java index c5499474f..47d0680dd 100644 --- a/storm/src/main/storm/mesos/MesosNimbus.java +++ b/storm/src/main/storm/mesos/MesosNimbus.java @@ -51,9 +51,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.yaml.snakeyaml.Yaml; +import storm.mesos.schedulers.DefaultScheduler; +import storm.mesos.schedulers.IMesosStormScheduler; import storm.mesos.shims.CommandLineShimFactory; import storm.mesos.shims.ICommandLineShim; import storm.mesos.shims.LocalStateShim; +import storm.mesos.util.MesosCommon; +import storm.mesos.util.RotatingMap; import java.io.File; import java.io.FileInputStream; @@ -82,8 +86,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import static storm.mesos.PrettyProtobuf.offerMapToString; -import static storm.mesos.PrettyProtobuf.offerToString; +import static storm.mesos.util.PrettyProtobuf.offerMapToString; public class MesosNimbus implements INimbus { public static final String CONF_EXECUTOR_URI = "mesos.executor.uri"; @@ -120,6 +123,8 @@ public class MesosNimbus implements INimbus { private Map _usedOffers; private ScheduledExecutorService timerScheduler = Executors.newScheduledThreadPool(1); + private IMesosStormScheduler _mesosStormScheduler = null; + private boolean _preferReservedResources = true; private Optional _container = Optional.absent(); private Path _generatedConfPath; @@ -132,6 +137,10 @@ private static Set listIntoSet(List l) { } } + public MesosNimbus() { + this._mesosStormScheduler = new DefaultScheduler(); + } + public static void main(String[] args) { backtype.storm.daemon.nimbus.launch(new MesosNimbus()); } @@ -318,56 +327,14 @@ protected MesosSchedulerDriver createMesosDriver() throws IOException { FrameworkInfo.Builder finfo = createFrameworkBuilder(); if ((credential = getCredential(finfo)) != null) { - driver = new MesosSchedulerDriver(_scheduler, - finfo.build(), - (String) _conf.get(CONF_MASTER_URL), - credential); + driver = new MesosSchedulerDriver(_scheduler, finfo.build(), (String) _conf.get(CONF_MASTER_URL), credential); } else { - driver = new MesosSchedulerDriver(_scheduler, - finfo.build(), - (String) _conf.get(CONF_MASTER_URL)); + driver = new MesosSchedulerDriver(_scheduler, finfo.build(), (String) _conf.get(CONF_MASTER_URL)); } return driver; } - protected OfferResources getResources(Offer offer, double executorCpu, double executorMem, double cpu, double mem) { - OfferResources resources = new OfferResources(); - - double offerCpu = 0; - double offerMem = 0; - - for (Resource r : offer.getResourcesList()) { - if (r.hasReservation()) { - // skip resources with dynamic reservations - continue; - } - if (r.getType() == Type.SCALAR) { - if (r.getName().equals("cpus")) { - offerCpu += r.getScalar().getValue(); - } else if (r.getName().equals("mem")) { - offerMem += r.getScalar().getValue(); - } - } - } - - if (offerCpu >= executorCpu + cpu && - offerMem >= executorMem + mem) { - resources.cpuSlots = (int) Math.floor((offerCpu - executorCpu) / cpu); - resources.memSlots = (int) Math.floor((offerMem - executorMem) / mem); - } - - int maxPorts = Math.min(resources.cpuSlots, resources.memSlots); - - List portList = new ArrayList<>(); - collectPorts(offer.getResourcesList(), portList, maxPorts); - resources.ports.addAll(portList); - - LOG.debug("Offer: " + offerToString(offer)); - LOG.debug("Extracted resources: {}", resources.toString()); - return resources; - } - private void collectPorts(List offers, List portList, int maxPorts) { for (Resource r : offers) { if (r.getName().equals("ports")) { @@ -389,26 +356,6 @@ private void collectPorts(List offers, List portList, int max } } - private List toSlots(Offer offer, double cpu, double mem, boolean supervisorExists) { - double executorCpuDemand = supervisorExists ? 0 : MesosCommon.executorCpu(_conf); - double executorMemDemand = supervisorExists ? 0 : MesosCommon.executorMem(_conf); - - OfferResources resources = getResources( - offer, - executorCpuDemand, - executorMemDemand, - cpu, - mem); - - List ret = new ArrayList(); - int availableSlots = Math.min(resources.cpuSlots, resources.memSlots); - availableSlots = Math.min(availableSlots, resources.ports.size()); - for (int i = 0; i < availableSlots; i++) { - ret.add(new WorkerSlot(offer.getHostname(), resources.ports.get(i))); - } - return ret; - } - /** * Method checks if all topologies that need assignment already have supervisor running on the node where the Offer * comes from. Required for more accurate available resource calculation where we can exclude supervisor's demand from @@ -444,69 +391,17 @@ public boolean isHostAccepted(String hostname) { (_disallowedHosts != null && !_disallowedHosts.contains(hostname)); } + @Override public Collection allSlotsAvailableForScheduling( - Collection existingSupervisors, Topologies topologies, Set topologiesMissingAssignments) { + Collection existingSupervisors, Topologies topologies, Set topologiesMissingAssignments) { synchronized (_offersLock) { - LOG.debug("allSlotsAvailableForScheduling: Currently have {} offers buffered {}", - _offers.size(), (_offers.size() > 0 ? (":" + offerMapToString(_offers)) : "")); - if (!topologiesMissingAssignments.isEmpty()) { - LOG.info("Topologies that need assignments: {}", topologiesMissingAssignments.toString()); - - // Revive any filtered offers - _driver.reviveOffers(); - } else { - LOG.info("Declining offers because no topologies need assignments"); - _offers.clear(); - return new ArrayList<>(); - } - } - - Double cpu = null; - Double mem = null; - // TODO: maybe this isn't the best approach. if a topology raises #cpus keeps failing, - // it will mess up scheduling on this cluster permanently - for (String id : topologiesMissingAssignments) { - TopologyDetails details = topologies.getById(id); - double tcpu = MesosCommon.topologyWorkerCpu(_conf, details); - double tmem = MesosCommon.topologyWorkerMem(_conf, details); - if (cpu == null || tcpu > cpu) { - cpu = tcpu; - } - if (mem == null || tmem > mem) { - mem = tmem; - } - } - - LOG.info("allSlotsAvailableForScheduling: pending topologies' max resource requirements per worker: cpu: {} & mem: {}", - String.valueOf(cpu), String.valueOf(mem)); - - List allSlots = new ArrayList<>(); - - if (cpu != null && mem != null) { - synchronized (_offersLock) { - for (Offer offer : _offers.newestValues()) { - boolean supervisorExists = supervisorExists(offer, existingSupervisors, topologiesMissingAssignments); - List offerSlots = toSlots(offer, cpu, mem, supervisorExists); - if (offerSlots.isEmpty()) { - _offers.clearKey(offer.getId()); - LOG.debug("Declining offer `{}' because it wasn't usable to create a slot which fits largest " + - "pending topologies' aggregate needs (max cpu: {} max mem: {})", - offerToString(offer), String.valueOf(cpu), String.valueOf(mem)); - } else { - allSlots.addAll(offerSlots); - } - } - } - } - - LOG.info("Number of available slots: {}", allSlots.size()); - if (LOG.isDebugEnabled()) { - for (WorkerSlot slot : allSlots) { - LOG.debug("available slot: {}", slot); - } + return _mesosStormScheduler.allSlotsAvailableForScheduling( + _offers, + existingSupervisors, + topologies, + topologiesMissingAssignments); } - return allSlots; } private OfferID findOffer(WorkerSlot worker) { diff --git a/storm/src/main/storm/mesos/MesosSupervisor.java b/storm/src/main/storm/mesos/MesosSupervisor.java index b4e009a42..a57b1c6ec 100644 --- a/storm/src/main/storm/mesos/MesosSupervisor.java +++ b/storm/src/main/storm/mesos/MesosSupervisor.java @@ -21,6 +21,7 @@ import backtype.storm.scheduler.ISupervisor; import backtype.storm.utils.Utils; import clojure.lang.PersistentVector; + import org.apache.mesos.Executor; import org.apache.mesos.ExecutorDriver; import org.apache.mesos.MesosExecutorDriver; @@ -37,6 +38,7 @@ import storm.mesos.logviewer.LogViewerController; import storm.mesos.shims.ILocalStateShim; import storm.mesos.shims.LocalStateShim; +import storm.mesos.util.MesosCommon; import java.io.IOException; import java.util.Collection; diff --git a/storm/src/main/storm/mesos/NimbusScheduler.java b/storm/src/main/storm/mesos/NimbusScheduler.java index 799dc3b6e..480456bd2 100644 --- a/storm/src/main/storm/mesos/NimbusScheduler.java +++ b/storm/src/main/storm/mesos/NimbusScheduler.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

+ * * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -33,7 +33,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; -import static storm.mesos.PrettyProtobuf.taskStatusToString; +import static storm.mesos.util.PrettyProtobuf.taskStatusToString; public class NimbusScheduler implements Scheduler { private MesosNimbus mesosNimbus; diff --git a/storm/src/main/storm/mesos/schedulers/DefaultScheduler.java b/storm/src/main/storm/mesos/schedulers/DefaultScheduler.java new file mode 100644 index 000000000..651d30d47 --- /dev/null +++ b/storm/src/main/storm/mesos/schedulers/DefaultScheduler.java @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.mesos.schedulers; + +import backtype.storm.scheduler.Cluster; +import backtype.storm.scheduler.ExecutorDetails; +import backtype.storm.scheduler.IScheduler; +import backtype.storm.scheduler.SupervisorDetails; +import backtype.storm.scheduler.Topologies; +import backtype.storm.scheduler.TopologyDetails; +import backtype.storm.scheduler.WorkerSlot; +import org.apache.log4j.Logger; +import org.apache.mesos.Protos; +import storm.mesos.MesosNimbus; +import storm.mesos.util.MesosCommon; +import storm.mesos.util.RotatingMap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Default Scheduler used by mesos-storm framework. + */ +public class DefaultScheduler implements IScheduler, IMesosStormScheduler { + private final Logger log = Logger.getLogger(MesosNimbus.class); + private Map mesosStormConf; + private final Map mesosWorkerSlotMap = new HashMap<>(); + + @Override + public void prepare(Map conf) { + mesosStormConf = conf; + } + + /* + * Different topologies have different resource requirements in terms of cpu and memory. So when Mesos asks + * this scheduler for a list of available worker slots, we create "MesosWorkerSlot" and store them into mesosWorkerSlotMap. + * Notably, we return a list of MesosWorkerSlot objects, even though Storm is only aware of the WorkerSlot type. However, + * since a MesosWorkerSlot *is* a WorkerSlot (in the polymorphic sense), Storm treats the list as WorkerSlot objects. + * + * Note: + * 1. "MesosWorkerSlot" is the same as WorkerSlot except that it is dedicated for a topology upon creation. This means that, + * a MesosWorkerSlot belonging to one topology cannot be used to launch a worker belonging to a different topology. + * 2. Please note that this method is called before schedule is invoked. We use this opportunity to assign the MesosWorkerSlot + * to a specific topology and store the state in "mesosWorkerSlotMap". This way, when Storm later calls schedule, we can just + * look up the "mesosWorkerSlotMap" for a list of available slots for the particular topology. + * 3. Given MesosWorkerSlot extends WorkerSlot, we shouldn't have to really create a "mesosWorkerSlotMap". Instead, in the schedule + * method, we could have just upcasted the "WorkerSlot" to "MesosWorkerSlot". But this is not currently possible because storm + * passes a recreated version of WorkerSlot to schedule method instead of passing the WorkerSlot returned by this method as is. + */ + @Override + public List allSlotsAvailableForScheduling(RotatingMap offers, + Collection existingSupervisors, + Topologies topologies, Set topologiesMissingAssignments) { + if (topologiesMissingAssignments.isEmpty()) { + log.info("Declining all offers that are currently buffered because no topologies need assignments"); + offers.clear(); + return new ArrayList<>(); + } + + log.info("Topologies that need assignments: " + topologiesMissingAssignments.toString()); + + // Decline those offers that cannot be used for any of the topologies that need assignments. + for (Protos.Offer offer : offers.newestValues()) { + boolean isOfferUseful = false; + for (String currentTopology : topologiesMissingAssignments) { + boolean supervisorExists = SchedulerUtils.supervisorExists(offer.getHostname(), existingSupervisors, currentTopology); + TopologyDetails topologyDetails = topologies.getById(currentTopology); + if (SchedulerUtils.isFit(mesosStormConf, offer, topologyDetails, supervisorExists)) { + isOfferUseful = true; + break; + } + } + if (!isOfferUseful) { + log.info("Declining Offer " + offer.getId().getValue() + " because it does not fit any of the topologies that need assignments"); + offers.clearKey(offer.getId()); + } + } + + List allSlots = new ArrayList<>(); + + Map> offerResourcesListPerNode = SchedulerUtils.getOfferResourcesListPerNode(offers); + + for (String currentTopology : topologiesMissingAssignments) { + TopologyDetails topologyDetails = topologies.getById(currentTopology); + int slotsNeeded = topologyDetails.getNumWorkers(); + + double requestedWorkerCpu = MesosCommon.topologyWorkerCpu(mesosStormConf, topologyDetails); + double requestedWorkerMem = MesosCommon.topologyWorkerMem(mesosStormConf, topologyDetails); + + log.info("Trying to find " + slotsNeeded + " slots for " + topologyDetails.getId()); + if (slotsNeeded == 0) { + continue; + } + + Set nodesWithExistingSupervisors = new HashSet<>(); + for (String currentNode : offerResourcesListPerNode.keySet()) { + if (SchedulerUtils.supervisorExists(currentNode, existingSupervisors, currentTopology)) { + nodesWithExistingSupervisors.add(currentNode); + } + } + + Boolean slotFound; + do { + slotFound = false; + for (String currentNode : offerResourcesListPerNode.keySet()) { + boolean supervisorExists = nodesWithExistingSupervisors.contains(currentNode); + if (slotsNeeded == 0) { + break; + } + + for (OfferResources resources : offerResourcesListPerNode.get(currentNode)) { + boolean isFit = SchedulerUtils.isFit(mesosStormConf, resources, topologyDetails, supervisorExists); + if (isFit) { + log.info(resources.toString() + " is a fit for " + + topologyDetails.getId() + " requestedWorkerCpu: " + requestedWorkerCpu + " requestedWorkerMem: " + + requestedWorkerMem); + nodesWithExistingSupervisors.add(currentNode); + MesosWorkerSlot mesosWorkerSlot = SchedulerUtils.createWorkerSlotFromOfferResources(mesosStormConf, resources, topologyDetails, supervisorExists); + if (mesosWorkerSlot == null) { + continue; + } + String slotId = mesosWorkerSlot.getNodeId() + ":" + mesosWorkerSlot.getPort(); + mesosWorkerSlotMap.put(slotId, mesosWorkerSlot); + // Place this offer in the first bucket of the RotatingMap so that it is less likely to get rotated out + offers.put(resources.getOfferId(), resources.getOffer()); + allSlots.add(mesosWorkerSlot); + slotsNeeded--; + slotFound = true; + } else { + log.info(resources.toString() + " is not a fit for " + + topologyDetails.getId() + " requestedWorkerCpu: " + requestedWorkerCpu + " requestedWorkerMem: " + requestedWorkerMem); + } + } + } + } while (slotFound == true && slotsNeeded > 0); + } + + log.info("Number of available slots: " + allSlots.size()); + if (log.isDebugEnabled()) { + for (WorkerSlot slot : allSlots) { + log.debug("available slot: " + slot); + } + } + return allSlots; + } + + + Map> getMesosWorkerSlotPerTopology(List workerSlots) { + HashMap> perTopologySlotList = new HashMap<>(); + + for (WorkerSlot workerSlot : workerSlots) { + if (workerSlot.getNodeId() == null) { + log.warn("Unexpected: Node id is null for worker slot while scheduling"); + continue; + } + MesosWorkerSlot mesosWorkerSlot = mesosWorkerSlotMap.get(workerSlot.getNodeId() + + ":" + String.valueOf(workerSlot.getPort())); + + String topologyId = mesosWorkerSlot.getTopologyId(); + if (perTopologySlotList.get(topologyId) == null) { + perTopologySlotList.put(topologyId, new ArrayList()); + } + perTopologySlotList.get(topologyId).add(mesosWorkerSlot); + } + + return perTopologySlotList; + } + + List> executorsPerWorkerList(Cluster cluster, TopologyDetails topologyDetails, Integer slotsAvailable) { + Collection executors = cluster.getUnassignedExecutors(topologyDetails); + List> executorsPerWorkerList = new ArrayList<>(); + + for (int i = 0; i < slotsAvailable; i++) { + executorsPerWorkerList.add(new ArrayList()); + } + + List executorList = new ArrayList<>(executors); + + /* The goal of this scheduler is to mimic Storm's default version. Storm's default scheduler sorts the + * executors by their id before spreading them across the available workers. + */ + Collections.sort(executorList, new Comparator() { + public int compare(ExecutorDetails e1, ExecutorDetails e2) { + return e1.getStartTask() - e2.getStartTask(); + } + }); + + int index = -1; + for (ExecutorDetails executorDetails : executorList) { + index = ++index % slotsAvailable; + executorsPerWorkerList.get(index).add(executorDetails); + } + + return executorsPerWorkerList; + } + + /** + * Schedule function looks in the "mesosWorkerSlotMap" to determine which topology owns the particular + * WorkerSlot and assigns the executors accordingly. + */ + @Override + public void schedule(Topologies topologies, Cluster cluster) { + List workerSlots = cluster.getAvailableSlots(); + Map> perTopologySlotList = getMesosWorkerSlotPerTopology(workerSlots); + + // So far we know how many MesosSlots each of the topologies have got. Lets assign executors for each of them + for (String topologyId : perTopologySlotList.keySet()) { + TopologyDetails topologyDetails = topologies.getById(topologyId); + List mesosWorkerSlots = perTopologySlotList.get(topologyId); + + int countSlotsRequested = topologyDetails.getNumWorkers(); + int countSlotsAssigned = cluster.getAssignedNumWorkers(topologyDetails); + + if (mesosWorkerSlots.size() == 0) { + log.warn("No slots found for topology " + topologyId + " while scheduling"); + continue; + } + + int countSlotsAvailable = Math.min(mesosWorkerSlots.size(), (countSlotsRequested - countSlotsAssigned)); + + List> executorsPerWorkerList = executorsPerWorkerList(cluster, topologyDetails, countSlotsAvailable); + + for (int i = 0; i < countSlotsAvailable; i++) { + cluster.assign(mesosWorkerSlots.remove(0), topologyId, executorsPerWorkerList.remove(0)); + } + } + mesosWorkerSlotMap.clear(); + } +} diff --git a/storm/src/main/storm/mesos/schedulers/IMesosStormScheduler.java b/storm/src/main/storm/mesos/schedulers/IMesosStormScheduler.java new file mode 100644 index 000000000..c8a453052 --- /dev/null +++ b/storm/src/main/storm/mesos/schedulers/IMesosStormScheduler.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.mesos.schedulers; + + +import backtype.storm.scheduler.SupervisorDetails; +import backtype.storm.scheduler.Topologies; +import backtype.storm.scheduler.WorkerSlot; +import org.apache.mesos.Protos; +import storm.mesos.util.RotatingMap; + +import java.util.Collection; +import java.util.List; +import java.util.Set; + +/** + * A scheduler needs to implement the following interface for it to be MesosNimbus compatible. + */ +public interface IMesosStormScheduler { + + /** + * This method is invoked by Nimbus when it wants to get a list of worker slots that are available for assigning the + * topology workers. In Nimbus's view, a "WorkerSlot" is a host and port that it can use to assign a worker. + *

+ * TODO: Rotating Map itself needs to be refactored. Perhaps make RotatingMap inherit Map so users can pass in a + * map? or Make the IMesosStormScheduler itself generic so _offers could be of any type? + */ + public List allSlotsAvailableForScheduling(RotatingMap offers, + Collection existingSupervisors, + Topologies topologies, Set topologiesMissingAssignments); +} diff --git a/storm/src/main/storm/mesos/OfferResources.java b/storm/src/main/storm/mesos/schedulers/MesosWorkerSlot.java similarity index 69% rename from storm/src/main/storm/mesos/OfferResources.java rename to storm/src/main/storm/mesos/schedulers/MesosWorkerSlot.java index 1563b0e67..0ebc6c0d5 100644 --- a/storm/src/main/storm/mesos/OfferResources.java +++ b/storm/src/main/storm/mesos/schedulers/MesosWorkerSlot.java @@ -15,20 +15,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package storm.mesos; +package storm.mesos.schedulers; -import org.apache.commons.lang3.builder.ToStringBuilder; +import backtype.storm.scheduler.WorkerSlot; -import java.util.ArrayList; -import java.util.List; +public class MesosWorkerSlot extends WorkerSlot { + private String topologyId; -public class OfferResources { - int cpuSlots = 0; - int memSlots = 0; - List ports = new ArrayList<>(); + public MesosWorkerSlot(String nodeId, Number port, String topologyId) { + super(nodeId, port); + this.topologyId = topologyId; + } - @Override - public String toString() { - return ToStringBuilder.reflectionToString(this); + public String getTopologyId() { + return this.topologyId; } } diff --git a/storm/src/main/storm/mesos/schedulers/OfferResources.java b/storm/src/main/storm/mesos/schedulers/OfferResources.java new file mode 100644 index 000000000..b5359e954 --- /dev/null +++ b/storm/src/main/storm/mesos/schedulers/OfferResources.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.mesos.schedulers; + +import com.google.common.base.Joiner; +import org.apache.mesos.Protos; + +import java.util.ArrayList; +import java.util.List; + +public class OfferResources { + + private class PortRange { + public long begin; + public long end; + + public PortRange(long begin, long end) { + this.begin = begin; + this.end = end; + } + } + + private Protos.Offer offer; + private Protos.OfferID offerId; + private String hostName; + private double mem; + private double cpu; + + List portRanges = new ArrayList<>(); + + public void addPortRanges(Protos.Value.Ranges ranges) { + for (Protos.Value.Range r : ranges.getRangeList()) { + this.portRanges.add(new PortRange(r.getBegin(), r.getEnd())); + } + } + + public OfferResources(Protos.Offer offer) { + this.offer = offer; + this.offerId = offer.getId(); + double offerMem = 0; + double offerCpu = 0; + Protos.Value.Ranges portRanges = null; + + String hostName = offer.getHostname(); + for (Protos.Resource r : offer.getResourcesList()) { + if (r.hasReservation()) { + // skip resources with dynamic reservations + continue; + } + if (r.getName().equals("cpus")) { + offerCpu += r.getScalar().getValue(); + } else if (r.getName().equals("mem")) { + offerMem += r.getScalar().getValue(); + } else if (r.getName().equals("ports")) { + Protos.Value.Ranges tmp = r.getRanges(); + if (portRanges == null) { + portRanges = tmp; + continue; + } + portRanges.getRangeList().addAll(tmp.getRangeList()); + } + } + + this.hostName = hostName; + this.mem = offerMem; + this.cpu = offerCpu; + if ((portRanges != null) && (!portRanges.getRangeList().isEmpty())) { + this.addPortRanges(portRanges); + } + } + + public Protos.Offer getOffer() { + return this.offer; + } + + public Protos.OfferID getOfferId() { + return this.offerId; + } + + public String getHostName() { + return this.hostName; + } + + public double getMem() { + return this.mem; + } + + public double getCpu() { + return this.cpu; + } + + public void decCpu(double val) { + cpu -= val; + } + + public void decMem(double val) { + mem -= val; + } + + public long getPort() { + if (!hasPort()) { + return -1; + } + + for (int i = 0; i < portRanges.size(); i++) { + PortRange portRange = portRanges.get(i); + if (portRange.begin < portRange.end) { + return portRange.begin++; + } else if (portRange.begin == portRange.end) { + portRanges.remove(i); + return portRange.begin; + } + } + + return -1; + } + + public boolean hasPort() { + return (portRanges != null && !portRanges.isEmpty()); + } + + @Override + public String toString() { + List portRangeStrings = new ArrayList<>(); + + for (int i = 0; i < portRanges.size(); i++) { + if (portRanges.get(i).begin == portRanges.get(i).end) { + portRangeStrings.add(String.valueOf(portRanges.get(i).begin)); + } else { + portRangeStrings.add(String.valueOf(portRanges.get(i).begin) + "-" + String.valueOf(portRanges.get(i).end)); + } + } + return "OfferResources with offerId: " + getOfferId().getValue().toString().trim() + " from host: " + getHostName() + " mem: " + String.valueOf(mem) + + " cpu: " + String.valueOf(cpu) + + " portRanges: [" + Joiner.on(",").join(portRangeStrings) + "]"; + } +} + diff --git a/storm/src/main/storm/mesos/schedulers/SchedulerUtils.java b/storm/src/main/storm/mesos/schedulers/SchedulerUtils.java new file mode 100644 index 000000000..50b305201 --- /dev/null +++ b/storm/src/main/storm/mesos/schedulers/SchedulerUtils.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.mesos.schedulers; + +import backtype.storm.scheduler.SupervisorDetails; +import backtype.storm.scheduler.TopologyDetails; +import org.apache.log4j.Logger; +import org.apache.mesos.Protos; +import storm.mesos.util.MesosCommon; +import storm.mesos.util.RotatingMap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SchedulerUtils { + + private static final Logger log = Logger.getLogger(SchedulerUtils.class); + + public static boolean isFit(Map mesosStormConf, OfferResources offerResources, TopologyDetails topologyDetails, boolean supervisorExists) { + double requestedWorkerCpu = MesosCommon.topologyWorkerCpu(mesosStormConf, topologyDetails); + double requestedWorkerMem = MesosCommon.topologyWorkerMem(mesosStormConf, topologyDetails); + + requestedWorkerCpu += supervisorExists ? 0 : MesosCommon.executorCpu(mesosStormConf); + requestedWorkerMem += supervisorExists ? 0 : MesosCommon.executorMem(mesosStormConf); + + if (requestedWorkerCpu <= offerResources.getCpu() && requestedWorkerMem <= offerResources.getMem()) { + return true; + } + return false; + } + + public static boolean isFit(Map mesosStormConf, Protos.Offer offer, TopologyDetails topologyDetails, boolean supervisorExists) { + OfferResources offerResources = new OfferResources(offer); + return isFit(mesosStormConf, offerResources, topologyDetails, supervisorExists); + } + + /** + * Method checks if all topologies that need assignment already have supervisor running on the node where the Offer + * comes from. Required for more accurate available resource calculation where we can exclude supervisor's demand from + * the Offer. + * Unfortunately because of WorkerSlot type is not topology agnostic, we need to exclude supervisor's resources only + * in case where ALL topologies in 'allSlotsAvailableForScheduling' method satisfy condition of supervisor existence + * @param offerHost hostname corresponding to the offer + * @param existingSupervisors Supervisors which already placed on the node for the Offer + * @param topologyId Topology id for which we are checking if the supervisor exists already + * @return boolean value indicating supervisor existence + */ + public static boolean supervisorExists(String offerHost, Collection existingSupervisors, + String topologyId) { + boolean supervisorExists = false; + String expectedSupervisorId = MesosCommon.supervisorId(offerHost, topologyId); + for (SupervisorDetails supervisorDetail : existingSupervisors) { + if (supervisorDetail.getId().equals(expectedSupervisorId)) { + supervisorExists = true; + } + } + return supervisorExists; + } + + public static Map> getOfferResourcesListPerNode(RotatingMap offers) { + Map> offerResourcesListPerNode = new HashMap<>(); + + for (Protos.Offer offer : offers.values()) { + String hostName = offer.getHostname(); + + List offerResourcesListForCurrentHost = offerResourcesListPerNode.get(hostName); + OfferResources offerResources = new OfferResources(offer); + if (offerResourcesListForCurrentHost == null) { + offerResourcesListPerNode.put(hostName, new ArrayList()); + } + offerResourcesListPerNode.get(hostName).add(offerResources); + log.info("Available resources at " + hostName + ": " + offerResources.toString()); + } + return offerResourcesListPerNode; + } + + public static MesosWorkerSlot createWorkerSlotFromOfferResources(Map mesosStormConf, OfferResources offerResources, + TopologyDetails topologyDetails, boolean supervisorExists) { + double requestedWorkerCpu = MesosCommon.topologyWorkerCpu(mesosStormConf, topologyDetails); + double requestedWorkerMem = MesosCommon.topologyWorkerMem(mesosStormConf, topologyDetails); + + requestedWorkerCpu += supervisorExists ? 0 : MesosCommon.executorCpu(mesosStormConf); + requestedWorkerMem += supervisorExists ? 0 : MesosCommon.executorMem(mesosStormConf); + + if (requestedWorkerCpu > offerResources.getCpu()) { + log.warn("Refusing to create worker slot. requestedWorkerCpu: " + requestedWorkerCpu + " but " + + "OfferedCpu: " + offerResources.getCpu() + " at node: " + offerResources.getHostName()); + return null; + } + + if (requestedWorkerMem > offerResources.getMem()) { + log.warn("Refusing to create worker slot. requestedWorkerMem: " + requestedWorkerMem + " but " + + "OfferedMem: " + offerResources.getMem() + " at node: " + offerResources.getHostName()); + return null; + } + + long port = offerResources.getPort(); + + if (port == -1) { + log.warn("Refusing to create worker slot. There are no ports available with offer " + offerResources.toString()); + return null; + } + + offerResources.decCpu(requestedWorkerCpu); + offerResources.decMem(requestedWorkerMem); + + return new MesosWorkerSlot(offerResources.getHostName(), port, topologyDetails.getId()); + } +} diff --git a/storm/src/main/storm/mesos/MesosCommon.java b/storm/src/main/storm/mesos/util/MesosCommon.java similarity index 99% rename from storm/src/main/storm/mesos/MesosCommon.java rename to storm/src/main/storm/mesos/util/MesosCommon.java index 9afcc7912..01b90fb21 100644 --- a/storm/src/main/storm/mesos/MesosCommon.java +++ b/storm/src/main/storm/mesos/util/MesosCommon.java @@ -6,16 +6,16 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

+ * * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ -package storm.mesos; +package storm.mesos.util; import backtype.storm.scheduler.TopologyDetails; import com.google.common.base.Optional; diff --git a/storm/src/main/storm/mesos/PrettyProtobuf.java b/storm/src/main/storm/mesos/util/PrettyProtobuf.java similarity index 97% rename from storm/src/main/storm/mesos/PrettyProtobuf.java rename to storm/src/main/storm/mesos/util/PrettyProtobuf.java index cb1b20bcc..b962b3521 100644 --- a/storm/src/main/storm/mesos/PrettyProtobuf.java +++ b/storm/src/main/storm/mesos/util/PrettyProtobuf.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package storm.mesos; +package storm.mesos.util; import com.google.common.base.Function; import com.google.common.collect.Lists; @@ -78,7 +78,7 @@ public static String taskStatusToString(TaskStatus taskStatus) { /** * Pretty-print mesos protobuf TaskInfo. - * + *

* XXX(erikdw): not including command, container (+data), nor health_check. */ public static String taskInfoToString(TaskInfo task) { @@ -92,7 +92,7 @@ public static String taskInfoToString(TaskInfo task) { /** * Pretty-print mesos protobuf Offer. - * + *

* XXX(erikdw): not including slave_id, attributes, executor_ids, nor framework_id. */ public static String offerToString(Offer offer) { @@ -121,17 +121,17 @@ public static String taskInfoListToString(List tasks) { /** * Pretty-print the values in the Offer map used in MesosNimbus. - * + *

* Callers must ensure they have locked the Map first, else they could * have inconsistent output since the _offers map is touched from both * mesos-driven events and storm-driven calls. - * - * TODO(erikdw): figure out a design better that removes the need + *

+ * TODO:(erikdw): figure out a design better that removes the need * for external callers to lock before calling this method. */ public static String offerMapToString(RotatingMap offers) { List offersAsStrings = Lists.transform(new ArrayList(offers.values()), - offerToStringTransform); + offerToStringTransform); return "[\n" + StringUtils.join(offersAsStrings, ",\n") + "]"; } @@ -230,5 +230,4 @@ private static Map resourcesToOrderedMap(List resource } return map; } - } diff --git a/storm/src/main/storm/mesos/RotatingMap.java b/storm/src/main/storm/mesos/util/RotatingMap.java similarity index 99% rename from storm/src/main/storm/mesos/RotatingMap.java rename to storm/src/main/storm/mesos/util/RotatingMap.java index 0bc31f6e0..6d6a6ace3 100644 --- a/storm/src/main/storm/mesos/RotatingMap.java +++ b/storm/src/main/storm/mesos/util/RotatingMap.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package storm.mesos; +package storm.mesos.util; import java.util.ArrayList; import java.util.Collection; diff --git a/storm/src/test/storm/mesos/MesosNimbusTest.java b/storm/src/test/storm/mesos/MesosNimbusTest.java index 0035ab779..6e8388ac8 100644 --- a/storm/src/test/storm/mesos/MesosNimbusTest.java +++ b/storm/src/test/storm/mesos/MesosNimbusTest.java @@ -21,18 +21,13 @@ import backtype.storm.scheduler.Topologies; import backtype.storm.scheduler.TopologyDetails; import backtype.storm.scheduler.WorkerSlot; -import org.apache.mesos.MesosSchedulerDriver; -import org.apache.mesos.Protos.FrameworkID; import org.apache.mesos.Protos.Offer; import org.apache.mesos.Protos.OfferID; -import org.apache.mesos.Protos.Resource; -import org.apache.mesos.Protos.SlaveID; -import org.apache.mesos.Protos.Value; import org.junit.Test; -import org.mockito.Mockito; +import storm.mesos.util.MesosCommon; +import storm.mesos.util.RotatingMap; import java.net.URI; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -40,101 +35,26 @@ import static org.junit.Assert.assertEquals; +// TODO(dskarthick) : Leverage the build methods defined in TestUtils function. public class MesosNimbusTest { - private Offer buildOffer(double cpus, double mem) { - return Offer.newBuilder() - .setId(OfferID.newBuilder().setValue("derp").build()) - .setFrameworkId(FrameworkID.newBuilder().setValue("derp").build()) - .setSlaveId(SlaveID.newBuilder().setValue("derp").build()) - .setHostname("derp") - .addAllResources( - Arrays.asList( - buildScalarResource("cpus", cpus), - buildScalarResourceWithReservation("cpus", 1.0, "dynamicallyReserved"), - buildScalarResource("mem", mem) - ) - ) - .build(); - } - - private Offer buildOfferWithPorts(double cpus, double mem, int portBegin, int portEnd) { - return Offer.newBuilder() - .setId(OfferID.newBuilder().setValue("derp").build()) - .setFrameworkId(FrameworkID.newBuilder().setValue("derp").build()) - .setSlaveId(SlaveID.newBuilder().setValue("derp").build()) - .setHostname("derp") - .addAllResources( - Arrays.asList( - buildScalarResource("cpus", cpus), - buildScalarResourceWithReservation("cpus", 1.0, "dynamicallyReserved"), - buildScalarResource("mem", mem), - buildRangeResource("ports", portBegin, portEnd) - ) - ) - .build(); - } - - private Offer buildOfferWithReservation(double cpus, double mem, double reservedCpu, double reservedMem) { - return Offer.newBuilder() - .setId(OfferID.newBuilder().setValue("derp").build()) - .setFrameworkId(FrameworkID.newBuilder().setValue("derp").build()) - .setSlaveId(SlaveID.newBuilder().setValue("derp").build()) - .setHostname("derp") - .addAllResources( - Arrays.asList( - buildScalarResource("cpus", cpus), - buildScalarResourceWithReservation("cpus", 1.0, "dynamicallyReserved"), - buildScalarResource("mem", mem), - buildScalarResourceWithRole("cpus", reservedCpu, "reserved"), - buildScalarResourceWithRole("mem", reservedMem, "reserved") - ) - ) - .build(); - } - - @Test - public void testGetResources() throws Exception { - MesosNimbus mesosNimbus = new MesosNimbus(); - - Offer offer1 = buildOffer(2.0, 2.0); - OfferResources offerResources1 = mesosNimbus.getResources(offer1, 1.0, 1.0, 1.0, 1.0); - assertEquals(1, offerResources1.cpuSlots); - assertEquals(1, offerResources1.memSlots); - - Offer offer2 = buildOffer(1.0, 1.0); - OfferResources offerResources2 = mesosNimbus.getResources(offer2, 1.0, 1.0, 1.0, 1.0); - assertEquals(0, offerResources2.cpuSlots); - assertEquals(0, offerResources2.memSlots); - - Offer offer3 = buildOfferWithReservation(2.0, 2.0, 1.0, 1.0); - OfferResources offerResources3 = mesosNimbus.getResources(offer3, 1.0, 1.0, 1.0, 1.0); - assertEquals(2, offerResources3.cpuSlots); - assertEquals(2, offerResources3.memSlots); - - Offer offer4 = buildOfferWithReservation(2.0, 2.0, 1.5, 1.5); - OfferResources offerResources4 = mesosNimbus.getResources(offer4, 2.5, 2.5, 1.0, 1.0); - assertEquals(1, offerResources4.cpuSlots); - assertEquals(1, offerResources4.memSlots); - } - @Test public void testGetResourcesScalar() throws Exception { MesosNimbus mesosNimbus = new MesosNimbus(); assertEquals( - Arrays.asList(buildScalarResource("cpus", 1.0)), + Arrays.asList(TestUtils.buildScalarResource("cpus", 1.0)), mesosNimbus.getResourcesScalar( - buildResourceList(1, 2, 3, 4), + TestUtils.buildResourceList(1, 2, 3, 4), 1.0, "cpus" ) ); assertEquals( - Arrays.asList(buildScalarResource("mem", 2.0)), + Arrays.asList(TestUtils.buildScalarResource("mem", 2.0)), mesosNimbus.getResourcesScalar( - buildResourceList(1, 2, 3, 4), + TestUtils.buildResourceList(1, 2, 3, 4), 2.0, "mem" ) @@ -142,11 +62,11 @@ public void testGetResourcesScalar() throws Exception { assertEquals( Arrays.asList( - buildScalarResource("cpus", 1.0), - buildScalarResourceWithRole("cpus", 1.0, "reserved") + TestUtils.buildScalarResource("cpus", 1.0), + TestUtils.buildScalarResourceWithRole("cpus", 1.0, "reserved") ), mesosNimbus.getResourcesScalar( - buildResourceList(1, 2, 3, 4), + TestUtils.buildResourceList(1, 2, 3, 4), 2.0, "cpus" ) @@ -154,11 +74,11 @@ public void testGetResourcesScalar() throws Exception { assertEquals( Arrays.asList( - buildScalarResource("mem", 2.0), - buildScalarResourceWithRole("mem", 1.0, "reserved") + TestUtils.buildScalarResource("mem", 2.0), + TestUtils.buildScalarResourceWithRole("mem", 1.0, "reserved") ), mesosNimbus.getResourcesScalar( - buildResourceList(1, 2, 3, 4), + TestUtils.buildResourceList(1, 2, 3, 4), 3.0, "mem" ) @@ -166,11 +86,11 @@ public void testGetResourcesScalar() throws Exception { assertEquals( Arrays.asList( - buildScalarResource("cpus", 1.0), - buildScalarResourceWithRole("cpus", 3.0, "reserved") + TestUtils.buildScalarResource("cpus", 1.0), + TestUtils.buildScalarResourceWithRole("cpus", 3.0, "reserved") ), mesosNimbus.getResourcesScalar( - buildResourceList(1, 2, 3, 4), + TestUtils.buildResourceList(1, 2, 3, 4), 4.0, "cpus" ) @@ -178,11 +98,11 @@ public void testGetResourcesScalar() throws Exception { assertEquals( Arrays.asList( - buildScalarResource("mem", 2.0), - buildScalarResourceWithRole("mem", 4.0, "reserved") + TestUtils.buildScalarResource("mem", 2.0), + TestUtils.buildScalarResourceWithRole("mem", 4.0, "reserved") ), mesosNimbus.getResourcesScalar( - buildResourceList(1, 2, 3, 4), + TestUtils.buildResourceList(1, 2, 3, 4), 6.0, "mem" ) @@ -190,11 +110,11 @@ public void testGetResourcesScalar() throws Exception { assertEquals( Arrays.asList( - buildScalarResource("cpus", 1.0), - buildScalarResourceWithRole("cpus", 0.5, "reserved") + TestUtils.buildScalarResource("cpus", 1.0), + TestUtils.buildScalarResourceWithRole("cpus", 0.5, "reserved") ), mesosNimbus.getResourcesScalar( - buildResourceList(1, 2, 3, 4), + TestUtils.buildResourceList(1, 2, 3, 4), 1.5, "cpus" ) @@ -202,11 +122,11 @@ public void testGetResourcesScalar() throws Exception { assertEquals( Arrays.asList( - buildScalarResource("mem", 2.0), - buildScalarResourceWithRole("mem", 0.5, "reserved") + TestUtils.buildScalarResource("mem", 2.0), + TestUtils.buildScalarResourceWithRole("mem", 0.5, "reserved") ), mesosNimbus.getResourcesScalar( - buildResourceList(1, 2, 3, 4), + TestUtils.buildResourceList(1, 2, 3, 4), 2.5, "mem" ) @@ -219,12 +139,12 @@ public void testSubtractResourcesScalar() throws Exception { assertEquals( Arrays.asList( - buildScalarResource("mem", 2.0), - buildScalarResourceWithRole("cpus", 3.0, "reserved"), - buildScalarResourceWithRole("mem", 4.0, "reserved") + TestUtils.buildScalarResource("mem", 2.0), + TestUtils.buildScalarResourceWithRole("cpus", 3.0, "reserved"), + TestUtils.buildScalarResourceWithRole("mem", 4.0, "reserved") ), mesosNimbus.subtractResourcesScalar( - buildResourceList(1, 2, 3, 4), + TestUtils.buildResourceList(1, 2, 3, 4), 1.0, "cpus" ) @@ -232,13 +152,13 @@ public void testSubtractResourcesScalar() throws Exception { assertEquals( Arrays.asList( - buildScalarResource("cpus", 1.0), - buildScalarResource("mem", 1.0), - buildScalarResourceWithRole("cpus", 3.0, "reserved"), - buildScalarResourceWithRole("mem", 4.0, "reserved") + TestUtils.buildScalarResource("cpus", 1.0), + TestUtils.buildScalarResource("mem", 1.0), + TestUtils.buildScalarResourceWithRole("cpus", 3.0, "reserved"), + TestUtils.buildScalarResourceWithRole("mem", 4.0, "reserved") ), mesosNimbus.subtractResourcesScalar( - buildResourceList(1, 2, 3, 4), + TestUtils.buildResourceList(1, 2, 3, 4), 1.0, "mem" ) @@ -246,79 +166,29 @@ public void testSubtractResourcesScalar() throws Exception { assertEquals( Arrays.asList( - buildScalarResource("mem", 2.0), - buildScalarResourceWithRole("cpus", 2.5, "reserved"), - buildScalarResourceWithRole("mem", 4.0, "reserved") + TestUtils.buildScalarResource("mem", 2.0), + TestUtils.buildScalarResourceWithRole("cpus", 2.5, "reserved"), + TestUtils.buildScalarResourceWithRole("mem", 4.0, "reserved") ), mesosNimbus.subtractResourcesScalar( - buildResourceList(1, 2, 3, 4), + TestUtils.buildResourceList(1, 2, 3, 4), 1.5, "cpus" ) ); assertEquals( - Arrays.asList( - buildScalarResource("cpus", 1.0), - buildScalarResourceWithRole("cpus", 3.0, "reserved"), - buildScalarResourceWithRole("mem", 3.5, "reserved") - ), - mesosNimbus.subtractResourcesScalar( - buildResourceList(1, 2, 3, 4), - 2.5, - "mem" - ) - ); - } - - private Resource buildScalarResource(String name, double value) { - return Resource.newBuilder() - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder() - .setValue(value) - .build()) - .setName(name) - .build(); - } - - private Resource buildScalarResourceWithRole(String name, double value, String role) { - return Resource.newBuilder() - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder() - .setValue(value) - .build()) - .setName(name) - .setRole(role) - .build(); - } - - private Resource buildScalarResourceWithReservation(String name, double value, String role) { - return Resource.newBuilder() - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder() - .setValue(value) - .build()) - .setName(name) - .setRole(role) - .setReservation( - Resource.ReservationInfo.newBuilder() - .setPrincipal("derp") - .build() - ) - .build(); - } - - private List buildResourceList(double cpus, double mem, double reservedCpu, double reservedMem) { - List resourceList = new ArrayList<>(); - resourceList.addAll( - Arrays.asList( - buildScalarResource("cpus", cpus), - buildScalarResource("mem", mem), - buildScalarResourceWithRole("cpus", reservedCpu, "reserved"), - buildScalarResourceWithRole("mem", reservedMem, "reserved") - ) + Arrays.asList( + TestUtils.buildScalarResource("cpus", 1.0), + TestUtils.buildScalarResourceWithRole("cpus", 3.0, "reserved"), + TestUtils.buildScalarResourceWithRole("mem", 3.5, "reserved") + ), + mesosNimbus.subtractResourcesScalar( + TestUtils.buildResourceList(1, 2, 3, 4), + 2.5, + "mem" + ) ); - return resourceList; } @Test @@ -327,10 +197,10 @@ public void testGetResourcesRange() throws Exception { assertEquals( Arrays.asList( - buildRangeResource("ports", 100, 100) + TestUtils.buildRangeResource("ports", 100, 100) ), mesosNimbus.getResourcesRange( - buildRangeResourceList(100, 100), + TestUtils.buildRangeResourceList(100, 100), 100, "ports" ) @@ -338,10 +208,10 @@ public void testGetResourcesRange() throws Exception { assertEquals( Arrays.asList( - buildRangeResource("ports", 105, 105) + TestUtils.buildRangeResource("ports", 105, 105) ), mesosNimbus.getResourcesRange( - buildRangeResourceList(100, 200), + TestUtils.buildRangeResourceList(100, 200), 105, "ports" ) @@ -350,7 +220,7 @@ public void testGetResourcesRange() throws Exception { assertEquals( 0, mesosNimbus.getResourcesRange( - buildRangeResourceList(100, 100), + TestUtils.buildRangeResourceList(100, 100), 200, "ports" ).size() @@ -364,126 +234,40 @@ public void testSubtractResourcesRange() throws Exception { assertEquals( Arrays.asList( - buildScalarResource("cpus", 1.0), - buildScalarResource("mem", 2.0), - buildScalarResourceWithRole("cpus", 3.0, "reserved"), - buildScalarResourceWithRole("mem", 4.0, "reserved") + TestUtils.buildScalarResource("cpus", 1.0), + TestUtils.buildScalarResource("mem", 2.0), + TestUtils.buildScalarResourceWithRole("cpus", 3.0, "reserved"), + TestUtils.buildScalarResourceWithRole("mem", 4.0, "reserved") ), mesosNimbus.subtractResourcesRange( - buildRangeResourceList(100, 100), + TestUtils.buildRangeResourceList(100, 100), 100, "ports" ) ); assertEquals( - Arrays.asList( - buildMultiRangeResource("ports", 100, 104, 106, 200), - buildMultiRangeResourceWithRole("ports", 100, 104, 106, 200, "reserved"), - buildScalarResource("cpus", 1.0), - buildScalarResource("mem", 2.0), - buildScalarResourceWithRole("cpus", 3.0, "reserved"), - buildScalarResourceWithRole("mem", 4.0, "reserved") - ), - mesosNimbus.subtractResourcesRange( - buildRangeResourceList(100, 200), - 105, - "ports" - ) + Arrays.asList( + TestUtils.buildMultiRangeResource("ports", 100, 104, 106, 200), + TestUtils.buildMultiRangeResourceWithRole("ports", 100, 104, 106, 200, "reserved"), + TestUtils.buildScalarResource("cpus", 1.0), + TestUtils.buildScalarResource("mem", 2.0), + TestUtils.buildScalarResourceWithRole("cpus", 3.0, "reserved"), + TestUtils.buildScalarResourceWithRole("mem", 4.0, "reserved") + ), + mesosNimbus.subtractResourcesRange( + TestUtils.buildRangeResourceList(100, 200), + 105, + "ports" + ) ); } - private Resource buildRangeResource(String name, int begin, int end) { - return Resource.newBuilder() - .setType(Value.Type.RANGES) - .setRanges( - Value.Ranges.newBuilder() - .addRange(Value.Range.newBuilder() - .setBegin(begin) - .setEnd(end) - .build()) - .build() - ) - .setName(name) - .build(); - } - - private Resource buildRangeResourceWithRole(String name, int begin, int end, String role) { - return Resource.newBuilder() - .setType(Value.Type.RANGES) - .setRanges( - Value.Ranges.newBuilder() - .addRange(Value.Range.newBuilder() - .setBegin(begin) - .setEnd(end) - .build()) - .build() - ) - .setName(name) - .setRole(role) - .build(); - } - - private Resource buildMultiRangeResource(String name, int begin1, int end1, int begin2, int end2) { - return Resource.newBuilder() - .setType(Value.Type.RANGES) - .setRanges( - Value.Ranges.newBuilder() - .addRange(Value.Range.newBuilder() - .setBegin(begin1) - .setEnd(end1) - .build()) - .addRange(Value.Range.newBuilder() - .setBegin(begin2) - .setEnd(end2) - .build()) - .build() - ) - .setName(name) - .build(); - } - - private Resource buildMultiRangeResourceWithRole(String name, int begin1, int end1, int begin2, int end2, String role) { - return Resource.newBuilder() - .setType(Value.Type.RANGES) - .setRanges( - Value.Ranges.newBuilder() - .addRange(Value.Range.newBuilder() - .setBegin(begin1) - .setEnd(end1) - .build()) - .addRange(Value.Range.newBuilder() - .setBegin(begin2) - .setEnd(end2) - .build()) - .build() - ) - .setName(name) - .setRole(role) - .build(); - } - - private List buildRangeResourceList(int begin, int end) { - List resourceList = new ArrayList<>(); - resourceList.addAll( - Arrays.asList( - buildRangeResource("ports", begin, end), - buildRangeResourceWithRole("ports", begin, end, "reserved"), - buildScalarResource("cpus", 1), - buildScalarResource("mem", 2), - buildScalarResourceWithRole("cpus", 3, "reserved"), - buildScalarResourceWithRole("mem", 4, "reserved") - ) - ); - return resourceList; - } - @Test public void testComputeResourcesForSlot() throws Exception { MesosNimbus mesosNimbus = new MesosNimbus(); - mesosNimbus._driver = Mockito.any(MesosSchedulerDriver.class); mesosNimbus._configUrl = new URI("http://127.0.0.1/"); OfferID offerId = OfferID.newBuilder().setValue("derp").build(); @@ -497,7 +281,7 @@ public void expire(OfferID key, Offer val) { offers.put( offerId, - buildOfferWithPorts(2.0, 2048, 1000, 1000) + TestUtils.buildOfferWithPorts("offer1", "host1.west", 2.0, 2048, 1000, 1000) ); HashMap topologyMap = new HashMap<>(); @@ -509,7 +293,7 @@ public void expire(OfferID key, Offer val) { conf.put(MesosNimbus.CONF_EXECUTOR_URI, ""); mesosNimbus._conf = conf; - topologyMap.put("t1", new TopologyDetails("t1", conf, Mockito.any(StormTopology.class), 5)); + topologyMap.put("t1", new TopologyDetails("t1", conf, new StormTopology(), 5)); HashMap> launchList = new HashMap<>(); HashMap> slotList = new HashMap<>(); slotList.put(offerId, Arrays.asList(new WorkerSlot("", 1000))); @@ -523,12 +307,11 @@ public void expire(OfferID key, Offer val) { slotList, OfferID.newBuilder().setValue("derp").build() ); - assertEquals(1, launchList.size()); assertEquals(1, launchList.get(offerId).size()); assertEquals( - buildScalarResource("cpus", 1.0), + TestUtils.buildScalarResource("cpus", 1.0), launchList.get(offerId).get(0).getTask().getResources(0) ); diff --git a/storm/src/test/storm/mesos/OfferRoleComparatorTest.java b/storm/src/test/storm/mesos/OfferRoleComparatorTest.java index 7c9fbcb63..dcb7980a9 100644 --- a/storm/src/test/storm/mesos/OfferRoleComparatorTest.java +++ b/storm/src/test/storm/mesos/OfferRoleComparatorTest.java @@ -26,7 +26,6 @@ import org.junit.Test; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -34,30 +33,12 @@ public class OfferRoleComparatorTest { - public Offer buildOffer() { - List resourceList = Arrays.asList( - Resource.newBuilder().setRole("*").setName("cpus").setType(Value.Type.SCALAR).setScalar(Value.Scalar.newBuilder().setValue(1).build()).build(), - Resource.newBuilder().setRole("*").setName("mem").setType(Value.Type.SCALAR).setScalar(Value.Scalar.newBuilder().setValue(1).build()).build(), - Resource.newBuilder().setRole("role").setName("cpus").setType(Value.Type.SCALAR).setScalar(Value.Scalar.newBuilder().setValue(1).build()).build(), - Resource.newBuilder().setRole("role").setName("mem").setType(Value.Type.SCALAR).setScalar(Value.Scalar.newBuilder().setValue(1).build()).build(), - Resource.newBuilder().setRole("otherRole").setName("cpus").setType(Value.Type.SCALAR).setScalar(Value.Scalar.newBuilder().setValue(1).build()).build(), - Resource.newBuilder().setRole("otherRole").setName("mem").setType(Value.Type.SCALAR).setScalar(Value.Scalar.newBuilder().setValue(1).build()).build() - ); - Collections.shuffle(resourceList); - return Offer.newBuilder() - .setId(OfferID.newBuilder().setValue("derp").build()) - .setFrameworkId(FrameworkID.newBuilder().setValue("derp").build()) - .setSlaveId(SlaveID.newBuilder().setValue("derp").build()) - .setHostname("derp") - .addAllResources(resourceList) - .build(); - } @Test public void testCompare() throws Exception { List offerResources = new ArrayList<>(); - Offer offer = buildOffer(); + Offer offer = TestUtils.buildOffer(); offerResources.addAll(offer.getResourcesList()); Collections.sort(offerResources, new ResourceRoleComparator()); diff --git a/storm/src/test/storm/mesos/TestUtils.java b/storm/src/test/storm/mesos/TestUtils.java new file mode 100644 index 000000000..d14b41347 --- /dev/null +++ b/storm/src/test/storm/mesos/TestUtils.java @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.mesos; + +import backtype.storm.generated.StormTopology; +import backtype.storm.scheduler.TopologyDetails; +import org.apache.mesos.Protos; +import storm.mesos.util.MesosCommon; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestUtils { + + public static TopologyDetails constructTopologyDetails(String topologyName, int numWorkers, double numCpus, double memSize) { + Map topologyConf = new HashMap<>(); + + StormTopology stormTopology = new StormTopology(); + TopologyDetails topologyDetails= new TopologyDetails(topologyName, topologyConf, stormTopology, numWorkers); + topologyDetails.getConf().put(MesosCommon.WORKER_CPU_CONF, Double.valueOf(numCpus)); + topologyDetails.getConf().put(MesosCommon.WORKER_MEM_CONF, Double.valueOf(memSize)); + + return topologyDetails; + } + + public static Protos.Offer buildOffer(String offerId, String hostName, double cpus, double mem) { + return Protos.Offer.newBuilder() + .setId(Protos.OfferID.newBuilder().setValue(String.valueOf(offerId)).build()) + .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("derp").build()) + .setSlaveId(Protos.SlaveID.newBuilder().setValue("derp").build()) + .setHostname(hostName) + .addAllResources( + Arrays.asList( + buildScalarResource("cpus", cpus), + buildScalarResourceWithReservation("cpus", 1.0, "dynamicallyReserved"), + buildScalarResource("mem", mem) + ) + ) + .build(); + } + + public static Protos.Offer buildOfferWithPorts(String offerId, String hostName, double cpus, double mem, int portBegin, int portEnd) { + return Protos.Offer.newBuilder() + .setId(Protos.OfferID.newBuilder().setValue(offerId).build()) + .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("derp").build()) + .setSlaveId(Protos.SlaveID.newBuilder().setValue("derp").build()) + .setHostname(hostName) + .addAllResources( + Arrays.asList( + buildScalarResource("cpus", cpus), + buildScalarResourceWithReservation("cpus", 1.0, "dynamicallyReserved"), + buildScalarResource("mem", mem), + buildRangeResource("ports", portBegin, portEnd) + ) + ) + .build(); + } + + public static Protos.Offer buildOfferWithReservation(String offerId, String hostName, double cpus, double mem, double reservedCpu, double reservedMem) { + return Protos.Offer.newBuilder() + .setId(Protos.OfferID.newBuilder().setValue(offerId).build()) + .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("derp").build()) + .setSlaveId(Protos.SlaveID.newBuilder().setValue("derp").build()) + .setHostname(hostName) + .addAllResources( + Arrays.asList( + buildScalarResource("cpus", cpus), + buildScalarResourceWithReservation("cpus", 1.0, "dynamicallyReserved"), + buildScalarResource("mem", mem), + buildScalarResourceWithRole("cpus", reservedCpu, "reserved"), + buildScalarResourceWithRole("mem", reservedMem, "reserved") + ) + ) + .build(); + } + + public static Protos.Resource buildScalarResource(String name, double value) { + return Protos.Resource.newBuilder() + .setType(Protos.Value.Type.SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder() + .setValue(value) + .build()) + .setName(name) + .build(); + } + + public static Protos.Resource buildScalarResourceWithRole(String name, double value, String role) { + return Protos.Resource.newBuilder() + .setType(Protos.Value.Type.SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder() + .setValue(value) + .build()) + .setName(name) + .setRole(role) + .build(); + } + + public static Protos.Resource buildScalarResourceWithReservation(String name, double value, String role) { + return Protos.Resource.newBuilder() + .setType(Protos.Value.Type.SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder() + .setValue(value) + .build()) + .setName(name) + .setRole(role) + .setReservation( + Protos.Resource.ReservationInfo.newBuilder() + .setPrincipal("derp") + .build() + ) + .build(); + } + + public static List buildResourceList(double cpus, double mem, double reservedCpu, double reservedMem) { + List resourceList = new ArrayList<>(); + resourceList.addAll( + Arrays.asList( + buildScalarResource("cpus", cpus), + buildScalarResource("mem", mem), + buildScalarResourceWithRole("cpus", reservedCpu, "reserved"), + buildScalarResourceWithRole("mem", reservedMem, "reserved") + ) + ); + return resourceList; + } + + public static Protos.Resource buildRangeResource(String name, int begin, int end) { + return Protos.Resource.newBuilder() + .setType(Protos.Value.Type.RANGES) + .setRanges( + Protos.Value.Ranges.newBuilder() + .addRange(Protos.Value.Range.newBuilder() + .setBegin(begin) + .setEnd(end) + .build()) + .build() + ) + .setName(name) + .build(); + } + + public static Protos.Resource buildRangeResourceWithRole(String name, int begin, int end, String role) { + return Protos.Resource.newBuilder() + .setType(Protos.Value.Type.RANGES) + .setRanges( + Protos.Value.Ranges.newBuilder() + .addRange(Protos.Value.Range.newBuilder() + .setBegin(begin) + .setEnd(end) + .build()) + .build() + ) + .setName(name) + .setRole(role) + .build(); + } + + public static Protos.Resource buildMultiRangeResource(String name, int begin1, int end1, int begin2, int end2) { + return Protos.Resource.newBuilder() + .setType(Protos.Value.Type.RANGES) + .setRanges( + Protos.Value.Ranges.newBuilder() + .addRange(Protos.Value.Range.newBuilder() + .setBegin(begin1) + .setEnd(end1) + .build()) + .addRange(Protos.Value.Range.newBuilder() + .setBegin(begin2) + .setEnd(end2) + .build()) + .build() + ) + .setName(name) + .build(); + } + + public static Protos.Resource buildMultiRangeResourceWithRole(String name, int begin1, int end1, int begin2, int end2, String role) { + return Protos.Resource.newBuilder() + .setType(Protos.Value.Type.RANGES) + .setRanges( + Protos.Value.Ranges.newBuilder() + .addRange(Protos.Value.Range.newBuilder() + .setBegin(begin1) + .setEnd(end1) + .build()) + .addRange(Protos.Value.Range.newBuilder() + .setBegin(begin2) + .setEnd(end2) + .build()) + .build() + ) + .setName(name) + .setRole(role) + .build(); + } + + public static List buildRangeResourceList(int begin, int end) { + List resourceList = new ArrayList<>(); + resourceList.addAll( + Arrays.asList( + buildRangeResource("ports", begin, end), + buildRangeResourceWithRole("ports", begin, end, "reserved"), + buildScalarResource("cpus", 1), + buildScalarResource("mem", 2), + buildScalarResourceWithRole("cpus", 3, "reserved"), + buildScalarResourceWithRole("mem", 4, "reserved") + ) + ); + return resourceList; + } + + public static Protos.Offer buildOffer() { + List resourceList = Arrays.asList( + Protos.Resource + .newBuilder().setRole("*").setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(1).build()).build(), + Protos.Resource + .newBuilder().setRole("*").setName("mem").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(1).build()).build(), + Protos.Resource + .newBuilder().setRole("role").setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(1).build()).build(), + Protos.Resource + .newBuilder().setRole("role").setName("mem").setType(Protos.Value.Type.SCALAR).setScalar(Protos.Value.Scalar.newBuilder().setValue(1).build()).build(), + Protos.Resource.newBuilder().setRole("otherRole").setName("cpus").setType(Protos.Value.Type.SCALAR).setScalar( + Protos.Value.Scalar.newBuilder().setValue(1).build()).build(), + Protos.Resource.newBuilder().setRole("otherRole").setName("mem").setType(Protos.Value.Type.SCALAR).setScalar( + Protos.Value.Scalar.newBuilder().setValue(1).build()).build() + ); + Collections.shuffle(resourceList); + + return Protos.Offer.newBuilder() + .setId(Protos.OfferID.newBuilder().setValue("derp").build()) + .setFrameworkId(Protos.FrameworkID.newBuilder().setValue("derp").build()) + .setSlaveId(Protos.SlaveID.newBuilder().setValue("derp").build()) + .setHostname("derp") + .addAllResources(resourceList) + .build(); + } +} diff --git a/storm/src/test/storm/mesos/schedulers/DefaultSchedulerTest.java b/storm/src/test/storm/mesos/schedulers/DefaultSchedulerTest.java new file mode 100644 index 000000000..2091d0d3b --- /dev/null +++ b/storm/src/test/storm/mesos/schedulers/DefaultSchedulerTest.java @@ -0,0 +1,429 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.mesos.schedulers; + +import backtype.storm.generated.StormTopology; +import backtype.storm.scheduler.Cluster; +import backtype.storm.scheduler.ExecutorDetails; +import backtype.storm.scheduler.SchedulerAssignment; +import backtype.storm.scheduler.SchedulerAssignmentImpl; +import backtype.storm.scheduler.SupervisorDetails; +import backtype.storm.scheduler.Topologies; +import backtype.storm.scheduler.TopologyDetails; +import backtype.storm.scheduler.WorkerSlot; +import org.apache.mesos.Protos.Offer; +import org.apache.mesos.Protos.OfferID; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Spy; +import org.mockito.internal.util.reflection.Whitebox; +import org.mockito.runners.MockitoJUnitRunner; +import storm.mesos.TestUtils; +import storm.mesos.util.MesosCommon; +import storm.mesos.util.RotatingMap; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static storm.mesos.TestUtils.buildOffer; +import static storm.mesos.TestUtils.buildOfferWithPorts; + +@RunWith(MockitoJUnitRunner.class) +public class DefaultSchedulerTest { + + @Spy + private DefaultScheduler defaultScheduler; + private Map mesosWorkerSlotMap; + + private Topologies topologies; + private RotatingMap rotatingMap; + private Set topologiesMissingAssignments; + private Map topologyMap; + private Collection existingSupervisors; + private final String sampleTopologyId = "test-topology1-65-1442255385"; + private final String sampleHost = "host1.east"; + private final int samplePort = 3100; + + + + private TopologyDetails constructTopologyDetails(String topologyName, int numWorkers) { + Map topologyConf1 = new HashMap<>(); + + StormTopology stormTopology = new StormTopology(); + TopologyDetails topologyDetails= new TopologyDetails(topologyName, topologyConf1, stormTopology, numWorkers); + + return topologyDetails; + } + + + private Cluster getSpyCluster() { + Map supervisors = new HashMap<>(); + Map assignmentMap = new HashMap<>(); + + for (SupervisorDetails supervisorDetails : existingSupervisors) { + String nodeId = supervisorDetails.getHost(); + supervisors.put(nodeId, supervisorDetails); + } + + return spy(new Cluster(null, supervisors, assignmentMap)); + } + + private void initializeMesosWorkerSlotMap(List mesosWorkerSlots) { + mesosWorkerSlotMap = (HashMap) (Whitebox.getInternalState(defaultScheduler, "mesosWorkerSlotMap")); + for (MesosWorkerSlot mesosWorkerSlot: mesosWorkerSlots) { + mesosWorkerSlotMap.put(String.format("%s:%s", mesosWorkerSlot.getNodeId(), mesosWorkerSlot.getPort()), mesosWorkerSlot); + } + } + + private List getWorkerSlotFromMesosWorkerSlot(List mesosWorkerSlotList) { + List workerSlotList = new ArrayList<>(); + for (WorkerSlot mesosWorkerSlot : mesosWorkerSlotList) { + workerSlotList.add(new WorkerSlot(mesosWorkerSlot.getNodeId(), mesosWorkerSlot.getPort())); + } + return workerSlotList; + } + + private Set generateExecutorDetailsSet(int count) { + Set executorsToAssign = new HashSet<>(); + for (int i=0; i < count; i++) { + executorsToAssign.add(new ExecutorDetails(i, i)); + } + return executorsToAssign; + } + + private List generateMesosWorkerSlots(int count) { + List mesosWorkerSlots = new ArrayList<>(); + + for (int i=0; i mesosWorkerSlots = this.generateMesosWorkerSlots(numWorkers); + initializeMesosWorkerSlotMap(mesosWorkerSlots); + + Set executorsToAssign = this.generateExecutorDetailsSet(numExecutors); + List workerSlotList = getWorkerSlotFromMesosWorkerSlot(mesosWorkerSlots); + topologyMap.put(sampleTopologyId, TestUtils.constructTopologyDetails(sampleTopologyId, numWorkers, 0.1, 100)); + this.topologies = new Topologies(topologyMap); + + doReturn(workerSlotList).when(spyCluster).getAvailableSlots(); + doReturn(executorsToAssign).when(spyCluster).getUnassignedExecutors(any(TopologyDetails.class)); + + return spyCluster; + } + + private Map> getworkerSlotExecutorDetailsMap(Map executorDetailsWorkerSlotMap) { + Map> workerSlotExecutorDetailsMap = new HashMap<>(); + + for (ExecutorDetails executorDetails : executorDetailsWorkerSlotMap.keySet()) { + WorkerSlot workerSlot = executorDetailsWorkerSlotMap.get(executorDetails); + if (!workerSlotExecutorDetailsMap.containsKey(workerSlot)) { + workerSlotExecutorDetailsMap.put(workerSlot, new ArrayList()); + } + workerSlotExecutorDetailsMap.get(workerSlot).add(executorDetails); + } + return workerSlotExecutorDetailsMap; + } + + @Before + public void initialize() { + defaultScheduler = new DefaultScheduler(); + Map mesosStormConf = new HashMap<>(); + defaultScheduler.prepare(mesosStormConf); + + rotatingMap = new RotatingMap<>(2); + + topologiesMissingAssignments = new HashSet<>(); + topologiesMissingAssignments.add("test-topology1-65-1442255385"); + topologiesMissingAssignments.add("test-topology1-65-1442255385"); + + existingSupervisors = new ArrayList<>(); + existingSupervisors.add(new SupervisorDetails(MesosCommon.supervisorId(sampleHost, "test-topology1-65-1442255385"), sampleHost, null, null)); + existingSupervisors.add(new SupervisorDetails(MesosCommon.supervisorId(sampleHost, "test-topology10-65-1442255385"), sampleHost, null, null)); + + topologyMap = new HashMap<>(); + topologyMap.put(sampleTopologyId, TestUtils.constructTopologyDetails(sampleTopologyId, 1, 0.1, 100)); + topologies = new Topologies(topologyMap); + + mesosWorkerSlotMap = new HashMap<>(); + } + + @Test + public void testAllSlotsAvailableForSchedulingWithOneOffer() { + List workerSlotsAvailableForScheduling; + + /* Offer with no ports but enough memory and cpu*/ + Offer offer = buildOffer("offer1", sampleHost, 10, 20000); + rotatingMap.put(offer.getId(), offer); + workerSlotsAvailableForScheduling = defaultScheduler.allSlotsAvailableForScheduling(rotatingMap, existingSupervisors, topologies, topologiesMissingAssignments); + assertEquals(workerSlotsAvailableForScheduling.size(), 0); + + + /* Offer with no cpu but enough ports and cpu */ + offer = buildOfferWithPorts("offer1", sampleHost, 0.0, 1000, samplePort, samplePort + 1); + rotatingMap.put(offer.getId(), offer); + workerSlotsAvailableForScheduling = defaultScheduler.allSlotsAvailableForScheduling(rotatingMap, existingSupervisors, topologies, topologiesMissingAssignments); + assertEquals(workerSlotsAvailableForScheduling.size(),0); + + + /* Offer with no memory but enough ports and cpu */ + offer = buildOfferWithPorts("offer1", sampleHost, 0.0, 1000, samplePort, samplePort + 1); + rotatingMap.put(offer.getId(), offer); + workerSlotsAvailableForScheduling = defaultScheduler.allSlotsAvailableForScheduling(rotatingMap, existingSupervisors, topologies, topologiesMissingAssignments); + assertEquals(workerSlotsAvailableForScheduling.size(),0); + + + /* Offer with just enough ports, memory and cpu */ + /* Case 1 - Supervisor exists for topology test-topology1-65-1442255385 on the host */ + offer = buildOfferWithPorts("offer1", sampleHost, 0.1, 200, samplePort, samplePort + 1); + rotatingMap.put(offer.getId(), offer); + workerSlotsAvailableForScheduling = defaultScheduler.allSlotsAvailableForScheduling(rotatingMap, existingSupervisors, topologies, + topologiesMissingAssignments); + assertEquals(workerSlotsAvailableForScheduling.size(), 1); + + /* Case 2 - Supervisor does not exists for topology test-topology1-65-1442255385 on the host */ + offer = buildOfferWithPorts("offer1", "host-without-supervisor.east", 0.1, 200, samplePort, samplePort + 1); + rotatingMap.put(offer.getId(), offer); + workerSlotsAvailableForScheduling = defaultScheduler.allSlotsAvailableForScheduling(rotatingMap, existingSupervisors, topologies, + topologiesMissingAssignments); + assertEquals(workerSlotsAvailableForScheduling.size(), 0); + + /* Case 3 - Supervisor exists for topology test-topology1-65-1442255385 on the host & offer has additional resources for supervisor */ + offer = buildOfferWithPorts("offer1", "host-without-supervisor.east", 0.1 + MesosCommon.DEFAULT_EXECUTOR_CPU, 200 + MesosCommon.DEFAULT_EXECUTOR_MEM_MB, + 3100, 3101); + rotatingMap.put(offer.getId(), offer); + workerSlotsAvailableForScheduling = defaultScheduler.allSlotsAvailableForScheduling(rotatingMap, existingSupervisors, topologies, + topologiesMissingAssignments); + assertEquals(workerSlotsAvailableForScheduling.size(), 1); + + /* Test default values for worker cpu and memory - This is to make sure that we account for default worker cpu and memory when the user does not pass MesosCommon.DEFAULT_WORKER_CPU && MesosCommon.DEFAULT_WORKER_MEM */ + offer = buildOfferWithPorts("offer1", "host-without-supervisor.east", MesosCommon.DEFAULT_WORKER_CPU + MesosCommon.DEFAULT_EXECUTOR_CPU, + MesosCommon.DEFAULT_WORKER_MEM_MB + MesosCommon.DEFAULT_EXECUTOR_MEM_MB, samplePort, samplePort + 1); + rotatingMap.put(offer.getId(), offer); + TopologyDetails topologyDetails = constructTopologyDetails(sampleTopologyId, 1); + topologyMap.put(sampleTopologyId, topologyDetails); + defaultScheduler.prepare(topologyDetails.getConf()); + workerSlotsAvailableForScheduling = defaultScheduler.allSlotsAvailableForScheduling(rotatingMap, existingSupervisors, new Topologies(topologyMap), + topologiesMissingAssignments); + assertEquals(workerSlotsAvailableForScheduling.size(), 1); + + + /* More than 1 worker slot is required - Plenty of memory & cpu is available, only two ports are available */ + offer = buildOfferWithPorts("offer1", "host-without-supervisor.east", 10 * MesosCommon.DEFAULT_WORKER_CPU + MesosCommon.DEFAULT_EXECUTOR_CPU, + 10 * MesosCommon.DEFAULT_WORKER_MEM_MB + MesosCommon.DEFAULT_EXECUTOR_MEM_MB, samplePort, samplePort + 1); + rotatingMap.put(offer.getId(), offer); + topologyDetails = constructTopologyDetails(sampleTopologyId, 10); + topologyMap.put(sampleTopologyId, topologyDetails); + defaultScheduler.prepare(topologyDetails.getConf()); + workerSlotsAvailableForScheduling = defaultScheduler.allSlotsAvailableForScheduling(rotatingMap, existingSupervisors, new Topologies(topologyMap), + topologiesMissingAssignments); + assertEquals(workerSlotsAvailableForScheduling.size(), 2); + + /* More than 1 worker slot is required - Plenty of ports & cpu is available, but memory is available for only two workers */ + offer = buildOfferWithPorts("offer1", "host-without-supervisor.east", 10 * MesosCommon.DEFAULT_WORKER_CPU + MesosCommon.DEFAULT_EXECUTOR_CPU, + 2 * MesosCommon.DEFAULT_WORKER_MEM_MB + MesosCommon.DEFAULT_EXECUTOR_MEM_MB, samplePort, samplePort + 1); + rotatingMap.put(offer.getId(), offer); + topologyDetails = constructTopologyDetails(sampleTopologyId, 10); + topologyMap.put(sampleTopologyId, topologyDetails); + defaultScheduler.prepare(topologyDetails.getConf()); + workerSlotsAvailableForScheduling = defaultScheduler.allSlotsAvailableForScheduling(rotatingMap, existingSupervisors, new Topologies(topologyMap), + topologiesMissingAssignments); + assertEquals(workerSlotsAvailableForScheduling.size(), 2); + + /* More than 1 worker slot is required - Plenty of ports & memory are available, but cpu is available for only two workers */ + offer = buildOfferWithPorts("offer1", "host-without-supervisor.east", 2 * MesosCommon.DEFAULT_WORKER_CPU + MesosCommon.DEFAULT_EXECUTOR_CPU, + 10 * MesosCommon.DEFAULT_WORKER_MEM_MB + MesosCommon.DEFAULT_EXECUTOR_MEM_MB, samplePort, samplePort + 100); + rotatingMap.put(offer.getId(), offer); + topologyDetails = constructTopologyDetails(sampleTopologyId, 10); + topologyMap.put(sampleTopologyId, topologyDetails); + defaultScheduler.prepare(topologyDetails.getConf()); + workerSlotsAvailableForScheduling = defaultScheduler.allSlotsAvailableForScheduling(rotatingMap, existingSupervisors, new Topologies(topologyMap), + topologiesMissingAssignments); + assertEquals(workerSlotsAvailableForScheduling.size(), 2); + + /* 10 worker slots are required - Plenty of cpu, memory & ports are available */ + offer = buildOfferWithPorts("offer1", "host-without-supervisor.east", 20 * MesosCommon.DEFAULT_WORKER_CPU + MesosCommon.DEFAULT_EXECUTOR_CPU, + 20 * MesosCommon.DEFAULT_WORKER_MEM_MB + MesosCommon.DEFAULT_EXECUTOR_MEM_MB, samplePort, samplePort + 100); + rotatingMap.put(offer.getId(), offer); + topologyDetails = constructTopologyDetails(sampleTopologyId, 10); + topologyMap.put(sampleTopologyId, topologyDetails); + defaultScheduler.prepare(topologyDetails.getConf()); + workerSlotsAvailableForScheduling = defaultScheduler.allSlotsAvailableForScheduling(rotatingMap, existingSupervisors, new Topologies(topologyMap), + topologiesMissingAssignments); + assertEquals(workerSlotsAvailableForScheduling.size(), 10); + } + + @Test + public void testAllSlotsAvailableForSchedulingWithMultipleOffers() { + List workerSlotsAvailableForScheduling; + Offer offer; + TopologyDetails topologyDetails; + + /* 10 worker slots are available but offers are fragmented on one host */ + offer = buildOffer("offer1", sampleHost, 0, 1000); + rotatingMap.put(offer.getId(), offer); + offer = buildOffer("offer2", sampleHost, 10, 0); + rotatingMap.put(offer.getId(), offer); + String sampleHost2 = "host1.west"; + offer = buildOffer("offer3", sampleHost2, 0.01, 1000); + rotatingMap.put(offer.getId(), offer); + offer = buildOffer("offer4", sampleHost2, 0.1, 9000); + rotatingMap.put(offer.getId(), offer); + offer = buildOffer("offer5", sampleHost2, 0.91, 9000); + rotatingMap.put(offer.getId(), offer); + offer = buildOfferWithPorts("offer6", sampleHost2, 5 * MesosCommon.DEFAULT_WORKER_CPU + MesosCommon.DEFAULT_EXECUTOR_CPU, + 5 * MesosCommon.DEFAULT_WORKER_MEM_MB + MesosCommon.DEFAULT_EXECUTOR_MEM_MB, samplePort, samplePort + 5); + rotatingMap.put(offer.getId(), offer); + + topologyMap.clear(); + topologyDetails = constructTopologyDetails(sampleTopologyId, 10); + topologyMap.put(sampleTopologyId, topologyDetails); + defaultScheduler.prepare(topologyDetails.getConf()); + + // Increase available cpu by a tiny fraction in order + offer = buildOfferWithPorts("offer6", sampleHost, 5 * MesosCommon.DEFAULT_WORKER_CPU + 1.1 * MesosCommon.DEFAULT_EXECUTOR_CPU, + 5 * MesosCommon.DEFAULT_WORKER_MEM_MB + MesosCommon.DEFAULT_EXECUTOR_MEM_MB, samplePort, samplePort + 5); + rotatingMap.put(offer.getId(), offer); + + topologyMap.clear(); + topologyDetails = constructTopologyDetails(sampleTopologyId, 10); + topologyMap.put(sampleTopologyId, topologyDetails); + defaultScheduler.prepare(topologyDetails.getConf()); + + workerSlotsAvailableForScheduling = defaultScheduler.allSlotsAvailableForScheduling(rotatingMap, existingSupervisors, new Topologies(topologyMap), + topologiesMissingAssignments); + assertEquals(workerSlotsAvailableForScheduling.size(), 5); // Note that by increasing the executor cpu by a fraction, we are able to get 5 worker slots as we expect + + + topologyMap.clear(); + topologyDetails = constructTopologyDetails(sampleTopologyId, 10); + topologyMap.put(sampleTopologyId, topologyDetails); + defaultScheduler.prepare(topologyDetails.getConf()); + + workerSlotsAvailableForScheduling = defaultScheduler.allSlotsAvailableForScheduling(rotatingMap, existingSupervisors, new Topologies(topologyMap), + topologiesMissingAssignments); + assertEquals(workerSlotsAvailableForScheduling.size(), 5); + + offer = buildOfferWithPorts("offer7", "host2.east", 3 * MesosCommon.DEFAULT_WORKER_CPU + MesosCommon.DEFAULT_EXECUTOR_CPU, + 3 * MesosCommon.DEFAULT_WORKER_MEM_MB + MesosCommon.DEFAULT_EXECUTOR_MEM_MB, samplePort, samplePort + 5); + rotatingMap.put(offer.getId(), offer); + + offer = buildOfferWithPorts("offer8", "host3.east", 100 * MesosCommon.DEFAULT_WORKER_CPU + MesosCommon.DEFAULT_EXECUTOR_CPU, + 100 * MesosCommon.DEFAULT_WORKER_MEM_MB + MesosCommon.DEFAULT_EXECUTOR_MEM_MB, samplePort, samplePort + 10); + rotatingMap.put(offer.getId(), offer); + + workerSlotsAvailableForScheduling = defaultScheduler.allSlotsAvailableForScheduling(rotatingMap, existingSupervisors, + new Topologies(topologyMap), topologiesMissingAssignments); + + assertEquals(workerSlotsAvailableForScheduling.size(), 10); + + // Make sure that the obtained worker slots are evenly spread across the available resources + Map workerCountPerHostMap = new HashMap<>(); + + for (WorkerSlot workerSlot : workerSlotsAvailableForScheduling) { + Integer tmp = workerCountPerHostMap.get(workerSlot.getNodeId()); + if (tmp == null) { + workerCountPerHostMap.put(workerSlot.getNodeId(), 1); + continue; + } + workerCountPerHostMap.put(workerSlot.getNodeId(), tmp + 1); + } + + List expectedWorkerCountArray = Arrays.asList(3, 3, 4); + List actualWorkerCountArray = Arrays.asList( + workerCountPerHostMap.get("host1.east"), + workerCountPerHostMap.get("host2.east"), + workerCountPerHostMap.get("host3.east")); + + Collections.sort(actualWorkerCountArray); + assertEquals(expectedWorkerCountArray, actualWorkerCountArray); + } + + @Test + public void testScheduleWithOneWorkerSlot() { + Cluster spyCluster = getSpyCluster(); + + List mesosWorkerSlots = this.generateMesosWorkerSlots(1); + initializeMesosWorkerSlotMap(mesosWorkerSlots); + + Set executorsToAssign = this.generateExecutorDetailsSet(4); + List workerSlotList = getWorkerSlotFromMesosWorkerSlot(mesosWorkerSlots); + + doReturn(workerSlotList).when(spyCluster).getAvailableSlots(); + doReturn(executorsToAssign).when(spyCluster).getUnassignedExecutors(any(TopologyDetails.class)); + + defaultScheduler.schedule(topologies, spyCluster); + + Set assignedExecutors = spyCluster.getAssignmentById(sampleTopologyId).getExecutors(); + assertEquals(executorsToAssign, assignedExecutors); + } + + @Test + public void testScheduleWithMultipleSlotsOnSameHost() { + Cluster spyCluster = this.getSpyCluster(3, 3); + defaultScheduler.schedule(topologies, spyCluster); + SchedulerAssignment schedulerAssignment = spyCluster.getAssignments() + .get(sampleTopologyId); + Map executorDetailsWorkerSlotMap = schedulerAssignment.getExecutorToSlot(); + /* We expect the three unassigned executors to be spread + across the three available worker slots */ + assertEquals(executorDetailsWorkerSlotMap.keySet().size(), 3); + assertEquals(executorDetailsWorkerSlotMap.values().size(), 3); + + spyCluster = this.getSpyCluster(3, 6); + defaultScheduler.schedule(topologies, spyCluster); + executorDetailsWorkerSlotMap = spyCluster.getAssignments() + .get(sampleTopologyId) + .getExecutorToSlot(); + /* We expect all executors to be scheduled across the three + available slots */ + assertEquals(executorDetailsWorkerSlotMap.keySet().size(), 6); + int workerSlotsUsed = new HashSet<>(executorDetailsWorkerSlotMap.values()).size() ; + assertEquals(workerSlotsUsed, 3); + + /* Lets make sure that the executors are evenly spread + across the worker slots in a round robin fashion */ + schedulerAssignment = spyCluster.getAssignments() + .get(sampleTopologyId); + executorDetailsWorkerSlotMap = schedulerAssignment.getExecutorToSlot(); + + Map> workerSlotExecutorDetailsMap = this.getworkerSlotExecutorDetailsMap(executorDetailsWorkerSlotMap); + + for (WorkerSlot workerSlot : workerSlotExecutorDetailsMap.keySet()) { + List executorDetails = workerSlotExecutorDetailsMap.get(workerSlot); + assertEquals(3, Math.abs(executorDetails.get(0).getStartTask() - executorDetails.get(1).getEndTask())); + } + } +} diff --git a/storm/src/test/storm/mesos/schedulers/OfferResourcesTest.java b/storm/src/test/storm/mesos/schedulers/OfferResourcesTest.java new file mode 100644 index 000000000..9140ef695 --- /dev/null +++ b/storm/src/test/storm/mesos/schedulers/OfferResourcesTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.mesos.schedulers; + +import org.apache.mesos.Protos.Offer; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; + +import static org.junit.Assert.assertEquals; +import static storm.mesos.TestUtils.buildOffer; +import static storm.mesos.TestUtils.buildOfferWithPorts; +import static storm.mesos.TestUtils.buildOfferWithReservation; + +@RunWith(MockitoJUnitRunner.class) +public class OfferResourcesTest { + + private final String sampleHost = "host1.east"; + + @Test + public void testOfferResources() throws Exception { + Offer offer = buildOfferWithReservation("offer1", sampleHost, 2, 1000, 6, 1000); + OfferResources offerResources = new OfferResources(offer); + assertEquals(8, offerResources.getCpu(), 0.0f); + assertEquals(2000, offerResources.getMem(), 0.0f); + assertEquals(sampleHost, offerResources.getHostName()); + + offer = buildOffer("offer1", sampleHost, 2.0, 2.0); + offerResources = new OfferResources(offer); + assertEquals(2, offerResources.getCpu(), 0.0); + assertEquals(2, offerResources.getMem(), 0.0); + assertEquals(sampleHost, offerResources.getHostName()); + + offer = buildOfferWithPorts("offer1", sampleHost, 2.0, 2000, 3000, 3100); + offerResources = new OfferResources(offer); + assertEquals(2, offerResources.getCpu(), 0.0); + assertEquals(2000, offerResources.getMem(), 0.0); + assertEquals(true, offerResources.hasPort()); + + offerResources.decCpu(1); + offerResources.decMem(1000); + assertEquals(1, offerResources.getCpu(), 0.0); + assertEquals(1000, offerResources.getMem(), 0.0); + assertEquals(3000, offerResources.getPort()); + assertEquals(3001, offerResources.getPort()); + } +} diff --git a/storm/src/test/storm/mesos/schedulers/SchedulerUtilsTest.java b/storm/src/test/storm/mesos/schedulers/SchedulerUtilsTest.java new file mode 100644 index 000000000..9b60b2abc --- /dev/null +++ b/storm/src/test/storm/mesos/schedulers/SchedulerUtilsTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.mesos.schedulers; + +import backtype.storm.scheduler.SupervisorDetails; +import org.apache.mesos.Protos.Offer; +import org.apache.mesos.Protos.OfferID; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.runners.MockitoJUnitRunner; +import storm.mesos.util.MesosCommon; +import storm.mesos.util.RotatingMap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static storm.mesos.TestUtils.buildOffer; + +@RunWith(MockitoJUnitRunner.class) +public class SchedulerUtilsTest { + + RotatingMap rotatingMap; + private final String sampleHost = "host1.east"; + + @Before + public void initialize() { + rotatingMap = new RotatingMap<>(2); + } + + private void buildOfferAndUpdateRotatingMap(String offerId, String hostName, double cpus, double memory) { + Offer offer = buildOffer(offerId, hostName, cpus, memory); + rotatingMap.put(offer.getId(), offer); + } + + @Test + public void testSupervisorExists() throws Exception { + Collection existingSupervisors = new ArrayList<>(); + String hostName = "host1.east"; + + existingSupervisors.add(new SupervisorDetails(MesosCommon.supervisorId(hostName, "test-topology1-65-1442255385"), hostName, null)); + existingSupervisors.add(new SupervisorDetails(MesosCommon.supervisorId(hostName, "test-topology10-65-1442255385"), hostName, null)); + + assertEquals(true, SchedulerUtils.supervisorExists(hostName, existingSupervisors, "test-topology1-65-1442255385")); + assertEquals(false, SchedulerUtils.supervisorExists(hostName, existingSupervisors, "test-topology2-65-1442255385")); + } + + @Test + public void testGetOfferResourcesListPerNode() { + String hostName = sampleHost; + + buildOfferAndUpdateRotatingMap("offer1", hostName, 0, 1000); + buildOfferAndUpdateRotatingMap("offer2", hostName, 10, 0); + buildOfferAndUpdateRotatingMap("offer3", hostName, 0, 100.01); + buildOfferAndUpdateRotatingMap("offer4", hostName, 1.001, 0); + buildOfferAndUpdateRotatingMap("offer5", hostName, 0, 0.001); + buildOfferAndUpdateRotatingMap("offer6", hostName, 0.001, 0.01); + + Map> offerResourcesMap = SchedulerUtils.getOfferResourcesListPerNode(rotatingMap); + assertEquals(offerResourcesMap.size(), 1); + + List offerResources = offerResourcesMap.get("host1.east"); + assertEquals(offerResources.size(), 6); + + hostName = "host1.west"; + buildOfferAndUpdateRotatingMap("offer7", hostName, 0, 1000); + buildOfferAndUpdateRotatingMap("offer8", hostName, 10, 0); + + offerResourcesMap = SchedulerUtils.getOfferResourcesListPerNode(rotatingMap); + assertEquals(offerResourcesMap.size(), 2); + + offerResources = offerResourcesMap.get("host1.east"); + assertEquals(offerResources.size(), 6); + + offerResources = offerResourcesMap.get("host1.west"); + assertEquals(offerResources.size(), 2); + } +}