Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@
@node+port->socket-ref)))))
(int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS))))

(defn register-queue-metrics [queues storm-conf topology-context]
(defn register-queue-metrics [queues storm-conf topology-context stats]
(doseq [[qname q] queues]
(.registerMetric topology-context (str "__" (name qname)) (StateMetric. q)
(int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
(int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))
()))

(defn skipped-max-spout! [^SpoutThrottlingMetrics m stats]
(-> m .skipped-max-spout (.incrBy (stats-rate stats))))
Expand Down
21 changes: 11 additions & 10 deletions storm-core/src/clj/org/apache/storm/daemon/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@
:producer-type :single-threaded
:batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
:batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
receive-queue ((:executor-receive-queue-map worker) executor-id)
]
(recursive-map
:worker worker
Expand All @@ -243,7 +244,7 @@
:component-id component-id
:open-or-prepare-was-called? (atom false)
:storm-conf storm-conf
:receive-queue ((:executor-receive-queue-map worker) executor-id)
:receive-queue receive-queue
:storm-id (:storm-id worker)
:conf (:conf worker)
:shared-executor-data (HashMap.)
Expand All @@ -257,7 +258,7 @@
:context (ClusterStateContext. DaemonType/WORKER))
:type executor-type
;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field)
:stats (mk-executor-stats <> (sampling-rate storm-conf))
:stats (mk-executor-stats <> (sampling-rate storm-conf) (.getMetrics receive-queue) (.getMetrics batch-transfer->worker))
:interval->task->metric-registry (HashMap.)
:task->component (:task->component worker)
:stream->component->grouper (outbound-components worker-context component-id storm-conf)
Expand Down Expand Up @@ -490,7 +491,7 @@
[component-id message-id (System/currentTimeMillis) values]))))

(defmethod mk-threads :spout [executor-data task-datas initial-credentials]
(let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler open-or-prepare-was-called?]} executor-data
(let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler open-or-prepare-was-called? stats]} executor-data
^ISpoutWaitStrategy spout-wait-strategy (init-spout-wait-strategy storm-conf)
max-spout-pending (executor-max-spout-pending storm-conf (count task-datas))
^Integer max-spout-pending (if max-spout-pending (int max-spout-pending))
Expand Down Expand Up @@ -595,7 +596,7 @@
(builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data))
(builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
:receive receive-queue}
storm-conf (:user-context task-data))
storm-conf (:user-context task-data) stats)
(when (instance? ICredentialsListener spout-obj) (.setCredentials spout-obj initial-credentials))

(.open spout-obj
Expand Down Expand Up @@ -781,12 +782,12 @@
(builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
:receive (:receive-queue executor-data)
:transfer (:transfer-queue (:worker executor-data))}
storm-conf user-context)
storm-conf user-context executor-stats)
(builtin-metrics/register-iconnection-client-metrics (:cached-node+port->socket (:worker executor-data)) storm-conf user-context)
(builtin-metrics/register-iconnection-server-metric (:receiver (:worker executor-data)) storm-conf user-context))
(builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
:receive (:receive-queue executor-data)}
storm-conf user-context)
storm-conf user-context executor-stats)
)

(.prepare bolt-obj
Expand Down Expand Up @@ -857,8 +858,8 @@
(.cleanup bolt))

;; TODO: refactor this to be part of an executor-specific map
(defmethod mk-executor-stats :spout [_ rate]
(stats/mk-spout-stats rate))
(defmethod mk-executor-stats :spout [_ rate receive-queue-metrics send-queue-metrics]
(stats/mk-spout-stats rate receive-queue-metrics send-queue-metrics))

(defmethod mk-executor-stats :bolt [_ rate]
(stats/mk-bolt-stats rate))
(defmethod mk-executor-stats :bolt [_ rate receive-queue-metrics send-queue-metrics]
(stats/mk-bolt-stats rate receive-queue-metrics send-queue-metrics))
76 changes: 60 additions & 16 deletions storm-core/src/clj/org/apache/storm/stats.clj
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
ComponentPageInfo ComponentType BoltAggregateStats
ExecutorAggregateStats SpecificAggregateStats
SpoutAggregateStats TopologyPageInfo TopologyStats])
(:import [org.apache.storm.utils Utils])
(:import [org.apache.storm.utils Utils DisruptorQueue$QueueMetrics])
(:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric])
(:use [org.apache.storm log util])
(:use [clojure.math.numeric-tower :only [ceil]]))
Expand All @@ -33,6 +33,7 @@
(def COMMON-FIELDS [:emitted :transferred])
(defrecord CommonStats [^MultiCountStatAndMetric emitted
^MultiCountStatAndMetric transferred
^MultiCountStatAndMetric population
rate])

(def BOLT-FIELDS [:acked :failed :process-latencies :executed :execute-latencies])
Expand All @@ -53,27 +54,35 @@

(def NUM-STAT-BUCKETS 20)

(defn- mk-queue-stats
[^DisruptorQueue$QueueMetrics receive-queue-metrics ^DisruptorQueue$QueueMetrics send-queue-metrics]
(let [population (MultiLatencyStatAndMetric. DisruptorQueue$QueueMetrics/NUM_BUCKETS)]
(.add population "receive" (.avgQueuePopulationMetric receive-queue-metrics))
(.add population "send" (.avgQueuePopulationMetric send-queue-metrics))
population))

(defn- mk-common-stats
[rate]
[rate receive-queue-metrics send-queue-metrics]
(CommonStats.
(MultiCountStatAndMetric. NUM-STAT-BUCKETS)
(MultiCountStatAndMetric. NUM-STAT-BUCKETS)
(mk-queue-stats receive-queue-metrics send-queue-metrics)
rate))

(defn mk-bolt-stats
[rate]
[rate receive-queue-metrics send-queue-metrics]
(BoltExecutorStats.
(mk-common-stats rate)
(mk-common-stats rate receive-queue-metrics send-queue-metrics)
(MultiCountStatAndMetric. NUM-STAT-BUCKETS)
(MultiCountStatAndMetric. NUM-STAT-BUCKETS)
(MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)
(MultiCountStatAndMetric. NUM-STAT-BUCKETS)
(MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)))

(defn mk-spout-stats
[rate]
[rate receive-queue-metrics send-queue-metrics]
(SpoutExecutorStats.
(mk-common-stats rate)
(mk-common-stats rate receive-queue-metrics send-queue-metrics)
(MultiCountStatAndMetric. NUM-STAT-BUCKETS)
(MultiCountStatAndMetric. NUM-STAT-BUCKETS)
(MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)))
Expand Down Expand Up @@ -189,6 +198,7 @@
[^CommonStats stats]
(merge
(value-stats stats COMMON-FIELDS)
{:population (.getTimeLatAvg (:population stats))}
{:rate (:rate stats)}))

(defn value-bolt-stats!
Expand Down Expand Up @@ -265,6 +275,7 @@
specific-stats (clojurify-specific-stats specific-stats)
common-stats (CommonStats. (.get_emitted stats)
(.get_transferred stats)
(.get_population stats)
(.get_rate stats))]
(if is_bolt?
; worker heart beat does not store the BoltExecutorStats or SpoutExecutorStats , instead it stores the result returned by render-stats!
Expand Down Expand Up @@ -295,10 +306,11 @@
[stats]
(let [specific-stats (thriftify-specific-stats stats)
rate (:rate stats)]
(ExecutorStats. (window-set-converter (:emitted stats) str)
(window-set-converter (:transferred stats) str)
specific-stats
rate)))
(doto (ExecutorStats. (window-set-converter (:emitted stats) str)
(window-set-converter (:transferred stats) str)
specific-stats
rate)
(.set_population (window-set-converter (:population stats) str)))))

(defn valid-number?
"Returns true if x is a number that is not NaN or Infinity, false otherwise"
Expand Down Expand Up @@ -445,6 +457,16 @@
:num-executors 1,
:num-tasks num-tasks,
:capacity (compute-agg-capacity statk->w->sid->num uptime)
:in-backlog (-> statk->w->sid->num
:population
str-key
(get window)
(get "receive")),
:out-backlog (-> statk->w->sid->num
:population
str-key
(get window)
(get "send")),
:cid+sid->input-stats
(merge-with
merge
Expand Down Expand Up @@ -500,6 +522,16 @@
:uptime uptime,
:num-executors 1,
:num-tasks num-tasks,
:in-backlog (-> statk->w->sid->num
:population
str-key
(get window)
(get "receive")),
:out-backlog (-> statk->w->sid->num
:population
str-key
(get window)
(get "send")),
:sid->output-stats
(merge-with
merge
Expand Down Expand Up @@ -646,7 +678,7 @@
(conj (:executor-stats acc-bolt-stats)
(merge
(select-keys bolt-stats
[:executor-id :uptime :host :port :capacity])
[:executor-id :uptime :host :port :capacity :in-backlog :out-backlog])
{:emitted (sum-streams bolt-out :emitted)
:transferred (sum-streams bolt-out :transferred)
:acked (sum-streams bolt-in :acked)
Expand Down Expand Up @@ -675,7 +707,7 @@
acked (sum-streams spout-out :acked)]
(conj (:executor-stats acc-spout-stats)
(merge
(select-keys spout-stats [:executor-id :uptime :host :port])
(select-keys spout-stats [:executor-id :uptime :host :port :in-backlog :out-backlog])
{:emitted (sum-streams spout-out :emitted)
:transferred (sum-streams spout-out :transferred)
:acked acked
Expand Down Expand Up @@ -927,6 +959,8 @@
transferred
acked
failed
in-backlog
out-backlog
num-executors] :as statk->num}]
(let [cas (CommonAggregateStats.)]
(and num-executors (.set_num_executors cas num-executors))
Expand All @@ -935,6 +969,8 @@
(and transferred (.set_transferred cas transferred))
(and acked (.set_acked cas acked))
(and failed (.set_failed cas failed))
(and in-backlog (.set_in_backlog cas in-backlog))
(and out-backlog (.set_out_backlog cas out-backlog))
(.set_common_stats s cas)))

(defn thriftify-bolt-agg-stats
Expand Down Expand Up @@ -1139,7 +1175,9 @@
:window->executed {}
:window->proc-lat-wgt-avg {}
:window->acked {}
:window->failed {}}]
:window->failed {}
:window->in-backlog {}
:window->out-backlog {}}]
(apply aggregate-comp-stats* (concat args (list init-val)))))

(defmethod aggregate-comp-stats :spout
Expand All @@ -1151,7 +1189,9 @@
:window->transferred {}
:window->comp-lat-wgt-avg {}
:window->acked {}
:window->failed {}}]
:window->failed {}
:window->in-backlog {}
:window->out-backlog {}}]
(apply aggregate-comp-stats* (concat args (list init-val)))))

(defmethod aggregate-comp-stats :default [& _] {})
Expand Down Expand Up @@ -1199,7 +1239,9 @@
:window->proc-lat-wgt-avg
:window->executed)
:window->acked (map-key str (:window->acked acc-data))
:window->failed (map-key str (:window->failed acc-data))})
:window->failed (map-key str (:window->failed acc-data))
:window->in-backlog (map-key str (:window->in-backlog acc-data))
:window->out-backlog (map-key str (:window->out-backlog acc-data))})

(defmethod post-aggregate-comp-stats :spout
[task->component
Expand Down Expand Up @@ -1266,7 +1308,9 @@
{:emitted (:window->emitted data)
:transferred (:window->transferred data)
:acked (:window->acked data)
:failed (:window->failed data)}
:failed (:window->failed data)
:in-backlog (:window->in-backlog data)
:out-backlog (:window->out-backlog data)}
(condp = (:type data)
:bolt {:execute-latency (:window->execute-latency data)
:process-latency (:window->process-latency data)
Expand Down
4 changes: 4 additions & 0 deletions storm-core/src/clj/org/apache/storm/ui/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,8 @@
"port" port
"emitted" (nil-to-zero (.get_emitted cas))
"transferred" (nil-to-zero (.get_transferred cas))
"inBacklog" (nil-to-zero (.get_in_backlog cas))
"outBacklog" (nil-to-zero (.get_out_backlog cas))
"capacity" (float-str (nil-to-zero (.get_capacity bas)))
"executeLatency" (float-str (.get_execute_latency_ms bas))
"executed" (nil-to-zero (.get_executed bas))
Expand Down Expand Up @@ -762,6 +764,8 @@
"port" port
"emitted" (nil-to-zero (.get_emitted cas))
"transferred" (nil-to-zero (.get_transferred cas))
"inBacklog" (nil-to-zero (.get_in_backlog cas))
"outBacklog" (nil-to-zero (.get_out_backlog cas))
"completeLatency" (float-str (.get_complete_latency_ms sas))
"acked" (nil-to-zero (.get_acked cas))
"failed" (nil-to-zero (.get_failed cas))
Expand Down
Loading