Skip to content

Commit

Permalink
Introducing new scheduler framework & more unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Karthick Duraisamy Soundararaj committed Mar 23, 2016
1 parent 2585500 commit cdd7264
Show file tree
Hide file tree
Showing 19 changed files with 1,550 additions and 464 deletions.
7 changes: 7 additions & 0 deletions storm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.mesos</groupId>
<artifactId>${shim}</artifactId>
Expand All @@ -198,5 +204,6 @@
<artifactId>storm-shim</artifactId>
<version>${project.parent.version}</version>
</dependency>

</dependencies>
</project>
4 changes: 2 additions & 2 deletions storm/src/main/storm/mesos/LaunchTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import org.apache.mesos.Protos;

import static storm.mesos.PrettyProtobuf.offerToString;
import static storm.mesos.PrettyProtobuf.taskInfoToString;
import static storm.mesos.util.PrettyProtobuf.offerToString;
import static storm.mesos.util.PrettyProtobuf.taskInfoToString;

class LaunchTask {
private final Protos.TaskInfo task;
Expand Down
145 changes: 20 additions & 125 deletions storm/src/main/storm/mesos/MesosNimbus.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import storm.mesos.schedulers.DefaultScheduler;
import storm.mesos.schedulers.IMesosStormScheduler;
import storm.mesos.shims.CommandLineShimFactory;
import storm.mesos.shims.ICommandLineShim;
import storm.mesos.shims.LocalStateShim;
import storm.mesos.util.MesosCommon;
import storm.mesos.util.RotatingMap;

import java.io.File;
import java.io.FileInputStream;
Expand Down Expand Up @@ -82,8 +86,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static storm.mesos.PrettyProtobuf.offerMapToString;
import static storm.mesos.PrettyProtobuf.offerToString;
import static storm.mesos.util.PrettyProtobuf.offerMapToString;

public class MesosNimbus implements INimbus {
public static final String CONF_EXECUTOR_URI = "mesos.executor.uri";
Expand Down Expand Up @@ -120,6 +123,8 @@ public class MesosNimbus implements INimbus {
private Map<TaskID, Offer> _usedOffers;
private ScheduledExecutorService timerScheduler =
Executors.newScheduledThreadPool(1);
private IMesosStormScheduler _mesosStormScheduler = null;

private boolean _preferReservedResources = true;
private Optional<String> _container = Optional.absent();
private Path _generatedConfPath;
Expand All @@ -132,6 +137,10 @@ private static Set listIntoSet(List l) {
}
}

public MesosNimbus() {
this._mesosStormScheduler = new DefaultScheduler();
}

public static void main(String[] args) {
backtype.storm.daemon.nimbus.launch(new MesosNimbus());
}
Expand Down Expand Up @@ -318,56 +327,14 @@ protected MesosSchedulerDriver createMesosDriver() throws IOException {
FrameworkInfo.Builder finfo = createFrameworkBuilder();

if ((credential = getCredential(finfo)) != null) {
driver = new MesosSchedulerDriver(_scheduler,
finfo.build(),
(String) _conf.get(CONF_MASTER_URL),
credential);
driver = new MesosSchedulerDriver(_scheduler, finfo.build(), (String) _conf.get(CONF_MASTER_URL), credential);
} else {
driver = new MesosSchedulerDriver(_scheduler,
finfo.build(),
(String) _conf.get(CONF_MASTER_URL));
driver = new MesosSchedulerDriver(_scheduler, finfo.build(), (String) _conf.get(CONF_MASTER_URL));
}

return driver;
}

protected OfferResources getResources(Offer offer, double executorCpu, double executorMem, double cpu, double mem) {
OfferResources resources = new OfferResources();

double offerCpu = 0;
double offerMem = 0;

for (Resource r : offer.getResourcesList()) {
if (r.hasReservation()) {
// skip resources with dynamic reservations
continue;
}
if (r.getType() == Type.SCALAR) {
if (r.getName().equals("cpus")) {
offerCpu += r.getScalar().getValue();
} else if (r.getName().equals("mem")) {
offerMem += r.getScalar().getValue();
}
}
}

if (offerCpu >= executorCpu + cpu &&
offerMem >= executorMem + mem) {
resources.cpuSlots = (int) Math.floor((offerCpu - executorCpu) / cpu);
resources.memSlots = (int) Math.floor((offerMem - executorMem) / mem);
}

int maxPorts = Math.min(resources.cpuSlots, resources.memSlots);

List<Integer> portList = new ArrayList<>();
collectPorts(offer.getResourcesList(), portList, maxPorts);
resources.ports.addAll(portList);

LOG.debug("Offer: " + offerToString(offer));
LOG.debug("Extracted resources: {}", resources.toString());
return resources;
}

private void collectPorts(List<Resource> offers, List<Integer> portList, int maxPorts) {
for (Resource r : offers) {
if (r.getName().equals("ports")) {
Expand All @@ -389,26 +356,6 @@ private void collectPorts(List<Resource> offers, List<Integer> portList, int max
}
}

private List<WorkerSlot> toSlots(Offer offer, double cpu, double mem, boolean supervisorExists) {
double executorCpuDemand = supervisorExists ? 0 : MesosCommon.executorCpu(_conf);
double executorMemDemand = supervisorExists ? 0 : MesosCommon.executorMem(_conf);

OfferResources resources = getResources(
offer,
executorCpuDemand,
executorMemDemand,
cpu,
mem);

List<WorkerSlot> ret = new ArrayList<WorkerSlot>();
int availableSlots = Math.min(resources.cpuSlots, resources.memSlots);
availableSlots = Math.min(availableSlots, resources.ports.size());
for (int i = 0; i < availableSlots; i++) {
ret.add(new WorkerSlot(offer.getHostname(), resources.ports.get(i)));
}
return ret;
}

/**
* Method checks if all topologies that need assignment already have supervisor running on the node where the Offer
* comes from. Required for more accurate available resource calculation where we can exclude supervisor's demand from
Expand Down Expand Up @@ -444,69 +391,17 @@ public boolean isHostAccepted(String hostname) {
(_disallowedHosts != null && !_disallowedHosts.contains(hostname));
}


@Override
public Collection<WorkerSlot> allSlotsAvailableForScheduling(
Collection<SupervisorDetails> existingSupervisors, Topologies topologies, Set<String> topologiesMissingAssignments) {
Collection<SupervisorDetails> existingSupervisors, Topologies topologies, Set<String> topologiesMissingAssignments) {
synchronized (_offersLock) {
LOG.debug("allSlotsAvailableForScheduling: Currently have {} offers buffered {}",
_offers.size(), (_offers.size() > 0 ? (":" + offerMapToString(_offers)) : ""));
if (!topologiesMissingAssignments.isEmpty()) {
LOG.info("Topologies that need assignments: {}", topologiesMissingAssignments.toString());

// Revive any filtered offers
_driver.reviveOffers();
} else {
LOG.info("Declining offers because no topologies need assignments");
_offers.clear();
return new ArrayList<>();
}
}

Double cpu = null;
Double mem = null;
// TODO: maybe this isn't the best approach. if a topology raises #cpus keeps failing,
// it will mess up scheduling on this cluster permanently
for (String id : topologiesMissingAssignments) {
TopologyDetails details = topologies.getById(id);
double tcpu = MesosCommon.topologyWorkerCpu(_conf, details);
double tmem = MesosCommon.topologyWorkerMem(_conf, details);
if (cpu == null || tcpu > cpu) {
cpu = tcpu;
}
if (mem == null || tmem > mem) {
mem = tmem;
}
}

LOG.info("allSlotsAvailableForScheduling: pending topologies' max resource requirements per worker: cpu: {} & mem: {}",
String.valueOf(cpu), String.valueOf(mem));

List<WorkerSlot> allSlots = new ArrayList<>();

if (cpu != null && mem != null) {
synchronized (_offersLock) {
for (Offer offer : _offers.newestValues()) {
boolean supervisorExists = supervisorExists(offer, existingSupervisors, topologiesMissingAssignments);
List<WorkerSlot> offerSlots = toSlots(offer, cpu, mem, supervisorExists);
if (offerSlots.isEmpty()) {
_offers.clearKey(offer.getId());
LOG.debug("Declining offer `{}' because it wasn't usable to create a slot which fits largest " +
"pending topologies' aggregate needs (max cpu: {} max mem: {})",
offerToString(offer), String.valueOf(cpu), String.valueOf(mem));
} else {
allSlots.addAll(offerSlots);
}
}
}
}

LOG.info("Number of available slots: {}", allSlots.size());
if (LOG.isDebugEnabled()) {
for (WorkerSlot slot : allSlots) {
LOG.debug("available slot: {}", slot);
}
return _mesosStormScheduler.allSlotsAvailableForScheduling(
_offers,
existingSupervisors,
topologies,
topologiesMissingAssignments);
}
return allSlots;
}

private OfferID findOffer(WorkerSlot worker) {
Expand Down
2 changes: 2 additions & 0 deletions storm/src/main/storm/mesos/MesosSupervisor.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import backtype.storm.scheduler.ISupervisor;
import backtype.storm.utils.Utils;
import clojure.lang.PersistentVector;

import org.apache.mesos.Executor;
import org.apache.mesos.ExecutorDriver;
import org.apache.mesos.MesosExecutorDriver;
Expand All @@ -37,6 +38,7 @@
import storm.mesos.logviewer.LogViewerController;
import storm.mesos.shims.ILocalStateShim;
import storm.mesos.shims.LocalStateShim;
import storm.mesos.util.MesosCommon;

import java.io.IOException;
import java.util.Collection;
Expand Down
6 changes: 3 additions & 3 deletions storm/src/main/storm/mesos/NimbusScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
*
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -33,7 +33,7 @@
import java.util.List;
import java.util.concurrent.CountDownLatch;

import static storm.mesos.PrettyProtobuf.taskStatusToString;
import static storm.mesos.util.PrettyProtobuf.taskStatusToString;

public class NimbusScheduler implements Scheduler {
private MesosNimbus mesosNimbus;
Expand Down
Loading

0 comments on commit cdd7264

Please sign in to comment.