From de4374fd1da4bb0a647f125b91f2f15644c0b2f7 Mon Sep 17 00:00:00 2001 From: chenyuzhao Date: Thu, 12 Jan 2017 17:21:55 +0800 Subject: [PATCH 1/8] refact rebalance to allow modify component parallelism freely --- .../clj/org/apache/storm/daemon/nimbus.clj | 32 +++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 112d00b75b1..1121220f8ef 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -291,13 +291,38 @@ (assoc-non-nil :component->executors executor-overrides)) }))) +(declare try-read-storm-topology) +(declare get-version-for-key) + +(defn update-storm-code-parallelism [nimbus storm-id component->executors] + (let [subject (get-subject) + blob-store (:blob-store nimbus) + ^StormTopology topology (.deepCopy (try-read-storm-topology storm-id blob-store)) + old-components (all-components topology) + storm-cluster-state (:storm-cluster-state nimbus) + code-key (master-stormcode-key storm-id) + nimbus-host-port-info (:nimbus-host-port-info nimbus)] + (doseq [[comp num-tasks] component->executors + :let [component (comp old-components) + component-common (.get_common component)]] + (.set_parallelism_hint component-common num-tasks) + (.set_json_conf component-common + (->> {TOPOLOGY-TASKS num-tasks} + (merge (component-conf component)) + to-json))) + (.deleteBlob blob-store code-key subject) + (.createBlob blob-store code-key (Utils/serialize topology) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject) + (if (instance? LocalFsBlobStore blob-store) + (.setup-blobstore! storm-cluster-state code-key nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info (:conf nimbus)))))) + (defn do-rebalance [nimbus storm-id status storm-base] (let [rebalance-options (:topology-action-options storm-base)] (.update-storm! (:storm-cluster-state nimbus) storm-id (-> {:topology-action-options nil} (assoc-non-nil :component->executors (:component->executors rebalance-options)) - (assoc-non-nil :num-workers (:num-workers rebalance-options))))) + (assoc-non-nil :num-workers (:num-workers rebalance-options)))) + (update-storm-code-parallelism nimbus storm-id (:component->executors rebalance-options))) (mk-assignments nimbus :scratch-topology-id storm-id)) (defn state-transitions [nimbus storm-id status storm-base] @@ -643,7 +668,7 @@ task->component (storm-task-info topology storm-conf)] (if (nil? component->executors) [] - (->> (storm-task-info topology storm-conf) + (->> task->component reverse-map (map-val sort) (join-maps component->executors) @@ -1163,7 +1188,8 @@ (defn- component-parallelism [storm-conf component] (let [storm-conf (merge storm-conf (component-conf component)) - num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component)) + ;;We should consider user defined parallelism first + num-tasks (or (num-start-executors component) (storm-conf TOPOLOGY-TASKS)) max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM) ] (if max-parallelism From 1384071d58020d03c018d9ce98638ee330539300 Mon Sep 17 00:00:00 2001 From: chenyuzhao Date: Fri, 13 Jan 2017 15:22:09 +0800 Subject: [PATCH 2/8] format code --- .../src/clj/org/apache/storm/daemon/nimbus.clj | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 1121220f8ef..4e1fe899d99 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -303,13 +303,13 @@ code-key (master-stormcode-key storm-id) nimbus-host-port-info (:nimbus-host-port-info nimbus)] (doseq [[comp num-tasks] component->executors - :let [component (comp old-components) + :let [component (get old-components comp) component-common (.get_common component)]] (.set_parallelism_hint component-common num-tasks) (.set_json_conf component-common - (->> {TOPOLOGY-TASKS num-tasks} - (merge (component-conf component)) - to-json))) + (->> {TOPOLOGY-TASKS num-tasks} + (merge (component-conf component)) + to-json))) (.deleteBlob blob-store code-key subject) (.createBlob blob-store code-key (Utils/serialize topology) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject) (if (instance? LocalFsBlobStore blob-store) @@ -318,10 +318,10 @@ (defn do-rebalance [nimbus storm-id status storm-base] (let [rebalance-options (:topology-action-options storm-base)] (.update-storm! (:storm-cluster-state nimbus) - storm-id - (-> {:topology-action-options nil} - (assoc-non-nil :component->executors (:component->executors rebalance-options)) - (assoc-non-nil :num-workers (:num-workers rebalance-options)))) + storm-id + (-> {:topology-action-options nil} + (assoc-non-nil :component->executors (:component->executors rebalance-options)) + (assoc-non-nil :num-workers (:num-workers rebalance-options)))) (update-storm-code-parallelism nimbus storm-id (:component->executors rebalance-options))) (mk-assignments nimbus :scratch-topology-id storm-id)) From 8b49f4d2e6e3f952c2a4d979d42166a44280fcc3 Mon Sep 17 00:00:00 2001 From: chenyuzhao Date: Fri, 13 Jan 2017 15:50:27 +0800 Subject: [PATCH 3/8] resolve conflict --- storm-core/src/clj/org/apache/storm/daemon/nimbus.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 4e1fe899d99..5fc036a1517 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -319,7 +319,7 @@ (let [rebalance-options (:topology-action-options storm-base)] (.update-storm! (:storm-cluster-state nimbus) storm-id - (-> {:topology-action-options nil} + (-> {} (assoc-non-nil :component->executors (:component->executors rebalance-options)) (assoc-non-nil :num-workers (:num-workers rebalance-options)))) (update-storm-code-parallelism nimbus storm-id (:component->executors rebalance-options))) From 6e6dcfc8501fa084f3bbae037f624db17c8c81c0 Mon Sep 17 00:00:00 2001 From: chenyuzhao Date: Fri, 13 Jan 2017 15:52:44 +0800 Subject: [PATCH 4/8] resolve conflict --- storm-core/src/clj/org/apache/storm/daemon/nimbus.clj | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 5fc036a1517..d5101f20cf3 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -318,10 +318,10 @@ (defn do-rebalance [nimbus storm-id status storm-base] (let [rebalance-options (:topology-action-options storm-base)] (.update-storm! (:storm-cluster-state nimbus) - storm-id - (-> {} - (assoc-non-nil :component->executors (:component->executors rebalance-options)) - (assoc-non-nil :num-workers (:num-workers rebalance-options)))) + storm-id + (-> {} + (assoc-non-nil :component->executors (:component->executors rebalance-options)) + (assoc-non-nil :num-workers (:num-workers rebalance-options)))) (update-storm-code-parallelism nimbus storm-id (:component->executors rebalance-options))) (mk-assignments nimbus :scratch-topology-id storm-id)) From d55a38f817bbc1768863e8e0d4e791a594afa463 Mon Sep 17 00:00:00 2001 From: chenyuzhao Date: Fri, 13 Jan 2017 16:12:11 +0800 Subject: [PATCH 5/8] resolve conflict --- storm-core/src/clj/org/apache/storm/daemon/nimbus.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index d5101f20cf3..0152c55e4d9 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -319,7 +319,7 @@ (let [rebalance-options (:topology-action-options storm-base)] (.update-storm! (:storm-cluster-state nimbus) storm-id - (-> {} + (-> {:topology-action-options nil} (assoc-non-nil :component->executors (:component->executors rebalance-options)) (assoc-non-nil :num-workers (:num-workers rebalance-options)))) (update-storm-code-parallelism nimbus storm-id (:component->executors rebalance-options))) From 2c08d598244fb1ec7485aa98aaf2b594784d66f2 Mon Sep 17 00:00:00 2001 From: chenyuzhao Date: Fri, 13 Jan 2017 17:46:12 +0800 Subject: [PATCH 6/8] format code --- storm-core/src/clj/org/apache/storm/daemon/nimbus.clj | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 0152c55e4d9..4ae7a94783d 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -307,9 +307,9 @@ component-common (.get_common component)]] (.set_parallelism_hint component-common num-tasks) (.set_json_conf component-common - (->> {TOPOLOGY-TASKS num-tasks} - (merge (component-conf component)) - to-json))) + (->> {TOPOLOGY-TASKS num-tasks} + (merge (component-conf component)) + to-json))) (.deleteBlob blob-store code-key subject) (.createBlob blob-store code-key (Utils/serialize topology) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject) (if (instance? LocalFsBlobStore blob-store) From dd7bb34ac8f68fe002d84d9e51ebf5445cab8938 Mon Sep 17 00:00:00 2001 From: chenyuzhao Date: Wed, 18 Jan 2017 12:18:50 +0800 Subject: [PATCH 7/8] change rebalance strategy to remove all the old worker --- .../src/clj/org/apache/storm/cluster.clj | 6 ++++ .../clj/org/apache/storm/daemon/nimbus.clj | 29 ++++++++++--------- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/storm-core/src/clj/org/apache/storm/cluster.clj b/storm-core/src/clj/org/apache/storm/cluster.clj index 810b3c3c0f3..11471bda1cf 100644 --- a/storm-core/src/clj/org/apache/storm/cluster.clj +++ b/storm-core/src/clj/org/apache/storm/cluster.clj @@ -88,6 +88,7 @@ (update-storm! [this storm-id new-elems]) (remove-storm-base! [this storm-id]) (set-assignment! [this storm-id info]) + (remove-assignment! [this storm-id]) ;; sets up information related to key consisting of nimbus ;; host:port and version info of the blob (setup-blobstore! [this key nimbusInfo versionInfo]) @@ -566,6 +567,11 @@ (let [thrift-assignment (thriftify-assignment info)] (.set_data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) acls))) + (remove-assignment! + [this storm-id] + (log-debug "removing assignment for storm " storm-id) + (.delete_node cluster-state (assignment-path storm-id))) + (remove-blobstore-key! [this blob-key] (log-debug "removing key" blob-key) diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 4ae7a94783d..6554c0b05c7 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -294,7 +294,7 @@ (declare try-read-storm-topology) (declare get-version-for-key) -(defn update-storm-code-parallelism [nimbus storm-id component->executors] +(defn update-storm-code-tasks [nimbus storm-id component->executors] (let [subject (get-subject) blob-store (:blob-store nimbus) ^StormTopology topology (.deepCopy (try-read-storm-topology storm-id blob-store)) @@ -304,12 +304,13 @@ nimbus-host-port-info (:nimbus-host-port-info nimbus)] (doseq [[comp num-tasks] component->executors :let [component (get old-components comp) - component-common (.get_common component)]] - (.set_parallelism_hint component-common num-tasks) - (.set_json_conf component-common - (->> {TOPOLOGY-TASKS num-tasks} - (merge (component-conf component)) - to-json))) + component-common (if (some? component) (.get_common component) nil)]] + (when (some? component-common) + (.set_parallelism_hint component-common num-tasks) + (.set_json_conf component-common + (->> {TOPOLOGY-TASKS num-tasks} + (merge (component-conf component)) + to-json)))) (.deleteBlob blob-store code-key subject) (.createBlob blob-store code-key (Utils/serialize topology) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject) (if (instance? LocalFsBlobStore blob-store) @@ -322,7 +323,7 @@ (-> {:topology-action-options nil} (assoc-non-nil :component->executors (:component->executors rebalance-options)) (assoc-non-nil :num-workers (:num-workers rebalance-options)))) - (update-storm-code-parallelism nimbus storm-id (:component->executors rebalance-options))) + (update-storm-code-tasks nimbus storm-id (:component->executors rebalance-options))) (mk-assignments nimbus :scratch-topology-id storm-id)) (defn state-transitions [nimbus storm-id status storm-base] @@ -994,14 +995,16 @@ topologies (locking (:submit-lock nimbus) (into {} (for [tid (.active-storms storm-cluster-state)] {tid (read-topology-details nimbus tid)}))) topologies (Topologies. topologies) + ;; for the topology which wants rebalance (specified by the scratch-topology-id) + ;; we remove its assignment, meaning that all the slots occupied by its assignment + ;; will be treated as free slot in the scheduler code + ;; and supervisors will kill all the old workers for rebalancing topology. + _ (when-not (nil? scratch-topology-id) + (.remove-assignment! storm-cluster-state scratch-topology-id)) ;; read all the assignments assigned-topology-ids (.assignments storm-cluster-state nil) existing-assignments (into {} (for [tid assigned-topology-ids] - ;; for the topology which wants rebalance (specified by the scratch-topology-id) - ;; we exclude its assignment, meaning that all the slots occupied by its assignment - ;; will be treated as free slot in the scheduler code. - (when (or (nil? scratch-topology-id) (not= tid scratch-topology-id)) - {tid (.assignment-info storm-cluster-state tid nil)})))] + {tid (.assignment-info storm-cluster-state tid nil)}))] ;; make the new assignments for topologies (locking (:sched-lock nimbus) (let [ new-scheduler-assignments (compute-new-scheduler-assignments From 9adf23112b75a8c19ea6ba704907b5fb9f0d6cd0 Mon Sep 17 00:00:00 2001 From: chenyuzhao Date: Tue, 14 Feb 2017 11:19:52 +0800 Subject: [PATCH 8/8] add log --- storm-core/src/clj/org/apache/storm/daemon/nimbus.clj | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 6554c0b05c7..dbea3abeb7e 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -297,7 +297,7 @@ (defn update-storm-code-tasks [nimbus storm-id component->executors] (let [subject (get-subject) blob-store (:blob-store nimbus) - ^StormTopology topology (.deepCopy (try-read-storm-topology storm-id blob-store)) + ^StormTopology topology (try-read-storm-topology storm-id blob-store) old-components (all-components topology) storm-cluster-state (:storm-cluster-state nimbus) code-key (master-stormcode-key storm-id) @@ -310,7 +310,8 @@ (.set_json_conf component-common (->> {TOPOLOGY-TASKS num-tasks} (merge (component-conf component)) - to-json)))) + to-json)) + (log-message "Override " storm-id " component: " comp " json conf to " (.get_json_conf component-common) " for rebalancing"))) (.deleteBlob blob-store code-key subject) (.createBlob blob-store code-key (Utils/serialize topology) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject) (if (instance? LocalFsBlobStore blob-store)