Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions storm-core/src/clj/org/apache/storm/daemon/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
[cluster :as cluster] [disruptor :as disruptor] [stats :as stats]])
(:require [org.apache.storm.daemon [task :as task]])
(:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics])
(:require [clojure.set :as set]))
(:require [clojure.set :as set]
[org.apache.storm.util :as util]))

(defn- mk-fields-grouper
[^Fields out-fields ^Fields group-fields ^List target-tasks]
Expand Down Expand Up @@ -277,6 +278,7 @@
:spout-throttling-metrics (if (= executor-type :spout)
(builtin-metrics/make-spout-throttling-data)
nil)
:start-time-in-secs (util/current-time-secs)
;; TODO: add in the executor-specific stuff in a :specific... or make a spout-data, bolt-data function?
)))

Expand Down Expand Up @@ -395,7 +397,7 @@
(reify
RunningExecutor
(render-stats [this]
(stats/render-stats! (:stats executor-data)))
(stats/render-stats! (:stats executor-data) (:start-time-in-secs executor-data)))
(get-executor-id [this]
executor-id)
(credentials-changed [this creds]
Expand Down
114 changes: 95 additions & 19 deletions storm-core/src/clj/org/apache/storm/stats.clj
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,35 @@
(value-stats stats SPOUT-FIELDS)
{:type :spout}))

(defn update-values [m f & args]
(into {} (for [[k v] m] [k (apply f v args)])))

(defn values-divided-by [kv-pairs t]
(update-values kv-pairs / (max 1.0 (double t))))

(defn convert-count-to-throughput
[stat eclipsed-time-in-secs]
(into {} (for [[time buckets] stat]
[time (values-divided-by buckets (if (= time ":all-time") eclipsed-time-in-secs (min (Integer/parseInt time) eclipsed-time-in-secs)))])))

(defn remove-component-id [buckets]
(into {} (for [[time pair] buckets]
{time (into {} (for [[[component stream] v] pair]
{stream v}))})))

(defmulti render-stats! class-selector)

(defmethod render-stats! SpoutExecutorStats
[stats]
(value-spout-stats! stats))
[stats start-time-in-secs]
(let [stats (value-spout-stats! stats)
eclipsed-time-in-secs (- (current-time-secs) start-time-in-secs)]
(assoc stats :throughput (convert-count-to-throughput (:emitted stats) eclipsed-time-in-secs))))

(defmethod render-stats! BoltExecutorStats
[stats]
(value-bolt-stats! stats))
[stats start-time-in-secs]
(let [stats (value-bolt-stats! stats)
eclipsed-time-in-secs (- (current-time-secs) start-time-in-secs)]
(assoc stats :throughput (remove-component-id (convert-count-to-throughput (:executed stats) eclipsed-time-in-secs)))))

(defmulti cleanup-stats! class-selector)

Expand Down Expand Up @@ -271,8 +291,8 @@
; worker heart beat does not store the BoltExecutorStats or SpoutExecutorStats , instead it stores the result returned by render-stats!
; which flattens the BoltExecutorStats/SpoutExecutorStats by extracting values from all atoms and merging all values inside :common to top
;level map we are pretty much doing the same here.
(dissoc (merge common-stats {:type :bolt} (apply ->BoltExecutorStats (into [nil] specific-stats))) :common)
(dissoc (merge common-stats {:type :spout} (apply ->SpoutExecutorStats (into [nil] specific-stats))) :common)
(dissoc (merge common-stats {:type :bolt} {:throughput (.get_throughput stats)} (apply ->BoltExecutorStats (into [nil] specific-stats))) :common)
(dissoc (merge common-stats {:type :spout} {:throughput (.get_throughput stats)} (apply ->SpoutExecutorStats (into [nil] specific-stats))) :common)
)))

(defmethod thriftify-specific-stats :bolt
Expand All @@ -295,11 +315,13 @@
(defn thriftify-executor-stats
[stats]
(let [specific-stats (thriftify-specific-stats stats)
rate (:rate stats)]
(ExecutorStats. (window-set-converter (:emitted stats) str)
(window-set-converter (:transferred stats) str)
specific-stats
rate)))
rate (:rate stats)
executor-stats (ExecutorStats. (window-set-converter (:emitted stats) str)
(window-set-converter (:transferred stats) str)
specific-stats
rate)]
(.set_throughput executor-stats(window-set-converter (:throughput stats) str))
executor-stats))

(defn valid-number?
"Returns true if x is a number that is not NaN or Infinity, false otherwise"
Expand Down Expand Up @@ -481,7 +503,12 @@
:transferred
str-key
(get window)
handle-sys-components-fn)})}))
handle-sys-components-fn)
:throughput (-> statk->w->sid->num
:throughput
str-key
(get window)
handle-sys-components-fn)})}))

(defn agg-pre-merge-comp-page-spout
[{exec-id :exec-id
Expand Down Expand Up @@ -526,11 +553,11 @@
str-key
(get window)
handle-sys-components-fn)
:transferred (-> statk->w->sid->num
:transferred
str-key
(get window)
handle-sys-components-fn)}))}))
:throughput (-> statk->w->sid->num
:throughput
str-key
(get window)
handle-sys-components-fn)}))}))

(defn agg-pre-merge-topo-page-bolt
[{comp-id :comp-id
Expand Down Expand Up @@ -571,6 +598,13 @@
handle-sys-components-fn
vals
sum)
:throughput (-> statk->w->sid->num
:throughput
str-key
(get window)
handle-sys-components-fn
vals
sum)
:capacity (compute-agg-capacity statk->w->sid->num uptime)
:acked (-> statk->w->sid->num
:acked
Expand Down Expand Up @@ -619,6 +653,13 @@
handle-sys-components-fn
vals
sum)
:throughput (-> statk->w->sid->num
:throughput
str-key
(get window)
handle-sys-components-fn
vals
sum)
:failed (-> statk->w->sid->num
:failed
str-key
Expand Down Expand Up @@ -650,6 +691,7 @@
[:executor-id :uptime :host :port :capacity])
{:emitted (sum-streams bolt-out :emitted)
:transferred (sum-streams bolt-out :transferred)
:throughput (sum-streams bolt-out :throughput)
:acked (sum-streams bolt-in :acked)
:failed (sum-streams bolt-in :failed)
:executed executed}
Expand Down Expand Up @@ -679,6 +721,7 @@
(select-keys spout-stats [:executor-id :uptime :host :port])
{:emitted (sum-streams spout-out :emitted)
:transferred (sum-streams spout-out :transferred)
:throughput (sum-streams spout-out :throughput)
:acked acked
:failed (sum-streams spout-out :failed)}
{:complete-latency (if (and acked (pos? acked))
Expand All @@ -694,6 +737,8 @@
:emitted (sum-or-0 (:emitted acc-bolt-stats) (:emitted bolt-stats))
:transferred (sum-or-0 (:transferred acc-bolt-stats)
(:transferred bolt-stats))
:throughput (sum-or-0 (:throughput acc-bolt-stats)
(:throughput bolt-stats))
:capacity (max-or-0 (:capacity acc-bolt-stats) (:capacity bolt-stats))
;; We sum average latency totals here to avoid dividing at each step.
;; Compute the average latencies by dividing the total by the count.
Expand All @@ -711,6 +756,7 @@
:num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks spout-stats))
:emitted (sum-or-0 (:emitted acc-spout-stats) (:emitted spout-stats))
:transferred (sum-or-0 (:transferred acc-spout-stats) (:transferred spout-stats))
:throughput (sum-or-0 (:throughput acc-spout-stats) (:throughput spout-stats))
;; We sum average latency totals here to avoid dividing at each step.
;; Compute the average latencies by dividing the total by the count.
:completeLatencyTotal (sum-or-0 (:completeLatencyTotal acc-spout-stats)
Expand All @@ -733,6 +779,7 @@
spout-id->stats
window->emitted
window->transferred
window->throughput
window->comp-lat-wgt-avg
window->acked
window->failed] :as acc-stats}
Expand Down Expand Up @@ -765,6 +812,10 @@
(map-val handle-sys-components-fn)
aggregate-count-streams
(merge-with + window->transferred))
:window->throughput (->> (:throughput stats)
(map-val handle-sys-components-fn)
aggregate-count-streams
(merge-with + window->throughput))
:window->comp-lat-wgt-avg (merge-with +
window->comp-lat-wgt-avg
w->compLatWgtAvg)
Expand Down Expand Up @@ -864,6 +915,7 @@
:spout-id->stats {}
:window->emitted {}
:window->transferred {}
:window->throughput {}
:window->comp-lat-wgt-avg {}
:window->acked {}
:window->failed {}}
Expand Down Expand Up @@ -914,6 +966,7 @@
(assoc :lastError (last-err-fn id)))]))
:window->emitted (map-key str (:window->emitted acc-data))
:window->transferred (map-key str (:window->transferred acc-data))
:throughput (map-key str (:window->throughput acc-data))
:window->complete-latency
(compute-weighted-averages-per-window acc-data
:window->comp-lat-wgt-avg
Expand All @@ -926,6 +979,7 @@
{:keys [num-tasks
emitted
transferred
throughput
acked
failed
num-executors] :as statk->num}]
Expand All @@ -934,6 +988,7 @@
(and num-tasks (.set_num_tasks cas num-tasks))
(and emitted (.set_emitted cas emitted))
(and transferred (.set_transferred cas transferred))
(and throughput (.set_throughput cas throughput))
(and acked (.set_acked cas acked))
(and failed (.set_failed cas failed))
(.set_common_stats s cas)))
Expand Down Expand Up @@ -983,6 +1038,7 @@
bolt-id->stats
window->emitted
window->transferred
window->throughput
window->complete-latency
window->acked
window->failed]} data
Expand All @@ -999,6 +1055,7 @@
topology-stats (doto (TopologyStats.)
(.set_window_to_emitted window->emitted)
(.set_window_to_transferred window->transferred)
(.set_window_to_throughput window->throughput)
(.set_window_to_complete_latencies_ms
window->complete-latency)
(.set_window_to_acked window->acked)
Expand Down Expand Up @@ -1101,6 +1158,10 @@
(map-val handle-sys-components-fn)
aggregate-count-streams
(merge-with + (:window->transferred acc-stats)))
:window->throughput (->> (:throughput new-stats)
(map-val handle-sys-components-fn)
aggregate-count-streams
(merge-with + (:window->throughput acc-stats)))
:window->exec-lat-wgt-avg (merge-with +
(:window->exec-lat-wgt-avg acc-stats)
w->execLatWgtAvg)
Expand Down Expand Up @@ -1130,6 +1191,10 @@
(map-val handle-sys-components-fn)
aggregate-count-streams
(merge-with + (:window->emitted acc-stats)))
:window->throughput (->> (:throughput new-stats)
(map-val handle-sys-components-fn)
aggregate-count-streams
(merge-with + (:window->throughput acc-stats)))
:window->transferred (->> (:transferred new-stats)
(map-val handle-sys-components-fn)
aggregate-count-streams
Expand Down Expand Up @@ -1183,6 +1248,7 @@
:executor-stats []
:window->emitted {}
:window->transferred {}
:window->throughput {}
:window->exec-lat-wgt-avg {}
:window->executed {}
:window->proc-lat-wgt-avg {}
Expand All @@ -1196,6 +1262,7 @@
:sid->output-stats {}
:executor-stats []
:window->emitted {}
:window->throughput {}
:window->transferred {}
:window->comp-lat-wgt-avg {}
:window->acked {}
Expand Down Expand Up @@ -1237,6 +1304,7 @@
:executor-stats (:executor-stats (:stats acc-data))
:window->emitted (map-key str (:window->emitted acc-data))
:window->transferred (map-key str (:window->transferred acc-data))
:window->throughput (map-key str (:window->throughput acc-data))
:window->execute-latency
(compute-weighted-averages-per-window acc-data
:window->exec-lat-wgt-avg
Expand Down Expand Up @@ -1271,6 +1339,7 @@
:executor-stats (:executor-stats (:stats acc-data))
:window->emitted (map-key str (:window->emitted acc-data))
:window->transferred (map-key str (:window->transferred acc-data))
:window->throughput (map-key str (:window->throughput acc-data))
:window->complete-latency
(compute-weighted-averages-per-window acc-data
:window->comp-lat-wgt-avg
Expand Down Expand Up @@ -1313,6 +1382,7 @@
(merge
{:emitted (:window->emitted data)
:transferred (:window->transferred data)
:throughput (:window->throughput data)
:acked (:window->acked data)
:failed (:window->failed data)}
(condp = (:type data)
Expand Down Expand Up @@ -1422,6 +1492,9 @@
transferred (:transferred stream-summary)
transferred (into {} (for [[window stat] transferred]
{window (filter-key filter-fn stat)}))
throughput (:throughput stream-summary)
throughput (into {} (for [[window stat] throughput]
{window (filter-key filter-fn stat)}))
stream-summary (-> stream-summary (dissoc :emitted) (assoc :emitted emitted))
stream-summary (-> stream-summary (dissoc :transferred) (assoc :transferred transferred))]
stream-summary))
Expand All @@ -1437,7 +1510,8 @@
(defn aggregate-common-stats
[stats-seq]
{:emitted (aggregate-counts (map #(.get_emitted ^ExecutorStats %) stats-seq))
:transferred (aggregate-counts (map #(.get_transferred ^ExecutorStats %) stats-seq))})
:transferred (aggregate-counts (map #(.get_transferred ^ExecutorStats %) stats-seq))
:throughput (aggregate-counts (map #(.get_throughput ^ExecutorStats %) stats-seq))})

(defn aggregate-bolt-stats
[stats-seq include-sys?]
Expand Down Expand Up @@ -1491,6 +1565,7 @@
:failed (aggregate-count-streams (:failed stats))
:emitted (aggregate-count-streams (:emitted stats))
:transferred (aggregate-count-streams (:transferred stats))
:throughput (aggregate-count-streams (:throughput stats))
:complete-latencies (aggregate-avg-streams (:complete-latencies stats)
(:acked stats))})

Expand All @@ -1507,6 +1582,7 @@
:failed (aggregate-count-streams (:failed stats))
:emitted (aggregate-count-streams (:emitted stats))
:transferred (aggregate-count-streams (:transferred stats))
:throughput (aggregate-count-streams (:throughput stats))
:process-latencies (aggregate-avg-streams (:process-latencies stats)
(:acked stats))
:executed (aggregate-count-streams (:executed stats))
Expand Down Expand Up @@ -1554,7 +1630,7 @@
;; Include only keys that will be used. We want to count acked and
;; failed only for the "tuple trees," so we do not include those keys
;; from the bolt executors.
[:emitted :transferred])
[:emitted :transferred :throughput])
agg-spout-stats)))

(defn error-subset
Expand Down
Loading