Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions storm-core/src/clj/org/apache/storm/cluster.clj
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
(update-storm! [this storm-id new-elems])
(remove-storm-base! [this storm-id])
(set-assignment! [this storm-id info])
(remove-assignment! [this storm-id])
;; sets up information related to key consisting of nimbus
;; host:port and version info of the blob
(setup-blobstore! [this key nimbusInfo versionInfo])
Expand Down Expand Up @@ -566,6 +567,11 @@
(let [thrift-assignment (thriftify-assignment info)]
(.set_data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) acls)))

(remove-assignment!
[this storm-id]
(log-debug "removing assignment for storm " storm-id)
(.delete_node cluster-state (assignment-path storm-id)))

(remove-blobstore-key!
[this blob-key]
(log-debug "removing key" blob-key)
Expand Down
46 changes: 38 additions & 8 deletions storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,40 @@
(assoc-non-nil :component->executors executor-overrides))
})))

(declare try-read-storm-topology)
(declare get-version-for-key)

(defn update-storm-code-tasks [nimbus storm-id component->executors]
(let [subject (get-subject)
blob-store (:blob-store nimbus)
^StormTopology topology (try-read-storm-topology storm-id blob-store)
old-components (all-components topology)
storm-cluster-state (:storm-cluster-state nimbus)
code-key (master-stormcode-key storm-id)
nimbus-host-port-info (:nimbus-host-port-info nimbus)]
(doseq [[comp num-tasks] component->executors
:let [component (get old-components comp)
component-common (if (some? component) (.get_common component) nil)]]
(when (some? component-common)
(.set_parallelism_hint component-common num-tasks)
(.set_json_conf component-common
(->> {TOPOLOGY-TASKS num-tasks}
(merge (component-conf component))
to-json))
(log-message "Override " storm-id " component: " comp " json conf to " (.get_json_conf component-common) " for rebalancing")))
(.deleteBlob blob-store code-key subject)
(.createBlob blob-store code-key (Utils/serialize topology) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
(if (instance? LocalFsBlobStore blob-store)
(.setup-blobstore! storm-cluster-state code-key nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info (:conf nimbus))))))

(defn do-rebalance [nimbus storm-id status storm-base]
(let [rebalance-options (:topology-action-options storm-base)]
(.update-storm! (:storm-cluster-state nimbus)
storm-id
(-> {:topology-action-options nil}
(assoc-non-nil :component->executors (:component->executors rebalance-options))
(assoc-non-nil :num-workers (:num-workers rebalance-options)))))
(assoc-non-nil :num-workers (:num-workers rebalance-options))))
(update-storm-code-tasks nimbus storm-id (:component->executors rebalance-options)))
(mk-assignments nimbus :scratch-topology-id storm-id))

(defn state-transitions [nimbus storm-id status storm-base]
Expand Down Expand Up @@ -643,7 +670,7 @@
task->component (storm-task-info topology storm-conf)]
(if (nil? component->executors)
[]
(->> (storm-task-info topology storm-conf)
(->> task->component
reverse-map
(map-val sort)
(join-maps component->executors)
Expand Down Expand Up @@ -969,14 +996,16 @@
topologies (locking (:submit-lock nimbus) (into {} (for [tid (.active-storms storm-cluster-state)]
{tid (read-topology-details nimbus tid)})))
topologies (Topologies. topologies)
;; for the topology which wants rebalance (specified by the scratch-topology-id)
;; we remove its assignment, meaning that all the slots occupied by its assignment
;; will be treated as free slot in the scheduler code
;; and supervisors will kill all the old workers for rebalancing topology.
_ (when-not (nil? scratch-topology-id)
(.remove-assignment! storm-cluster-state scratch-topology-id))
;; read all the assignments
assigned-topology-ids (.assignments storm-cluster-state nil)
existing-assignments (into {} (for [tid assigned-topology-ids]
;; for the topology which wants rebalance (specified by the scratch-topology-id)
;; we exclude its assignment, meaning that all the slots occupied by its assignment
;; will be treated as free slot in the scheduler code.
(when (or (nil? scratch-topology-id) (not= tid scratch-topology-id))
{tid (.assignment-info storm-cluster-state tid nil)})))]
{tid (.assignment-info storm-cluster-state tid nil)}))]
;; make the new assignments for topologies
(locking (:sched-lock nimbus) (let [
new-scheduler-assignments (compute-new-scheduler-assignments
Expand Down Expand Up @@ -1163,7 +1192,8 @@

(defn- component-parallelism [storm-conf component]
(let [storm-conf (merge storm-conf (component-conf component))
num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component))
;;We should consider user defined parallelism first
num-tasks (or (num-start-executors component) (storm-conf TOPOLOGY-TASKS))
max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM)
]
(if max-parallelism
Expand Down