Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate offers from the same host into OfferResources. #111

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
433 changes: 148 additions & 285 deletions storm/src/main/storm/mesos/MesosNimbus.java

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion storm/src/main/storm/mesos/MesosSupervisor.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void killedWorker(int port) {
}

protected boolean startLogViewer(Map conf) {
return MesosCommon.startLogViewer(conf);
return MesosCommon.autoStartLogViewer(conf);
}

class StormExecutor implements Executor {
Expand Down
48 changes: 16 additions & 32 deletions storm/src/main/storm/mesos/schedulers/DefaultScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,33 +74,18 @@ public List<WorkerSlot> allSlotsAvailableForScheduling(RotatingMap<Protos.OfferI
Collection<SupervisorDetails> existingSupervisors,
Topologies topologies, Set<String> topologiesMissingAssignments) {
if (topologiesMissingAssignments.isEmpty()) {
// TODO(ksoundararaj): Should we rotate irrespective of whether or not the topologiesMissingAssignments is empty?
offers.rotate();
log.info("Declining all offers that are currently buffered because no topologies need assignments");
offers.clear();
return new ArrayList<>();
}

log.info("Topologies that need assignments: " + topologiesMissingAssignments.toString());

// Decline those offers that cannot be used for any of the topologies that need assignments.
for (Protos.Offer offer : offers.newestValues()) {
boolean isOfferUseful = false;
for (String currentTopology : topologiesMissingAssignments) {
boolean supervisorExists = SchedulerUtils.supervisorExists(offer.getHostname(), existingSupervisors, currentTopology);
TopologyDetails topologyDetails = topologies.getById(currentTopology);
if (SchedulerUtils.isFit(mesosStormConf, offer, topologyDetails, supervisorExists)) {
isOfferUseful = true;
break;
}
}
if (!isOfferUseful) {
log.info("Declining Offer " + offer.getId().getValue() + " because it does not fit any of the topologies that need assignments");
offers.clearKey(offer.getId());
}
}

List<WorkerSlot> allSlots = new ArrayList<>();

Map<String, List<OfferResources>> offerResourcesListPerNode = SchedulerUtils.getOfferResourcesListPerNode(offers);
Map<String, OfferResources> offerResourcesPerNode = MesosCommon.getConsolidatedOfferResourcesPerNode(mesosStormConf, offers);

for (String currentTopology : topologiesMissingAssignments) {
TopologyDetails topologyDetails = topologies.getById(currentTopology);
Expand All @@ -115,46 +100,45 @@ public List<WorkerSlot> allSlotsAvailableForScheduling(RotatingMap<Protos.OfferI
}

Set<String> nodesWithExistingSupervisors = new HashSet<>();
for (String currentNode : offerResourcesListPerNode.keySet()) {
for (String currentNode : offerResourcesPerNode.keySet()) {
if (SchedulerUtils.supervisorExists(currentNode, existingSupervisors, currentTopology)) {
nodesWithExistingSupervisors.add(currentNode);
}
}

//TODO(ksoundararaj) : Give priority to static reservations before scheduling workers across nodes
Boolean slotFound;
do {
slotFound = false;
for (String currentNode : offerResourcesListPerNode.keySet()) {
boolean supervisorExists = nodesWithExistingSupervisors.contains(currentNode);
if (slotsNeeded == 0) {
break;
}

for (OfferResources resources : offerResourcesListPerNode.get(currentNode)) {
boolean isFit = SchedulerUtils.isFit(mesosStormConf, resources, topologyDetails, supervisorExists);
for (String currentNode : offerResourcesPerNode.keySet()) {
for (OfferResources offerResources : offerResourcesPerNode.values()) {
if (slotsNeeded == 0) {
break;
}
boolean supervisorExists = nodesWithExistingSupervisors.contains(currentNode);
boolean isFit = SchedulerUtils.isFit(mesosStormConf, offerResources, topologyDetails, supervisorExists);
if (isFit) {
log.info(resources.toString() + " is a fit for " +
log.info(offerResources.toString() + " is a fit for " +
topologyDetails.getId() + " requestedWorkerCpu: " + requestedWorkerCpu + " requestedWorkerMem: " +
requestedWorkerMem);
nodesWithExistingSupervisors.add(currentNode);
MesosWorkerSlot mesosWorkerSlot = SchedulerUtils.createWorkerSlotFromOfferResources(mesosStormConf, resources, topologyDetails, supervisorExists);
MesosWorkerSlot mesosWorkerSlot = SchedulerUtils.createWorkerSlotFromOfferResources(mesosStormConf, offerResources, topologyDetails, supervisorExists);
if (mesosWorkerSlot == null) {
continue;
}
String slotId = mesosWorkerSlot.getNodeId() + ":" + mesosWorkerSlot.getPort();
mesosWorkerSlotMap.put(slotId, mesosWorkerSlot);
// Place this offer in the first bucket of the RotatingMap so that it is less likely to get rotated out
offers.put(resources.getOfferId(), resources.getOffer());
allSlots.add(mesosWorkerSlot);
slotsNeeded--;
slotFound = true;
} else {
log.info(resources.toString() + " is not a fit for " +
log.info(offerResources.toString() + " is not a fit for " +
topologyDetails.getId() + " requestedWorkerCpu: " + requestedWorkerCpu + " requestedWorkerMem: " + requestedWorkerMem);
}
}
}
} while (slotFound == true && slotsNeeded > 0);
log.info("Number of available slots for " + topologyDetails.getId() + ": " + allSlots.size());
}

log.info("Number of available slots: " + allSlots.size());
Expand Down
Loading