Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ _site
dependency-reduced-pom.xml
derby.log
metastore_db
storm-core/maven-eclipse.xml
12 changes: 6 additions & 6 deletions storm-core/src/clj/backtype/storm/daemon/nimbus.clj
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
(:import [backtype.storm.security.auth ThriftServer ThriftConnectionType ReqContext AuthUtils])
(:use [backtype.storm.scheduler.DefaultScheduler])
(:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails
Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails])
Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler])
(:import [backtype.storm.utils TimeCacheMap TimeCacheMap$ExpiredCallback Utils ThriftTopologyUtils
BufferFileInputStream])
(:import [backtype.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo
ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus
KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo
ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice])
ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice ExecutorInfo])
(:import [backtype.storm.daemon Shutdownable])
(:use [backtype.storm util config log timer])
(:require [backtype.storm [cluster :as cluster] [stats :as stats]])
Expand Down Expand Up @@ -346,7 +346,7 @@
topology (read-storm-topology conf storm-id)
executor->component (->> (compute-executor->component nimbus storm-id)
(map-key (fn [[start-task end-task]]
(ExecutorDetails. (int start-task) (int end-task)))))]
(ExecutorInfo. (int start-task) (int end-task)))))]
(TopologyDetails. storm-id
topology-conf
topology
Expand Down Expand Up @@ -500,7 +500,7 @@
executor->slot (into {} (for [[executor [node port]] executor->node+port]
;; filter out the dead executors
(if (contains? alive-executors executor)
{(ExecutorDetails. (first executor)
{(ExecutorInfo. (first executor)
(second executor))
(WorkerSlot. node port)}
{})))]]
Expand Down Expand Up @@ -534,8 +534,8 @@
(map-val (fn [^SchedulerAssignment assignment]
(->> assignment
.getExecutorToSlot
(#(into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] %]
{[(.getStartTask executor) (.getEndTask executor)]
(#(into {} (for [[^ExecutorInfo executor ^WorkerSlot slot] %]
{[(.get_task_start executor) (.get_task_end executor)]
[(.getNodeId slot) (.getPort slot)]})))))
scheduler-assignments))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
(:require [backtype.storm.scheduler.EvenScheduler :as EvenScheduler])
(:import [backtype.storm.scheduler IScheduler Topologies
Cluster TopologyDetails WorkerSlot SchedulerAssignment
EvenScheduler ExecutorDetails])
EvenScheduler])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, is the original import unnecessary? And you just remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chuanlei
Yes, It's unnecessary.

(:gen-class
:implements [backtype.storm.scheduler.IScheduler]))

Expand Down Expand Up @@ -59,7 +59,7 @@
(map #(vector (.getNodeId %) (.getPort %))))
all-executors (->> topology
.getExecutors
(map #(vector (.getStartTask %) (.getEndTask %)))
(map #(vector (.get_task_start %) (.get_task_end %)))
set)
alive-assigned (EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id)
alive-executors (->> alive-assigned vals (apply concat) set)
Expand Down
11 changes: 6 additions & 5 deletions storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
(:use [backtype.storm util log config])
(:require [clojure.set :as set])
(:import [backtype.storm.scheduler IScheduler Topologies
Cluster TopologyDetails WorkerSlot ExecutorDetails])
Cluster TopologyDetails WorkerSlot])
(:import [backtype.storm.generated ExecutorInfo])
(:gen-class
:implements [backtype.storm.scheduler.IScheduler]))

Expand All @@ -31,8 +32,8 @@
executor->slot (if existing-assignment
(.getExecutorToSlot existing-assignment)
{})
executor->node+port (into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] executor->slot
:let [executor [(.getStartTask executor) (.getEndTask executor)]
executor->node+port (into {} (for [[^ExecutorInfo executor ^WorkerSlot slot] executor->slot
:let [executor [(.get_task_start executor) (.get_task_end executor)]
node+port [(.getNodeId slot) (.getPort slot)]]]
{executor node+port}))
alive-assigned (reverse-map executor->node+port)]
Expand All @@ -44,7 +45,7 @@
(map #(vector (.getNodeId %) (.getPort %))))
all-executors (->> topology
.getExecutors
(map #(vector (.getStartTask %) (.getEndTask %)))
(map #(vector (.get_task_start %) (.get_task_end %)))
set)
alive-assigned (get-alive-assigned-node+port->executors cluster topology-id)
total-slots-to-use (min (.getNumWorkers topology)
Expand All @@ -71,7 +72,7 @@
(doseq [[node+port executors] node+port->executors
:let [^WorkerSlot slot (WorkerSlot. (first node+port) (last node+port))
executors (for [[start-task end-task] executors]
(ExecutorDetails. start-task end-task))]]
(ExecutorInfo. start-task end-task))]]
(.assign cluster slot topology-id executors)))))

(defn -prepare [this conf]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
(:import [java.util HashSet Set List LinkedList ArrayList Map HashMap])
(:import [backtype.storm.scheduler IScheduler Topologies
Cluster TopologyDetails WorkerSlot SchedulerAssignment
EvenScheduler ExecutorDetails])
EvenScheduler])
(:gen-class
:init init
:constructors {[] []}
Expand Down
32 changes: 17 additions & 15 deletions storm-core/src/jvm/backtype/storm/scheduler/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Map;
import java.util.Set;

import backtype.storm.generated.ExecutorInfo;

public class Cluster {

/**
Expand Down Expand Up @@ -137,12 +139,12 @@ public boolean needsScheduling(TopologyDetails topology) {
* @param topology
* @return
*/
public Map<ExecutorDetails, String> getNeedsSchedulingExecutorToComponents(TopologyDetails topology) {
Collection<ExecutorDetails> allExecutors = new HashSet(topology.getExecutors());
public Map<ExecutorInfo, String> getNeedsSchedulingExecutorToComponents(TopologyDetails topology) {
Collection<ExecutorInfo> allExecutors = new HashSet(topology.getExecutors());

SchedulerAssignment assignment = this.assignments.get(topology.getId());
if (assignment != null) {
Collection<ExecutorDetails> assignedExecutors = assignment.getExecutors();
Collection<ExecutorInfo> assignedExecutors = assignment.getExecutors();
allExecutors.removeAll(assignedExecutors);
}

Expand All @@ -155,13 +157,13 @@ public Map<ExecutorDetails, String> getNeedsSchedulingExecutorToComponents(Topol
* @param topology
* @return
*/
public Map<String, List<ExecutorDetails>> getNeedsSchedulingComponentToExecutors(TopologyDetails topology) {
Map<ExecutorDetails, String> executorToComponents = this.getNeedsSchedulingExecutorToComponents(topology);
Map<String, List<ExecutorDetails>> componentToExecutors = new HashMap<String, List<ExecutorDetails>>();
for (ExecutorDetails executor : executorToComponents.keySet()) {
public Map<String, List<ExecutorInfo>> getNeedsSchedulingComponentToExecutors(TopologyDetails topology) {
Map<ExecutorInfo, String> executorToComponents = this.getNeedsSchedulingExecutorToComponents(topology);
Map<String, List<ExecutorInfo>> componentToExecutors = new HashMap<String, List<ExecutorInfo>>();
for (ExecutorInfo executor : executorToComponents.keySet()) {
String component = executorToComponents.get(executor);
if (!componentToExecutors.containsKey(component)) {
componentToExecutors.put(component, new ArrayList<ExecutorDetails>());
componentToExecutors.put(component, new ArrayList<ExecutorInfo>());
}

componentToExecutors.get(component).add(executor);
Expand Down Expand Up @@ -244,16 +246,16 @@ public List<WorkerSlot> getAssignableSlots(SupervisorDetails supervisor) {
/**
* get the unassigned executors of the topology.
*/
public Collection<ExecutorDetails> getUnassignedExecutors(TopologyDetails topology) {
public Collection<ExecutorInfo> getUnassignedExecutors(TopologyDetails topology) {
if (topology == null) {
return new ArrayList<ExecutorDetails>(0);
return new ArrayList<ExecutorInfo>(0);
}

Collection<ExecutorDetails> ret = new HashSet(topology.getExecutors());
Collection<ExecutorInfo> ret = new HashSet(topology.getExecutors());

SchedulerAssignment assignment = this.getAssignmentById(topology.getId());
if (assignment != null) {
Set<ExecutorDetails> assignedExecutors = assignment.getExecutors();
Set<ExecutorInfo> assignedExecutors = assignment.getExecutors();
ret.removeAll(assignedExecutors);
}

Expand Down Expand Up @@ -283,17 +285,17 @@ public int getAssignedNumWorkers(TopologyDetails topology) {
*
* @throws RuntimeException if the specified slot is already occupied.
*/
public void assign(WorkerSlot slot, String topologyId, Collection<ExecutorDetails> executors) {
public void assign(WorkerSlot slot, String topologyId, Collection<ExecutorInfo> executors) {
if (this.isSlotOccupied(slot)) {
throw new RuntimeException("slot: [" + slot.getNodeId() + ", " + slot.getPort() + "] is already occupied.");
}

SchedulerAssignmentImpl assignment = (SchedulerAssignmentImpl)this.getAssignmentById(topologyId);
if (assignment == null) {
assignment = new SchedulerAssignmentImpl(topologyId, new HashMap<ExecutorDetails, WorkerSlot>());
assignment = new SchedulerAssignmentImpl(topologyId, new HashMap<ExecutorInfo, WorkerSlot>());
this.assignments.put(topologyId, assignment);
} else {
for (ExecutorDetails executor : executors) {
for (ExecutorInfo executor : executors) {
if (assignment.isExecutorAssigned(executor)) {
throw new RuntimeException("the executor is already assigned, you should unassign it before assign it to another slot.");
}
Expand Down
54 changes: 0 additions & 54 deletions storm-core/src/jvm/backtype/storm/scheduler/ExecutorDetails.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Map;
import java.util.Set;

import backtype.storm.generated.ExecutorInfo;

public interface SchedulerAssignment {
/**
* Does this slot occupied by this assignment?
Expand All @@ -34,7 +36,7 @@ public interface SchedulerAssignment {
* @param executor
* @return
*/
public boolean isExecutorAssigned(ExecutorDetails executor);
public boolean isExecutorAssigned(ExecutorInfo executor);

/**
* get the topology-id this assignment is for.
Expand All @@ -46,13 +48,13 @@ public interface SchedulerAssignment {
* get the executor -> slot map.
* @return
*/
public Map<ExecutorDetails, WorkerSlot> getExecutorToSlot();
public Map<ExecutorInfo, WorkerSlot> getExecutorToSlot();

/**
* Return the executors covered by this assignments
* @return
*/
public Set<ExecutorDetails> getExecutors();
public Set<ExecutorInfo> getExecutors();

public Set<WorkerSlot> getSlots();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Map;
import java.util.Set;

import backtype.storm.generated.ExecutorInfo;

//TODO: improve this by maintaining slot -> executors as well for more efficient operations
public class SchedulerAssignmentImpl implements SchedulerAssignment {
/**
Expand All @@ -34,11 +36,11 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment {
/**
* assignment detail, a mapping from executor to <code>WorkerSlot</code>
*/
Map<ExecutorDetails, WorkerSlot> executorToSlot;
Map<ExecutorInfo, WorkerSlot> executorToSlot;

public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlots) {
public SchedulerAssignmentImpl(String topologyId, Map<ExecutorInfo, WorkerSlot> executorToSlots) {
this.topologyId = topologyId;
this.executorToSlot = new HashMap<ExecutorDetails, WorkerSlot>(0);
this.executorToSlot = new HashMap<ExecutorInfo, WorkerSlot>(0);
if (executorToSlots != null) {
this.executorToSlot.putAll(executorToSlots);
}
Expand All @@ -54,8 +56,8 @@ public Set<WorkerSlot> getSlots() {
* @param slot
* @param executors
*/
public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors) {
for (ExecutorDetails executor : executors) {
public void assign(WorkerSlot slot, Collection<ExecutorInfo> executors) {
for (ExecutorInfo executor : executors) {
this.executorToSlot.put(executor, slot);
}
}
Expand All @@ -65,16 +67,16 @@ public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors) {
* @param slot
*/
public void unassignBySlot(WorkerSlot slot) {
List<ExecutorDetails> executors = new ArrayList<ExecutorDetails>();
for (ExecutorDetails executor : this.executorToSlot.keySet()) {
List<ExecutorInfo> executors = new ArrayList<ExecutorInfo>();
for (ExecutorInfo executor : this.executorToSlot.keySet()) {
WorkerSlot ws = this.executorToSlot.get(executor);
if (ws.equals(slot)) {
executors.add(executor);
}
}

// remove
for (ExecutorDetails executor : executors) {
for (ExecutorInfo executor : executors) {
this.executorToSlot.remove(executor);
}
}
Expand All @@ -88,23 +90,23 @@ public boolean isSlotOccupied(WorkerSlot slot) {
return this.executorToSlot.containsValue(slot);
}

public boolean isExecutorAssigned(ExecutorDetails executor) {
public boolean isExecutorAssigned(ExecutorInfo executor) {
return this.executorToSlot.containsKey(executor);
}

public String getTopologyId() {
return this.topologyId;
}

public Map<ExecutorDetails, WorkerSlot> getExecutorToSlot() {
public Map<ExecutorInfo, WorkerSlot> getExecutorToSlot() {
return this.executorToSlot;
}

/**
* Return the executors covered by this assignments
* @return
*/
public Set<ExecutorDetails> getExecutors() {
public Set<ExecutorInfo> getExecutors() {
return this.executorToSlot.keySet();
}
}
Loading