diff --git a/storm-core/src/clj/org/apache/storm/cluster.clj b/storm-core/src/clj/org/apache/storm/cluster.clj index 810b3c3c0f3..11471bda1cf 100644 --- a/storm-core/src/clj/org/apache/storm/cluster.clj +++ b/storm-core/src/clj/org/apache/storm/cluster.clj @@ -88,6 +88,7 @@ (update-storm! [this storm-id new-elems]) (remove-storm-base! [this storm-id]) (set-assignment! [this storm-id info]) + (remove-assignment! [this storm-id]) ;; sets up information related to key consisting of nimbus ;; host:port and version info of the blob (setup-blobstore! [this key nimbusInfo versionInfo]) @@ -566,6 +567,11 @@ (let [thrift-assignment (thriftify-assignment info)] (.set_data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) acls))) + (remove-assignment! + [this storm-id] + (log-debug "removing assignment for storm " storm-id) + (.delete_node cluster-state (assignment-path storm-id))) + (remove-blobstore-key! [this blob-key] (log-debug "removing key" blob-key) diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 112d00b75b1..dbea3abeb7e 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -291,13 +291,40 @@ (assoc-non-nil :component->executors executor-overrides)) }))) +(declare try-read-storm-topology) +(declare get-version-for-key) + +(defn update-storm-code-tasks [nimbus storm-id component->executors] + (let [subject (get-subject) + blob-store (:blob-store nimbus) + ^StormTopology topology (try-read-storm-topology storm-id blob-store) + old-components (all-components topology) + storm-cluster-state (:storm-cluster-state nimbus) + code-key (master-stormcode-key storm-id) + nimbus-host-port-info (:nimbus-host-port-info nimbus)] + (doseq [[comp num-tasks] component->executors + :let [component (get old-components comp) + component-common (if (some? component) (.get_common component) nil)]] + (when (some? component-common) + (.set_parallelism_hint component-common num-tasks) + (.set_json_conf component-common + (->> {TOPOLOGY-TASKS num-tasks} + (merge (component-conf component)) + to-json)) + (log-message "Override " storm-id " component: " comp " json conf to " (.get_json_conf component-common) " for rebalancing"))) + (.deleteBlob blob-store code-key subject) + (.createBlob blob-store code-key (Utils/serialize topology) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject) + (if (instance? LocalFsBlobStore blob-store) + (.setup-blobstore! storm-cluster-state code-key nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info (:conf nimbus)))))) + (defn do-rebalance [nimbus storm-id status storm-base] (let [rebalance-options (:topology-action-options storm-base)] (.update-storm! (:storm-cluster-state nimbus) storm-id (-> {:topology-action-options nil} (assoc-non-nil :component->executors (:component->executors rebalance-options)) - (assoc-non-nil :num-workers (:num-workers rebalance-options))))) + (assoc-non-nil :num-workers (:num-workers rebalance-options)))) + (update-storm-code-tasks nimbus storm-id (:component->executors rebalance-options))) (mk-assignments nimbus :scratch-topology-id storm-id)) (defn state-transitions [nimbus storm-id status storm-base] @@ -643,7 +670,7 @@ task->component (storm-task-info topology storm-conf)] (if (nil? component->executors) [] - (->> (storm-task-info topology storm-conf) + (->> task->component reverse-map (map-val sort) (join-maps component->executors) @@ -969,14 +996,16 @@ topologies (locking (:submit-lock nimbus) (into {} (for [tid (.active-storms storm-cluster-state)] {tid (read-topology-details nimbus tid)}))) topologies (Topologies. topologies) + ;; for the topology which wants rebalance (specified by the scratch-topology-id) + ;; we remove its assignment, meaning that all the slots occupied by its assignment + ;; will be treated as free slot in the scheduler code + ;; and supervisors will kill all the old workers for rebalancing topology. + _ (when-not (nil? scratch-topology-id) + (.remove-assignment! storm-cluster-state scratch-topology-id)) ;; read all the assignments assigned-topology-ids (.assignments storm-cluster-state nil) existing-assignments (into {} (for [tid assigned-topology-ids] - ;; for the topology which wants rebalance (specified by the scratch-topology-id) - ;; we exclude its assignment, meaning that all the slots occupied by its assignment - ;; will be treated as free slot in the scheduler code. - (when (or (nil? scratch-topology-id) (not= tid scratch-topology-id)) - {tid (.assignment-info storm-cluster-state tid nil)})))] + {tid (.assignment-info storm-cluster-state tid nil)}))] ;; make the new assignments for topologies (locking (:sched-lock nimbus) (let [ new-scheduler-assignments (compute-new-scheduler-assignments @@ -1163,7 +1192,8 @@ (defn- component-parallelism [storm-conf component] (let [storm-conf (merge storm-conf (component-conf component)) - num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component)) + ;;We should consider user defined parallelism first + num-tasks (or (num-start-executors component) (storm-conf TOPOLOGY-TASKS)) max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM) ] (if max-parallelism