diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index e2380b74ce5..03db8552acf 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -28,7 +28,7 @@ (:import [org.apache.storm.grouping CustomStreamGrouping]) (:import [org.apache.storm.task WorkerTopologyContext IBolt OutputCollector IOutputCollector]) (:import [org.apache.storm.generated GlobalStreamId]) - (:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread]) + (:import [org.apache.storm.utils Utils ConfigUtils TupleUtils MutableObject RotatingMap RotatingMap$ExpiredCallback MutableLong Time DisruptorQueue WorkerBackpressureThread DisruptorBackpressureCallback]) (:import [com.lmax.disruptor InsufficientCapacityException]) (:import [org.apache.storm.serialization KryoTupleSerializer]) (:import [org.apache.storm.daemon Shutdownable]) @@ -38,9 +38,10 @@ (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping]) (:import [java.lang Thread Thread$UncaughtExceptionHandler] [java.util.concurrent ConcurrentLinkedQueue] - [org.json.simple JSONValue]) + [org.json.simple JSONValue] + [com.lmax.disruptor.dsl ProducerType]) (:require [org.apache.storm [thrift :as thrift] - [cluster :as cluster] [disruptor :as disruptor] [stats :as stats]]) + [cluster :as cluster] [stats :as stats]]) (:require [org.apache.storm.daemon [task :as task]]) (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics]) (:require [clojure.set :as set])) @@ -223,7 +224,7 @@ (let [val (AddressedTuple. task tuple)] (when (= true (storm-conf TOPOLOGY-DEBUG)) (log-message "TRANSFERING tuple " val)) - (disruptor/publish batch-transfer->worker val)))) + (.publish ^DisruptorQueue batch-transfer->worker val)))) (defn mk-executor-data [worker executor-id] (let [worker-context (worker-context worker) @@ -231,13 +232,13 @@ component-id (.getComponentId worker-context (first task-ids)) storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id) executor-type (executor-type worker-context component-id) - batch-transfer->worker (disruptor/disruptor-queue + batch-transfer->worker (DisruptorQueue. (str "executor" executor-id "-send-queue") + ProducerType/SINGLE (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE) (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) - :producer-type :single-threaded - :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) - :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS)) + (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) + (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS)) ] (recursive-map :worker worker @@ -286,14 +287,14 @@ (defn- mk-disruptor-backpressure-handler [executor-data] "make a handler for the executor's receive disruptor queue to check highWaterMark and lowWaterMark for backpressure" - (disruptor/disruptor-backpressure-handler - (fn [] + (reify DisruptorBackpressureCallback + (highWaterMark [this] "When receive queue is above highWaterMark" (if (not @(:backpressure executor-data)) (do (reset! (:backpressure executor-data) true) (log-debug "executor " (:executor-id executor-data) " is congested, set backpressure flag true") (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data)))))) - (fn [] + (lowWaterMark [this] "When receive queue is below lowWaterMark" (if @(:backpressure executor-data) (do (reset! (:backpressure executor-data) false) @@ -305,16 +306,19 @@ cached-emit (MutableObject. (ArrayList.)) storm-conf (:storm-conf executor-data) serializer (KryoTupleSerializer. storm-conf (:worker-context executor-data)) + ^DisruptorQueue batch-transfer-queue (:batch-transfer-queue executor-data) + handler (reify com.lmax.disruptor.EventHandler + (onEvent [this o seq-id batch-end?] + (let [^ArrayList alist (.getObject cached-emit)] + (.add alist o) + (when batch-end? + (worker-transfer-fn serializer alist) + (.setObject cached-emit (ArrayList.)))))) ] - (disruptor/consume-loop* - (:batch-transfer-queue executor-data) - (disruptor/handler [o seq-id batch-end?] - (let [^ArrayList alist (.getObject cached-emit)] - (.add alist o) - (when batch-end? - (worker-transfer-fn serializer alist) - (.setObject cached-emit (ArrayList.))))) - :uncaught-exception-handler (:report-error-and-die executor-data)))) + (Utils/asyncLoop + (fn [] (.consumeBatchWhenAvailable batch-transfer-queue handler) 0) + (.getName batch-transfer-queue) + (:uncaught-exception-handler (:report-error-and-die executor-data))))) (defn setup-metrics! [executor-data] (let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data @@ -326,7 +330,7 @@ interval (fn [] (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID))]] - (disruptor/publish receive-queue val))))))) + (.publish ^DisruptorQueue receive-queue val))))))) (defn metrics-tick [executor-data task-data ^TupleImpl tuple] @@ -367,7 +371,7 @@ tick-time-secs (fn [] (let [val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [tick-time-secs] Constants/SYSTEM_TASK_ID Constants/SYSTEM_TICK_STREAM_ID))]] - (disruptor/publish receive-queue val)))))))) + (.publish ^DisruptorQueue receive-queue val)))))))) (defn mk-executor [worker executor-id initial-credentials] (let [executor-data (mk-executor-data worker executor-id) @@ -410,15 +414,15 @@ (let [receive-queue (:receive-queue executor-data) context (:worker-context executor-data) val [(AddressedTuple. AddressedTuple/BROADCAST_DEST (TupleImpl. context [creds] Constants/SYSTEM_TASK_ID Constants/CREDENTIALS_CHANGED_STREAM_ID))]] - (disruptor/publish receive-queue val))) + (.publish ^DisruptorQueue receive-queue val))) (get-backpressure-flag [this] @(:backpressure executor-data)) Shutdownable (shutdown [this] (log-message "Shutting down executor " component-id ":" (pr-str executor-id)) - (disruptor/halt-with-interrupt! (:receive-queue executor-data)) - (disruptor/halt-with-interrupt! (:batch-transfer-queue executor-data)) + (.haltWithInterrupt ^DisruptorQueue (:receive-queue executor-data)) + (.haltWithInterrupt ^DisruptorQueue (:batch-transfer-queue executor-data)) (doseq [t threads] (.interrupt t) (.join t)) @@ -460,8 +464,8 @@ (let [task-ids (:task-ids executor-data) debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG))) ] - (disruptor/clojure-handler - (fn [tuple-batch sequence-id end-of-batch?] + (reify com.lmax.disruptor.EventHandler + (onEvent [this tuple-batch sequence-id end-of-batch?] (fast-list-iter [^AddressedTuple addressed-tuple tuple-batch] (let [^TupleImpl tuple (.getTuple addressed-tuple) task-id (.getDest addressed-tuple)] @@ -623,7 +627,7 @@ (fn [] ;; This design requires that spouts be non-blocking - (disruptor/consume-batch receive-queue event-handler) + (.consumeBatch ^DisruptorQueue receive-queue event-handler) (let [active? @(:storm-active-atom executor-data) curr-count (.get emitted-count) @@ -841,7 +845,7 @@ (let [receive-queue (:receive-queue executor-data) event-handler (mk-task-receiver executor-data tuple-action-fn)] (fn [] - (disruptor/consume-batch-when-available receive-queue event-handler) + (.consumeBatchWhenAvailable ^DisruptorQueue receive-queue event-handler) 0)))] ;; TODO: can get any SubscribedState objects out of the context now diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj index b2bdcdb7ba0..1f530aced65 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@ -19,16 +19,16 @@ (:require [clj-time.core :as time]) (:require [clj-time.coerce :as coerce]) (:require [org.apache.storm.daemon [executor :as executor]]) - (:require [org.apache.storm [disruptor :as disruptor] [cluster :as cluster]]) + (:require [org.apache.storm [cluster :as cluster]]) (:require [clojure.set :as set]) (:require [org.apache.storm.messaging.loader :as msg-loader]) (:import [java.util.concurrent Executors] [org.apache.storm.hooks IWorkerHook BaseWorkerHook] [uk.org.lidalia.sysoutslf4j.context SysOutOverSLF4J]) + (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue Time WorkerBackpressureCallback DisruptorBackpressureCallback]) (:import [java.util ArrayList HashMap] [java.util.concurrent.locks ReentrantReadWriteLock]) (:import [org.apache.commons.io FileUtils]) - (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue Time]) (:import [org.apache.storm.grouping LoadMapping]) (:import [org.apache.storm.messaging TransportFactory]) (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status]) @@ -125,7 +125,7 @@ (fast-map-iter [[short-executor pairs] grouped] (let [q (short-executor-receive-queue-map short-executor)] (if q - (disruptor/publish q pairs) + (.publish ^DisruptorQueue q pairs) (log-warn "Received invalid messages for unknown tasks. Dropping... ") ))))))) @@ -136,8 +136,8 @@ (defn- mk-backpressure-handler [executors] "make a handler that checks and updates worker's backpressure flag" - (disruptor/worker-backpressure-handler - (fn [worker] + (reify WorkerBackpressureCallback + (onEvent [this worker] (let [storm-id (:storm-id worker) assignment-id (:assignment-id worker) port (:port worker) @@ -156,11 +156,11 @@ (defn- mk-disruptor-backpressure-handler [worker] "make a handler for the worker's send disruptor queue to check highWaterMark and lowWaterMark for backpressure" - (disruptor/disruptor-backpressure-handler - (fn [] + (reify DisruptorBackpressureCallback + (highWaterMark [this] (reset! (:transfer-backpressure worker) true) (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker))) - (fn [] + (lowWaterMark [this] (reset! (:transfer-backpressure worker) false) (WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger worker))))) @@ -192,7 +192,7 @@ ))))) (when (not (.isEmpty local)) (local-transfer local)) - (when (not (.isEmpty remoteMap)) (disruptor/publish transfer-queue remoteMap))))] + (when (not (.isEmpty remoteMap)) (.publish ^DisruptorQueue transfer-queue remoteMap))))] (if try-serialize-local (do (log-warn "WILL TRY TO SERIALIZE ALL TUPLES (Turn off " TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE " for production)") @@ -204,11 +204,11 @@ (defn- mk-receive-queue-map [storm-conf executors] (->> executors ;; TODO: this depends on the type of executor - (map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e) - (storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE) - (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) - :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) - :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))])) + (map (fn [e] [e (DisruptorQueue. (str "receive-queue" e) + (storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE) + (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) + (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) + (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))])) (into {}) )) @@ -248,10 +248,11 @@ (defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state] (let [assignment-versions (atom {}) executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions)) - transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) + transfer-queue (DisruptorQueue. "worker-transfer-queue" + (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) - :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) - :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS)) + (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) + (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS)) executor-receive-queue-map (mk-receive-queue-map storm-conf executors) receive-queue-map (->> executor-receive-queue-map @@ -438,21 +439,20 @@ ;; TODO: consider having a max batch size besides what disruptor does automagically to prevent latency issues (defn mk-transfer-tuples-handler [worker] - (let [^DisruptorQueue transfer-queue (:transfer-queue worker) + (let [^DisruptorQueue transfer-queue (:transfer-queue worker) drainer (TransferDrainer.) node+port->socket (:cached-node+port->socket worker) task->node+port (:cached-task->node+port worker) endpoint-socket-lock (:endpoint-socket-lock worker) ] - (disruptor/clojure-handler - (fn [packets _ batch-end?] + (reify com.lmax.disruptor.EventHandler + (onEvent [this packets seqId batch-end?] (.add drainer packets) - (when batch-end? (read-locked endpoint-socket-lock - (let [node+port->socket @node+port->socket - task->node+port @task->node+port] - (.send drainer task->node+port node+port->socket))) + (let [node+port->socket @node+port->socket + task->node+port @task->node+port] + (.send drainer task->node+port node+port->socket))) (.clear drainer)))))) ;; Check whether this messaging connection is ready to send data @@ -658,7 +658,9 @@ transfer-tuples (mk-transfer-tuples-handler worker) - transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples) + transfer-thread (Utils/asyncLoop + (fn [] + (.consumeBatchWhenAvailable ^DisruptorQueue (:transfer-queue worker) transfer-tuples) 0)) disruptor-handler (mk-disruptor-backpressure-handler worker) _ (.registerBackpressureCallback (:transfer-queue worker) disruptor-handler) @@ -690,7 +692,7 @@ ;;in which case it's a noop (.term ^IContext (:mq-context worker)) (log-message "Shutting down transfer thread") - (disruptor/halt-with-interrupt! (:transfer-queue worker)) + (.haltWithInterrupt ^DisruptorQueue (:transfer-queue worker)) (.interrupt transfer-thread) (.join transfer-thread) diff --git a/storm-core/src/clj/org/apache/storm/disruptor.clj b/storm-core/src/clj/org/apache/storm/disruptor.clj deleted file mode 100644 index e2211c0a401..00000000000 --- a/storm-core/src/clj/org/apache/storm/disruptor.clj +++ /dev/null @@ -1,89 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns org.apache.storm.disruptor - (:import [org.apache.storm.utils DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback Utils]) - (:import [com.lmax.disruptor.dsl ProducerType]) - (:require [clojure [string :as str]]) - (:require [clojure [set :as set]]) - (:use [clojure walk]) - (:use [org.apache.storm util log])) - -(def PRODUCER-TYPE - {:multi-threaded ProducerType/MULTI - :single-threaded ProducerType/SINGLE}) - -(defnk disruptor-queue - [^String queue-name buffer-size timeout :producer-type :multi-threaded :batch-size 100 :batch-timeout 1] - (DisruptorQueue. queue-name - (PRODUCER-TYPE producer-type) buffer-size - timeout batch-size batch-timeout)) - -(defn clojure-handler - [afn] - (reify com.lmax.disruptor.EventHandler - (onEvent - [this o seq-id batchEnd?] - (afn o seq-id batchEnd?)))) - -(defn disruptor-backpressure-handler - [afn-high-wm afn-low-wm] - (reify DisruptorBackpressureCallback - (highWaterMark - [this] - (afn-high-wm)) - (lowWaterMark - [this] - (afn-low-wm)))) - -(defn worker-backpressure-handler - [afn] - (reify WorkerBackpressureCallback - (onEvent - [this o] - (afn o)))) - -(defmacro handler - [& args] - `(clojure-handler (fn ~@args))) - -(defn publish - [^DisruptorQueue q o] - (.publish q o)) - -(defn consume-batch - [^DisruptorQueue queue handler] - (.consumeBatch queue handler)) - -(defn consume-batch-when-available - [^DisruptorQueue queue handler] - (.consumeBatchWhenAvailable queue handler)) - -(defn halt-with-interrupt! - [^DisruptorQueue queue] - (.haltWithInterrupt queue)) - -(defnk consume-loop* - [^DisruptorQueue queue handler - :uncaught-exception-handler nil] - (Utils/asyncLoop - (fn [] (consume-batch-when-available queue handler) 0) - (.getName queue) - uncaught-exception-handler)) - -(defmacro consume-loop [queue & handler-args] - `(let [handler# (handler ~@handler-args)] - (consume-loop* ~queue handler#))) diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java index 19aba06fc3d..4482297430d 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@ -30,6 +30,11 @@ import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.ProducerType; +import org.apache.storm.metric.api.IStatefulObject; +import org.apache.storm.metric.internal.RateTracker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -46,12 +51,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.storm.metric.api.IStatefulObject; -import org.apache.storm.metric.internal.RateTracker; - /** * A single consumer queue that uses the LMAX Disruptor. They key to the performance is * the ability to catch up to the producer by processing tuples in batches. @@ -381,6 +380,10 @@ public DisruptorQueue(String queueName, ProducerType type, int size, long readTi _flusher.start(); } + public DisruptorQueue(String queueName, int size, long readTimeout, int inputBatchSize, long flushInterval) { + this(queueName, ProducerType.MULTI, size, readTimeout, inputBatchSize, flushInterval); + } + public String getName() { return _queueName; }