diff --git a/storm/src/main/storm/mesos/MesosNimbus.java b/storm/src/main/storm/mesos/MesosNimbus.java index 47d0680dd..ca706ce5c 100644 --- a/storm/src/main/storm/mesos/MesosNimbus.java +++ b/storm/src/main/storm/mesos/MesosNimbus.java @@ -53,6 +53,7 @@ import org.yaml.snakeyaml.Yaml; import storm.mesos.schedulers.DefaultScheduler; import storm.mesos.schedulers.IMesosStormScheduler; +import storm.mesos.schedulers.OfferResources; import storm.mesos.shims.CommandLineShimFactory; import storm.mesos.shims.ICommandLineShim; import storm.mesos.shims.LocalStateShim; @@ -87,6 +88,8 @@ import java.util.concurrent.TimeUnit; import static storm.mesos.util.PrettyProtobuf.offerMapToString; +import static storm.mesos.util.PrettyProtobuf.taskInfoListToString; +import static storm.mesos.util.PrettyProtobuf.offerIDListToString; public class MesosNimbus implements INimbus { public static final String CONF_EXECUTOR_URI = "mesos.executor.uri"; @@ -94,7 +97,6 @@ public class MesosNimbus implements INimbus { public static final String CONF_MASTER_FAILOVER_TIMEOUT_SECS = "mesos.master.failover.timeout.secs"; public static final String CONF_MESOS_ALLOWED_HOSTS = "mesos.allowed.hosts"; public static final String CONF_MESOS_DISALLOWED_HOSTS = "mesos.disallowed.hosts"; - public static final String CONF_MESOS_ROLE = "mesos.framework.role"; public static final String CONF_MESOS_PRINCIPAL = "mesos.framework.principal"; public static final String CONF_MESOS_SECRET_FILE = "mesos.framework.secret.file"; @@ -121,8 +123,7 @@ public class MesosNimbus implements INimbus { private RotatingMap _offers; private LocalFileServer _httpServer; private Map _usedOffers; - private ScheduledExecutorService timerScheduler = - Executors.newScheduledThreadPool(1); + private ScheduledExecutorService timerScheduler = Executors.newScheduledThreadPool(1); private IMesosStormScheduler _mesosStormScheduler = null; private boolean _preferReservedResources = true; @@ -356,34 +357,6 @@ private void collectPorts(List offers, List portList, int max } } - /** - * Method checks if all topologies that need assignment already have supervisor running on the node where the Offer - * comes from. Required for more accurate available resource calculation where we can exclude supervisor's demand from - * the Offer. - * Unfortunately because of WorkerSlot type is not topology agnostic, we need to exclude supervisor's resources only - * in case where ALL topologies in 'allSlotsAvailableForScheduling' method satisfy condition of supervisor existence - * - * @param offer Offer - * @param existingSupervisors Supervisors which already placed on the node for the Offer - * @param topologiesMissingAssignments Topology ids required assignment - * @return Boolean value indicating supervisor existence - */ - private boolean supervisorExists( - Offer offer, Collection existingSupervisors, Set topologiesMissingAssignments) { - boolean alreadyExists = true; - for (String topologyId : topologiesMissingAssignments) { - String offerHost = offer.getHostname(); - boolean exists = false; - for (SupervisorDetails d : existingSupervisors) { - if (d.getId().equals(MesosCommon.supervisorId(offerHost, topologyId))) { - exists = true; - } - } - alreadyExists = (alreadyExists && exists); - } - return alreadyExists; - } - public boolean isHostAccepted(String hostname) { return (_allowedHosts == null && _disallowedHosts == null) || @@ -404,53 +377,6 @@ public Collection allSlotsAvailableForScheduling( } } - private OfferID findOffer(WorkerSlot worker) { - int port = worker.getPort(); - for (Offer offer : _offers.values()) { - if (offer.getHostname().equals(worker.getNodeId())) { - List r = getResourcesRange(offer.getResourcesList(), port, "ports"); - if (r != null) return offer.getId(); - } - } - // Still haven't found the slot? Maybe it's an offer we already used. - return null; - } - - protected List getResourcesScalar(final List offerResources, - final double value, - final String name) { - List resources = new ArrayList<>(); - double valueNeeded = value; - for (Resource r : offerResources) { - if (r.hasReservation()) { - // skip resources with dynamic reservations - continue; - } - if (r.getType() == Type.SCALAR && - r.getName().equals(name)) { - if (r.getScalar().getValue() > valueNeeded) { - resources.add( - r.toBuilder() - .setScalar(Scalar.newBuilder().setValue(valueNeeded)) - .build() - ); - return resources; - } else if (Math.abs(r.getScalar().getValue() - valueNeeded) < 0.0001) { // check if zero - resources.add( - r.toBuilder() - .setScalar(Scalar.newBuilder().setValue(valueNeeded)) - .build() - ); - return resources; - } else { - resources.add(r.toBuilder().build()); - valueNeeded -= r.getScalar().getValue(); - } - } - } - return resources; - } - protected List subtractResourcesScalar(final List offerResources, final double value, final String name) { @@ -480,32 +406,6 @@ protected List subtractResourcesScalar(final List offerResou return resources; } - protected List getResourcesRange(final List offerResources, - final long value, - final String name) { - for (Resource r : offerResources) { - if (r.hasReservation()) { - // skip reserved resources - continue; - } - if (r.getType() == Type.RANGES && r.getName().equals(name)) { - for (Range range : r.getRanges().getRangeList()) { - if (value >= range.getBegin() && value <= range.getEnd()) { - return Arrays.asList(r.toBuilder() - .setRanges( - Ranges.newBuilder() - .addRange( - Range.newBuilder().setBegin(value).setEnd(value).build() - ).build() - ) - .build() - ); - } - } - } - } - return new ArrayList<>(); - } protected List subtractResourcesRange(final List offerResources, final long value, @@ -556,225 +456,188 @@ protected List subtractResourcesRange(final List offerResour return resources; } - @Override - public void assignSlots(Topologies topologies, Map> slots) { - if (slots.size() == 0) { - LOG.debug("assignSlots: no slots passed in, nothing to do"); - return; - } - for (Map.Entry> topologyToSlots : slots.entrySet()) { - String topologyId = topologyToSlots.getKey(); - for (WorkerSlot slot : topologyToSlots.getValue()) { - TopologyDetails details = topologies.getById(topologyId); - LOG.debug("assignSlots: topologyId: {} worker being assigned to slot: {} with workerCpu: {} workerMem: {}", - topologyId, slot, MesosCommon.topologyWorkerCpu(_conf, details), MesosCommon.topologyWorkerMem(_conf, details)); - } + + + + private String getLogViewerConfig() { + String logViewerConfig = null; + // Find port for the logviewer + return " -c " + MesosCommon.AUTO_START_LOGVIEWER_CONF + "=true"; + } + + private ExecutorInfo.Builder getExecutorInfoBuilder(TopologyDetails details, String executorDataStr, + String executorName, + List executorCpuResources, List executorMemResources, List executorPortsResources, String extraConfig) { + String configUri; + try { + configUri = new URL(_configUrl.toURL(), + _configUrl.getPath() + "/storm.yaml").toString(); + } catch (MalformedURLException e) { + throw new RuntimeException(e); } - synchronized (_offersLock) { - computeLaunchList(topologies, slots); + + ExecutorInfo.Builder executorInfoBuilder = ExecutorInfo.newBuilder(); + + executorInfoBuilder + .setName(executorName) + .setExecutorId(ExecutorID.newBuilder().setValue(details.getId())) + .setData(ByteString.copyFromUtf8(executorDataStr)) + .addAllResources(executorCpuResources) + .addAllResources(executorMemResources) + .addAllResources(executorPortsResources); + + ICommandLineShim commandLineShim = CommandLineShimFactory.makeCommandLineShim(_container.isPresent(), extraConfig); + if (_container.isPresent()) { + executorInfoBuilder.setCommand(CommandInfo.newBuilder() + .addUris(URI.newBuilder().setValue(configUri)) + .setValue(commandLineShim.getCommandLine())) + .setContainer(ContainerInfo.newBuilder() + .setType(ContainerInfo.Type.DOCKER) + .setDocker(ContainerInfo.DockerInfo.newBuilder() + .setImage(_container.get()) + .setNetwork(ContainerInfo.DockerInfo.Network.HOST) + .setForcePullImage(true) + .build() + ).build()); + } else { + executorInfoBuilder.setCommand(CommandInfo.newBuilder() + .addUris(URI.newBuilder().setValue((String) _conf.get(CONF_EXECUTOR_URI))) + .addUris(URI.newBuilder().setValue(configUri)) + .setValue(commandLineShim.getCommandLine())); } + + return executorInfoBuilder; } - protected void computeLaunchList(Topologies topologies, Map> slots) { - Map> toLaunch = new HashMap<>(); + + + public Map> getTasksToLaunch(Topologies topologies, + Map> slots, + Map offerResourcesPerNode) { + Map> tasksToLaunchPerNode = new HashMap<>(); + for (String topologyId : slots.keySet()) { - Map> slotList = new HashMap<>(); - for (WorkerSlot slot : slots.get(topologyId)) { - OfferID id = findOffer(slot); - if (!slotList.containsKey(id)) { - slotList.put(id, new ArrayList()); + Collection slotList = slots.get(topologyId); + TopologyDetails details = topologies.getById(topologyId); + boolean subtractedExecutorResources = false; + + double workerCpu = MesosCommon.topologyWorkerCpu(_conf, details); + double workerMem = MesosCommon.topologyWorkerMem(_conf, details); + double executorCpu = MesosCommon.executorCpu(_conf); + double executorMem = MesosCommon.executorMem(_conf); + + for (WorkerSlot slot : slotList) { + OfferResources offerResources = offerResourcesPerNode.get(slot.getNodeId()); + String workerPrefix = ""; + if (_conf.containsKey(MesosCommon.WORKER_NAME_PREFIX)) { + workerPrefix = MesosCommon.getWorkerPrefix(_conf, details); } - slotList.get(id).add(slot); - } - for (OfferID id : slotList.keySet()) { - computeResourcesForSlot(_offers, topologies, toLaunch, topologyId, slotList, id); - } - } + List ids = offerResources.getOfferIds(); - for (OfferID id : toLaunch.keySet()) { - List tasks = toLaunch.get(id); - List launchList = new ArrayList<>(); + if (ids.isEmpty()) { + LOG.warn("Unable to find offer for slot: " + slot + " as it is no longer in the RotatingMap of offers, " + + " topology " + topologyId + " will no longer be scheduled on this slot"); + } - LOG.info("Launching tasks for offerId: {} : {}", id.getValue(), launchTaskListToString(tasks)); - for (LaunchTask t : tasks) { - launchList.add(t.getTask()); - _usedOffers.put(t.getTask().getTaskId(), t.getOffer()); - } - List launchOffer = new ArrayList<>(); - launchOffer.add(id); - _driver.launchTasks(launchOffer, launchList); - _offers.remove(id); - } - } + TaskID taskId = TaskID.newBuilder() + .setValue(MesosCommon.taskId(slot.getNodeId(), slot.getPort())) + .build(); - protected void computeResourcesForSlot(final RotatingMap offers, - Topologies topologies, - Map> toLaunch, - String topologyId, - Map> slotList, - OfferID id) { - Offer offer = offers.get(id); - List workerSlots = slotList.get(id); - boolean usingExistingOffer = false; - boolean subtractedExecutorResources = false; - - for (WorkerSlot slot : workerSlots) { - TopologyDetails details = topologies.getById(topologyId); - String workerPrefix = ""; - if (_conf.containsKey(MesosCommon.WORKER_NAME_PREFIX)) { - workerPrefix = MesosCommon.getWorkerPrefix(_conf, details); - } - TaskID taskId = TaskID.newBuilder() - .setValue(MesosCommon.taskId(workerPrefix + slot.getNodeId(), slot.getPort())) - .build(); - - if ((id == null || offer == null) && _usedOffers.containsKey(taskId)) { - offer = _usedOffers.get(taskId); - if (offer != null) { - id = offer.getId(); - usingExistingOffer = true; - } - } - if (id != null && offer != null) { - if (!toLaunch.containsKey(id)) { - toLaunch.put(id, new ArrayList()); + if (!subtractedExecutorResources) { + workerCpu += executorCpu; + workerMem += executorMem; } - double workerCpu = MesosCommon.topologyWorkerCpu(_conf, details); - double workerMem = MesosCommon.topologyWorkerMem(_conf, details); - double executorCpu = MesosCommon.executorCpu(_conf); - double executorMem = MesosCommon.executorMem(_conf); Map executorData = new HashMap(); executorData.put(MesosCommon.SUPERVISOR_ID, MesosCommon.supervisorId(slot.getNodeId(), details.getId())); executorData.put(MesosCommon.ASSIGNMENT_ID, workerPrefix + slot.getNodeId()); - Offer.Builder newBuilder = Offer.newBuilder(); - newBuilder.mergeFrom(offer); - newBuilder.clearResources(); + if (!subtractedExecutorResources) { + workerCpu -= executorCpu; + workerMem -= executorMem; + subtractedExecutorResources = true; + } - Offer.Builder existingBuilder = Offer.newBuilder(); - existingBuilder.mergeFrom(offer); - existingBuilder.clearResources(); - String extraConfig = ""; + String topologyAndNodeId = details.getId() + " | " + slot.getNodeId(); + String executorName = "storm-supervisor | " + topologyAndNodeId; + String taskName = "storm-worker | " + topologyAndNodeId + ":" + slot.getPort(); + String executorDataStr = JSONValue.toJSONString(executorData); - List offerResources = new ArrayList<>(); - offerResources.addAll(offer.getResourcesList()); - // Prefer reserved resources? - if (_preferReservedResources) { - Collections.sort(offerResources, new ResourceRoleComparator()); + // The fact that we are here implies that the resources are available for the worker in the host. + // So we dont have to check if the resources are actually available before proceding. + if (executorCpu > offerResources.getCpu() || executorMem > offerResources.getMem()) { + LOG.error(String.format("Unable to launch worker %s. Required executorCpu: %d, Required executorMem: %d. Available OfferResources : %s", offerResources.getHostName(), executorCpu, executorMem, offerResources)); + continue; } + List executorCpuResources = offerResources.getResourcesListScalar(executorCpu, "cpu", MesosCommon.getRole(_conf)); + List executorMemResources = offerResources.getResourcesListScalar(executorMem, "mem", MesosCommon.getRole(_conf)); + List executorPortResources = new ArrayList<>(); - List executorCpuResources = getResourcesScalar(offerResources, executorCpu, "cpus"); - List executorMemResources = getResourcesScalar(offerResources, executorMem, "mem"); - List executorPortsResources = null; - if (!subtractedExecutorResources) { - offerResources = subtractResourcesScalar(offerResources, executorCpu, "cpus"); - offerResources = subtractResourcesScalar(offerResources, executorMem, "mem"); - subtractedExecutorResources = true; - } - List workerCpuResources = getResourcesScalar(offerResources, workerCpu, "cpus"); - offerResources = subtractResourcesScalar(offerResources, workerCpu, "cpus"); - List workerMemResources = getResourcesScalar(offerResources, workerMem, "mem"); - offerResources = subtractResourcesScalar(offerResources, workerMem, "mem"); - List workerPortsResources = getResourcesRange(offerResources, slot.getPort(), "ports"); - offerResources = subtractResourcesRange(offerResources, slot.getPort(), "ports"); - - // Find port for the logviewer - if (!subtractedExecutorResources && MesosCommon.startLogViewer(_conf)) { - List portList = new ArrayList<>(); - collectPorts(offerResources, portList, 1); - int port = Optional.fromNullable((Number) _conf.get(Config.LOGVIEWER_PORT)).or(8000).intValue(); - executorPortsResources = getResourcesRange(offerResources, port, "ports"); - if (!executorPortsResources.isEmpty()) { - // Was the port available? - extraConfig = " -c " + MesosCommon.AUTO_START_LOGVIEWER_CONF + "=true"; - offerResources = subtractResourcesRange(offerResources, port, "ports"); + String extraConfig = ""; + if (MesosCommon.autoStartLogViewer(_conf)) { + long port = Optional.fromNullable((Number) _conf.get(Config.LOGVIEWER_PORT)).or(8000).intValue();; + List logviewerPortResources = offerResources.getResourcesRange(port, "ports"); + if (logviewerPortResources != null) { + extraConfig = getLogViewerConfig(); + executorPortResources.addAll(logviewerPortResources); } + LOG.error("Unable to launch logviewer on worker {}:{}. Port could not be found. Available OfferResources : {}", offerResources.getHostName(), port, offerResources); } - Offer remainingOffer = existingBuilder.addAllResources(offerResources).build(); - // Update the remaining offer list - offers.put(id, remainingOffer); + ExecutorInfo.Builder executorInfoBuilder = getExecutorInfoBuilder(details, executorDataStr, executorName, executorCpuResources, executorMemResources, executorPortResources, extraConfig); - String configUri; - try { - configUri = new URL(_configUrl.toURL(), - _configUrl.getPath() + "/storm.yaml").toString(); - } catch (MalformedURLException e) { - throw new RuntimeException(e); + TaskInfo task = TaskInfo.newBuilder() + .setTaskId(taskId) + .setName(taskName) + .setSlaveId(offerResources.getSlaveId()) + .setExecutor(executorInfoBuilder.build()) + .addAllResources(offerResources.getResourcesListScalar(workerCpu, "cpus", MesosCommon.getRole(_conf))) + .addAllResources(offerResources.getResourcesListScalar(workerMem, "mem", MesosCommon.getRole(_conf))) + .addAllResources(offerResources.getResourcesRange("ports")) + .build(); + + List taskInfoList = tasksToLaunchPerNode.get(slot.getNodeId()); + if (taskInfoList == null) { + taskInfoList = new ArrayList<>(); + tasksToLaunchPerNode.put(slot.getNodeId(), taskInfoList); } + taskInfoList.add(task); + } + } - String delimiter = MesosCommon.getMesosComponentNameDelimiter(_conf, details); - String topologyAndNodeId = details.getId() + delimiter + slot.getNodeId(); - String executorName = "storm-supervisor" + delimiter + topologyAndNodeId; - String taskName = "storm-worker" + delimiter + topologyAndNodeId + ":" + slot.getPort(); - String executorDataStr = JSONValue.toJSONString(executorData); - ExecutorInfo.Builder executorInfoBuilder = ExecutorInfo.newBuilder(); - executorInfoBuilder - .setName(executorName) - .setExecutorId(ExecutorID.newBuilder().setValue(details.getId())) - .setData(ByteString.copyFromUtf8(executorDataStr)) - .addAllResources(executorCpuResources) - .addAllResources(executorMemResources); - if (executorPortsResources != null) { - executorInfoBuilder.addAllResources(executorPortsResources); - } - ICommandLineShim commandLineShim = CommandLineShimFactory.makeCommandLineShim(_container.isPresent(), extraConfig); - if (_container.isPresent()) { - executorInfoBuilder - .setCommand(CommandInfo.newBuilder() - .addUris(URI.newBuilder().setValue(configUri)) - .setValue(commandLineShim.getCommandLine())) - .setContainer( - ContainerInfo.newBuilder() - .setType(ContainerInfo.Type.DOCKER) - .setDocker( - ContainerInfo.DockerInfo.newBuilder() - .setImage(_container.get()) - .setNetwork(ContainerInfo.DockerInfo.Network.HOST) - .setForcePullImage(true) - .build() - ) - .build() - ); - } else { - executorInfoBuilder - .setCommand(CommandInfo.newBuilder() - .addUris(URI.newBuilder().setValue((String) _conf.get(CONF_EXECUTOR_URI))) - .addUris(URI.newBuilder().setValue(configUri)) - .setValue(commandLineShim.getCommandLine())); - } + return tasksToLaunchPerNode; + } - LOG.info("Launching task with Mesos Executor data: < {} >", executorDataStr); - TaskInfo task = TaskInfo.newBuilder() - .setName(taskName) - .setTaskId(taskId) - .setSlaveId(offer.getSlaveId()) - .setExecutor(executorInfoBuilder.build()) - .addAllResources(workerCpuResources) - .addAllResources(workerMemResources) - .addAllResources(workerPortsResources) - .build(); + @Override + public void assignSlots(Topologies topologies, Map> slots) { + synchronized (_offersLock) { + if (slots.size() == 0) { + LOG.info("assignSlots: no slots passed in, nothing to do"); + return; + } - Offer newOffer = offer.toBuilder() - .addAllResources(task.getResourcesList()).build(); + Map offerResourcesPerNode = MesosCommon.getConsolidatedOfferResourcesPerNode(_conf, _offers); + Map> tasksToLaunchPerNode = getTasksToLaunch(topologies, slots, offerResourcesPerNode); - LOG.debug("Launching task: {}", task.toString()); + for (String node : tasksToLaunchPerNode.keySet()) { + List offerIDList = offerResourcesPerNode.get(node).getOfferIds(); + List taskInfoList = tasksToLaunchPerNode.get(node); - toLaunch.get(id).add(new LaunchTask(task, newOffer)); - } + LOG.info("Using offerIDs: " + offerIDListToString(offerIDList) + " on host: " + node + " to launch tasks: " + taskInfoListToString(taskInfoList)); - if (usingExistingOffer) { - _driver.killTask(taskId); + _driver.launchTasks(offerIDList, taskInfoList); + for (OfferID offerID: offerIDList) { + _offers.remove(offerID); + } } } } private FrameworkInfo.Builder createFrameworkBuilder() throws IOException { Number failoverTimeout = Optional.fromNullable((Number) _conf.get(CONF_MASTER_FAILOVER_TIMEOUT_SECS)).or(24 * 7 * 3600); - String role = Optional.fromNullable((String) _conf.get(CONF_MESOS_ROLE)).or("*"); + String role = MesosCommon.getRole(_conf); Boolean checkpoint = Optional.fromNullable((Boolean) _conf.get(CONF_MESOS_CHECKPOINT)).or(false); String frameworkName = Optional.fromNullable((String) _conf.get(CONF_MESOS_FRAMEWORK_NAME)).or("Storm!!!"); diff --git a/storm/src/main/storm/mesos/MesosSupervisor.java b/storm/src/main/storm/mesos/MesosSupervisor.java index a57b1c6ec..15e8a20d6 100644 --- a/storm/src/main/storm/mesos/MesosSupervisor.java +++ b/storm/src/main/storm/mesos/MesosSupervisor.java @@ -136,7 +136,7 @@ public void killedWorker(int port) { } protected boolean startLogViewer(Map conf) { - return MesosCommon.startLogViewer(conf); + return MesosCommon.autoStartLogViewer(conf); } class StormExecutor implements Executor { diff --git a/storm/src/main/storm/mesos/schedulers/DefaultScheduler.java b/storm/src/main/storm/mesos/schedulers/DefaultScheduler.java index 651d30d47..17808e22b 100644 --- a/storm/src/main/storm/mesos/schedulers/DefaultScheduler.java +++ b/storm/src/main/storm/mesos/schedulers/DefaultScheduler.java @@ -74,6 +74,8 @@ public List allSlotsAvailableForScheduling(RotatingMap existingSupervisors, Topologies topologies, Set topologiesMissingAssignments) { if (topologiesMissingAssignments.isEmpty()) { + // TODO(ksoundararaj): Should we rotate irrespective of whether or not the topologiesMissingAssignments is empty? + offers.rotate(); log.info("Declining all offers that are currently buffered because no topologies need assignments"); offers.clear(); return new ArrayList<>(); @@ -81,26 +83,9 @@ public List allSlotsAvailableForScheduling(RotatingMap allSlots = new ArrayList<>(); - Map> offerResourcesListPerNode = SchedulerUtils.getOfferResourcesListPerNode(offers); + Map offerResourcesPerNode = MesosCommon.getConsolidatedOfferResourcesPerNode(mesosStormConf, offers); for (String currentTopology : topologiesMissingAssignments) { TopologyDetails topologyDetails = topologies.getById(currentTopology); @@ -115,46 +100,45 @@ public List allSlotsAvailableForScheduling(RotatingMap nodesWithExistingSupervisors = new HashSet<>(); - for (String currentNode : offerResourcesListPerNode.keySet()) { + for (String currentNode : offerResourcesPerNode.keySet()) { if (SchedulerUtils.supervisorExists(currentNode, existingSupervisors, currentTopology)) { nodesWithExistingSupervisors.add(currentNode); } } + //TODO(ksoundararaj) : Give priority to static reservations before scheduling workers across nodes Boolean slotFound; do { slotFound = false; - for (String currentNode : offerResourcesListPerNode.keySet()) { - boolean supervisorExists = nodesWithExistingSupervisors.contains(currentNode); - if (slotsNeeded == 0) { - break; - } - - for (OfferResources resources : offerResourcesListPerNode.get(currentNode)) { - boolean isFit = SchedulerUtils.isFit(mesosStormConf, resources, topologyDetails, supervisorExists); + for (String currentNode : offerResourcesPerNode.keySet()) { + for (OfferResources offerResources : offerResourcesPerNode.values()) { + if (slotsNeeded == 0) { + break; + } + boolean supervisorExists = nodesWithExistingSupervisors.contains(currentNode); + boolean isFit = SchedulerUtils.isFit(mesosStormConf, offerResources, topologyDetails, supervisorExists); if (isFit) { - log.info(resources.toString() + " is a fit for " + + log.info(offerResources.toString() + " is a fit for " + topologyDetails.getId() + " requestedWorkerCpu: " + requestedWorkerCpu + " requestedWorkerMem: " + requestedWorkerMem); nodesWithExistingSupervisors.add(currentNode); - MesosWorkerSlot mesosWorkerSlot = SchedulerUtils.createWorkerSlotFromOfferResources(mesosStormConf, resources, topologyDetails, supervisorExists); + MesosWorkerSlot mesosWorkerSlot = SchedulerUtils.createWorkerSlotFromOfferResources(mesosStormConf, offerResources, topologyDetails, supervisorExists); if (mesosWorkerSlot == null) { continue; } String slotId = mesosWorkerSlot.getNodeId() + ":" + mesosWorkerSlot.getPort(); mesosWorkerSlotMap.put(slotId, mesosWorkerSlot); - // Place this offer in the first bucket of the RotatingMap so that it is less likely to get rotated out - offers.put(resources.getOfferId(), resources.getOffer()); allSlots.add(mesosWorkerSlot); slotsNeeded--; slotFound = true; } else { - log.info(resources.toString() + " is not a fit for " + + log.info(offerResources.toString() + " is not a fit for " + topologyDetails.getId() + " requestedWorkerCpu: " + requestedWorkerCpu + " requestedWorkerMem: " + requestedWorkerMem); } } } } while (slotFound == true && slotsNeeded > 0); + log.info("Number of available slots for " + topologyDetails.getId() + ": " + allSlots.size()); } log.info("Number of available slots: " + allSlots.size()); diff --git a/storm/src/main/storm/mesos/schedulers/OfferResources.java b/storm/src/main/storm/mesos/schedulers/OfferResources.java index b5359e954..f56fd0904 100644 --- a/storm/src/main/storm/mesos/schedulers/OfferResources.java +++ b/storm/src/main/storm/mesos/schedulers/OfferResources.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,12 +18,17 @@ package storm.mesos.schedulers; import com.google.common.base.Joiner; +import org.apache.log4j.Logger; import org.apache.mesos.Protos; +import storm.mesos.ResourceRoleComparator; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; public class OfferResources { + private final Logger log = Logger.getLogger(OfferResources.class); private class PortRange { public long begin; @@ -35,11 +40,12 @@ public PortRange(long begin, long end) { } } - private Protos.Offer offer; - private Protos.OfferID offerId; + private List offers; private String hostName; - private double mem; - private double cpu; + private double unreservedCpu; + private double unreservedMem; + private double reservedCpu; + private double reservedMem; List portRanges = new ArrayList<>(); @@ -50,24 +56,33 @@ public void addPortRanges(Protos.Value.Ranges ranges) { } public OfferResources(Protos.Offer offer) { - this.offer = offer; - this.offerId = offer.getId(); - double offerMem = 0; - double offerCpu = 0; + this.offers = new ArrayList<>(); + this.offers.add(offer); + Protos.Value.Ranges portRanges = null; String hostName = offer.getHostname(); + for (Protos.Resource r : offer.getResourcesList()) { if (r.hasReservation()) { // skip resources with dynamic reservations continue; } + if (r.getName().equals("cpus")) { - offerCpu += r.getScalar().getValue(); + if (!r.getRole().equals("*")) { + reservedCpu += r.getScalar().getValue(); + continue; + } + unreservedCpu += r.getScalar().getValue(); } else if (r.getName().equals("mem")) { - offerMem += r.getScalar().getValue(); + if (!r.getRole().equals("*")) { + reservedMem += r.getScalar().getValue(); + continue; + } + unreservedMem += r.getScalar().getValue(); } else if (r.getName().equals("ports")) { - Protos.Value.Ranges tmp = r.getRanges(); + Protos.Value.Ranges tmp = r.getRanges(); if (portRanges == null) { portRanges = tmp; continue; @@ -77,19 +92,200 @@ public OfferResources(Protos.Offer offer) { } this.hostName = hostName; - this.mem = offerMem; - this.cpu = offerCpu; if ((portRanges != null) && (!portRanges.getRangeList().isEmpty())) { this.addPortRanges(portRanges); } } - public Protos.Offer getOffer() { - return this.offer; + public void merge(Protos.Offer offer) { + this.offers.add(offer); + + Protos.Value.Ranges portRanges = null; + + String hostName = offer.getHostname(); + if (!this.hostName.equals(hostName)) { + log.error("OfferConsolidationError: Offer from " + this.hostName + " should not be merged with " + hostName); + return; + } + + // TODO(ksoundararaj) : dry this code + for (Protos.Resource r : offer.getResourcesList()) { + if (r.getName().equals("cpus")) { + if (!r.getRole().equals("*")) { + reservedCpu += r.getScalar().getValue(); + continue; + } + unreservedMem += r.getScalar().getValue(); + } else if (r.getName().equals("mem")) { + if (!r.getRole().equals("*")) { + reservedMem += r.getScalar().getValue(); + continue; + } + unreservedMem += r.getScalar().getValue(); + } else if (r.getName().equals("ports")) { + portRanges = r.getRanges(); + } + } + + if ((portRanges != null) && (!portRanges.getRangeList().isEmpty())) { + this.addPortRanges(portRanges); + } + } + + public List getResourcesListScalar(final double value, final String name, String frameworkRole) { + + double valueNeeded = value; + List retVal = new ArrayList<>(); + + switch (name) { + case "cpu": + if ((unreservedCpu + reservedCpu) < valueNeeded) { + // TODO(ksoundararaj): Throw ResourceUnavailableException + return null; + } + double tmp = 0.0d; + if (reservedCpu > valueNeeded) { + tmp = valueNeeded; + reservedCpu -= valueNeeded; + valueNeeded = 0; + } else if (reservedCpu < valueNeeded) { + tmp = reservedCpu; + reservedCpu = 0; + valueNeeded -= reservedCpu; + } + retVal.add(Protos.Resource.newBuilder() + .setName(name) + .setType(Protos.Value.Type.SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(tmp)) + .setRole(frameworkRole) + .build()); + + if (valueNeeded > 0) { + unreservedCpu -= valueNeeded; + } + retVal.add(Protos.Resource.newBuilder() + .setName(name) + .setType(Protos.Value.Type.SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(valueNeeded)) + .setRole(frameworkRole) + .build()); + break; + case "mem": + if ((unreservedMem + reservedMem) < valueNeeded) { + // TODO(ksoundararaj): Throw ResourceUnavailableException + return null; + } + tmp = 0; + if (reservedMem > valueNeeded) { + tmp = valueNeeded; + reservedMem -= valueNeeded; + valueNeeded = 0; + } else if (reservedMem < valueNeeded) { + tmp = reservedMem; + reservedMem = 0; + valueNeeded -= reservedMem; + } + retVal.add(Protos.Resource.newBuilder() + .setName(name) + .setType(Protos.Value.Type.SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(tmp)) + .setRole(frameworkRole) + .build()); + + if (valueNeeded > 0) { + unreservedMem -= valueNeeded; + } + retVal.add(Protos.Resource.newBuilder() + .setName(name) + .setType(Protos.Value.Type.SCALAR) + .setScalar(Protos.Value.Scalar.newBuilder().setValue(valueNeeded)) + .setRole(frameworkRole) + .build()); + break; + default: + return null; + } + return retVal; + } + + + public List getResourcesRange(final long value, final String name) { + List resourceList = getResourceList(); + List retVal = null; + + if (name.equals("ports")) { + long portNumber = getPort(value); + if (portNumber == -1) { + // TODO(ksoundararaj) : Throw ResourceNotAvailableException instead of returning null + return null; + } + for (Protos.Resource r : resourceList) { + if (r.getType() == Protos.Value.Type.RANGES && r.getName().equals(name)) { + for (Protos.Value.Range range : r.getRanges().getRangeList()) { + if (value >= range.getBegin() && value <= range.getEnd()) { + retVal = Arrays.asList(r.toBuilder() + .setRanges( + Protos.Value.Ranges.newBuilder() + .addRange( + Protos.Value.Range.newBuilder().setBegin(value).setEnd(value).build() + ).build() + ).build() + ); + } + } + } + } + } + return retVal; + } + + // TODO(ksoundararaj): Dry this code + public List getResourcesRange(final String name) { + List resourceList = getResourceList(); + List retVal = null; + + if (name.equals("ports")) { + long portNumber = getPort(); + for (Protos.Resource r : resourceList) { + if (r.getType() == Protos.Value.Type.RANGES && r.getName().equals(name)) { + for (Protos.Value.Range range : r.getRanges().getRangeList()) { + if (portNumber >= range.getBegin() && portNumber <= range.getEnd()) { + retVal = Arrays.asList(r.toBuilder() + .setRanges( + Protos.Value.Ranges.newBuilder() + .addRange( + Protos.Value.Range.newBuilder().setBegin(portNumber).setEnd(portNumber).build() + ).build() + ).build() + ); + } + } + } + } + } + return retVal; + } + + public List getResourceList() { + List availableResources = new ArrayList<>(); + for (Protos.Offer offer : offers) { + availableResources.addAll(offer.getResourcesList()); + } + Collections.sort(availableResources, new ResourceRoleComparator()); + return availableResources; + } + + public Protos.SlaveID getSlaveId() { + return offers.get(0).getSlaveId(); } - public Protos.OfferID getOfferId() { - return this.offerId; + public List getOfferIds() { + List offerIDList = new ArrayList<>(); + + for (Protos.Offer offer : offers) { + offerIDList.add(offer.getId()); + } + return offerIDList; } public String getHostName() { @@ -97,19 +293,34 @@ public String getHostName() { } public double getMem() { - return this.mem; + return this.reservedMem + this.unreservedMem; } public double getCpu() { - return this.cpu; + return this.reservedCpu + this.unreservedCpu; } - public void decCpu(double val) { - cpu -= val; + + public double decMem(double value) { + // TODO(ksoundararaj) : Make sure the value doesnt go -ve + if (reservedMem > value) { + reservedMem -= value; + } else if (reservedMem < value) { + reservedMem = 0; + unreservedMem -= value - reservedMem; + } + return this.reservedMem + this.unreservedMem; } - public void decMem(double val) { - mem -= val; + public double decCpu(double value) { + // TODO(ksoundararaj) : Make sure the value doesnt go -ve + if (reservedCpu > value) { + reservedCpu -= value; + } else if (reservedCpu < value) { + reservedCpu = 0; + unreservedCpu -= value - reservedCpu; + } + return this.reservedCpu + this.unreservedCpu; } public long getPort() { @@ -130,12 +341,42 @@ public long getPort() { return -1; } + public long getPort(long portNumber) { + boolean portFound = false; + + for (int i = 0; i < portRanges.size(); i++) { + PortRange portRange = portRanges.get(i); + + if (portNumber > portRange.begin && portNumber < portRange.end) { + portRanges.add(new PortRange(portRange.begin, portNumber - 1)); + portRanges.add(new PortRange(portNumber - 1, portRange.end)); + portFound = true; + } else if (portRange.begin == portRange.end) { + portFound = true; + } + + if (portFound) { + portRanges.remove(i); + return portNumber; + } + } + + return -1; + } + public boolean hasPort() { return (portRanges != null && !portRanges.isEmpty()); } - @Override - public String toString() { + private String offersToString(List offers) { + List offerIds = new ArrayList<>(); + for (Protos.Offer offer : offers) { + offerIds.add(offer.getId().getValue()); + } + return "[" + Joiner.on(".").join(offerIds) + "]"; + } + + private String portRangesToString(List portRanges) { List portRangeStrings = new ArrayList<>(); for (int i = 0; i < portRanges.size(); i++) { @@ -145,9 +386,15 @@ public String toString() { portRangeStrings.add(String.valueOf(portRanges.get(i).begin) + "-" + String.valueOf(portRanges.get(i).end)); } } - return "OfferResources with offerId: " + getOfferId().getValue().toString().trim() + " from host: " + getHostName() + " mem: " + String.valueOf(mem) + - " cpu: " + String.valueOf(cpu) + - " portRanges: [" + Joiner.on(",").join(portRangeStrings) + "]"; + + return "[" + Joiner.on(",").join(portRangeStrings) + "]"; + } + + @Override + public String toString() { + return "OfferResources with offerIds: " + offersToString(offers) + " host: " + getHostName() + " mem: " + String.valueOf(unreservedMem + reservedMem) + + " cpu: " + String.valueOf(unreservedCpu + reservedCpu) + + " portRanges: " + portRangesToString(portRanges); } } diff --git a/storm/src/main/storm/mesos/util/MesosCommon.java b/storm/src/main/storm/mesos/util/MesosCommon.java index 01b90fb21..668facf40 100644 --- a/storm/src/main/storm/mesos/util/MesosCommon.java +++ b/storm/src/main/storm/mesos/util/MesosCommon.java @@ -21,6 +21,8 @@ import com.google.common.base.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.mesos.Protos; +import storm.mesos.schedulers.OfferResources; import java.util.HashMap; import java.util.Map; @@ -38,6 +40,7 @@ public class MesosCommon { public static final String WORKER_NAME_PREFIX = "topology.mesos.worker.prefix"; public static final String WORKER_NAME_PREFIX_DELIMITER = "topology.mesos.worker.prefix.delimiter"; public static final String MESOS_COMPONENT_NAME_DELIMITER = "topology.mesos.component.name.delimiter"; + public static final String CONF_MESOS_ROLE = "mesos.framework.role"; public static final double DEFAULT_WORKER_CPU = 1; public static final double DEFAULT_WORKER_MEM_MB = 1000; @@ -82,7 +85,7 @@ public static String supervisorId(String nodeid, String topologyId) { return nodeid + "-" + topologyId; } - public static boolean startLogViewer(Map conf) { + public static boolean autoStartLogViewer(Map conf) { return Optional.fromNullable((Boolean) conf.get(AUTO_START_LOGVIEWER_CONF)).or(true); } @@ -92,6 +95,25 @@ public static int portFromTaskId(String taskId) { return Integer.parseInt(port); } + public static Map getConsolidatedOfferResourcesPerNode(Map conf, RotatingMap offers) { + Map offerResourcesPerNode = new HashMap<>(); + + for (Protos.Offer offer : offers.values()) { + String hostName = offer.getHostname(); + + OfferResources offerResourcesForHost = offerResourcesPerNode.get(hostName); + if (offerResourcesForHost == null) { + offerResourcesForHost = new OfferResources(offer); + offerResourcesPerNode.put(hostName, offerResourcesForHost); + } else { + offerResourcesForHost.merge(offer); + } + LOG.info("Available resources at " + hostName + ": " + offerResourcesForHost.toString()); + } + + return offerResourcesPerNode; + } + public static int getSuicideTimeout(Map conf) { return Optional.fromNullable((Number) conf.get(SUICIDE_CONF)) .or(DEFAULT_SUICIDE_TIMEOUT_SECS).intValue(); @@ -124,4 +146,8 @@ public static double executorMem(Map conf) { return Optional.fromNullable((Number) conf.get(EXECUTOR_MEM_CONF)) .or(DEFAULT_EXECUTOR_MEM_MB).doubleValue(); } + + public static String getRole(Map conf) { + return Optional.fromNullable((String) conf.get(CONF_MESOS_ROLE)).or("*"); + } } diff --git a/storm/src/main/storm/mesos/util/PrettyProtobuf.java b/storm/src/main/storm/mesos/util/PrettyProtobuf.java index b962b3521..56a32a22d 100644 --- a/storm/src/main/storm/mesos/util/PrettyProtobuf.java +++ b/storm/src/main/storm/mesos/util/PrettyProtobuf.java @@ -135,6 +135,24 @@ public static String offerMapToString(RotatingMap offers) { return "[\n" + StringUtils.join(offersAsStrings, ",\n") + "]"; } + /** + * Pretty-print the list of OfferIDs. + */ + public static String offerIDListToString(List offerIDList) { + List offerIDsAsStrings = Lists.transform(offerIDList, offerIDToStringTransform); + return "[" + StringUtils.join(offerIDsAsStrings, ", ") + "]"; + } + + /** + * Wrapper around getTrimmedString which allows using gauva's transform utility. + */ + private static Function offerIDToStringTransform = + new Function() { + public String apply(OfferID o) { + return o.getValue().toString(); + } + }; + /** * Wrapper around offerToString which allows using gauva's transform utility. */ diff --git a/storm/src/test/storm/mesos/MesosNimbusTest.java b/storm/src/test/storm/mesos/MesosNimbusTest.java deleted file mode 100644 index 6e8388ac8..000000000 --- a/storm/src/test/storm/mesos/MesosNimbusTest.java +++ /dev/null @@ -1,320 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.mesos; - -import backtype.storm.generated.StormTopology; -import backtype.storm.scheduler.Topologies; -import backtype.storm.scheduler.TopologyDetails; -import backtype.storm.scheduler.WorkerSlot; -import org.apache.mesos.Protos.Offer; -import org.apache.mesos.Protos.OfferID; -import org.junit.Test; -import storm.mesos.util.MesosCommon; -import storm.mesos.util.RotatingMap; - -import java.net.URI; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; - -// TODO(dskarthick) : Leverage the build methods defined in TestUtils function. -public class MesosNimbusTest { - - @Test - public void testGetResourcesScalar() throws Exception { - MesosNimbus mesosNimbus = new MesosNimbus(); - - assertEquals( - Arrays.asList(TestUtils.buildScalarResource("cpus", 1.0)), - mesosNimbus.getResourcesScalar( - TestUtils.buildResourceList(1, 2, 3, 4), - 1.0, - "cpus" - ) - ); - - assertEquals( - Arrays.asList(TestUtils.buildScalarResource("mem", 2.0)), - mesosNimbus.getResourcesScalar( - TestUtils.buildResourceList(1, 2, 3, 4), - 2.0, - "mem" - ) - ); - - assertEquals( - Arrays.asList( - TestUtils.buildScalarResource("cpus", 1.0), - TestUtils.buildScalarResourceWithRole("cpus", 1.0, "reserved") - ), - mesosNimbus.getResourcesScalar( - TestUtils.buildResourceList(1, 2, 3, 4), - 2.0, - "cpus" - ) - ); - - assertEquals( - Arrays.asList( - TestUtils.buildScalarResource("mem", 2.0), - TestUtils.buildScalarResourceWithRole("mem", 1.0, "reserved") - ), - mesosNimbus.getResourcesScalar( - TestUtils.buildResourceList(1, 2, 3, 4), - 3.0, - "mem" - ) - ); - - assertEquals( - Arrays.asList( - TestUtils.buildScalarResource("cpus", 1.0), - TestUtils.buildScalarResourceWithRole("cpus", 3.0, "reserved") - ), - mesosNimbus.getResourcesScalar( - TestUtils.buildResourceList(1, 2, 3, 4), - 4.0, - "cpus" - ) - ); - - assertEquals( - Arrays.asList( - TestUtils.buildScalarResource("mem", 2.0), - TestUtils.buildScalarResourceWithRole("mem", 4.0, "reserved") - ), - mesosNimbus.getResourcesScalar( - TestUtils.buildResourceList(1, 2, 3, 4), - 6.0, - "mem" - ) - ); - - assertEquals( - Arrays.asList( - TestUtils.buildScalarResource("cpus", 1.0), - TestUtils.buildScalarResourceWithRole("cpus", 0.5, "reserved") - ), - mesosNimbus.getResourcesScalar( - TestUtils.buildResourceList(1, 2, 3, 4), - 1.5, - "cpus" - ) - ); - - assertEquals( - Arrays.asList( - TestUtils.buildScalarResource("mem", 2.0), - TestUtils.buildScalarResourceWithRole("mem", 0.5, "reserved") - ), - mesosNimbus.getResourcesScalar( - TestUtils.buildResourceList(1, 2, 3, 4), - 2.5, - "mem" - ) - ); - } - - @Test - public void testSubtractResourcesScalar() throws Exception { - MesosNimbus mesosNimbus = new MesosNimbus(); - - assertEquals( - Arrays.asList( - TestUtils.buildScalarResource("mem", 2.0), - TestUtils.buildScalarResourceWithRole("cpus", 3.0, "reserved"), - TestUtils.buildScalarResourceWithRole("mem", 4.0, "reserved") - ), - mesosNimbus.subtractResourcesScalar( - TestUtils.buildResourceList(1, 2, 3, 4), - 1.0, - "cpus" - ) - ); - - assertEquals( - Arrays.asList( - TestUtils.buildScalarResource("cpus", 1.0), - TestUtils.buildScalarResource("mem", 1.0), - TestUtils.buildScalarResourceWithRole("cpus", 3.0, "reserved"), - TestUtils.buildScalarResourceWithRole("mem", 4.0, "reserved") - ), - mesosNimbus.subtractResourcesScalar( - TestUtils.buildResourceList(1, 2, 3, 4), - 1.0, - "mem" - ) - ); - - assertEquals( - Arrays.asList( - TestUtils.buildScalarResource("mem", 2.0), - TestUtils.buildScalarResourceWithRole("cpus", 2.5, "reserved"), - TestUtils.buildScalarResourceWithRole("mem", 4.0, "reserved") - ), - mesosNimbus.subtractResourcesScalar( - TestUtils.buildResourceList(1, 2, 3, 4), - 1.5, - "cpus" - ) - ); - - assertEquals( - Arrays.asList( - TestUtils.buildScalarResource("cpus", 1.0), - TestUtils.buildScalarResourceWithRole("cpus", 3.0, "reserved"), - TestUtils.buildScalarResourceWithRole("mem", 3.5, "reserved") - ), - mesosNimbus.subtractResourcesScalar( - TestUtils.buildResourceList(1, 2, 3, 4), - 2.5, - "mem" - ) - ); - } - - @Test - public void testGetResourcesRange() throws Exception { - MesosNimbus mesosNimbus = new MesosNimbus(); - - assertEquals( - Arrays.asList( - TestUtils.buildRangeResource("ports", 100, 100) - ), - mesosNimbus.getResourcesRange( - TestUtils.buildRangeResourceList(100, 100), - 100, - "ports" - ) - ); - - assertEquals( - Arrays.asList( - TestUtils.buildRangeResource("ports", 105, 105) - ), - mesosNimbus.getResourcesRange( - TestUtils.buildRangeResourceList(100, 200), - 105, - "ports" - ) - ); - - assertEquals( - 0, - mesosNimbus.getResourcesRange( - TestUtils.buildRangeResourceList(100, 100), - 200, - "ports" - ).size() - ); - } - - - @Test - public void testSubtractResourcesRange() throws Exception { - MesosNimbus mesosNimbus = new MesosNimbus(); - - assertEquals( - Arrays.asList( - TestUtils.buildScalarResource("cpus", 1.0), - TestUtils.buildScalarResource("mem", 2.0), - TestUtils.buildScalarResourceWithRole("cpus", 3.0, "reserved"), - TestUtils.buildScalarResourceWithRole("mem", 4.0, "reserved") - ), - mesosNimbus.subtractResourcesRange( - TestUtils.buildRangeResourceList(100, 100), - 100, - "ports" - ) - ); - - assertEquals( - Arrays.asList( - TestUtils.buildMultiRangeResource("ports", 100, 104, 106, 200), - TestUtils.buildMultiRangeResourceWithRole("ports", 100, 104, 106, 200, "reserved"), - TestUtils.buildScalarResource("cpus", 1.0), - TestUtils.buildScalarResource("mem", 2.0), - TestUtils.buildScalarResourceWithRole("cpus", 3.0, "reserved"), - TestUtils.buildScalarResourceWithRole("mem", 4.0, "reserved") - ), - mesosNimbus.subtractResourcesRange( - TestUtils.buildRangeResourceList(100, 200), - 105, - "ports" - ) - ); - } - - - @Test - public void testComputeResourcesForSlot() throws Exception { - MesosNimbus mesosNimbus = new MesosNimbus(); - - mesosNimbus._configUrl = new URI("http://127.0.0.1/"); - - OfferID offerId = OfferID.newBuilder().setValue("derp").build(); - RotatingMap offers = new RotatingMap<>( - new RotatingMap.ExpiredCallback() { - @Override - public void expire(OfferID key, Offer val) { - } - } - ); - - offers.put( - offerId, - TestUtils.buildOfferWithPorts("offer1", "host1.west", 2.0, 2048, 1000, 1000) - ); - - HashMap topologyMap = new HashMap<>(); - Map conf = new HashMap<>(); - conf.put(MesosCommon.WORKER_CPU_CONF, 1); - conf.put(MesosCommon.WORKER_MEM_CONF, 1024); - conf.put(MesosCommon.EXECUTOR_CPU_CONF, 1); - conf.put(MesosCommon.EXECUTOR_MEM_CONF, 1024); - conf.put(MesosNimbus.CONF_EXECUTOR_URI, ""); - mesosNimbus._conf = conf; - - topologyMap.put("t1", new TopologyDetails("t1", conf, new StormTopology(), 5)); - HashMap> launchList = new HashMap<>(); - HashMap> slotList = new HashMap<>(); - slotList.put(offerId, Arrays.asList(new WorkerSlot("", 1000))); - Topologies topologies = new Topologies(topologyMap); - - mesosNimbus.computeResourcesForSlot( - offers, - topologies, - launchList, - "t1", - slotList, - OfferID.newBuilder().setValue("derp").build() - ); - assertEquals(1, launchList.size()); - assertEquals(1, launchList.get(offerId).size()); - - assertEquals( - TestUtils.buildScalarResource("cpus", 1.0), - launchList.get(offerId).get(0).getTask().getResources(0) - ); - - assertEquals(0, offers.get(offerId).getResourcesCount()); - } -} diff --git a/storm/src/test/storm/mesos/schedulers/DefaultSchedulerTest.java b/storm/src/test/storm/mesos/schedulers/DefaultSchedulerTest.java index 2091d0d3b..49c4ec819 100644 --- a/storm/src/test/storm/mesos/schedulers/DefaultSchedulerTest.java +++ b/storm/src/test/storm/mesos/schedulers/DefaultSchedulerTest.java @@ -303,8 +303,7 @@ public void testAllSlotsAvailableForSchedulingWithMultipleOffers() { rotatingMap.put(offer.getId(), offer); offer = buildOffer("offer5", sampleHost2, 0.91, 9000); rotatingMap.put(offer.getId(), offer); - offer = buildOfferWithPorts("offer6", sampleHost2, 5 * MesosCommon.DEFAULT_WORKER_CPU + MesosCommon.DEFAULT_EXECUTOR_CPU, - 5 * MesosCommon.DEFAULT_WORKER_MEM_MB + MesosCommon.DEFAULT_EXECUTOR_MEM_MB, samplePort, samplePort + 5); + offer = buildOfferWithPorts("offer6", sampleHost2, 0, 0, samplePort, samplePort + 5); rotatingMap.put(offer.getId(), offer); topologyMap.clear(); @@ -312,8 +311,7 @@ public void testAllSlotsAvailableForSchedulingWithMultipleOffers() { topologyMap.put(sampleTopologyId, topologyDetails); defaultScheduler.prepare(topologyDetails.getConf()); - // Increase available cpu by a tiny fraction in order - offer = buildOfferWithPorts("offer6", sampleHost, 5 * MesosCommon.DEFAULT_WORKER_CPU + 1.1 * MesosCommon.DEFAULT_EXECUTOR_CPU, + offer = buildOfferWithPorts("offer6", sampleHost, 5 * MesosCommon.DEFAULT_WORKER_CPU + 1 * MesosCommon.DEFAULT_EXECUTOR_CPU, 5 * MesosCommon.DEFAULT_WORKER_MEM_MB + MesosCommon.DEFAULT_EXECUTOR_MEM_MB, samplePort, samplePort + 5); rotatingMap.put(offer.getId(), offer); @@ -324,8 +322,8 @@ public void testAllSlotsAvailableForSchedulingWithMultipleOffers() { workerSlotsAvailableForScheduling = defaultScheduler.allSlotsAvailableForScheduling(rotatingMap, existingSupervisors, new Topologies(topologyMap), topologiesMissingAssignments); - assertEquals(workerSlotsAvailableForScheduling.size(), 5); // Note that by increasing the executor cpu by a fraction, we are able to get 5 worker slots as we expect - + // We only have one offer with 6 ports + assertEquals(workerSlotsAvailableForScheduling.size(), 6); topologyMap.clear(); topologyDetails = constructTopologyDetails(sampleTopologyId, 10); @@ -334,7 +332,7 @@ public void testAllSlotsAvailableForSchedulingWithMultipleOffers() { workerSlotsAvailableForScheduling = defaultScheduler.allSlotsAvailableForScheduling(rotatingMap, existingSupervisors, new Topologies(topologyMap), topologiesMissingAssignments); - assertEquals(workerSlotsAvailableForScheduling.size(), 5); + assertEquals(workerSlotsAvailableForScheduling.size(), 6); offer = buildOfferWithPorts("offer7", "host2.east", 3 * MesosCommon.DEFAULT_WORKER_CPU + MesosCommon.DEFAULT_EXECUTOR_CPU, 3 * MesosCommon.DEFAULT_WORKER_MEM_MB + MesosCommon.DEFAULT_EXECUTOR_MEM_MB, samplePort, samplePort + 5);