Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion storm-core/src/clj/org/apache/storm/ui/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@
(get-in
(clojurify-structure (StatsUtil/spoutStreamsStats spout-summs true))
["complete-latencies" window]))
:throughput (if bolt-summs
(get-in
(clojurify-structure (StatsUtil/boltStreamsStats bolt-summs true))
[:throughput window])
(get-in
(clojurify-structure (StatsUtil/spoutStreamsStats spout-summs true))
[:throughput window]))
:transferred (or
(get-in
(clojurify-structure (StatsUtil/spoutStreamsStats spout-summs true))
Expand Down Expand Up @@ -566,6 +573,7 @@
"emitted" (get-in stats [:emitted w])
"transferred" (get-in stats [:transferred w])
"completeLatency" (StatsUtil/floatStr (get-in stats [:complete-latencies w]))
"throughput" (StatsUtil/floatStr (get-in stats [:throughput w]))
"acked" (get-in stats [:acked w])
"failed" (get-in stats [:failed w])})))

Expand Down Expand Up @@ -612,6 +620,7 @@
"tasks" (.get_num_tasks common-stats)
"emitted" (.get_emitted common-stats)
"transferred" (.get_transferred common-stats)
"throughput" (StatsUtil/floatStr (.get_throughput common-stats))
"acked" (.get_acked common-stats)
"failed" (.get_failed common-stats)
"requestedMemOnHeap" (.get (.get_resources_map common-stats) Config/TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB)
Expand Down Expand Up @@ -656,6 +665,7 @@
{:emitted (.get_window_to_emitted topo-stats)
:transferred (.get_window_to_transferred topo-stats)
:complete-latencies (.get_window_to_complete_latencies_ms topo-stats)
:throughput (.get_window_to_throughput topo-stats)
:acked (.get_window_to_acked topo-stats)
:failed (.get_window_to_failed topo-stats)}
topo-stats (topology-stats window stat->window->number)
Expand Down Expand Up @@ -857,6 +867,7 @@
"windowPretty" (window-hint window)
"emitted" (.get_emitted comm-s)
"transferred" (.get_transferred comm-s)
"throughput" (StatsUtil/floatStr (.get_throughput comm-s))
"acked" (.get_acked comm-s)
"failed" (.get_failed comm-s)
"executeLatency" (StatsUtil/floatStr (.get_execute_latency_ms bolt-s))
Expand All @@ -873,6 +884,7 @@
"windowPretty" (window-hint window)
"emitted" (.get_emitted comm-s)
"transferred" (.get_transferred comm-s)
"throughput" (StatsUtil/floatStr (.get_throughput comm-s))
"acked" (.get_acked comm-s)
"failed" (.get_failed comm-s)
"completeLatency" (StatsUtil/floatStr (.get_complete_latency_ms spout-s))}))
Expand Down Expand Up @@ -900,7 +912,8 @@
(let [^CommonAggregateStats cas (.get_common_stats stats)]
{"stream" stream-id
"emitted" (Utils/nullToZero (.get_emitted cas))
"transferred" (Utils/nullToZero (.get_transferred cas))}))
"transferred" (Utils/nullToZero (.get_transferred cas))
"throughput" (StatsUtil/floatStr (.get_throughput cas))}))

(defmethod unpack-comp-output-stat ComponentType/SPOUT
[[stream-id ^ComponentAggregateStats stats]]
Expand All @@ -910,6 +923,7 @@
{"stream" stream-id
"emitted" (Utils/nullToZero (.get_emitted cas))
"transferred" (Utils/nullToZero (.get_transferred cas))
"throughput" (StatsUtil/floatStr (.get_throughput cas))
"completeLatency" (StatsUtil/floatStr (.get_complete_latency_ms spout-s))
"acked" (Utils/nullToZero (.get_acked cas))
"failed" (Utils/nullToZero (.get_failed cas))}))
Expand Down Expand Up @@ -937,6 +951,7 @@
"port" port
"emitted" (Utils/nullToZero (.get_emitted cas))
"transferred" (Utils/nullToZero (.get_transferred cas))
"throughput" (StatsUtil/floatStr (.get_throughput cas))
"capacity" (StatsUtil/floatStr (Utils/nullToZero (.get_capacity bas)))
"executeLatency" (StatsUtil/floatStr (.get_execute_latency_ms bas))
"executed" (Utils/nullToZero (.get_executed bas))
Expand Down Expand Up @@ -965,6 +980,7 @@
"port" port
"emitted" (Utils/nullToZero (.get_emitted cas))
"transferred" (Utils/nullToZero (.get_transferred cas))
"throughput" (StatsUtil/floatStr (.get_throughput cas))
"completeLatency" (StatsUtil/floatStr (.get_complete_latency_ms sas))
"acked" (Utils/nullToZero (.get_acked cas))
"failed" (Utils/nullToZero (.get_failed cas))
Expand Down
Loading