diff --git a/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj b/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj index 14d0132ce0e..79f65afae47 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj @@ -83,10 +83,11 @@ @node+port->socket-ref))))) (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))) -(defn register-queue-metrics [queues storm-conf topology-context] +(defn register-queue-metrics [queues storm-conf topology-context stats] (doseq [[qname q] queues] (.registerMetric topology-context (str "__" (name qname)) (StateMetric. q) - (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS))))) + (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS))) + ())) (defn skipped-max-spout! [^SpoutThrottlingMetrics m stats] (-> m .skipped-max-spout (.incrBy (stats-rate stats)))) diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 07925b8aedc..bafed3b92d2 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -234,6 +234,7 @@ :producer-type :single-threaded :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS)) + receive-queue ((:executor-receive-queue-map worker) executor-id) ] (recursive-map :worker worker @@ -243,7 +244,7 @@ :component-id component-id :open-or-prepare-was-called? (atom false) :storm-conf storm-conf - :receive-queue ((:executor-receive-queue-map worker) executor-id) + :receive-queue receive-queue :storm-id (:storm-id worker) :conf (:conf worker) :shared-executor-data (HashMap.) @@ -257,7 +258,7 @@ :context (ClusterStateContext. DaemonType/WORKER)) :type executor-type ;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field) - :stats (mk-executor-stats <> (sampling-rate storm-conf)) + :stats (mk-executor-stats <> (sampling-rate storm-conf) (.getMetrics receive-queue) (.getMetrics batch-transfer->worker)) :interval->task->metric-registry (HashMap.) :task->component (:task->component worker) :stream->component->grouper (outbound-components worker-context component-id storm-conf) @@ -490,7 +491,7 @@ [component-id message-id (System/currentTimeMillis) values])))) (defmethod mk-threads :spout [executor-data task-datas initial-credentials] - (let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler open-or-prepare-was-called?]} executor-data + (let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler open-or-prepare-was-called? stats]} executor-data ^ISpoutWaitStrategy spout-wait-strategy (init-spout-wait-strategy storm-conf) max-spout-pending (executor-max-spout-pending storm-conf (count task-datas)) ^Integer max-spout-pending (if max-spout-pending (int max-spout-pending)) @@ -595,7 +596,7 @@ (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data)) (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data) :receive receive-queue} - storm-conf (:user-context task-data)) + storm-conf (:user-context task-data) stats) (when (instance? ICredentialsListener spout-obj) (.setCredentials spout-obj initial-credentials)) (.open spout-obj @@ -781,12 +782,12 @@ (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data) :receive (:receive-queue executor-data) :transfer (:transfer-queue (:worker executor-data))} - storm-conf user-context) + storm-conf user-context executor-stats) (builtin-metrics/register-iconnection-client-metrics (:cached-node+port->socket (:worker executor-data)) storm-conf user-context) (builtin-metrics/register-iconnection-server-metric (:receiver (:worker executor-data)) storm-conf user-context)) (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data) :receive (:receive-queue executor-data)} - storm-conf user-context) + storm-conf user-context executor-stats) ) (.prepare bolt-obj @@ -857,8 +858,8 @@ (.cleanup bolt)) ;; TODO: refactor this to be part of an executor-specific map -(defmethod mk-executor-stats :spout [_ rate] - (stats/mk-spout-stats rate)) +(defmethod mk-executor-stats :spout [_ rate receive-queue-metrics send-queue-metrics] + (stats/mk-spout-stats rate receive-queue-metrics send-queue-metrics)) -(defmethod mk-executor-stats :bolt [_ rate] - (stats/mk-bolt-stats rate)) +(defmethod mk-executor-stats :bolt [_ rate receive-queue-metrics send-queue-metrics] + (stats/mk-bolt-stats rate receive-queue-metrics send-queue-metrics)) diff --git a/storm-core/src/clj/org/apache/storm/stats.clj b/storm-core/src/clj/org/apache/storm/stats.clj index 26a4eb44cf6..dd7731d9c18 100644 --- a/storm-core/src/clj/org/apache/storm/stats.clj +++ b/storm-core/src/clj/org/apache/storm/stats.clj @@ -23,7 +23,7 @@ ComponentPageInfo ComponentType BoltAggregateStats ExecutorAggregateStats SpecificAggregateStats SpoutAggregateStats TopologyPageInfo TopologyStats]) - (:import [org.apache.storm.utils Utils]) + (:import [org.apache.storm.utils Utils DisruptorQueue$QueueMetrics]) (:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric]) (:use [org.apache.storm log util]) (:use [clojure.math.numeric-tower :only [ceil]])) @@ -33,6 +33,7 @@ (def COMMON-FIELDS [:emitted :transferred]) (defrecord CommonStats [^MultiCountStatAndMetric emitted ^MultiCountStatAndMetric transferred + ^MultiCountStatAndMetric population rate]) (def BOLT-FIELDS [:acked :failed :process-latencies :executed :execute-latencies]) @@ -53,17 +54,25 @@ (def NUM-STAT-BUCKETS 20) +(defn- mk-queue-stats + [^DisruptorQueue$QueueMetrics receive-queue-metrics ^DisruptorQueue$QueueMetrics send-queue-metrics] + (let [population (MultiLatencyStatAndMetric. DisruptorQueue$QueueMetrics/NUM_BUCKETS)] + (.add population "receive" (.avgQueuePopulationMetric receive-queue-metrics)) + (.add population "send" (.avgQueuePopulationMetric send-queue-metrics)) + population)) + (defn- mk-common-stats - [rate] + [rate receive-queue-metrics send-queue-metrics] (CommonStats. (MultiCountStatAndMetric. NUM-STAT-BUCKETS) (MultiCountStatAndMetric. NUM-STAT-BUCKETS) + (mk-queue-stats receive-queue-metrics send-queue-metrics) rate)) (defn mk-bolt-stats - [rate] + [rate receive-queue-metrics send-queue-metrics] (BoltExecutorStats. - (mk-common-stats rate) + (mk-common-stats rate receive-queue-metrics send-queue-metrics) (MultiCountStatAndMetric. NUM-STAT-BUCKETS) (MultiCountStatAndMetric. NUM-STAT-BUCKETS) (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS) @@ -71,9 +80,9 @@ (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS))) (defn mk-spout-stats - [rate] + [rate receive-queue-metrics send-queue-metrics] (SpoutExecutorStats. - (mk-common-stats rate) + (mk-common-stats rate receive-queue-metrics send-queue-metrics) (MultiCountStatAndMetric. NUM-STAT-BUCKETS) (MultiCountStatAndMetric. NUM-STAT-BUCKETS) (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS))) @@ -189,6 +198,7 @@ [^CommonStats stats] (merge (value-stats stats COMMON-FIELDS) + {:population (.getTimeLatAvg (:population stats))} {:rate (:rate stats)})) (defn value-bolt-stats! @@ -265,6 +275,7 @@ specific-stats (clojurify-specific-stats specific-stats) common-stats (CommonStats. (.get_emitted stats) (.get_transferred stats) + (.get_population stats) (.get_rate stats))] (if is_bolt? ; worker heart beat does not store the BoltExecutorStats or SpoutExecutorStats , instead it stores the result returned by render-stats! @@ -295,10 +306,11 @@ [stats] (let [specific-stats (thriftify-specific-stats stats) rate (:rate stats)] - (ExecutorStats. (window-set-converter (:emitted stats) str) - (window-set-converter (:transferred stats) str) - specific-stats - rate))) + (doto (ExecutorStats. (window-set-converter (:emitted stats) str) + (window-set-converter (:transferred stats) str) + specific-stats + rate) + (.set_population (window-set-converter (:population stats) str))))) (defn valid-number? "Returns true if x is a number that is not NaN or Infinity, false otherwise" @@ -445,6 +457,16 @@ :num-executors 1, :num-tasks num-tasks, :capacity (compute-agg-capacity statk->w->sid->num uptime) + :in-backlog (-> statk->w->sid->num + :population + str-key + (get window) + (get "receive")), + :out-backlog (-> statk->w->sid->num + :population + str-key + (get window) + (get "send")), :cid+sid->input-stats (merge-with merge @@ -500,6 +522,16 @@ :uptime uptime, :num-executors 1, :num-tasks num-tasks, + :in-backlog (-> statk->w->sid->num + :population + str-key + (get window) + (get "receive")), + :out-backlog (-> statk->w->sid->num + :population + str-key + (get window) + (get "send")), :sid->output-stats (merge-with merge @@ -646,7 +678,7 @@ (conj (:executor-stats acc-bolt-stats) (merge (select-keys bolt-stats - [:executor-id :uptime :host :port :capacity]) + [:executor-id :uptime :host :port :capacity :in-backlog :out-backlog]) {:emitted (sum-streams bolt-out :emitted) :transferred (sum-streams bolt-out :transferred) :acked (sum-streams bolt-in :acked) @@ -675,7 +707,7 @@ acked (sum-streams spout-out :acked)] (conj (:executor-stats acc-spout-stats) (merge - (select-keys spout-stats [:executor-id :uptime :host :port]) + (select-keys spout-stats [:executor-id :uptime :host :port :in-backlog :out-backlog]) {:emitted (sum-streams spout-out :emitted) :transferred (sum-streams spout-out :transferred) :acked acked @@ -927,6 +959,8 @@ transferred acked failed + in-backlog + out-backlog num-executors] :as statk->num}] (let [cas (CommonAggregateStats.)] (and num-executors (.set_num_executors cas num-executors)) @@ -935,6 +969,8 @@ (and transferred (.set_transferred cas transferred)) (and acked (.set_acked cas acked)) (and failed (.set_failed cas failed)) + (and in-backlog (.set_in_backlog cas in-backlog)) + (and out-backlog (.set_out_backlog cas out-backlog)) (.set_common_stats s cas))) (defn thriftify-bolt-agg-stats @@ -1139,7 +1175,9 @@ :window->executed {} :window->proc-lat-wgt-avg {} :window->acked {} - :window->failed {}}] + :window->failed {} + :window->in-backlog {} + :window->out-backlog {}}] (apply aggregate-comp-stats* (concat args (list init-val))))) (defmethod aggregate-comp-stats :spout @@ -1151,7 +1189,9 @@ :window->transferred {} :window->comp-lat-wgt-avg {} :window->acked {} - :window->failed {}}] + :window->failed {} + :window->in-backlog {} + :window->out-backlog {}}] (apply aggregate-comp-stats* (concat args (list init-val))))) (defmethod aggregate-comp-stats :default [& _] {}) @@ -1199,7 +1239,9 @@ :window->proc-lat-wgt-avg :window->executed) :window->acked (map-key str (:window->acked acc-data)) - :window->failed (map-key str (:window->failed acc-data))}) + :window->failed (map-key str (:window->failed acc-data)) + :window->in-backlog (map-key str (:window->in-backlog acc-data)) + :window->out-backlog (map-key str (:window->out-backlog acc-data))}) (defmethod post-aggregate-comp-stats :spout [task->component @@ -1266,7 +1308,9 @@ {:emitted (:window->emitted data) :transferred (:window->transferred data) :acked (:window->acked data) - :failed (:window->failed data)} + :failed (:window->failed data) + :in-backlog (:window->in-backlog data) + :out-backlog (:window->out-backlog data)} (condp = (:type data) :bolt {:execute-latency (:window->execute-latency data) :process-latency (:window->process-latency data) diff --git a/storm-core/src/clj/org/apache/storm/ui/core.clj b/storm-core/src/clj/org/apache/storm/ui/core.clj index 1ab2d3398d3..3dc77a0f75c 100644 --- a/storm-core/src/clj/org/apache/storm/ui/core.clj +++ b/storm-core/src/clj/org/apache/storm/ui/core.clj @@ -734,6 +734,8 @@ "port" port "emitted" (nil-to-zero (.get_emitted cas)) "transferred" (nil-to-zero (.get_transferred cas)) + "inBacklog" (nil-to-zero (.get_in_backlog cas)) + "outBacklog" (nil-to-zero (.get_out_backlog cas)) "capacity" (float-str (nil-to-zero (.get_capacity bas))) "executeLatency" (float-str (.get_execute_latency_ms bas)) "executed" (nil-to-zero (.get_executed bas)) @@ -762,6 +764,8 @@ "port" port "emitted" (nil-to-zero (.get_emitted cas)) "transferred" (nil-to-zero (.get_transferred cas)) + "inBacklog" (nil-to-zero (.get_in_backlog cas)) + "outBacklog" (nil-to-zero (.get_out_backlog cas)) "completeLatency" (float-str (.get_complete_latency_ms sas)) "acked" (nil-to-zero (.get_acked cas)) "failed" (nil-to-zero (.get_failed cas)) diff --git a/storm-core/src/jvm/org/apache/storm/generated/CommonAggregateStats.java b/storm-core/src/jvm/org/apache/storm/generated/CommonAggregateStats.java index 93bcb171d8f..acaa30ec606 100644 --- a/storm-core/src/jvm/org/apache/storm/generated/CommonAggregateStats.java +++ b/storm-core/src/jvm/org/apache/storm/generated/CommonAggregateStats.java @@ -23,35 +23,26 @@ */ package org.apache.storm.generated; +import org.apache.thrift.EncodingUtils; +import org.apache.thrift.protocol.TTupleProtocol; import org.apache.thrift.scheme.IScheme; import org.apache.thrift.scheme.SchemeFactory; import org.apache.thrift.scheme.StandardScheme; - import org.apache.thrift.scheme.TupleScheme; -import org.apache.thrift.protocol.TTupleProtocol; -import org.apache.thrift.protocol.TProtocolException; -import org.apache.thrift.EncodingUtils; -import org.apache.thrift.TException; -import org.apache.thrift.async.AsyncMethodCallback; -import org.apache.thrift.server.AbstractNonblockingServer.*; -import java.util.List; + import java.util.ArrayList; -import java.util.Map; -import java.util.HashMap; +import java.util.BitSet; +import java.util.Collections; import java.util.EnumMap; -import java.util.Set; -import java.util.HashSet; import java.util.EnumSet; -import java.util.Collections; -import java.util.BitSet; -import java.nio.ByteBuffer; -import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import javax.annotation.Generated; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-05-07") public class CommonAggregateStats implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("CommonAggregateStats"); @@ -61,6 +52,8 @@ public class CommonAggregateStats implements org.apache.thrift.TBase, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -74,6 +67,8 @@ public class CommonAggregateStats implements org.apache.thrift.TBase byName = new HashMap(); @@ -109,6 +106,10 @@ public static _Fields findByThriftId(int fieldId) { return ACKED; case 6: // FAILED return FAILED; + case 7: // IN_BACKLOG + return IN_BACKLOG; + case 8: // OUT_BACKLOG + return OUT_BACKLOG; default: return null; } @@ -155,8 +156,10 @@ public String getFieldName() { private static final int __TRANSFERRED_ISSET_ID = 3; private static final int __ACKED_ISSET_ID = 4; private static final int __FAILED_ISSET_ID = 5; + private static final int __IN_BACKLOG_ISSET_ID = 6; + private static final int __OUT_BACKLOG_ISSET_ID = 7; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.NUM_EXECUTORS,_Fields.NUM_TASKS,_Fields.EMITTED,_Fields.TRANSFERRED,_Fields.ACKED,_Fields.FAILED}; + private static final _Fields optionals[] = {_Fields.NUM_EXECUTORS,_Fields.NUM_TASKS,_Fields.EMITTED,_Fields.TRANSFERRED,_Fields.ACKED,_Fields.FAILED,_Fields.IN_BACKLOG,_Fields.OUT_BACKLOG}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -172,6 +175,10 @@ public String getFieldName() { new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); tmpMap.put(_Fields.FAILED, new org.apache.thrift.meta_data.FieldMetaData("failed", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))); + tmpMap.put(_Fields.IN_BACKLOG, new org.apache.thrift.meta_data.FieldMetaData("in_backlog", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); + tmpMap.put(_Fields.OUT_BACKLOG, new org.apache.thrift.meta_data.FieldMetaData("out_backlog", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CommonAggregateStats.class, metaDataMap); } @@ -190,6 +197,8 @@ public CommonAggregateStats(CommonAggregateStats other) { this.transferred = other.transferred; this.acked = other.acked; this.failed = other.failed; + this.in_backlog = other.in_backlog; + this.out_backlog = other.out_backlog; } public CommonAggregateStats deepCopy() { @@ -210,6 +219,10 @@ public void clear() { this.acked = 0; set_failed_isSet(false); this.failed = 0; + set_in_backlog_isSet(false); + this.in_backlog = 0; + set_out_backlog_isSet(false); + this.out_backlog = 0; } public int get_num_executors() { @@ -344,6 +357,50 @@ public void set_failed_isSet(boolean value) { __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __FAILED_ISSET_ID, value); } + public int get_in_backlog() { + return this.in_backlog; + } + + public void set_in_backlog(int in_backlog) { + this.in_backlog = in_backlog; + set_in_backlog_isSet(true); + } + + public void unset_in_backlog() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __IN_BACKLOG_ISSET_ID); + } + + /** Returns true if field in_backlog is set (has been assigned a value) and false otherwise */ + public boolean is_set_in_backlog() { + return EncodingUtils.testBit(__isset_bitfield, __IN_BACKLOG_ISSET_ID); + } + + public void set_in_backlog_isSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __IN_BACKLOG_ISSET_ID, value); + } + + public int get_out_backlog() { + return this.out_backlog; + } + + public void set_out_backlog(int out_backlog) { + this.out_backlog = out_backlog; + set_out_backlog_isSet(true); + } + + public void unset_out_backlog() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __OUT_BACKLOG_ISSET_ID); + } + + /** Returns true if field out_backlog is set (has been assigned a value) and false otherwise */ + public boolean is_set_out_backlog() { + return EncodingUtils.testBit(__isset_bitfield, __OUT_BACKLOG_ISSET_ID); + } + + public void set_out_backlog_isSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __OUT_BACKLOG_ISSET_ID, value); + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case NUM_EXECUTORS: @@ -394,6 +451,22 @@ public void setFieldValue(_Fields field, Object value) { } break; + case IN_BACKLOG: + if (value == null) { + unset_in_backlog(); + } else { + set_in_backlog((Integer)value); + } + break; + + case OUT_BACKLOG: + if (value == null) { + unset_out_backlog(); + } else { + set_out_backlog((Integer)value); + } + break; + } } @@ -417,6 +490,12 @@ public Object getFieldValue(_Fields field) { case FAILED: return get_failed(); + case IN_BACKLOG: + return get_in_backlog(); + + case OUT_BACKLOG: + return get_out_backlog(); + } throw new IllegalStateException(); } @@ -440,6 +519,10 @@ public boolean isSet(_Fields field) { return is_set_acked(); case FAILED: return is_set_failed(); + case IN_BACKLOG: + return is_set_in_backlog(); + case OUT_BACKLOG: + return is_set_out_backlog(); } throw new IllegalStateException(); } @@ -511,6 +594,24 @@ public boolean equals(CommonAggregateStats that) { return false; } + boolean this_present_in_backlog = true && this.is_set_in_backlog(); + boolean that_present_in_backlog = true && that.is_set_in_backlog(); + if (this_present_in_backlog || that_present_in_backlog) { + if (!(this_present_in_backlog && that_present_in_backlog)) + return false; + if (this.in_backlog != that.in_backlog) + return false; + } + + boolean this_present_out_backlog = true && this.is_set_out_backlog(); + boolean that_present_out_backlog = true && that.is_set_out_backlog(); + if (this_present_out_backlog || that_present_out_backlog) { + if (!(this_present_out_backlog && that_present_out_backlog)) + return false; + if (this.out_backlog != that.out_backlog) + return false; + } + return true; } @@ -548,6 +649,16 @@ public int hashCode() { if (present_failed) list.add(failed); + boolean present_in_backlog = true && (is_set_in_backlog()); + list.add(present_in_backlog); + if (present_in_backlog) + list.add(in_backlog); + + boolean present_out_backlog = true && (is_set_out_backlog()); + list.add(present_out_backlog); + if (present_out_backlog) + list.add(out_backlog); + return list.hashCode(); } @@ -619,6 +730,26 @@ public int compareTo(CommonAggregateStats other) { return lastComparison; } } + lastComparison = Boolean.valueOf(is_set_in_backlog()).compareTo(other.is_set_in_backlog()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_in_backlog()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.in_backlog, other.in_backlog); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(is_set_out_backlog()).compareTo(other.is_set_out_backlog()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_out_backlog()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.out_backlog, other.out_backlog); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -674,6 +805,18 @@ public String toString() { sb.append(this.failed); first = false; } + if (is_set_in_backlog()) { + if (!first) sb.append(", "); + sb.append("in_backlog:"); + sb.append(this.in_backlog); + first = false; + } + if (is_set_out_backlog()) { + if (!first) sb.append(", "); + sb.append("out_backlog:"); + sb.append(this.out_backlog); + first = false; + } sb.append(")"); return sb.toString(); } @@ -767,6 +910,22 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, CommonAggregateStat org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 7: // IN_BACKLOG + if (schemeField.type == org.apache.thrift.protocol.TType.I32) { + struct.in_backlog = iprot.readI32(); + struct.set_in_backlog_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 8: // OUT_BACKLOG + if (schemeField.type == org.apache.thrift.protocol.TType.I32) { + struct.out_backlog = iprot.readI32(); + struct.set_out_backlog_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -810,6 +969,16 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, CommonAggregateSta oprot.writeI64(struct.failed); oprot.writeFieldEnd(); } + if (struct.is_set_in_backlog()) { + oprot.writeFieldBegin(IN_BACKLOG_FIELD_DESC); + oprot.writeI32(struct.in_backlog); + oprot.writeFieldEnd(); + } + if (struct.is_set_out_backlog()) { + oprot.writeFieldBegin(OUT_BACKLOG_FIELD_DESC); + oprot.writeI32(struct.out_backlog); + oprot.writeFieldEnd(); + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -846,7 +1015,13 @@ public void write(org.apache.thrift.protocol.TProtocol prot, CommonAggregateStat if (struct.is_set_failed()) { optionals.set(5); } - oprot.writeBitSet(optionals, 6); + if (struct.is_set_in_backlog()) { + optionals.set(6); + } + if (struct.is_set_out_backlog()) { + optionals.set(7); + } + oprot.writeBitSet(optionals, 8); if (struct.is_set_num_executors()) { oprot.writeI32(struct.num_executors); } @@ -865,12 +1040,18 @@ public void write(org.apache.thrift.protocol.TProtocol prot, CommonAggregateStat if (struct.is_set_failed()) { oprot.writeI64(struct.failed); } + if (struct.is_set_in_backlog()) { + oprot.writeI32(struct.in_backlog); + } + if (struct.is_set_out_backlog()) { + oprot.writeI32(struct.out_backlog); + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, CommonAggregateStats struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; - BitSet incoming = iprot.readBitSet(6); + BitSet incoming = iprot.readBitSet(8); if (incoming.get(0)) { struct.num_executors = iprot.readI32(); struct.set_num_executors_isSet(true); @@ -895,6 +1076,14 @@ public void read(org.apache.thrift.protocol.TProtocol prot, CommonAggregateStats struct.failed = iprot.readI64(); struct.set_failed_isSet(true); } + if (incoming.get(6)) { + struct.in_backlog = iprot.readI32(); + struct.set_in_backlog_isSet(true); + } + if (incoming.get(7)) { + struct.out_backlog = iprot.readI32(); + struct.set_out_backlog_isSet(true); + } } } diff --git a/storm-core/src/jvm/org/apache/storm/generated/ExecutorStats.java b/storm-core/src/jvm/org/apache/storm/generated/ExecutorStats.java index 14c68edf745..3095aa57866 100644 --- a/storm-core/src/jvm/org/apache/storm/generated/ExecutorStats.java +++ b/storm-core/src/jvm/org/apache/storm/generated/ExecutorStats.java @@ -23,35 +23,26 @@ */ package org.apache.storm.generated; +import org.apache.thrift.EncodingUtils; +import org.apache.thrift.protocol.TTupleProtocol; import org.apache.thrift.scheme.IScheme; import org.apache.thrift.scheme.SchemeFactory; import org.apache.thrift.scheme.StandardScheme; - import org.apache.thrift.scheme.TupleScheme; -import org.apache.thrift.protocol.TTupleProtocol; -import org.apache.thrift.protocol.TProtocolException; -import org.apache.thrift.EncodingUtils; -import org.apache.thrift.TException; -import org.apache.thrift.async.AsyncMethodCallback; -import org.apache.thrift.server.AbstractNonblockingServer.*; -import java.util.List; + import java.util.ArrayList; -import java.util.Map; -import java.util.HashMap; +import java.util.BitSet; +import java.util.Collections; import java.util.EnumMap; -import java.util.Set; -import java.util.HashSet; import java.util.EnumSet; -import java.util.Collections; -import java.util.BitSet; -import java.nio.ByteBuffer; -import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import javax.annotation.Generated; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) -@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)") +@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-05-07") public class ExecutorStats implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ExecutorStats"); @@ -59,6 +50,7 @@ public class ExecutorStats implements org.apache.thrift.TBase, SchemeFactory> schemes = new HashMap, SchemeFactory>(); static { @@ -70,13 +62,15 @@ public class ExecutorStats implements org.apache.thrift.TBase> transferred; // required private ExecutorSpecificStats specific; // required private double rate; // required + private Map> population; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { EMITTED((short)1, "emitted"), TRANSFERRED((short)2, "transferred"), SPECIFIC((short)3, "specific"), - RATE((short)4, "rate"); + RATE((short)4, "rate"), + POPULATION((short)5, "population"); private static final Map byName = new HashMap(); @@ -99,6 +93,8 @@ public static _Fields findByThriftId(int fieldId) { return SPECIFIC; case 4: // RATE return RATE; + case 5: // POPULATION + return POPULATION; default: return null; } @@ -141,6 +137,7 @@ public String getFieldName() { // isset id assignments private static final int __RATE_ISSET_ID = 0; private byte __isset_bitfield = 0; + private static final _Fields optionals[] = {_Fields.POPULATION}; public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -160,6 +157,12 @@ public String getFieldName() { new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, ExecutorSpecificStats.class))); tmpMap.put(_Fields.RATE, new org.apache.thrift.meta_data.FieldMetaData("rate", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE))); + tmpMap.put(_Fields.POPULATION, new org.apache.thrift.meta_data.FieldMetaData("population", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), + new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING), + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.DOUBLE))))); metaDataMap = Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ExecutorStats.class, metaDataMap); } @@ -220,6 +223,21 @@ public ExecutorStats(ExecutorStats other) { this.specific = new ExecutorSpecificStats(other.specific); } this.rate = other.rate; + if (other.is_set_population()) { + Map> __this__population = new HashMap>(other.population.size()); + for (Map.Entry> other_element : other.population.entrySet()) { + + String other_element_key = other_element.getKey(); + Map other_element_value = other_element.getValue(); + + String __this__population_copy_key = other_element_key; + + Map __this__population_copy_value = new HashMap(other_element_value); + + __this__population.put(__this__population_copy_key, __this__population_copy_value); + } + this.population = __this__population; + } } public ExecutorStats deepCopy() { @@ -233,6 +251,7 @@ public void clear() { this.specific = null; set_rate_isSet(false); this.rate = 0.0; + this.population = null; } public int get_emitted_size() { @@ -348,6 +367,40 @@ public void set_rate_isSet(boolean value) { __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __RATE_ISSET_ID, value); } + public int get_population_size() { + return (this.population == null) ? 0 : this.population.size(); + } + + public void put_to_population(String key, Map val) { + if (this.population == null) { + this.population = new HashMap>(); + } + this.population.put(key, val); + } + + public Map> get_population() { + return this.population; + } + + public void set_population(Map> population) { + this.population = population; + } + + public void unset_population() { + this.population = null; + } + + /** Returns true if field population is set (has been assigned a value) and false otherwise */ + public boolean is_set_population() { + return this.population != null; + } + + public void set_population_isSet(boolean value) { + if (!value) { + this.population = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case EMITTED: @@ -382,6 +435,14 @@ public void setFieldValue(_Fields field, Object value) { } break; + case POPULATION: + if (value == null) { + unset_population(); + } else { + set_population((Map>)value); + } + break; + } } @@ -399,6 +460,9 @@ public Object getFieldValue(_Fields field) { case RATE: return get_rate(); + case POPULATION: + return get_population(); + } throw new IllegalStateException(); } @@ -418,6 +482,8 @@ public boolean isSet(_Fields field) { return is_set_specific(); case RATE: return is_set_rate(); + case POPULATION: + return is_set_population(); } throw new IllegalStateException(); } @@ -471,6 +537,15 @@ public boolean equals(ExecutorStats that) { return false; } + boolean this_present_population = true && this.is_set_population(); + boolean that_present_population = true && that.is_set_population(); + if (this_present_population || that_present_population) { + if (!(this_present_population && that_present_population)) + return false; + if (!this.population.equals(that.population)) + return false; + } + return true; } @@ -498,6 +573,11 @@ public int hashCode() { if (present_rate) list.add(rate); + boolean present_population = true && (is_set_population()); + list.add(present_population); + if (present_population) + list.add(population); + return list.hashCode(); } @@ -549,6 +629,16 @@ public int compareTo(ExecutorStats other) { return lastComparison; } } + lastComparison = Boolean.valueOf(is_set_population()).compareTo(other.is_set_population()); + if (lastComparison != 0) { + return lastComparison; + } + if (is_set_population()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.population, other.population); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -596,6 +686,16 @@ public String toString() { sb.append("rate:"); sb.append(this.rate); first = false; + if (is_set_population()) { + if (!first) sb.append(", "); + sb.append("population:"); + if (this.population == null) { + sb.append("null"); + } else { + sb.append(this.population); + } + first = false; + } sb.append(")"); return sb.toString(); } @@ -738,6 +838,38 @@ public void read(org.apache.thrift.protocol.TProtocol iprot, ExecutorStats struc org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 5: // POPULATION + if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { + { + org.apache.thrift.protocol.TMap _map292 = iprot.readMapBegin(); + struct.population = new HashMap>(2*_map292.size); + String _key293; + Map _val294; + for (int _i295 = 0; _i295 < _map292.size; ++_i295) + { + _key293 = iprot.readString(); + { + org.apache.thrift.protocol.TMap _map296 = iprot.readMapBegin(); + _val294 = new HashMap(2*_map296.size); + String _key297; + double _val298; + for (int _i299 = 0; _i299 < _map296.size; ++_i299) + { + _key297 = iprot.readString(); + _val298 = iprot.readDouble(); + _val294.put(_key297, _val298); + } + iprot.readMapEnd(); + } + struct.population.put(_key293, _val294); + } + iprot.readMapEnd(); + } + struct.set_population_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -755,15 +887,15 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, ExecutorStats stru oprot.writeFieldBegin(EMITTED_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, struct.emitted.size())); - for (Map.Entry> _iter292 : struct.emitted.entrySet()) + for (Map.Entry> _iter300 : struct.emitted.entrySet()) { - oprot.writeString(_iter292.getKey()); + oprot.writeString(_iter300.getKey()); { - oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, _iter292.getValue().size())); - for (Map.Entry _iter293 : _iter292.getValue().entrySet()) + oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, _iter300.getValue().size())); + for (Map.Entry _iter301 : _iter300.getValue().entrySet()) { - oprot.writeString(_iter293.getKey()); - oprot.writeI64(_iter293.getValue()); + oprot.writeString(_iter301.getKey()); + oprot.writeI64(_iter301.getValue()); } oprot.writeMapEnd(); } @@ -776,15 +908,15 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, ExecutorStats stru oprot.writeFieldBegin(TRANSFERRED_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, struct.transferred.size())); - for (Map.Entry> _iter294 : struct.transferred.entrySet()) + for (Map.Entry> _iter302 : struct.transferred.entrySet()) { - oprot.writeString(_iter294.getKey()); + oprot.writeString(_iter302.getKey()); { - oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, _iter294.getValue().size())); - for (Map.Entry _iter295 : _iter294.getValue().entrySet()) + oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, _iter302.getValue().size())); + for (Map.Entry _iter303 : _iter302.getValue().entrySet()) { - oprot.writeString(_iter295.getKey()); - oprot.writeI64(_iter295.getValue()); + oprot.writeString(_iter303.getKey()); + oprot.writeI64(_iter303.getValue()); } oprot.writeMapEnd(); } @@ -801,6 +933,29 @@ public void write(org.apache.thrift.protocol.TProtocol oprot, ExecutorStats stru oprot.writeFieldBegin(RATE_FIELD_DESC); oprot.writeDouble(struct.rate); oprot.writeFieldEnd(); + if (struct.population != null) { + if (struct.is_set_population()) { + oprot.writeFieldBegin(POPULATION_FIELD_DESC); + { + oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, struct.population.size())); + for (Map.Entry> _iter304 : struct.population.entrySet()) + { + oprot.writeString(_iter304.getKey()); + { + oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.DOUBLE, _iter304.getValue().size())); + for (Map.Entry _iter305 : _iter304.getValue().entrySet()) + { + oprot.writeString(_iter305.getKey()); + oprot.writeDouble(_iter305.getValue()); + } + oprot.writeMapEnd(); + } + } + oprot.writeMapEnd(); + } + oprot.writeFieldEnd(); + } + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -820,86 +975,108 @@ public void write(org.apache.thrift.protocol.TProtocol prot, ExecutorStats struc TTupleProtocol oprot = (TTupleProtocol) prot; { oprot.writeI32(struct.emitted.size()); - for (Map.Entry> _iter296 : struct.emitted.entrySet()) + for (Map.Entry> _iter306 : struct.emitted.entrySet()) { - oprot.writeString(_iter296.getKey()); + oprot.writeString(_iter306.getKey()); { - oprot.writeI32(_iter296.getValue().size()); - for (Map.Entry _iter297 : _iter296.getValue().entrySet()) + oprot.writeI32(_iter306.getValue().size()); + for (Map.Entry _iter307 : _iter306.getValue().entrySet()) { - oprot.writeString(_iter297.getKey()); - oprot.writeI64(_iter297.getValue()); + oprot.writeString(_iter307.getKey()); + oprot.writeI64(_iter307.getValue()); } } } } { oprot.writeI32(struct.transferred.size()); - for (Map.Entry> _iter298 : struct.transferred.entrySet()) + for (Map.Entry> _iter308 : struct.transferred.entrySet()) { - oprot.writeString(_iter298.getKey()); + oprot.writeString(_iter308.getKey()); { - oprot.writeI32(_iter298.getValue().size()); - for (Map.Entry _iter299 : _iter298.getValue().entrySet()) + oprot.writeI32(_iter308.getValue().size()); + for (Map.Entry _iter309 : _iter308.getValue().entrySet()) { - oprot.writeString(_iter299.getKey()); - oprot.writeI64(_iter299.getValue()); + oprot.writeString(_iter309.getKey()); + oprot.writeI64(_iter309.getValue()); } } } } struct.specific.write(oprot); oprot.writeDouble(struct.rate); + BitSet optionals = new BitSet(); + if (struct.is_set_population()) { + optionals.set(0); + } + oprot.writeBitSet(optionals, 1); + if (struct.is_set_population()) { + { + oprot.writeI32(struct.population.size()); + for (Map.Entry> _iter310 : struct.population.entrySet()) + { + oprot.writeString(_iter310.getKey()); + { + oprot.writeI32(_iter310.getValue().size()); + for (Map.Entry _iter311 : _iter310.getValue().entrySet()) + { + oprot.writeString(_iter311.getKey()); + oprot.writeDouble(_iter311.getValue()); + } + } + } + } + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, ExecutorStats struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; { - org.apache.thrift.protocol.TMap _map300 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32()); - struct.emitted = new HashMap>(2*_map300.size); - String _key301; - Map _val302; - for (int _i303 = 0; _i303 < _map300.size; ++_i303) + org.apache.thrift.protocol.TMap _map312 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32()); + struct.emitted = new HashMap>(2*_map312.size); + String _key313; + Map _val314; + for (int _i315 = 0; _i315 < _map312.size; ++_i315) { - _key301 = iprot.readString(); + _key313 = iprot.readString(); { - org.apache.thrift.protocol.TMap _map304 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, iprot.readI32()); - _val302 = new HashMap(2*_map304.size); - String _key305; - long _val306; - for (int _i307 = 0; _i307 < _map304.size; ++_i307) + org.apache.thrift.protocol.TMap _map316 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, iprot.readI32()); + _val314 = new HashMap(2*_map316.size); + String _key317; + long _val318; + for (int _i319 = 0; _i319 < _map316.size; ++_i319) { - _key305 = iprot.readString(); - _val306 = iprot.readI64(); - _val302.put(_key305, _val306); + _key317 = iprot.readString(); + _val318 = iprot.readI64(); + _val314.put(_key317, _val318); } } - struct.emitted.put(_key301, _val302); + struct.emitted.put(_key313, _val314); } } struct.set_emitted_isSet(true); { - org.apache.thrift.protocol.TMap _map308 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32()); - struct.transferred = new HashMap>(2*_map308.size); - String _key309; - Map _val310; - for (int _i311 = 0; _i311 < _map308.size; ++_i311) + org.apache.thrift.protocol.TMap _map320 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32()); + struct.transferred = new HashMap>(2*_map320.size); + String _key321; + Map _val322; + for (int _i323 = 0; _i323 < _map320.size; ++_i323) { - _key309 = iprot.readString(); + _key321 = iprot.readString(); { - org.apache.thrift.protocol.TMap _map312 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, iprot.readI32()); - _val310 = new HashMap(2*_map312.size); - String _key313; - long _val314; - for (int _i315 = 0; _i315 < _map312.size; ++_i315) + org.apache.thrift.protocol.TMap _map324 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, iprot.readI32()); + _val322 = new HashMap(2*_map324.size); + String _key325; + long _val326; + for (int _i327 = 0; _i327 < _map324.size; ++_i327) { - _key313 = iprot.readString(); - _val314 = iprot.readI64(); - _val310.put(_key313, _val314); + _key325 = iprot.readString(); + _val326 = iprot.readI64(); + _val322.put(_key325, _val326); } } - struct.transferred.put(_key309, _val310); + struct.transferred.put(_key321, _val322); } } struct.set_transferred_isSet(true); @@ -908,6 +1085,33 @@ public void read(org.apache.thrift.protocol.TProtocol prot, ExecutorStats struct struct.set_specific_isSet(true); struct.rate = iprot.readDouble(); struct.set_rate_isSet(true); + BitSet incoming = iprot.readBitSet(1); + if (incoming.get(0)) { + { + org.apache.thrift.protocol.TMap _map328 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32()); + struct.population = new HashMap>(2*_map328.size); + String _key329; + Map _val330; + for (int _i331 = 0; _i331 < _map328.size; ++_i331) + { + _key329 = iprot.readString(); + { + org.apache.thrift.protocol.TMap _map332 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.DOUBLE, iprot.readI32()); + _val330 = new HashMap(2*_map332.size); + String _key333; + double _val334; + for (int _i335 = 0; _i335 < _map332.size; ++_i335) + { + _key333 = iprot.readString(); + _val334 = iprot.readDouble(); + _val330.put(_key333, _val334); + } + } + struct.population.put(_key329, _val330); + } + } + struct.set_population_isSet(true); + } } } diff --git a/storm-core/src/jvm/org/apache/storm/metric/internal/MultiCountStatAndMetric.java b/storm-core/src/jvm/org/apache/storm/metric/internal/MultiCountStatAndMetric.java index 874387c2434..fd6be75e603 100644 --- a/storm-core/src/jvm/org/apache/storm/metric/internal/MultiCountStatAndMetric.java +++ b/storm-core/src/jvm/org/apache/storm/metric/internal/MultiCountStatAndMetric.java @@ -17,13 +17,13 @@ */ package org.apache.storm.metric.internal; -import java.util.Map; -import java.util.List; +import org.apache.storm.metric.api.IMetric; + import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.apache.storm.metric.api.IMetric; - /** * Acts as a MultiCount Metric, but keeps track of approximate counts * for the last 10 mins, 3 hours, 1 day, and all time. for the same keys diff --git a/storm-core/src/jvm/org/apache/storm/metric/internal/MultiLatencyStatAndMetric.java b/storm-core/src/jvm/org/apache/storm/metric/internal/MultiLatencyStatAndMetric.java index 1003b00cc23..9abd2450434 100644 --- a/storm-core/src/jvm/org/apache/storm/metric/internal/MultiLatencyStatAndMetric.java +++ b/storm-core/src/jvm/org/apache/storm/metric/internal/MultiLatencyStatAndMetric.java @@ -17,13 +17,13 @@ */ package org.apache.storm.metric.internal; -import java.util.Map; -import java.util.List; +import org.apache.storm.metric.api.IMetric; + import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.apache.storm.metric.api.IMetric; - /** * Acts as a Latnecy Metric for multiple keys, but keeps track of approximate counts * for the last 10 mins, 3 hours, 1 day, and all time. for the same keys @@ -39,6 +39,14 @@ public MultiLatencyStatAndMetric(int numBuckets) { _numBuckets = numBuckets; } + public synchronized void add(T key, LatencyStatAndMetric latencyStatAndMetric) { + LatencyStatAndMetric c = _lat.get(key); + if (c != null) { + throw new IllegalArgumentException(key + " is already registered as latencyStatAndMetric"); + } + _lat.put(key, latencyStatAndMetric); + } + LatencyStatAndMetric get(T key) { LatencyStatAndMetric c = _lat.get(key); if (c == null) { diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java index 2a6feadf673..37c459cf4b7 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@ -31,6 +31,7 @@ import com.lmax.disruptor.dsl.ProducerType; import org.apache.storm.metric.api.IStatefulObject; +import org.apache.storm.metric.internal.LatencyStatAndMetric; import org.apache.storm.metric.internal.RateTracker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -286,8 +287,12 @@ public void close() { * This inner class provides methods to access the metrics of the disruptor queue. */ public class QueueMetrics { + public final static int NUM_BUCKETS = 20; + private final RateTracker _rateTracker = new RateTracker(10000, 10); + private final LatencyStatAndMetric _avgQueuePopulationMetric = new LatencyStatAndMetric(NUM_BUCKETS); + public long writePos() { return _buffer.getCursor(); } @@ -312,6 +317,10 @@ public float pctFull() { return (1.0F * population() / capacity()); } + public LatencyStatAndMetric avgQueuePopulationMetric() { + return _avgQueuePopulationMetric; + } + public Object getState() { Map state = new HashMap(); @@ -341,8 +350,14 @@ public void notifyArrivals(long counts) { _rateTracker.notify(counts); } + public void recordPopulation() { + //TODO: has extra lock + _avgQueuePopulationMetric.record(_metrics.population() + _overflowCount.get()); + } + public void close() { _rateTracker.close(); + _avgQueuePopulationMetric.close(); } } @@ -474,6 +489,7 @@ private void publishDirectSingle(Object obj, boolean block) throws InsufficientC m.set(obj); _buffer.publish(at); _metrics.notifyArrivals(1); + _metrics.recordPopulation(); } private void publishDirect(ArrayList objs, boolean block) throws InsufficientCapacityException { @@ -494,6 +510,7 @@ private void publishDirect(ArrayList objs, boolean block) throws Insuffi } _buffer.publish(begin, end); _metrics.notifyArrivals(size); + _metrics.recordPopulation(); } } diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift index 5a7169b63af..a52d574f44d 100644 --- a/storm-core/src/storm.thrift +++ b/storm-core/src/storm.thrift @@ -224,6 +224,7 @@ struct ExecutorStats { 2: required map> transferred; 3: required ExecutorSpecificStats specific; 4: required double rate; + 5: optional map> population; } struct ExecutorInfo { @@ -271,6 +272,8 @@ struct CommonAggregateStats { 4: optional i64 transferred; 5: optional i64 acked; 6: optional i64 failed; +7: optional i32 in_backlog; +8: optional i32 out_backlog; } struct SpoutAggregateStats { diff --git a/storm-core/src/ui/public/templates/component-page-template.html b/storm-core/src/ui/public/templates/component-page-template.html index 472a769bfbf..d217be04f57 100644 --- a/storm-core/src/ui/public/templates/component-page-template.html +++ b/storm-core/src/ui/public/templates/component-page-template.html @@ -198,6 +198,16 @@

Executors ({{windowHint}})

Transferred + + + Input backlog + + + + + Output backlog + + Complete latency (ms) @@ -225,6 +235,8 @@

Executors ({{windowHint}})

{{emitted}} {{transferred}} + {{inBacklog}} + {{outBacklog}} {{completeLatency}} {{acked}} {{failed}} @@ -474,6 +486,16 @@

Executors ({{windowHint}})

Transferred
+ + + Input backlog + + + + + Output backlog + + Capacity (last 10m) @@ -516,6 +538,8 @@

Executors ({{windowHint}})

{{emitted}} {{transferred}} + {{inBacklog}} + {{outBacklog}} {{capacity}} {{executeLatency}} {{executed}}