diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index b8242ca565e..790ec979920 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@ -158,6 +158,10 @@ (defn remove-dead-worker [worker] (swap! dead-workers disj worker))) +(defn is-worker-launchtime-timed-out? [now lt conf] + (> (- now lt) + (conf SUPERVISOR-WORKER-START-TIMEOUT-SECS))) + (defn is-worker-hb-timed-out? [now hb conf] (> (- now (:time-secs hb)) (conf SUPERVISOR-WORKER-TIMEOUT-SECS))) @@ -301,6 +305,7 @@ (log-warn-error e "Failed to cleanup pid dir: " pid " for worker " id". Will retry later"))))) ;; on windows, the supervisor may still holds the lock on the worker directory (try-cleanup-worker conf id)) + (swap! (:worker-launchtime-atom supervisor) dissoc id) (log-message "Shut down " (:supervisor-id supervisor) ":" id)) (def SUPERVISOR-ZK-ACLS @@ -315,6 +320,7 @@ :uptime (uptime-computer) :version STORM-VERSION :worker-thread-pids-atom (atom {}) + :worker-launchtime-atom (atom {}) :storm-cluster-state (cluster/mk-storm-cluster-state conf :acls (when (Utils/isZkAuthenticationConfiguredStormServer conf) @@ -382,6 +388,7 @@ port id mem-onheap) + (swap! (:worker-launchtime-atom supervisor) assoc id {:launchtime (current-time-secs) :port port }) [id port]) (do (log-message "Missing topology storm code, so can't launch worker with assignment " @@ -396,9 +403,14 @@ now (current-time-secs) allocated (read-allocated-workers supervisor assigned-executors now) keepers (filter-val - (fn [[state _]] (= state :valid)) + (fn [[state _]] (or (= state :not-started) (= state :valid))) allocated) - keep-ports (set (for [[id [_ hb]] keepers] (:port hb))) + keep-ports (->> keepers + (map (fn [[id [_ hb]]] + (:port (or hb + (get @(:worker-launchtime-atom supervisor) id))))) + (filter not-nil?) + (set)) reassign-executors (select-keys-pred (complement keep-ports) assigned-executors) new-worker-ids (into {} @@ -415,23 +427,35 @@ ;; 6. wait for workers launch (log-debug "Syncing processes") + (log-debug "Keepers: " keepers) + (log-debug "Keep ports: " keep-ports) + (log-debug "Reassigned executors: " reassign-executors) (log-debug "Assigned executors: " assigned-executors) (log-debug "Allocated: " allocated) (doseq [[id [state heartbeat]] allocated] - (when (not= :valid state) - (log-message - "Shutting down and clearing state for id " id - ". Current supervisor time: " now - ". State: " state - ", Heartbeat: " (pr-str heartbeat)) - (shutdown-worker supervisor id))) + (let + [worker-launchtime (:launchtime (@(:worker-launchtime-atom supervisor) id))] + (when + (or + (and (not= :valid state) + (not= :not-started state)) + (and (= :not-started state) + (or (nil? worker-launchtime) + (is-worker-launchtime-timed-out? now worker-launchtime conf)))) + (if (= :not-started state) + (log-message "Worker " id " failed to start")) + (log-message + "Shutting down and clearing state for id " id + ". Current supervisor time: " now + ". State: " state + ", Heartbeat: " (pr-str heartbeat)) + (shutdown-worker supervisor id)))) (let [valid-new-worker-ids (get-valid-new-worker-ids conf supervisor reassign-executors new-worker-ids)] (ls-approved-workers! local-state (merge (select-keys (ls-approved-workers local-state) (keys keepers)) - valid-new-worker-ids)) - (wait-for-workers-launch conf (keys valid-new-worker-ids))))) + valid-new-worker-ids))))) (defn assigned-storm-ids-from-port-assignments [assignment] (->> assignment @@ -1154,7 +1178,7 @@ (if run-worker-as-user (worker-launcher conf user ["worker" worker-dir (write-script worker-dir command :environment topology-worker-environment)] :log-prefix log-prefix :exit-code-callback callback :directory (File. worker-dir)) (launch-process command :environment topology-worker-environment :log-prefix log-prefix :exit-code-callback callback :directory (File. worker-dir))) - ))) + (swap! (:worker-launchtime-atom supervisor) assoc worker-id {:launchtime (current-time-secs) :port port })))) ;; local implementation