Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@
(defn remove-dead-worker [worker]
(swap! dead-workers disj worker)))

(defn is-worker-launchtime-timed-out? [now lt conf]
(> (- now lt)
(conf SUPERVISOR-WORKER-START-TIMEOUT-SECS)))

(defn is-worker-hb-timed-out? [now hb conf]
(> (- now (:time-secs hb))
(conf SUPERVISOR-WORKER-TIMEOUT-SECS)))
Expand Down Expand Up @@ -301,6 +305,7 @@
(log-warn-error e "Failed to cleanup pid dir: " pid " for worker " id". Will retry later")))))
;; on windows, the supervisor may still holds the lock on the worker directory
(try-cleanup-worker conf id))
(swap! (:worker-launchtime-atom supervisor) dissoc id)
(log-message "Shut down " (:supervisor-id supervisor) ":" id))

(def SUPERVISOR-ZK-ACLS
Expand All @@ -315,6 +320,7 @@
:uptime (uptime-computer)
:version STORM-VERSION
:worker-thread-pids-atom (atom {})
:worker-launchtime-atom (atom {})
:storm-cluster-state (cluster/mk-storm-cluster-state conf :acls (when
(Utils/isZkAuthenticationConfiguredStormServer
conf)
Expand Down Expand Up @@ -382,6 +388,7 @@
port
id
mem-onheap)
(swap! (:worker-launchtime-atom supervisor) assoc id {:launchtime (current-time-secs) :port port })
[id port])
(do
(log-message "Missing topology storm code, so can't launch worker with assignment "
Expand All @@ -396,9 +403,14 @@
now (current-time-secs)
allocated (read-allocated-workers supervisor assigned-executors now)
keepers (filter-val
(fn [[state _]] (= state :valid))
(fn [[state _]] (or (= state :not-started) (= state :valid)))
allocated)
keep-ports (set (for [[id [_ hb]] keepers] (:port hb)))
keep-ports (->> keepers
(map (fn [[id [_ hb]]]
(:port (or hb
(get @(:worker-launchtime-atom supervisor) id)))))
(filter not-nil?)
(set))
reassign-executors (select-keys-pred (complement keep-ports) assigned-executors)
new-worker-ids (into
{}
Expand All @@ -415,23 +427,35 @@
;; 6. wait for workers launch

(log-debug "Syncing processes")
(log-debug "Keepers: " keepers)
(log-debug "Keep ports: " keep-ports)
(log-debug "Reassigned executors: " reassign-executors)
(log-debug "Assigned executors: " assigned-executors)
(log-debug "Allocated: " allocated)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we removed this, we should also remove the wait-for-workers-launch private functions.

(doseq [[id [state heartbeat]] allocated]
(when (not= :valid state)
(log-message
"Shutting down and clearing state for id " id
". Current supervisor time: " now
". State: " state
", Heartbeat: " (pr-str heartbeat))
(shutdown-worker supervisor id)))
(let
[worker-launchtime (:launchtime (@(:worker-launchtime-atom supervisor) id))]
(when
(or
(and (not= :valid state)
(not= :not-started state))
(and (= :not-started state)
(or (nil? worker-launchtime)
(is-worker-launchtime-timed-out? now worker-launchtime conf))))
(if (= :not-started state)
(log-message "Worker " id " failed to start"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log-error?

(log-message
"Shutting down and clearing state for id " id
". Current supervisor time: " now
". State: " state
", Heartbeat: " (pr-str heartbeat))
(shutdown-worker supervisor id))))
(let [valid-new-worker-ids (get-valid-new-worker-ids conf supervisor reassign-executors new-worker-ids)]
(ls-approved-workers! local-state
(merge
(select-keys (ls-approved-workers local-state)
(keys keepers))
valid-new-worker-ids))
(wait-for-workers-launch conf (keys valid-new-worker-ids)))))
valid-new-worker-ids)))))

(defn assigned-storm-ids-from-port-assignments [assignment]
(->> assignment
Expand Down Expand Up @@ -1154,7 +1178,7 @@
(if run-worker-as-user
(worker-launcher conf user ["worker" worker-dir (write-script worker-dir command :environment topology-worker-environment)] :log-prefix log-prefix :exit-code-callback callback :directory (File. worker-dir))
(launch-process command :environment topology-worker-environment :log-prefix log-prefix :exit-code-callback callback :directory (File. worker-dir)))
)))
(swap! (:worker-launchtime-atom supervisor) assoc worker-id {:launchtime (current-time-secs) :port port }))))

;; local implementation

Expand Down