Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide Additional String substitutions for *.worker.childopts #765

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion storm-core/project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
:target-path "target"
:javac-options ["-target" "1.6" "-source" "1.6"]
:profiles {:dev {:resource-paths ["src/dev"]
:dependencies [[org.mockito/mockito-all "1.9.5"]]}
:dependencies [[org.mockito/mockito-all "1.9.5"]
[org.clojure/clojure-contrib "1.2.0"]]}
:release {}
:lib {}
}
Expand Down
18 changes: 15 additions & 3 deletions storm-core/src/clj/backtype/storm/daemon/supervisor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,19 @@
(FileUtils/moveDirectory (File. tmproot) (File. stormroot))
))

(defn replace-childopts-tags-by-ids
[childopts worker-id storm-id port]
(
let [replacement-map {"%ID%" (str port)
"%WORKER-ID%" (str worker-id)
"%STORM-ID%" (str storm-id)
"%WORKER-PORT%" (str port)}]
(if-not (nil? childopts)
(reduce (fn [string entry]
(apply clojure.string/replace string entry))
childopts replacement-map)
nil)
))

(defmethod launch-worker
:distributed [supervisor storm-id port worker-id]
Expand All @@ -409,9 +422,8 @@
stormjar (supervisor-stormjar-path stormroot)
storm-conf (read-supervisor-storm-conf conf storm-id)
classpath (add-to-classpath (current-classpath) [stormjar])
childopts (.replaceAll (str (conf WORKER-CHILDOPTS) " " (storm-conf TOPOLOGY-WORKER-CHILDOPTS))
"%ID%"
(str port))
childopts (replace-childopts-tags-by-ids (str (conf WORKER-CHILDOPTS) " " (storm-conf TOPOLOGY-WORKER-CHILDOPTS))
worker-id storm-id port)
logfilename (str "worker-" port ".log")
command (str "java -server " childopts
" -Djava.library.path=" (conf JAVA-LIBRARY-PATH)
Expand Down
3 changes: 2 additions & 1 deletion storm-core/src/jvm/backtype/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,8 @@ public class Config extends HashMap<String, Object> {

/**
* The jvm opts provided to workers launched by this supervisor. All "%ID%" substrings are replaced
* with an identifier for this worker.
* with an identifier for this worker. Also, "%WORKER-ID%", "%STORM-ID%" and "%WORKER-PORT%" are
* replaced with appropriate runtime values for this worker.
*/
public static final String WORKER_CHILDOPTS = "worker.childopts";
public static final Object WORKER_CHILDOPTS_SCHEMA = String.class;
Expand Down
91 changes: 91 additions & 0 deletions storm-core/test/clj/backtype/storm/supervisor_test.clj
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(ns backtype.storm.supervisor-test
(:use [clojure test])
(:require [clojure.contrib [string :as contrib-str]])
(:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter])
(:use [backtype.storm bootstrap testing])
(:use [backtype.storm.daemon common])
Expand Down Expand Up @@ -240,3 +241,93 @@
;; TODO just do reassign, and check that cleans up worker states after killing but doesn't get rid of downloaded code
)

(defn found? [sub-str input-str] (
contrib-str/substring? sub-str (str input-str)
))
(defn not-found? [sub-str input-str]
(complement (found? sub-str input-str)))

(deftest test-replace-childopts-tags-by-ids-happy-path
(testing "worker-launcher replaces ids in childopts"
(let [ worker-id "w-01"
storm-id "s-01"
port 9999
childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%STORM-ID%-%WORKER-ID%-%WORKER-PORT%.log"
]
(def childopts-with-ids (supervisor/replace-childopts-tags-by-ids childopts worker-id storm-id port))
(is (not-found? "%WORKER-ID%" childopts-with-ids))
(is (found? "w-01" childopts-with-ids))
(is (not-found? "%STORM-ID%" childopts-with-ids))
(is (found? "s-01" childopts-with-ids))
(is (not-found? "%WORKER-PORT%" childopts-with-ids))
(is (found? "-9999." childopts-with-ids))
(is (not-found? "%ID%" childopts-with-ids))
(is (found? "worker-9999" childopts-with-ids) (str childopts-with-ids))
)))

(deftest test-replace-childopts-tags-by-ids-storm-id-alone
(testing "worker-launcher replaces ids in childopts"
(let [ worker-id "w-01"
storm-id "s-01"
port 9999
childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%STORM-ID%.log"]
(def childopts-with-ids (supervisor/replace-childopts-tags-by-ids childopts worker-id storm-id port))
(is (not-found? "%WORKER-ID%" childopts-with-ids))
(is (not-found? "w-01" childopts-with-ids))
(is (not-found? "%STORM-ID%" childopts-with-ids))
(is (found? "s-01" childopts-with-ids))
(is (not-found? "%WORKER-PORT%" childopts-with-ids))
(is (not-found? "-9999." childopts-with-ids))
(is (not-found? "%ID%" childopts-with-ids))
(is (not-found? "worker-9999" childopts-with-ids) (str childopts-with-ids)) )))

(deftest test-replace-childopts-tags-by-ids-no-keys
(testing "worker-launcher has no ids to replace in childopts"
(let [ worker-id "w-01"
storm-id "s-01"
port 9999
childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker.log"]
(def childopts-with-ids (supervisor/replace-childopts-tags-by-ids childopts worker-id storm-id port))
(is (not-found? "%WORKER-ID%" childopts-with-ids))
(is (not-found? "w-01" childopts-with-ids))
(is (not-found? "%STORM-ID%" childopts-with-ids))
(is (not-found? "s-01" childopts-with-ids))
(is (not-found? "%WORKER-PORT%" childopts-with-ids))
(is (not-found? "-9999." childopts-with-ids))
(is (not-found? "%ID%" childopts-with-ids))
(is (not-found? "worker-9999" childopts-with-ids) (str childopts-with-ids)) )))

(deftest test-replace-childopts-tags-by-ids-nil-childopts
(testing "worker-launcher has nil childopts"
(let [ worker-id "w-01"
storm-id "s-01"
port 9999
childopts nil]
(def childopts-with-ids (supervisor/replace-childopts-tags-by-ids childopts worker-id storm-id port))
(is (not-found? "%WORKER-ID%" childopts-with-ids))
(is (not-found? "w-01" childopts-with-ids))
(is (not-found? "%STORM-ID%" childopts-with-ids))
(is (not-found? "s-01" childopts-with-ids))
(is (not-found? "%WORKER-PORT%" childopts-with-ids))
(is (not-found? "-9999." childopts-with-ids))
(is (not-found? "%ID%" childopts-with-ids))
(is (not-found? "worker-9999" childopts-with-ids) (str childopts-with-ids))
)))

(deftest test-replace-childopts-tags-by-ids-nil-ids
(testing "worker-launcher has nil ids"
(let [ worker-id nil
storm-id "s-01"
port 9999
childopts "-Xloggc:/home/y/lib/storm/current/logs/gc.worker-%ID%-%STORM-ID%-%WORKER-ID%-%WORKER-PORT%.log"]
(def childopts-with-ids (supervisor/replace-childopts-tags-by-ids childopts worker-id storm-id port))
(is (not-found? "%WORKER-ID%" childopts-with-ids))
(is (not-found? "w-01" childopts-with-ids))
(is (not-found? "%STORM-ID%" childopts-with-ids))
(is (found? "s-01" childopts-with-ids))
(is (not-found? "%WORKER-PORT%" childopts-with-ids))
(is (found? "-9999." childopts-with-ids))
(is (not-found? "%ID%" childopts-with-ids))
(is (found? "worker-9999" childopts-with-ids) (str childopts-with-ids))
)))