Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions storm-core/src/clj/org/apache/storm/daemon/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -489,9 +489,9 @@

;; Send sampled data to the eventlogger if the global or component level
;; debug flag is set (via nimbus api).
(defn send-to-eventlogger [executor-data task-data values component-id message-id random]
(let [c->d @(:storm-component->debug-atom executor-data)
options (get c->d component-id (get c->d (:storm-id executor-data)))
(defn send-to-eventlogger [executor-data task-data values component-id message-id ^Random random storm-id debug-atom]
(let [c->d @debug-atom
options (get c->d component-id (get c->d storm-id))
spct (if (and (not-nil? options) (:enable options)) (:samplingpct options) 0)]
;; the thread's initialized random number generator is used to generate
;; uniformily distributed random numbers.
Expand Down Expand Up @@ -552,6 +552,8 @@
event-handler (mk-task-receiver executor-data tuple-action-fn)
has-ackers? (StormCommon/hasAckers storm-conf)
has-eventloggers? (StormCommon/hasEventLoggers storm-conf)
storm-id (:storm-id executor-data)
debug-atom (:storm-component->debug-atom executor-data)
emitted-count (MutableLong. 0)
empty-emit-streak (MutableLong. 0)
spout-transfer-fn (fn []
Expand Down Expand Up @@ -582,7 +584,7 @@
tuple-id)]
(transfer-fn out-task out-tuple)))
(if has-eventloggers?
(send-to-eventlogger executor-data task-data values component-id message-id rand))
(send-to-eventlogger executor-data task-data values component-id message-id rand storm-id debug-atom))
(if (and rooted?
(not (.isEmpty out-ids)))
(do
Expand Down Expand Up @@ -748,6 +750,8 @@
(.getSourceStreamId tuple)
delta)))))))
has-eventloggers? (StormCommon/hasEventLoggers storm-conf)
storm-id (:storm-id executor-data)
debug-atom (:storm-component->debug-atom executor-data)
bolt-transfer-fn (fn []
;; If topology was started in inactive state, don't call prepare bolt until it's activated first.
(while (not @(:storm-active-atom executor-data))
Expand Down Expand Up @@ -778,7 +782,7 @@
(MessageId/makeId anchors-to-ids))]
(transfer-fn t tuple))))
(if has-eventloggers?
(send-to-eventlogger executor-data task-data values component-id nil rand))
(send-to-eventlogger executor-data task-data values component-id nil rand storm-id debug-atom))
(or out-tasks [])))]]
(.registerAll (:builtin-metrics task-data) storm-conf user-context)
(when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials))
Expand Down