diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index e759b1d1887..8ad8f7b6f96 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -489,9 +489,9 @@ ;; Send sampled data to the eventlogger if the global or component level ;; debug flag is set (via nimbus api). -(defn send-to-eventlogger [executor-data task-data values component-id message-id random] - (let [c->d @(:storm-component->debug-atom executor-data) - options (get c->d component-id (get c->d (:storm-id executor-data))) +(defn send-to-eventlogger [executor-data task-data values component-id message-id ^Random random storm-id debug-atom] + (let [c->d @debug-atom + options (get c->d component-id (get c->d storm-id)) spct (if (and (not-nil? options) (:enable options)) (:samplingpct options) 0)] ;; the thread's initialized random number generator is used to generate ;; uniformily distributed random numbers. @@ -552,6 +552,8 @@ event-handler (mk-task-receiver executor-data tuple-action-fn) has-ackers? (StormCommon/hasAckers storm-conf) has-eventloggers? (StormCommon/hasEventLoggers storm-conf) + storm-id (:storm-id executor-data) + debug-atom (:storm-component->debug-atom executor-data) emitted-count (MutableLong. 0) empty-emit-streak (MutableLong. 0) spout-transfer-fn (fn [] @@ -582,7 +584,7 @@ tuple-id)] (transfer-fn out-task out-tuple))) (if has-eventloggers? - (send-to-eventlogger executor-data task-data values component-id message-id rand)) + (send-to-eventlogger executor-data task-data values component-id message-id rand storm-id debug-atom)) (if (and rooted? (not (.isEmpty out-ids))) (do @@ -748,6 +750,8 @@ (.getSourceStreamId tuple) delta))))))) has-eventloggers? (StormCommon/hasEventLoggers storm-conf) + storm-id (:storm-id executor-data) + debug-atom (:storm-component->debug-atom executor-data) bolt-transfer-fn (fn [] ;; If topology was started in inactive state, don't call prepare bolt until it's activated first. (while (not @(:storm-active-atom executor-data)) @@ -778,7 +782,7 @@ (MessageId/makeId anchors-to-ids))] (transfer-fn t tuple)))) (if has-eventloggers? - (send-to-eventlogger executor-data task-data values component-id nil rand)) + (send-to-eventlogger executor-data task-data values component-id nil rand storm-id debug-atom)) (or out-tasks [])))]] (.registerAll (:builtin-metrics task-data) storm-conf user-context) (when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials))