Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions storm-core/src/clj/org/apache/storm/daemon/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@

(defn mk-executor [worker executor-id initial-credentials]
(let [executor-data (mk-executor-data worker executor-id)
storm-conf (:storm-conf executor-data)
_ (log-message "Loading executor " (:component-id executor-data) ":" (pr-str executor-id))
task-datas (->> executor-data
:task-ids
Expand All @@ -375,19 +376,21 @@
report-error-and-die (:report-error-and-die executor-data)
component-id (:component-id executor-data)

backpressure-enabled (storm-conf TOPOLOGY-BACKPRESSURE-ENABLE)
throttle-on (and backpressure-enabled @(:throttle-on (:worker executor-data)))

disruptor-handler (mk-disruptor-backpressure-handler executor-data)
_ (.registerBackpressureCallback (:receive-queue executor-data) disruptor-handler)
_ (-> (.setHighWaterMark (:receive-queue executor-data) ((:storm-conf executor-data) BACKPRESSURE-DISRUPTOR-HIGH-WATERMARK))
(.setLowWaterMark ((:storm-conf executor-data) BACKPRESSURE-DISRUPTOR-LOW-WATERMARK))
(.setEnableBackpressure ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)))
_ (-> (.setHighWaterMark (:receive-queue executor-data) (storm-conf BACKPRESSURE-DISRUPTOR-HIGH-WATERMARK))
(.setLowWaterMark (storm-conf BACKPRESSURE-DISRUPTOR-LOW-WATERMARK))
(.setEnableBackpressure backpressure-enabled))

;; starting the batch-transfer->worker ensures that anything publishing to that queue
;; doesn't block (because it's a single threaded queue and the caching/consumer started
;; trick isn't thread-safe)
system-threads [(start-batch-transfer->worker-handler! worker executor-data)]
handlers (with-error-reaction report-error-and-die
(mk-threads executor-data task-datas initial-credentials))
(mk-threads executor-data task-datas initial-credentials throttle-on))
threads (concat handlers system-threads)]
(setup-ticks! worker executor-data)

Expand Down Expand Up @@ -438,16 +441,15 @@
(when time-delta
(stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))

(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id]
(let [storm-conf (:storm-conf executor-data)
^ISpout spout (:object task-data)
(defn- ack-spout-msg [storm-conf stats task-data msg-id tuple-info time-delta id]
(let [^ISpout spout (:object task-data)
task-id (:task-id task-data)]
(when (= true (storm-conf TOPOLOGY-DEBUG))
(log-message "SPOUT Acking message " id " " msg-id))
(.ack spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
(when time-delta
(stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
(stats/spout-acked-tuple! stats (:stream tuple-info) time-delta))))

(defn mk-task-receiver [executor-data tuple-action-fn]
(let [task-ids (:task-ids executor-data)
Expand Down Expand Up @@ -491,7 +493,7 @@
EVENTLOGGER-STREAM-ID
[component-id message-id (System/currentTimeMillis) values]))))

(defmethod mk-threads :spout [executor-data task-datas initial-credentials]
(defmethod mk-threads :spout [executor-data task-datas initial-credentials throttle-on]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is logically wrong. Throttle-on flag is a boolean that you have to obtain dynamically from the executor-data in run time, not that you can pass in as a constant when making the spout thread at the begining.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this change will effectively disable backpressure.

@roshannaik
We still need to look up that value in spout async loop.
One sketching idea for workaround is that we may not need to look up that value for each iteration, which is based on precondition that it may also not hurt when spout notices about throttle slightly later.

Btw, I guess when nextTuple() takes some amount of time (say, about to 1 ms, not speed of light but still not slow), looking up that value may be not hurt overall performance. When we don't have data for emit, we sleep 1ms in loop for default configuration.

topology.spout.wait.strategy: "org.apache.storm.spout.SleepSpoutWaitStrategy"
topology.sleep.spout.wait.strategy.time.ms: 1

(let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler open-or-prepare-was-called?]} executor-data
^ISpoutWaitStrategy spout-wait-strategy (init-spout-wait-strategy storm-conf)
max-spout-pending (executor-max-spout-pending storm-conf (count task-datas))
Expand All @@ -501,6 +503,7 @@
rand (Random. (Utils/secureRandomLong))
^DisruptorQueue transfer-queue (executor-data :batch-transfer-queue)
debug? (= true (storm-conf TOPOLOGY-DEBUG))
stats (executor-data :stats)

pending (RotatingMap.
2 ;; microoptimize for performance of .size method
Expand Down Expand Up @@ -531,7 +534,7 @@
(throw-runtime "Fatal error, mismatched task ids: " task-id " " stored-task-id))
(let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
(condp = stream-id
ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
ACKER-ACK-STREAM-ID (ack-spout-msg storm-conf stats (get task-datas task-id)
spout-id tuple-finished-info time-delta id)
ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
spout-id tuple-finished-info time-delta "FAIL-STREAM" id)
Expand Down Expand Up @@ -588,7 +591,7 @@
ACKER-INIT-STREAM-ID
[root-id (Utils/bitXorVals out-ids) task-id]))
(when message-id
(ack-spout-msg executor-data task-data message-id
(ack-spout-msg storm-conf stats task-data message-id
{:stream out-stream-id :values values}
(if (sampler) 0) "0:")))
(or out-tasks [])
Expand Down Expand Up @@ -627,9 +630,9 @@

(let [active? @(:storm-active-atom executor-data)
curr-count (.get emitted-count)
backpressure-enabled ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)
throttle-on (and backpressure-enabled
@(:throttle-on (:worker executor-data)))
; backpressure-enabled ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)
; throttle-on (and backpressure-enabled
; @(:throttle-on (:worker executor-data)))
reached-max-spout-pending (and max-spout-pending
(>= (.size pending) max-spout-pending))
]
Expand Down Expand Up @@ -684,7 +687,7 @@
(let [curr (or (.get pending key) (long 0))]
(.put pending key (bit-xor curr id))))

(defmethod mk-threads :bolt [executor-data task-datas initial-credentials]
(defmethod mk-threads :bolt [executor-data task-datas initial-credentials throttle-on]
(let [storm-conf (:storm-conf executor-data)
execute-sampler (mk-stats-sampler storm-conf)
executor-stats (:stats executor-data)
Expand Down