Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions storm-core/src/clj/backtype/storm/converter.clj
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
(.set_scheduler_meta (:scheduler-meta supervisor-info))
(.set_uptime_secs (long (:uptime-secs supervisor-info)))
(.set_version (:version supervisor-info))
(.set_system_stats (:system-stats supervisor-info))
))

(defn clojurify-supervisor-info [^SupervisorInfo supervisor-info]
Expand All @@ -41,7 +42,8 @@
(if (.get_meta supervisor-info) (into [] (.get_meta supervisor-info)))
(if (.get_scheduler_meta supervisor-info) (into {} (.get_scheduler_meta supervisor-info)))
(.get_uptime_secs supervisor-info)
(.get_version supervisor-info))))
(.get_version supervisor-info)
(if-let [system_stats (.get_system_stats supervisor-info)] (into {} system_stats)))))

(defn thriftify-assignment [assignment]
(doto (Assignment.)
Expand Down Expand Up @@ -185,6 +187,7 @@
:executor-stats (clojurify-stats (into {} (.get_executor_stats worker-hb)))
:uptime (.get_uptime_secs worker-hb)
:time-secs (.get_time_secs worker-hb)
:system-stats (.get_system_stats worker-hb)
}
{}))

Expand All @@ -194,7 +197,8 @@
(.set_uptime_secs (:uptime worker-hb))
(.set_storm_id (:storm-id worker-hb))
(.set_executor_stats (thriftify-stats (filter second (:executor-stats worker-hb))))
(.set_time_secs (:time-secs worker-hb)))))
(.set_time_secs (:time-secs worker-hb))
(.set_system_stats (:system-stats worker-hb)))))

(defn clojurify-error [^ErrorInfo error]
(if error
Expand Down
2 changes: 1 addition & 1 deletion storm-core/src/clj/backtype/storm/daemon/common.clj
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
;; component->executors is a map from spout/bolt id to number of executors for that component
(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner topology-action-options prev-status])

(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version])
(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version system-stats])

(defprotocol DaemonCommon
(waiting? [this]))
Expand Down
1 change: 1 addition & 0 deletions storm-core/src/clj/backtype/storm/daemon/nimbus.clj
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,7 @@
(count (:used-ports info))
id) ]
(when-let [version (:version info)] (.set_version sup-sum version))
(when-let [system-stats (:system-stats info)] (.set_system_stats sup-sum system-stats))
sup-sum
))
nimbus-uptime ((:uptime nimbus))
Expand Down
4 changes: 3 additions & 1 deletion storm-core/src/clj/backtype/storm/daemon/supervisor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@
(Utils/isZkAuthenticationConfiguredStormServer
conf)
SUPERVISOR-ZK-ACLS))
:system-stats-fn (mk-system-stats-fn)
:local-state (supervisor-state conf)
:supervisor-id (.getSupervisorId isupervisor)
:assignment-id (.getAssignmentId isupervisor)
Expand Down Expand Up @@ -511,7 +512,8 @@
(.getMetadata isupervisor)
(conf SUPERVISOR-SCHEDULER-META)
((:uptime supervisor))
(:version supervisor))))]
(:version supervisor)
((:system-stats-fn supervisor)))))]
(heartbeat-fn)

;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
Expand Down
4 changes: 4 additions & 0 deletions storm-core/src/clj/backtype/storm/daemon/worker.clj
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
(:import [backtype.storm.security.auth AuthUtils])
(:import [javax.security.auth Subject])
(:import [java.security PrivilegedExceptionAction])
(:use [backtype.storm.util])

(:gen-class))

(defmulti mk-suicide-fn cluster-mode)
Expand Down Expand Up @@ -61,6 +63,7 @@
:executor-stats stats
:uptime ((:uptime worker))
:time-secs (current-time-secs)
:system-stats ((:system-stats-fn worker))
}]
;; do the zookeeper heartbeat
(.worker-heartbeat! (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (:port worker) zk-hb)
Expand Down Expand Up @@ -253,6 +256,7 @@
:receiver-thread-count (get storm-conf WORKER-RECEIVER-THREAD-COUNT)
:transfer-fn (mk-transfer-fn <>)
:assignment-versions assignment-versions
:system-stats-fn (mk-system-stats-fn)
)))

(defn- endpoint->string [[node port]]
Expand Down
31 changes: 31 additions & 0 deletions storm-core/src/clj/backtype/storm/util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,21 @@
(throw (RuntimeException. (str "Got unexpected process name: " name))))
(first split)))

(defn memory-bean
"Return MemoryMXBean"
[]
(ManagementFactory/getMemoryMXBean))

(defn heap-usage
"Return the heap usage of the given MemoryMXBean"
[bean]
(.getHeapMemoryUsage bean))

(defn non-heap-usage
"Return the non heap usage of the given MemoryMXBean"
[bean]
(.getNonHeapMemoryUsage bean))

(defn exec-command! [command]
(let [[comm-str & args] (seq (.split command " "))
command (CommandLine. comm-str)]
Expand Down Expand Up @@ -1065,3 +1080,19 @@
(assoc coll k (apply str (repeat (count (coll k)) "#")))
coll))

(defn mk-system-stats-fn
"Returns a function that retuns the system stats (memory in Bytes & CPU util) of the JVM process"
[]
(let [memory-bean (memory-bean)]
(fn [] {
"heap_initBytes" (double (.getInit (heap-usage memory-bean)))
"heap_usedBytes" (double (.getUsed (heap-usage memory-bean)))
"heap_committedBytes" (double (.getCommitted (heap-usage memory-bean)))
"heap_maxBytes" (double (.getMax (heap-usage memory-bean)))
"nonHeap_initBytes" (double (.getInit (non-heap-usage memory-bean)))
"nonHeap_usedBytes" (double (.getUsed (non-heap-usage memory-bean)))
"nonHeap_committedBytes" (double (.getCommitted (non-heap-usage memory-bean)))
"nonHeap_maxBytes" (double (.getMax (non-heap-usage memory-bean)))
"cpuUtil" (Utils/getCpuUtil)
})))

Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.slf4j.LoggerFactory;

@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-2-6")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-7-1")
public class AlreadyAliveException extends TException implements org.apache.thrift.TBase<AlreadyAliveException, AlreadyAliveException._Fields>, java.io.Serializable, Cloneable, Comparable<AlreadyAliveException> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AlreadyAliveException");

Expand Down
Loading