diff --git a/.gitignore b/.gitignore index 02816a120e8..1577b096a11 100644 --- a/.gitignore +++ b/.gitignore @@ -35,3 +35,4 @@ _site dependency-reduced-pom.xml derby.log metastore_db +storm-core/maven-eclipse.xml diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index c88e36bf92d..8bb8aacbf54 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -26,13 +26,13 @@ (:import [backtype.storm.security.auth ThriftServer ThriftConnectionType ReqContext AuthUtils]) (:use [backtype.storm.scheduler.DefaultScheduler]) (:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails - Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails]) + Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler]) (:import [backtype.storm.utils TimeCacheMap TimeCacheMap$ExpiredCallback Utils ThriftTopologyUtils BufferFileInputStream]) (:import [backtype.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo - ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice]) + ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice ExecutorInfo]) (:import [backtype.storm.daemon Shutdownable]) (:use [backtype.storm util config log timer]) (:require [backtype.storm [cluster :as cluster] [stats :as stats]]) @@ -346,7 +346,7 @@ topology (read-storm-topology conf storm-id) executor->component (->> (compute-executor->component nimbus storm-id) (map-key (fn [[start-task end-task]] - (ExecutorDetails. (int start-task) (int end-task)))))] + (ExecutorInfo. (int start-task) (int end-task)))))] (TopologyDetails. storm-id topology-conf topology @@ -500,7 +500,7 @@ executor->slot (into {} (for [[executor [node port]] executor->node+port] ;; filter out the dead executors (if (contains? alive-executors executor) - {(ExecutorDetails. (first executor) + {(ExecutorInfo. (first executor) (second executor)) (WorkerSlot. node port)} {})))]] @@ -534,8 +534,8 @@ (map-val (fn [^SchedulerAssignment assignment] (->> assignment .getExecutorToSlot - (#(into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] %] - {[(.getStartTask executor) (.getEndTask executor)] + (#(into {} (for [[^ExecutorInfo executor ^WorkerSlot slot] %] + {[(.get_task_start executor) (.get_task_end executor)] [(.getNodeId slot) (.getPort slot)]}))))) scheduler-assignments)) diff --git a/storm-core/src/clj/backtype/storm/scheduler/DefaultScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/DefaultScheduler.clj index 1198eb6bac2..688395d2474 100644 --- a/storm-core/src/clj/backtype/storm/scheduler/DefaultScheduler.clj +++ b/storm-core/src/clj/backtype/storm/scheduler/DefaultScheduler.clj @@ -18,7 +18,7 @@ (:require [backtype.storm.scheduler.EvenScheduler :as EvenScheduler]) (:import [backtype.storm.scheduler IScheduler Topologies Cluster TopologyDetails WorkerSlot SchedulerAssignment - EvenScheduler ExecutorDetails]) + EvenScheduler]) (:gen-class :implements [backtype.storm.scheduler.IScheduler])) @@ -59,7 +59,7 @@ (map #(vector (.getNodeId %) (.getPort %)))) all-executors (->> topology .getExecutors - (map #(vector (.getStartTask %) (.getEndTask %))) + (map #(vector (.get_task_start %) (.get_task_end %))) set) alive-assigned (EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id) alive-executors (->> alive-assigned vals (apply concat) set) diff --git a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj index 25ba03b1b85..c1afaf120c8 100644 --- a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj +++ b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj @@ -17,7 +17,8 @@ (:use [backtype.storm util log config]) (:require [clojure.set :as set]) (:import [backtype.storm.scheduler IScheduler Topologies - Cluster TopologyDetails WorkerSlot ExecutorDetails]) + Cluster TopologyDetails WorkerSlot]) + (:import [backtype.storm.generated ExecutorInfo]) (:gen-class :implements [backtype.storm.scheduler.IScheduler])) @@ -31,8 +32,8 @@ executor->slot (if existing-assignment (.getExecutorToSlot existing-assignment) {}) - executor->node+port (into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] executor->slot - :let [executor [(.getStartTask executor) (.getEndTask executor)] + executor->node+port (into {} (for [[^ExecutorInfo executor ^WorkerSlot slot] executor->slot + :let [executor [(.get_task_start executor) (.get_task_end executor)] node+port [(.getNodeId slot) (.getPort slot)]]] {executor node+port})) alive-assigned (reverse-map executor->node+port)] @@ -44,7 +45,7 @@ (map #(vector (.getNodeId %) (.getPort %)))) all-executors (->> topology .getExecutors - (map #(vector (.getStartTask %) (.getEndTask %))) + (map #(vector (.get_task_start %) (.get_task_end %))) set) alive-assigned (get-alive-assigned-node+port->executors cluster topology-id) total-slots-to-use (min (.getNumWorkers topology) @@ -71,7 +72,7 @@ (doseq [[node+port executors] node+port->executors :let [^WorkerSlot slot (WorkerSlot. (first node+port) (last node+port)) executors (for [[start-task end-task] executors] - (ExecutorDetails. start-task end-task))]] + (ExecutorInfo. start-task end-task))]] (.assign cluster slot topology-id executors))))) (defn -prepare [this conf] diff --git a/storm-core/src/clj/backtype/storm/scheduler/IsolationScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/IsolationScheduler.clj index c6cf8d9ad7d..c75eab37b73 100644 --- a/storm-core/src/clj/backtype/storm/scheduler/IsolationScheduler.clj +++ b/storm-core/src/clj/backtype/storm/scheduler/IsolationScheduler.clj @@ -19,7 +19,7 @@ (:import [java.util HashSet Set List LinkedList ArrayList Map HashMap]) (:import [backtype.storm.scheduler IScheduler Topologies Cluster TopologyDetails WorkerSlot SchedulerAssignment - EvenScheduler ExecutorDetails]) + EvenScheduler]) (:gen-class :init init :constructors {[] []} diff --git a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java index e0c7cc72945..575bfff4bb0 100644 --- a/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java +++ b/storm-core/src/jvm/backtype/storm/scheduler/Cluster.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.Set; +import backtype.storm.generated.ExecutorInfo; + public class Cluster { /** @@ -137,12 +139,12 @@ public boolean needsScheduling(TopologyDetails topology) { * @param topology * @return */ - public Map getNeedsSchedulingExecutorToComponents(TopologyDetails topology) { - Collection allExecutors = new HashSet(topology.getExecutors()); + public Map getNeedsSchedulingExecutorToComponents(TopologyDetails topology) { + Collection allExecutors = new HashSet(topology.getExecutors()); SchedulerAssignment assignment = this.assignments.get(topology.getId()); if (assignment != null) { - Collection assignedExecutors = assignment.getExecutors(); + Collection assignedExecutors = assignment.getExecutors(); allExecutors.removeAll(assignedExecutors); } @@ -155,13 +157,13 @@ public Map getNeedsSchedulingExecutorToComponents(Topol * @param topology * @return */ - public Map> getNeedsSchedulingComponentToExecutors(TopologyDetails topology) { - Map executorToComponents = this.getNeedsSchedulingExecutorToComponents(topology); - Map> componentToExecutors = new HashMap>(); - for (ExecutorDetails executor : executorToComponents.keySet()) { + public Map> getNeedsSchedulingComponentToExecutors(TopologyDetails topology) { + Map executorToComponents = this.getNeedsSchedulingExecutorToComponents(topology); + Map> componentToExecutors = new HashMap>(); + for (ExecutorInfo executor : executorToComponents.keySet()) { String component = executorToComponents.get(executor); if (!componentToExecutors.containsKey(component)) { - componentToExecutors.put(component, new ArrayList()); + componentToExecutors.put(component, new ArrayList()); } componentToExecutors.get(component).add(executor); @@ -244,16 +246,16 @@ public List getAssignableSlots(SupervisorDetails supervisor) { /** * get the unassigned executors of the topology. */ - public Collection getUnassignedExecutors(TopologyDetails topology) { + public Collection getUnassignedExecutors(TopologyDetails topology) { if (topology == null) { - return new ArrayList(0); + return new ArrayList(0); } - Collection ret = new HashSet(topology.getExecutors()); + Collection ret = new HashSet(topology.getExecutors()); SchedulerAssignment assignment = this.getAssignmentById(topology.getId()); if (assignment != null) { - Set assignedExecutors = assignment.getExecutors(); + Set assignedExecutors = assignment.getExecutors(); ret.removeAll(assignedExecutors); } @@ -283,17 +285,17 @@ public int getAssignedNumWorkers(TopologyDetails topology) { * * @throws RuntimeException if the specified slot is already occupied. */ - public void assign(WorkerSlot slot, String topologyId, Collection executors) { + public void assign(WorkerSlot slot, String topologyId, Collection executors) { if (this.isSlotOccupied(slot)) { throw new RuntimeException("slot: [" + slot.getNodeId() + ", " + slot.getPort() + "] is already occupied."); } SchedulerAssignmentImpl assignment = (SchedulerAssignmentImpl)this.getAssignmentById(topologyId); if (assignment == null) { - assignment = new SchedulerAssignmentImpl(topologyId, new HashMap()); + assignment = new SchedulerAssignmentImpl(topologyId, new HashMap()); this.assignments.put(topologyId, assignment); } else { - for (ExecutorDetails executor : executors) { + for (ExecutorInfo executor : executors) { if (assignment.isExecutorAssigned(executor)) { throw new RuntimeException("the executor is already assigned, you should unassign it before assign it to another slot."); } diff --git a/storm-core/src/jvm/backtype/storm/scheduler/ExecutorDetails.java b/storm-core/src/jvm/backtype/storm/scheduler/ExecutorDetails.java deleted file mode 100644 index bcf4aca47d2..00000000000 --- a/storm-core/src/jvm/backtype/storm/scheduler/ExecutorDetails.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package backtype.storm.scheduler; - -public class ExecutorDetails { - int startTask; - int endTask; - - public ExecutorDetails(int startTask, int endTask){ - this.startTask = startTask; - this.endTask = endTask; - } - - public int getStartTask() { - return startTask; - } - - public int getEndTask() { - return endTask; - } - - public boolean equals(Object other) { - if (other == null || !(other instanceof ExecutorDetails)) { - return false; - } - - ExecutorDetails executor = (ExecutorDetails)other; - return (this.startTask == executor.startTask) && (this.endTask == executor.endTask); - } - - public int hashCode() { - return this.startTask + 13 * this.endTask; - } - - @Override - public String toString() { - return "[" + this.startTask + ", " + this.endTask + "]"; - } -} diff --git a/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignment.java b/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignment.java index 0212e48a24f..d0010323db5 100644 --- a/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignment.java +++ b/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignment.java @@ -20,6 +20,8 @@ import java.util.Map; import java.util.Set; +import backtype.storm.generated.ExecutorInfo; + public interface SchedulerAssignment { /** * Does this slot occupied by this assignment? @@ -34,7 +36,7 @@ public interface SchedulerAssignment { * @param executor * @return */ - public boolean isExecutorAssigned(ExecutorDetails executor); + public boolean isExecutorAssigned(ExecutorInfo executor); /** * get the topology-id this assignment is for. @@ -46,13 +48,13 @@ public interface SchedulerAssignment { * get the executor -> slot map. * @return */ - public Map getExecutorToSlot(); + public Map getExecutorToSlot(); /** * Return the executors covered by this assignments * @return */ - public Set getExecutors(); + public Set getExecutors(); public Set getSlots(); } \ No newline at end of file diff --git a/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignmentImpl.java b/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignmentImpl.java index 08af4b704b3..32a05c0c3fd 100644 --- a/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignmentImpl.java +++ b/storm-core/src/jvm/backtype/storm/scheduler/SchedulerAssignmentImpl.java @@ -25,6 +25,8 @@ import java.util.Map; import java.util.Set; +import backtype.storm.generated.ExecutorInfo; + //TODO: improve this by maintaining slot -> executors as well for more efficient operations public class SchedulerAssignmentImpl implements SchedulerAssignment { /** @@ -34,11 +36,11 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment { /** * assignment detail, a mapping from executor to WorkerSlot */ - Map executorToSlot; + Map executorToSlot; - public SchedulerAssignmentImpl(String topologyId, Map executorToSlots) { + public SchedulerAssignmentImpl(String topologyId, Map executorToSlots) { this.topologyId = topologyId; - this.executorToSlot = new HashMap(0); + this.executorToSlot = new HashMap(0); if (executorToSlots != null) { this.executorToSlot.putAll(executorToSlots); } @@ -54,8 +56,8 @@ public Set getSlots() { * @param slot * @param executors */ - public void assign(WorkerSlot slot, Collection executors) { - for (ExecutorDetails executor : executors) { + public void assign(WorkerSlot slot, Collection executors) { + for (ExecutorInfo executor : executors) { this.executorToSlot.put(executor, slot); } } @@ -65,8 +67,8 @@ public void assign(WorkerSlot slot, Collection executors) { * @param slot */ public void unassignBySlot(WorkerSlot slot) { - List executors = new ArrayList(); - for (ExecutorDetails executor : this.executorToSlot.keySet()) { + List executors = new ArrayList(); + for (ExecutorInfo executor : this.executorToSlot.keySet()) { WorkerSlot ws = this.executorToSlot.get(executor); if (ws.equals(slot)) { executors.add(executor); @@ -74,7 +76,7 @@ public void unassignBySlot(WorkerSlot slot) { } // remove - for (ExecutorDetails executor : executors) { + for (ExecutorInfo executor : executors) { this.executorToSlot.remove(executor); } } @@ -88,7 +90,7 @@ public boolean isSlotOccupied(WorkerSlot slot) { return this.executorToSlot.containsValue(slot); } - public boolean isExecutorAssigned(ExecutorDetails executor) { + public boolean isExecutorAssigned(ExecutorInfo executor) { return this.executorToSlot.containsKey(executor); } @@ -96,7 +98,7 @@ public String getTopologyId() { return this.topologyId; } - public Map getExecutorToSlot() { + public Map getExecutorToSlot() { return this.executorToSlot; } @@ -104,7 +106,7 @@ public Map getExecutorToSlot() { * Return the executors covered by this assignments * @return */ - public Set getExecutors() { + public Set getExecutors() { return this.executorToSlot.keySet(); } } \ No newline at end of file diff --git a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java index 6daf4edae35..2cc13ad41a0 100644 --- a/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java +++ b/storm-core/src/jvm/backtype/storm/scheduler/TopologyDetails.java @@ -22,6 +22,7 @@ import java.util.Map; import backtype.storm.Config; +import backtype.storm.generated.ExecutorInfo; import backtype.storm.generated.StormTopology; @@ -29,7 +30,7 @@ public class TopologyDetails { String topologyId; Map topologyConf; StormTopology topology; - Map executorToComponent; + Map executorToComponent; int numWorkers; public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, int numWorkers) { @@ -39,9 +40,9 @@ public TopologyDetails(String topologyId, Map topologyConf, StormTopology topolo this.numWorkers = numWorkers; } - public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, int numWorkers, Map executorToComponents) { + public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, int numWorkers, Map executorToComponents) { this(topologyId, topologyConf, topology, numWorkers); - this.executorToComponent = new HashMap(0); + this.executorToComponent = new HashMap(0); if (executorToComponents != null) { this.executorToComponent.putAll(executorToComponents); } @@ -67,13 +68,13 @@ public StormTopology getTopology() { return topology; } - public Map getExecutorToComponent() { + public Map getExecutorToComponent() { return this.executorToComponent; } - public Map selectExecutorToComponent(Collection executors) { - Map ret = new HashMap(executors.size()); - for (ExecutorDetails executor : executors) { + public Map selectExecutorToComponent(Collection executors) { + Map ret = new HashMap(executors.size()); + for (ExecutorInfo executor : executors) { String compId = this.executorToComponent.get(executor); if (compId != null) { ret.put(executor, compId); @@ -83,7 +84,7 @@ public Map selectExecutorToComponent(Collection getExecutors() { + public Collection getExecutors() { return this.executorToComponent.keySet(); } } diff --git a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java index 883c65fe656..45ca90f1f23 100644 --- a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java +++ b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/Node.java @@ -29,8 +29,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import backtype.storm.generated.ExecutorInfo; import backtype.storm.scheduler.Cluster; -import backtype.storm.scheduler.ExecutorDetails; import backtype.storm.scheduler.SchedulerAssignment; import backtype.storm.scheduler.SupervisorDetails; import backtype.storm.scheduler.WorkerSlot; @@ -223,7 +223,7 @@ public void freeTopology(String topId, Cluster cluster) { * @param executors the executors to run in that slot. * @param cluster the cluster to be updated */ - public void assign(String topId, Collection executors, + public void assign(String topId, Collection executors, Cluster cluster) { if (!_isAlive) { throw new IllegalStateException("Trying to adding to a dead node " + _nodeId); diff --git a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/NodePool.java b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/NodePool.java index 21d1577a47c..36c59f83254 100644 --- a/storm-core/src/jvm/backtype/storm/scheduler/multitenant/NodePool.java +++ b/storm-core/src/jvm/backtype/storm/scheduler/multitenant/NodePool.java @@ -32,8 +32,8 @@ import org.slf4j.LoggerFactory; import backtype.storm.Config; +import backtype.storm.generated.ExecutorInfo; import backtype.storm.scheduler.Cluster; -import backtype.storm.scheduler.ExecutorDetails; import backtype.storm.scheduler.SchedulerAssignment; import backtype.storm.scheduler.TopologyDetails; import backtype.storm.scheduler.WorkerSlot; @@ -61,9 +61,9 @@ public NodeAndSlotCounts(int nodes, int slots) { */ public static class RoundRobinSlotScheduler { private Map> _nodeToComps; - private HashMap> _spreadToSchedule; - private LinkedList> _slots; - private Set _lastSlot; + private HashMap> _spreadToSchedule; + private LinkedList> _slots; + private Set _lastSlot; private Cluster _cluster; private String _topId; @@ -79,14 +79,14 @@ public RoundRobinSlotScheduler(TopologyDetails td, int slotsToUse, _topId = td.getId(); _cluster = cluster; - Map execToComp = td.getExecutorToComponent(); + Map execToComp = td.getExecutorToComponent(); SchedulerAssignment assignment = _cluster.getAssignmentById(_topId); _nodeToComps = new HashMap>(); if (assignment != null) { - Map execToSlot = assignment.getExecutorToSlot(); + Map execToSlot = assignment.getExecutorToSlot(); - for (Entry entry: execToSlot.entrySet()) { + for (Entry entry: execToSlot.entrySet()) { String nodeId = entry.getValue().getNodeId(); Set comps = _nodeToComps.get(nodeId); if (comps == null) { @@ -97,27 +97,27 @@ public RoundRobinSlotScheduler(TopologyDetails td, int slotsToUse, } } - _spreadToSchedule = new HashMap>(); + _spreadToSchedule = new HashMap>(); List spreadComps = (List)td.getConf().get(Config.TOPOLOGY_SPREAD_COMPONENTS); if (spreadComps != null) { for (String comp: spreadComps) { - _spreadToSchedule.put(comp, new ArrayList()); + _spreadToSchedule.put(comp, new ArrayList()); } } - _slots = new LinkedList>(); + _slots = new LinkedList>(); for (int i = 0; i < slotsToUse; i++) { - _slots.add(new HashSet()); + _slots.add(new HashSet()); } int at = 0; - for (Entry> entry: _cluster.getNeedsSchedulingComponentToExecutors(td).entrySet()) { + for (Entry> entry: _cluster.getNeedsSchedulingComponentToExecutors(td).entrySet()) { LOG.debug("Scheduling for {}", entry.getKey()); if (_spreadToSchedule.containsKey(entry.getKey())) { LOG.debug("Saving {} for spread...",entry.getKey()); _spreadToSchedule.get(entry.getKey()).addAll(entry.getValue()); } else { - for (ExecutorDetails ed: entry.getValue()) { + for (ExecutorInfo ed: entry.getValue()) { LOG.debug("Assigning {} {} to slot {}", new Object[]{entry.getKey(), ed, at}); _slots.get(at).add(ed); at++; @@ -139,10 +139,10 @@ public boolean assignSlotTo(Node n) { if (_slots.isEmpty()) { return false; } - Set slot = _slots.pop(); + Set slot = _slots.pop(); if (slot == _lastSlot) { //The last slot fill it up - for (Entry> entry: _spreadToSchedule.entrySet()) { + for (Entry> entry: _spreadToSchedule.entrySet()) { if (entry.getValue().size() > 0) { slot.addAll(entry.getValue()); } @@ -154,7 +154,7 @@ public boolean assignSlotTo(Node n) { nodeComps = new HashSet(); _nodeToComps.put(nodeId, nodeComps); } - for (Entry> entry: _spreadToSchedule.entrySet()) { + for (Entry> entry: _spreadToSchedule.entrySet()) { if (entry.getValue().size() > 0) { String comp = entry.getKey(); if (!nodeComps.contains(comp)) { diff --git a/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj b/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj index d347ed59e75..c91a60f0a26 100644 --- a/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj +++ b/storm-core/test/clj/backtype/storm/scheduler/multitenant_scheduler_test.clj @@ -17,8 +17,8 @@ (:use [clojure test]) (:use [backtype.storm config testing log]) (:require [backtype.storm.daemon [nimbus :as nimbus]]) - (:import [backtype.storm.generated StormTopology]) - (:import [backtype.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails + (:import [backtype.storm.generated StormTopology ExecutorInfo]) + (:import [backtype.storm.scheduler Cluster SupervisorDetails WorkerSlot SchedulerAssignmentImpl Topologies TopologyDetails]) (:import [backtype.storm.scheduler.multitenant Node NodePool FreePool DefaultPool IsolatedPool MultitenantScheduler])) @@ -31,7 +31,7 @@ (defn to-top-map [topologies] (into {} (for [top topologies] {(.getId top) top}))) -(defn ed [id] (ExecutorDetails. (int id) (int id))) +(defn ed [id] (ExecutorInfo. (int id) (int id))) (defn mk-ed-map [arg] (into {} @@ -53,25 +53,25 @@ (is (= 4 (.totalSlotsFree node))) (is (= 0 (.totalSlotsUsed node))) (is (= 4 (.totalSlots node))) - (.assign node "topology1" (list (ExecutorDetails. 1 1)) cluster) + (.assign node "topology1" (list (ExecutorInfo. 1 1)) cluster) (is (= 1 (.size (.getRunningTopologies node)))) (is (= false (.isTotallyFree node))) (is (= 3 (.totalSlotsFree node))) (is (= 1 (.totalSlotsUsed node))) (is (= 4 (.totalSlots node))) - (.assign node "topology1" (list (ExecutorDetails. 2 2)) cluster) + (.assign node "topology1" (list (ExecutorInfo. 2 2)) cluster) (is (= 1 (.size (.getRunningTopologies node)))) (is (= false (.isTotallyFree node))) (is (= 2 (.totalSlotsFree node))) (is (= 2 (.totalSlotsUsed node))) (is (= 4 (.totalSlots node))) - (.assign node "topology2" (list (ExecutorDetails. 1 1)) cluster) + (.assign node "topology2" (list (ExecutorInfo. 1 1)) cluster) (is (= 2 (.size (.getRunningTopologies node)))) (is (= false (.isTotallyFree node))) (is (= 1 (.totalSlotsFree node))) (is (= 3 (.totalSlotsUsed node))) (is (= 4 (.totalSlots node))) - (.assign node "topology2" (list (ExecutorDetails. 2 2)) cluster) + (.assign node "topology2" (list (ExecutorInfo. 2 2)) cluster) (is (= 2 (.size (.getRunningTopologies node)))) (is (= false (.isTotallyFree node))) (is (= 0 (.totalSlotsFree node))) @@ -91,7 +91,7 @@ node-map (Node/getAllNodesFrom cluster) free-pool (FreePool. )] ;; assign one node so it is not in the pool - (.assign (.get node-map "super0") "topology1" (list (ExecutorDetails. 1 1)) cluster) + (.assign (.get node-map "super0") "topology1" (list (ExecutorInfo. 1 1)) cluster) (.init free-pool cluster node-map) (is (= 4 (.nodesAvailable free-pool))) (is (= (* 4 4) (.slotsAvailable free-pool))) @@ -691,10 +691,10 @@ ["bolt2" 10 15] ["bolt3" 15 20]])) existing-assignments { - "topology1" (SchedulerAssignmentImpl. "topology1" {(ExecutorDetails. 0 5) (WorkerSlot. "super0" 1) - (ExecutorDetails. 5 10) (WorkerSlot. "super0" 20) - (ExecutorDetails. 10 15) (WorkerSlot. "super0" 1) - (ExecutorDetails. 15 20) (WorkerSlot. "super0" 1)}) + "topology1" (SchedulerAssignmentImpl. "topology1" {(ExecutorInfo. 0 5) (WorkerSlot. "super0" 1) + (ExecutorInfo. 5 10) (WorkerSlot. "super0" 20) + (ExecutorInfo. 10 15) (WorkerSlot. "super0" 1) + (ExecutorInfo. 15 20) (WorkerSlot. "super0" 1)}) } cluster (Cluster. (nimbus/standalone-nimbus) supers existing-assignments) node-map (Node/getAllNodesFrom cluster) @@ -739,8 +739,8 @@ 1 (mk-ed-map [["spout21" 2 3]])) worker-slot-with-multiple-assignments (WorkerSlot. "super1" 1) - existing-assignments {"topology2" (SchedulerAssignmentImpl. "topology2" {(ExecutorDetails. 1 1) worker-slot-with-multiple-assignments}) - "topology3" (SchedulerAssignmentImpl. "topology3" {(ExecutorDetails. 2 2) worker-slot-with-multiple-assignments})} + existing-assignments {"topology2" (SchedulerAssignmentImpl. "topology2" {(ExecutorInfo. 1 1) worker-slot-with-multiple-assignments}) + "topology3" (SchedulerAssignmentImpl. "topology3" {(ExecutorInfo. 2 2) worker-slot-with-multiple-assignments})} cluster (Cluster. (nimbus/standalone-nimbus) supers existing-assignments) topologies (Topologies. (to-top-map [topology1 topology2 topology3])) conf {MULTITENANT-SCHEDULER-USER-POOLS {"userA" 2 "userB" 1}} @@ -768,7 +768,7 @@ (mk-ed-map [["spout11" 0 1]])) existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1" - {(ExecutorDetails. 0 0) (WorkerSlot. "super0" port-not-reported-by-supervisor)})} + {(ExecutorInfo. 0 0) (WorkerSlot. "super0" port-not-reported-by-supervisor)})} cluster (Cluster. (nimbus/standalone-nimbus) supers existing-assignments) topologies (Topologies. (to-top-map [topology1])) conf {} @@ -803,12 +803,12 @@ worker-slot-with-multiple-assignments (WorkerSlot. dead-supervisor 1) existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1" - {(ExecutorDetails. 0 0) worker-slot-with-multiple-assignments - (ExecutorDetails. 1 1) (WorkerSlot. dead-supervisor 3)}) + {(ExecutorInfo. 0 0) worker-slot-with-multiple-assignments + (ExecutorInfo. 1 1) (WorkerSlot. dead-supervisor 3)}) "topology2" (SchedulerAssignmentImpl. "topology2" - {(ExecutorDetails. 4 4) worker-slot-with-multiple-assignments - (ExecutorDetails. 5 5) (WorkerSlot. dead-supervisor port-not-reported-by-supervisor)})} + {(ExecutorInfo. 4 4) worker-slot-with-multiple-assignments + (ExecutorInfo. 5 5) (WorkerSlot. dead-supervisor port-not-reported-by-supervisor)})} cluster (Cluster. (nimbus/standalone-nimbus) supers existing-assignments) topologies (Topologies. (to-top-map [topology1 topology2])) conf {} diff --git a/storm-core/test/clj/backtype/storm/scheduler_test.clj b/storm-core/test/clj/backtype/storm/scheduler_test.clj index 45acf75e874..7ba91972cb6 100644 --- a/storm-core/test/clj/backtype/storm/scheduler_test.clj +++ b/storm-core/test/clj/backtype/storm/scheduler_test.clj @@ -18,32 +18,32 @@ (:use [backtype.storm config testing]) (:use [backtype.storm.scheduler EvenScheduler]) (:require [backtype.storm.daemon [nimbus :as nimbus]]) - (:import [backtype.storm.generated StormTopology]) - (:import [backtype.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails + (:import [backtype.storm.generated StormTopology ExecutorInfo]) + (:import [backtype.storm.scheduler Cluster SupervisorDetails WorkerSlot SchedulerAssignmentImpl Topologies TopologyDetails])) (defn clojurify-executor->slot [executorToSlot] (into {} (for [[executor slot] executorToSlot] - {[(.getStartTask executor) (.getEndTask executor)] + {[(.get_task_start executor) (.get_task_end executor)] [(.getNodeId slot) (.getPort slot)]}))) (defn clojurify-executor->comp [executorToComp] (into {} (for [[executor component] executorToComp] - {[(.getStartTask executor) (.getEndTask executor)] component}))) + {[(.get_task_start executor) (.get_task_end executor)] component}))) (defn clojurify-component->executors [compToExecutor] (into {} (for [[component executors] compToExecutor - :let [new-executors (set (map #(vector (.getStartTask %) (.getEndTask %)) executors))]] + :let [new-executors (set (map #(vector (.get_task_start %) (.get_task_end %)) executors))]] {component new-executors}))) (deftest test-supervisor-details - (let [executor->slot {(ExecutorDetails. (int 1) (int 5)) (WorkerSlot. "supervisor1" (int 1)) - (ExecutorDetails. (int 6) (int 10)) (WorkerSlot. "supervisor2" (int 2))} + (let [executor->slot {(ExecutorInfo. (int 1) (int 5)) (WorkerSlot. "supervisor1" (int 1)) + (ExecutorInfo. (int 6) (int 10)) (WorkerSlot. "supervisor2" (int 2))} topology-id "topology1" assignment (SchedulerAssignmentImpl. topology-id executor->slot)] ;; test assign (.assign assignment (WorkerSlot. "supervisor1" 1) - (list (ExecutorDetails. (int 11) (int 15)) (ExecutorDetails. (int 16) (int 20)))) + (list (ExecutorInfo. (int 11) (int 15)) (ExecutorInfo. (int 16) (int 20)))) (is (= {[1 5] ["supervisor1" 1] [6 10] ["supervisor2" 2] [11 15] ["supervisor1" 1] @@ -54,8 +54,8 @@ (is (= true (.isSlotOccupied assignment (WorkerSlot. "supervisor1" (int 1))))) ;; test isExecutorAssigned - (is (= true (.isExecutorAssigned assignment (ExecutorDetails. (int 1) (int 5))))) - (is (= false (.isExecutorAssigned assignment (ExecutorDetails. (int 21) (int 25))))) + (is (= true (.isExecutorAssigned assignment (ExecutorInfo. (int 1) (int 5))))) + (is (= false (.isExecutorAssigned assignment (ExecutorInfo. (int 21) (int 25))))) ;; test unassignBySlot (.unassignBySlot assignment (WorkerSlot. "supervisor1" (int 1))) @@ -65,8 +65,8 @@ )) (deftest test-topologies - (let [executor1 (ExecutorDetails. (int 1) (int 5)) - executor2 (ExecutorDetails. (int 6) (int 10)) + (let [executor1 (ExecutorInfo. (int 1) (int 5)) + executor2 (ExecutorInfo. (int 6) (int 10)) topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1"} (StormTopology.) 1 {executor1 "spout1" executor2 "bolt1"}) @@ -91,13 +91,14 @@ (deftest test-cluster (let [supervisor1 (SupervisorDetails. "supervisor1" "192.168.0.1" (list ) (map int (list 1 3 5 7 9))) supervisor2 (SupervisorDetails. "supervisor2" "192.168.0.2" (list ) (map int (list 2 4 6 8 10))) - executor1 (ExecutorDetails. (int 1) (int 5)) - executor2 (ExecutorDetails. (int 6) (int 10)) - executor3 (ExecutorDetails. (int 11) (int 15)) - executor11 (ExecutorDetails. (int 100) (int 105)) - executor12 (ExecutorDetails. (int 106) (int 110)) - executor21 (ExecutorDetails. (int 201) (int 205)) - executor22 (ExecutorDetails. (int 206) (int 210)) + executor1 (ExecutorInfo. (int 1) (int 5)) + executor2 (ExecutorInfo. (int 6) (int 10)) + executor3 (ExecutorInfo. (int 11) (int 15)) + executor11 (ExecutorInfo. (int 100) (int 105)) + executor12 (ExecutorInfo. (int 106) (int 110)) + executor21 (ExecutorInfo. (int 201) (int 205)) + executor22 (ExecutorInfo. (int 206) (int 210)) + ;; topology1 needs scheduling: executor3 is NOT assigned a slot. topology1 (TopologyDetails. "topology1" {TOPOLOGY-NAME "topology-name-1"} (StormTopology.) @@ -169,7 +170,7 @@ (empty? (.getNeedsSchedulingExecutorToComponents cluster topology3)))) ;; test Cluster.getNeedsSchedulingComponentToExecutors - (is (= {"bolt2" #{[(.getStartTask executor3) (.getEndTask executor3)]}} + (is (= {"bolt2" #{[(.get_task_start executor3) (.get_task_end executor3)]}} (clojurify-component->executors (.getNeedsSchedulingComponentToExecutors cluster topology1)))) (is (= true (empty? (.getNeedsSchedulingComponentToExecutors cluster topology2))))