diff --git a/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj b/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj index 3104c528092..dcfa8d83257 100644 --- a/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj +++ b/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj @@ -15,8 +15,10 @@ ;; limitations under the License. (ns org.apache.storm.cluster-state.zookeeper-state-factory - (:import [org.apache.curator.framework.state ConnectionStateListener]) - (:import [org.apache.zookeeper KeeperException$NoNodeException] + (:import [org.apache.curator.framework.state ConnectionStateListener] + [org.apache.storm.zookeeper Zookeeper]) + (:import [org.apache.zookeeper KeeperException$NoNodeException CreateMode + Watcher$Event$EventType Watcher$Event$KeeperState] [org.apache.storm.cluster ClusterState DaemonType]) (:use [org.apache.storm cluster config log util]) (:require [org.apache.storm [zookeeper :as zk]]) @@ -25,7 +27,7 @@ (defn -mkState [this conf auth-conf acls context] (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf auth-conf)] - (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT) acls) + (Zookeeper/mkdirs zk (conf STORM-ZOOKEEPER-ROOT) acls) (.close zk)) (let [callbacks (atom {}) active (atom true) @@ -36,9 +38,9 @@ :root (conf STORM-ZOOKEEPER-ROOT) :watcher (fn [state type path] (when @active - (when-not (= :connected state) + (when-not (= Watcher$Event$KeeperState/SyncConnected state) (log-warn "Received event " state ":" type ":" path " with disconnected Writer Zookeeper.")) - (when-not (= :none type) + (when-not (= Watcher$Event$EventType/None type) (doseq [callback (vals @callbacks)] (callback type path)))))) is-nimbus? (= (.getDaemonType context) DaemonType/NIMBUS) @@ -50,9 +52,9 @@ :root (conf STORM-ZOOKEEPER-ROOT) :watcher (fn [state type path] (when @active - (when-not (= :connected state) + (when-not (= Watcher$Event$KeeperState/SyncConnected state) (log-warn "Received event " state ":" type ":" path " with disconnected Reader Zookeeper.")) - (when-not (= :none type) + (when-not (= Watcher$Event$EventType/None type) (doseq [callback (vals @callbacks)] (callback type path)))))) zk-writer)] @@ -71,27 +73,27 @@ (set-ephemeral-node [this path data acls] - (zk/mkdirs zk-writer (parent-path path) acls) - (if (zk/exists zk-writer path false) + (Zookeeper/mkdirs zk-writer (parent-path path) acls) + (if (Zookeeper/exists zk-writer path false) (try-cause - (zk/set-data zk-writer path data) ; should verify that it's ephemeral + (Zookeeper/setData zk-writer path data) ; should verify that it's ephemeral (catch KeeperException$NoNodeException e (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data") - (zk/create-node zk-writer path data :ephemeral acls))) - (zk/create-node zk-writer path data :ephemeral acls))) + (Zookeeper/createNode zk-writer path data CreateMode/EPHEMERAL acls))) + (Zookeeper/createNode zk-writer path data CreateMode/EPHEMERAL acls))) (create-sequential [this path data acls] - (zk/create-node zk-writer path data :sequential acls)) + (Zookeeper/createNode zk-writer path data CreateMode/PERSISTENT_SEQUENTIAL acls)) (set-data [this path data acls] ;; note: this does not turn off any existing watches - (if (zk/exists zk-writer path false) - (zk/set-data zk-writer path data) + (if (Zookeeper/exists zk-writer path false) + (Zookeeper/setData zk-writer path data) (do - (zk/mkdirs zk-writer (parent-path path) acls) - (zk/create-node zk-writer path data :persistent acls)))) + (Zookeeper/mkdirs zk-writer (parent-path path) acls) + (Zookeeper/createNode zk-writer path data CreateMode/PERSISTENT acls)))) (set-worker-hb [this path data acls] @@ -99,7 +101,7 @@ (delete-node [this path] - (zk/delete-node zk-writer path)) + (Zookeeper/deleteNode zk-writer path)) (delete-worker-hb [this path] @@ -107,15 +109,15 @@ (get-data [this path watch?] - (zk/get-data zk-reader path watch?)) + (Zookeeper/getData zk-reader path watch?)) (get-data-with-version [this path watch?] - (zk/get-data-with-version zk-reader path watch?)) + (Zookeeper/getDataWithVersion zk-reader path watch?)) (get-version [this path watch?] - (zk/get-version zk-reader path watch?)) + (Zookeeper/getVersion zk-reader path watch?)) (get-worker-hb [this path watch?] @@ -123,7 +125,7 @@ (get-children [this path watch?] - (zk/get-children zk-reader path watch?)) + (Zookeeper/getChildren zk-reader path watch?)) (get-worker-hb-children [this path watch?] @@ -131,11 +133,11 @@ (mkdirs [this path acls] - (zk/mkdirs zk-writer path acls)) + (Zookeeper/mkdirs zk-writer path acls)) (node-exists [this path watch?] - (zk/exists-node? zk-reader path watch?)) + (Zookeeper/existsNode zk-reader path watch?)) (add-listener [this listener] @@ -143,15 +145,15 @@ (stateChanged [this client newState] (.stateChanged listener client newState)))] - (zk/add-listener zk-reader curator-listener))) + (Zookeeper/addListener zk-reader curator-listener))) (sync-path [this path] - (zk/sync-path zk-writer path)) + (Zookeeper/syncPath zk-writer path)) (delete-node-blobstore [this path nimbus-host-port-info] - (zk/delete-node-blobstore zk-writer path nimbus-host-port-info)) + (Zookeeper/deleteNodeBlobstore zk-writer path nimbus-host-port-info)) (close [this] diff --git a/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj b/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj index b781e1cde1c..ef9ecbbf375 100644 --- a/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj +++ b/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj @@ -16,6 +16,7 @@ (ns org.apache.storm.command.dev-zookeeper (:use [org.apache.storm zookeeper util config]) (:import [org.apache.storm.utils ConfigUtils]) + (:import [org.apache.storm.zookeeper Zookeeper]) (:gen-class)) (defn -main [& args] @@ -23,5 +24,5 @@ port (conf STORM-ZOOKEEPER-PORT) localpath (conf DEV-ZOOKEEPER-PATH)] (rmr localpath) - (mk-inprocess-zookeeper localpath :port port) + (Zookeeper/mkInprocessZookeeper localpath port) )) diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj index 54048490ef1..8a5eb213d3d 100644 --- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj +++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj @@ -14,7 +14,8 @@ ;; See the License for the specific language governing permissions and ;; limitations under the License. (ns org.apache.storm.command.shell-submission - (:import [org.apache.storm StormSubmitter]) + (:import [org.apache.storm StormSubmitter] + [org.apache.storm.zookeeper Zookeeper]) (:use [org.apache.storm thrift util config log zookeeper]) (:require [clojure.string :as str]) (:import [org.apache.storm.utils ConfigUtils]) @@ -23,7 +24,7 @@ (defn -main [^String tmpjarpath & args] (let [conf (clojurify-structure (ConfigUtils/readStormConfig)) - zk-leader-elector (zk-leader-elector conf) + zk-leader-elector (Zookeeper/zkLeaderElector conf) leader-nimbus (.getLeader zk-leader-elector) host (.getHost leader-nimbus) port (.getPort leader-nimbus) diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index d871f8134e8..de5a14ea501 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -55,6 +55,7 @@ [stats :as stats]]) (:require [clojure.set :as set]) (:import [org.apache.storm.daemon.common StormBase Assignment]) + (:import [org.apache.storm.zookeeper Zookeeper]) (:use [org.apache.storm.daemon common]) (:use [org.apache.storm config]) (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms]) @@ -195,7 +196,7 @@ (exit-process! 20 "Error when processing an event") )) :scheduler (mk-scheduler conf inimbus) - :leader-elector (zk-leader-elector conf) + :leader-elector (Zookeeper/zkLeaderElector conf) :id->sched-status (atom {}) :node-id->resources (atom {}) ;;resources of supervisors :id->resources (atom {}) ;;resources of topologies diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj index 7d76eeb18a6..cc786590e87 100644 --- a/storm-core/src/clj/org/apache/storm/testing.clj +++ b/storm-core/src/clj/org/apache/storm/testing.clj @@ -22,7 +22,8 @@ [worker :as worker] [executor :as executor]]) (:require [org.apache.storm [process-simulator :as psim]]) - (:import [org.apache.commons.io FileUtils]) + (:import [org.apache.commons.io FileUtils] + [org.apache.storm.zookeeper Zookeeper]) (:import [java.io File]) (:import [java.util HashMap ArrayList]) (:import [java.util.concurrent.atomic AtomicInteger]) @@ -134,7 +135,7 @@ (defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024 :nimbus-daemon false] (let [zk-tmp (local-temp-path) [zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS) - (zk/mk-inprocess-zookeeper zk-tmp)) + (Zookeeper/mkInprocessZookeeper zk-tmp nil)) daemon-conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) {TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true ZMQ-LINGER-MILLIS 0 @@ -203,7 +204,7 @@ (if (not-nil? (:zookeeper cluster-map)) (do (log-message "Shutting down in process zookeeper") - (zk/shutdown-inprocess-zookeeper (:zookeeper cluster-map)) + (Zookeeper/shutdownInprocessZookeeper (:zookeeper cluster-map)) (log-message "Done shutting down in process zookeeper"))) (doseq [t @(:tmp-dirs cluster-map)] (log-message "Deleting temporary path " t) @@ -288,11 +289,11 @@ (defmacro with-inprocess-zookeeper [port-sym & body] `(with-local-tmp [tmp#] - (let [[~port-sym zks#] (zk/mk-inprocess-zookeeper tmp#)] + (let [[~port-sym zks#] (Zookeeper/mkInprocessZookeeper tmp# nil)] (try ~@body (finally - (zk/shutdown-inprocess-zookeeper zks#)))))) + (Zookeeper/shutdownInprocessZookeeper zks#)))))) (defn submit-local-topology [nimbus storm-name conf topology] diff --git a/storm-core/src/clj/org/apache/storm/zookeeper.clj b/storm-core/src/clj/org/apache/storm/zookeeper.clj index 8a223cdc1f5..413ffd6571d 100644 --- a/storm-core/src/clj/org/apache/storm/zookeeper.clj +++ b/storm-core/src/clj/org/apache/storm/zookeeper.clj @@ -30,21 +30,10 @@ (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo]) (:import [java.io File]) (:import [java.util List Map]) + (:import [org.apache.storm.zookeeper Zookeeper ZkKeeperStates ZkEventTypes]) (:import [org.apache.storm.utils Utils ZookeeperAuthInfo]) (:use [org.apache.storm util log config])) -(def zk-keeper-states - {Watcher$Event$KeeperState/Disconnected :disconnected - Watcher$Event$KeeperState/SyncConnected :connected - Watcher$Event$KeeperState/AuthFailed :auth-failed - Watcher$Event$KeeperState/Expired :expired}) - -(def zk-event-types - {Watcher$Event$EventType/None :none - Watcher$Event$EventType/NodeCreated :node-created - Watcher$Event$EventType/NodeDeleted :node-deleted - Watcher$Event$EventType/NodeDataChanged :node-data-changed - Watcher$Event$EventType/NodeChildrenChanged :node-children-changed}) (defn- default-watcher [state type path] @@ -57,15 +46,15 @@ :auth-conf nil] (let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))] (.. fk - (getCuratorListenable) - (addListener - (reify CuratorListener - (^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e] - (when (= (.getType e) CuratorEventType/WATCHED) - (let [^WatchedEvent event (.getWatchedEvent e)] - (watcher (zk-keeper-states (.getState event)) - (zk-event-types (.getType event)) - (.getPath event)))))))) + (getCuratorListenable) + (addListener + (reify CuratorListener + (^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e] + (when (= (.getType e) CuratorEventType/WATCHED) + (let [^WatchedEvent event (.getWatchedEvent e)] + (watcher (.getState event) + (.getType event) + (.getPath event)))))))) ;; (.. fk ;; (getUnhandledErrorListenable) ;; (addListener @@ -84,225 +73,3 @@ (.start fk) fk)) -(def zk-create-modes - {:ephemeral CreateMode/EPHEMERAL - :persistent CreateMode/PERSISTENT - :sequential CreateMode/PERSISTENT_SEQUENTIAL}) - -(defn create-node - ([^CuratorFramework zk ^String path ^bytes data mode acls] - (let [mode (zk-create-modes mode)] - (try - (.. zk (create) (creatingParentsIfNeeded) (withMode mode) (withACL acls) (forPath (normalize-path path) data)) - (catch Exception e (throw (wrap-in-runtime e)))))) - ([^CuratorFramework zk ^String path ^bytes data acls] - (create-node zk path data :persistent acls))) - -(defn exists-node? - [^CuratorFramework zk ^String path watch?] - ((complement nil?) - (try - (if watch? - (.. zk (checkExists) (watched) (forPath (normalize-path path))) - (.. zk (checkExists) (forPath (normalize-path path)))) - (catch Exception e (throw (wrap-in-runtime e)))))) - -(defnk delete-node - [^CuratorFramework zk ^String path] - (let [path (normalize-path path)] - (when (exists-node? zk path false) - (try-cause (.. zk (delete) (deletingChildrenIfNeeded) (forPath (normalize-path path))) - (catch KeeperException$NoNodeException e - ;; do nothing - (log-message "exception" e) - ) - (catch Exception e (throw (wrap-in-runtime e))))))) - -(defn mkdirs - [^CuratorFramework zk ^String path acls] - (let [path (normalize-path path)] - (when-not (or (= path "/") (exists-node? zk path false)) - (mkdirs zk (parent-path path) acls) - (try-cause - (create-node zk path (barr 7) :persistent acls) - (catch KeeperException$NodeExistsException e - ;; this can happen when multiple clients doing mkdir at same time - )) - ))) - -(defn sync-path - [^CuratorFramework zk ^String path] - (try - (.. zk (sync) (forPath (normalize-path path))) - (catch Exception e (throw (wrap-in-runtime e))))) - - -(defn add-listener [^CuratorFramework zk ^ConnectionStateListener listener] - (.. zk (getConnectionStateListenable) (addListener listener))) - -(defn get-data - [^CuratorFramework zk ^String path watch?] - (let [path (normalize-path path)] - (try-cause - (if (exists-node? zk path watch?) - (if watch? - (.. zk (getData) (watched) (forPath path)) - (.. zk (getData) (forPath path)))) - (catch KeeperException$NoNodeException e - ;; this is fine b/c we still have a watch from the successful exists call - nil ) - (catch Exception e (throw (wrap-in-runtime e)))))) - -(defn get-data-with-version - [^CuratorFramework zk ^String path watch?] - (let [stats (org.apache.zookeeper.data.Stat. ) - path (normalize-path path)] - (try-cause - (if-let [data - (if (exists-node? zk path watch?) - (if watch? - (.. zk (getData) (watched) (storingStatIn stats) (forPath path)) - (.. zk (getData) (storingStatIn stats) (forPath path))))] - {:data data - :version (.getVersion stats)}) - (catch KeeperException$NoNodeException e - ;; this is fine b/c we still have a watch from the successful exists call - nil )))) - -(defn get-version -[^CuratorFramework zk ^String path watch?] - (if-let [stats - (if watch? - (.. zk (checkExists) (watched) (forPath (normalize-path path))) - (.. zk (checkExists) (forPath (normalize-path path))))] - (.getVersion stats) - nil)) - -(defn get-children - [^CuratorFramework zk ^String path watch?] - (try - (if watch? - (.. zk (getChildren) (watched) (forPath (normalize-path path))) - (.. zk (getChildren) (forPath (normalize-path path)))) - (catch Exception e (throw (wrap-in-runtime e))))) - -(defn delete-node-blobstore - "Deletes the state inside the zookeeper for a key, for which the - contents of the key starts with nimbus host port information" - [^CuratorFramework zk ^String parent-path ^String host-port-info] - (let [parent-path (normalize-path parent-path) - child-path-list (if (exists-node? zk parent-path false) - (into [] (get-children zk parent-path false)) - [])] - (doseq [child child-path-list] - (when (.startsWith child host-port-info) - (log-debug "delete-node " "child" child) - (delete-node zk (str parent-path "/" child)))))) - -(defn set-data - [^CuratorFramework zk ^String path ^bytes data] - (try - (.. zk (setData) (forPath (normalize-path path) data)) - (catch Exception e (throw (wrap-in-runtime e))))) - -(defn exists - [^CuratorFramework zk ^String path watch?] - (exists-node? zk path watch?)) - -(defnk mk-inprocess-zookeeper - [localdir :port nil] - (let [localfile (File. localdir) - zk (ZooKeeperServer. localfile localfile 2000) - [retport factory] - (loop [retport (if port port 2000)] - (if-let [factory-tmp - (try-cause - (doto (NIOServerCnxnFactory.) - (.configure (InetSocketAddress. retport) 0)) - (catch BindException e - (when (> (inc retport) (if port port 65535)) - (throw (RuntimeException. - "No port is available to launch an inprocess zookeeper.")))))] - [retport factory-tmp] - (recur (inc retport))))] - (log-message "Starting inprocess zookeeper at port " retport " and dir " localdir) - (.startup factory zk) - [retport factory])) - -(defn shutdown-inprocess-zookeeper - [handle] - (.shutdown handle)) - -(defn- to-NimbusInfo [^Participant participant] - (let - [id (if (clojure.string/blank? (.getId participant)) - (throw (RuntimeException. "No nimbus leader participant host found, have you started your nimbus hosts?")) - (.getId participant)) - nimbus-info (NimbusInfo/parse id)] - (.setLeader nimbus-info (.isLeader participant)) - nimbus-info)) - -(defn leader-latch-listener-impl - "Leader latch listener that will be invoked when we either gain or lose leadership" - [conf zk leader-latch] - (let [hostname (.getCanonicalHostName (InetAddress/getLocalHost))] - (reify LeaderLatchListener - (^void isLeader[this] - (log-message (str hostname " gained leadership"))) - (^void notLeader[this] - (log-message (str hostname " lost leadership.")))))) - -(defn zk-leader-elector - "Zookeeper Implementation of ILeaderElector." - [conf] - (let [servers (conf STORM-ZOOKEEPER-SERVERS) - zk (mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf) - leader-lock-path (str (conf STORM-ZOOKEEPER-ROOT) "/leader-lock") - id (.toHostPortString (NimbusInfo/fromConf conf)) - leader-latch (atom (LeaderLatch. zk leader-lock-path id)) - leader-latch-listener (atom (leader-latch-listener-impl conf zk @leader-latch)) - ] - (reify ILeaderElector - (prepare [this conf] - (log-message "no-op for zookeeper implementation")) - - (^void addToLeaderLockQueue [this] - ;if this latch is already closed, we need to create new instance. - (if (.equals LeaderLatch$State/CLOSED (.getState @leader-latch)) - (do - (reset! leader-latch (LeaderLatch. zk leader-lock-path id)) - (reset! leader-latch-listener (leader-latch-listener-impl conf zk @leader-latch)) - (log-message "LeaderLatch was in closed state. Resetted the leaderLatch and listeners.") - )) - - ;Only if the latch is not already started we invoke start. - (if (.equals LeaderLatch$State/LATENT (.getState @leader-latch)) - (do - (.addListener @leader-latch @leader-latch-listener) - (.start @leader-latch) - (log-message "Queued up for leader lock.")) - (log-message "Node already in queue for leader lock."))) - - (^void removeFromLeaderLockQueue [this] - ;Only started latches can be closed. - (if (.equals LeaderLatch$State/STARTED (.getState @leader-latch)) - (do - (.close @leader-latch) - (log-message "Removed from leader lock queue.")) - (log-message "leader latch is not started so no removeFromLeaderLockQueue needed."))) - - (^boolean isLeader [this] - (.hasLeadership @leader-latch)) - - (^NimbusInfo getLeader [this] - (to-NimbusInfo (.getLeader @leader-latch))) - - (^List getAllNimbuses [this] - (let [participants (.getParticipants @leader-latch)] - (map (fn [^Participant participant] - (to-NimbusInfo participant)) - participants))) - - (^void close[this] - (log-message "closing zookeeper connection of leader elector.") - (.close zk))))) diff --git a/storm-core/src/jvm/org/apache/storm/callback/Callback.java b/storm-core/src/jvm/org/apache/storm/callback/Callback.java new file mode 100644 index 00000000000..29b97619817 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/callback/Callback.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.callback; + +public interface Callback { + public Object execute(T... args); +} diff --git a/storm-core/src/jvm/org/apache/storm/callback/DefaultWatcherCallBack.java b/storm-core/src/jvm/org/apache/storm/callback/DefaultWatcherCallBack.java new file mode 100644 index 00000000000..043dd0c3393 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/callback/DefaultWatcherCallBack.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.callback; + +import org.apache.storm.zookeeper.ZkEventTypes; +import org.apache.storm.zookeeper.ZkKeeperStates; +import org.apache.zookeeper.Watcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultWatcherCallBack implements WatcherCallBack { + + private static Logger LOG = LoggerFactory.getLogger(DefaultWatcherCallBack.class); + + @Override + public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) { + LOG.debug("Zookeeper state update: {}, {}, {}", ZkKeeperStates.getStateName(state), ZkEventTypes.getTypeName(type), path); + } + +} diff --git a/storm-core/src/jvm/org/apache/storm/callback/WatcherCallBack.java b/storm-core/src/jvm/org/apache/storm/callback/WatcherCallBack.java new file mode 100644 index 00000000000..41a50ec864c --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/callback/WatcherCallBack.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.callback; + +import org.apache.zookeeper.Watcher; + +public interface WatcherCallBack { + public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path); +} diff --git a/storm-core/src/jvm/org/apache/storm/nimbus/ILeaderElector.java b/storm-core/src/jvm/org/apache/storm/nimbus/ILeaderElector.java index 3c729ec90c4..e8789df6ae4 100644 --- a/storm-core/src/jvm/org/apache/storm/nimbus/ILeaderElector.java +++ b/storm-core/src/jvm/org/apache/storm/nimbus/ILeaderElector.java @@ -37,14 +37,14 @@ public interface ILeaderElector extends Closeable { * check isLeader() to perform any leadership action. This method can be called * multiple times so it needs to be idempotent. */ - void addToLeaderLockQueue(); + void addToLeaderLockQueue() throws Exception; /** * Removes the caller from the leader lock queue. If the caller is leader * also releases the lock. This method can be called multiple times so it needs * to be idempotent. */ - void removeFromLeaderLockQueue(); + void removeFromLeaderLockQueue() throws Exception; /** * @@ -62,7 +62,7 @@ public interface ILeaderElector extends Closeable { * * @return list of current nimbus addresses, includes leader. */ - List getAllNimbuses(); + List getAllNimbuses()throws Exception; /** * Method called to allow for cleanup. once close this object can not be reused. diff --git a/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedZookeeper.java b/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedZookeeper.java new file mode 100644 index 00000000000..ca54f6f0929 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/testing/staticmocking/MockedZookeeper.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.testing.staticmocking; + +import org.apache.storm.zookeeper.Zookeeper; + +public class MockedZookeeper implements AutoCloseable { + + public MockedZookeeper(Zookeeper inst) { + Zookeeper.setInstance(inst); + } + + @Override + public void close() throws Exception { + Zookeeper.resetInstance(); + } +} diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java index 9d0c7c610a7..380f4dd340c 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java @@ -17,6 +17,7 @@ */ package org.apache.storm.utils; +import org.apache.commons.io.FileUtils; import org.apache.storm.Config; import org.apache.storm.blobstore.BlobStore; import org.apache.storm.blobstore.BlobStoreAclHandler; @@ -48,6 +49,7 @@ import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; +import org.eclipse.jetty.util.log.Log; import org.json.simple.JSONValue; import org.json.simple.parser.ParseException; import org.slf4j.Logger; @@ -55,25 +57,7 @@ import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.constructor.SafeConstructor; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.FileReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.io.PrintStream; -import java.io.RandomAccessFile; -import java.io.Serializable; +import java.io.*; import java.net.URL; import java.net.URLDecoder; import java.nio.ByteBuffer; @@ -1041,9 +1025,9 @@ public String getBackupConnectionString() throws Exception { .connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT))) .sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT))) .retryPolicy(new StormBoundedExponentialBackoffRetry( - Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)), - Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)), - Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)))); + Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)), + Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING)), + Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)))); if (auth != null && auth.scheme != null && auth.payload != null) { builder.authorization(auth.scheme, auth.payload); @@ -1382,5 +1366,13 @@ public static TopologyInfo getTopologyInfo(String name, String asUser, Map storm public static int toPositive(int number) { return number & Integer.MAX_VALUE; } + + public static RuntimeException wrapInRuntime(Exception e){ + if (e instanceof RuntimeException){ + return (RuntimeException)e; + }else { + return new RuntimeException(e); + } + } } diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java b/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java new file mode 100644 index 00000000000..7923b7e9e25 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/zookeeper/LeaderElectorImp.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.curator.framework.recipes.leader.Participant; +import org.apache.storm.nimbus.ILeaderElector; +import org.apache.storm.nimbus.NimbusInfo; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +public class LeaderElectorImp implements ILeaderElector { + private static Logger LOG = LoggerFactory.getLogger(LeaderElectorImp.class); + private final Map conf; + private final List servers; + private final CuratorFramework zk; + private final String leaderlockPath; + private final String id; + private final AtomicReference leaderLatch; + private final AtomicReference leaderLatchListener; + + public LeaderElectorImp(Map conf, List servers, CuratorFramework zk, String leaderlockPath, String id, AtomicReference leaderLatch, + AtomicReference leaderLatchListener) { + this.conf = conf; + this.servers = servers; + this.zk = zk; + this.leaderlockPath = leaderlockPath; + this.id = id; + this.leaderLatch = leaderLatch; + this.leaderLatchListener = leaderLatchListener; + } + + @Override + public void prepare(Map conf) { + // no-op for zookeeper implementation + } + + @Override + public void addToLeaderLockQueue() throws Exception { + // if this latch is already closed, we need to create new instance. + if (LeaderLatch.State.CLOSED.equals(leaderLatch.get().getState())) { + leaderLatch.set(new LeaderLatch(zk, leaderlockPath)); + leaderLatchListener.set(Zookeeper.leaderLatchListenerImpl(conf, zk, leaderLatch.get())); + LOG.info("LeaderLatch was in closed state. Resetted the leaderLatch and listeners."); + } + // Only if the latch is not already started we invoke start + if (LeaderLatch.State.LATENT.equals(leaderLatch.get().getState())) { + leaderLatch.get().addListener(leaderLatchListener.get()); + leaderLatch.get().start(); + LOG.info("Queued up for leader lock."); + } else { + LOG.info("Node already in queue for leader lock."); + } + } + + @Override + // Only started latches can be closed. + public void removeFromLeaderLockQueue() throws Exception { + if (LeaderLatch.State.STARTED.equals(leaderLatch.get().getState())) { + leaderLatch.get().close(); + LOG.info("Removed from leader lock queue."); + } else { + LOG.info("leader latch is not started so no removeFromLeaderLockQueue needed."); + } + } + + @Override + public boolean isLeader() throws Exception { + return leaderLatch.get().hasLeadership(); + } + + @Override + public NimbusInfo getLeader() { + try { + return Zookeeper.toNimbusInfo(leaderLatch.get().getLeader()); + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + } + + @Override + public List getAllNimbuses() throws Exception { + List nimbusInfos = new ArrayList<>(); + Collection participants = leaderLatch.get().getParticipants(); + for (Participant participant : participants) { + nimbusInfos.add(Zookeeper.toNimbusInfo(participant)); + } + return nimbusInfos; + } + + @Override + public void close() { + LOG.info("closing zookeeper connection of leader elector."); + zk.close(); + } +} diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/ZkEventTypes.java b/storm-core/src/jvm/org/apache/storm/zookeeper/ZkEventTypes.java new file mode 100644 index 00000000000..52475583a94 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/zookeeper/ZkEventTypes.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.zookeeper; + +import org.apache.zookeeper.Watcher; + +import java.util.HashMap; + +public class ZkEventTypes { + + private static HashMap map; + + static { + map = new HashMap(); + + map.put(Watcher.Event.EventType.None, ":none"); + map.put(Watcher.Event.EventType.NodeCreated, ":node-created"); + map.put(Watcher.Event.EventType.NodeDeleted, ":node-deleted"); + map.put(Watcher.Event.EventType.NodeDataChanged, ":node-data-changed"); + map.put(Watcher.Event.EventType.NodeChildrenChanged, ":node-children-changed"); + + } + + public static String getTypeName(Watcher.Event.EventType type) { + return map.get(type); + } + +} diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/ZkKeeperStates.java b/storm-core/src/jvm/org/apache/storm/zookeeper/ZkKeeperStates.java new file mode 100644 index 00000000000..66dc23157e9 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/zookeeper/ZkKeeperStates.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.zookeeper; + +import org.apache.zookeeper.Watcher; + +import java.util.HashMap; + +public class ZkKeeperStates { + + private static HashMap map; + + static { + map = new HashMap(); + + map.put(Watcher.Event.KeeperState.AuthFailed, ":auth-failed"); + map.put(Watcher.Event.KeeperState.SyncConnected, ":connected"); + map.put(Watcher.Event.KeeperState.Disconnected, ":disconnected"); + map.put(Watcher.Event.KeeperState.Expired, ":expired"); + } + + public static String getStateName(Watcher.Event.KeeperState state) { + return map.get(state); + } + +} diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java new file mode 100644 index 00000000000..f1c7f323706 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java @@ -0,0 +1,430 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.zookeeper; + +import clojure.lang.APersistentMap; +import clojure.lang.PersistentArrayMap; +import clojure.lang.RT; +import org.apache.commons.lang.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.CuratorListener; +import org.apache.curator.framework.api.UnhandledErrorListener; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.curator.framework.recipes.leader.Participant; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.storm.Config; +import org.apache.storm.callback.DefaultWatcherCallBack; +import org.apache.storm.callback.WatcherCallBack; +import org.apache.storm.nimbus.ILeaderElector; +import org.apache.storm.nimbus.NimbusInfo; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.ZookeeperAuthInfo; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +public class Zookeeper { + private static Logger LOG = LoggerFactory.getLogger(Zookeeper.class); + + // A singleton instance allows us to mock delegated static methods in our + // tests by subclassing. + private static final Zookeeper INSTANCE = new Zookeeper(); + private static Zookeeper _instance = INSTANCE; + + /** + * Provide an instance of this class for delegates to use. To mock out + * delegated methods, provide an instance of a subclass that overrides the + * implementation of the delegated method. + * + * @param u a Zookeeper instance + */ + public static void setInstance(Zookeeper u) { + _instance = u; + } + + /** + * Resets the singleton instance to the default. This is helpful to reset + * the class to its original functionality when mocking is no longer + * desired. + */ + public static void resetInstance() { + _instance = INSTANCE; + } + + public static CuratorFramework mkClient(Map conf, List servers, Object port, String root) { + return mkClient(conf, servers, port, root, new DefaultWatcherCallBack()); + } + + public static CuratorFramework mkClient(Map conf, List servers, Object port, Map authConf) { + return mkClient(conf, servers, port, "", new DefaultWatcherCallBack(), authConf); + } + + public static CuratorFramework mkClient(Map conf, List servers, Object port, String root, Map authConf) { + return mkClient(conf, servers, port, root, new DefaultWatcherCallBack(), authConf); + } + + public static CuratorFramework mkClient(Map conf, List servers, Object port, String root, final WatcherCallBack watcher, Map authConf) { + CuratorFramework fk; + if (authConf != null) { + fk = Utils.newCurator(conf, servers, port, root, new ZookeeperAuthInfo(authConf)); + } else { + fk = Utils.newCurator(conf, servers, port, root); + } + + fk.getCuratorListenable().addListener(new CuratorListener() { + @Override + public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception { + if (e.getType().equals(CuratorEventType.WATCHED)) { + WatchedEvent event = e.getWatchedEvent(); + watcher.execute(event.getState(), event.getType(), event.getPath()); + } + } + }); + fk.start(); + return fk; + } + + /** + * connect ZK, register Watch/unhandle Watch + * + * @return + */ + public static CuratorFramework mkClient(Map conf, List servers, Object port, String root, final WatcherCallBack watcher) { + return mkClient(conf, servers, port, root, watcher, null); + } + + public static String createNode(CuratorFramework zk, String path, byte[] data, org.apache.zookeeper.CreateMode mode, List acls) { + String ret = null; + try { + String npath = normalizePath(path); + ret = zk.create().creatingParentsIfNeeded().withMode(mode).withACL(acls).forPath(npath, data); + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + return ret; + } + + public static String createNode(CuratorFramework zk, String path, byte[] data, List acls){ + return createNode(zk, path, data, org.apache.zookeeper.CreateMode.PERSISTENT, acls); + } + + public static boolean existsNode(CuratorFramework zk, String path, boolean watch){ + Stat stat = null; + try { + if (watch) { + stat = zk.checkExists().watched().forPath(normalizePath(path)); + } else { + stat = zk.checkExists().forPath(normalizePath(path)); + } + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + return stat != null; + } + + public static void deleteNode(CuratorFramework zk, String path){ + try { + String npath = normalizePath(path); + if (existsNode(zk, npath, false)) { + zk.delete().deletingChildrenIfNeeded().forPath(normalizePath(path)); + } + } catch (Exception e) { + if (exceptionCause(KeeperException.NodeExistsException.class, e)) { + // do nothing + LOG.info("delete {} failed.", path, e); + } else { + throw Utils.wrapInRuntime(e); + } + } + } + + public static void mkdirs(CuratorFramework zk, String path, List acls) { + _instance.mkdirsImpl(zk, path, acls); + } + + public void mkdirsImpl(CuratorFramework zk, String path, List acls) { + String npath = normalizePath(path); + if (npath.equals("/")) { + return; + } + if (existsNode(zk, npath, false)) { + return; + } + byte[] byteArray = new byte[1]; + byteArray[0] = (byte) 7; + try { + createNode(zk, npath, byteArray, org.apache.zookeeper.CreateMode.PERSISTENT, acls); + } catch (Exception e) { + if (exceptionCause(KeeperException.NodeExistsException.class, e)) { + // this can happen when multiple clients doing mkdir at same time + } + } + } + + public static void syncPath(CuratorFramework zk, String path){ + try { + zk.sync().forPath(normalizePath(path)); + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + } + + public static void addListener(CuratorFramework zk, ConnectionStateListener listener) { + zk.getConnectionStateListenable().addListener(listener); + } + + public static byte[] getData(CuratorFramework zk, String path, boolean watch){ + try { + String npath = normalizePath(path); + if (existsNode(zk, npath, watch)) { + if (watch) { + return zk.getData().watched().forPath(npath); + } else { + return zk.getData().forPath(npath); + } + } + } catch (Exception e) { + if (exceptionCause(KeeperException.NoNodeException.class, e)) { + // this is fine b/c we still have a watch from the successful exists call + } else { + throw Utils.wrapInRuntime(e); + } + } + return null; + } + + public static Integer getVersion(CuratorFramework zk, String path, boolean watch) throws Exception { + String npath = normalizePath(path); + Stat stat = null; + if (existsNode(zk, npath, watch)) { + if (watch) { + stat = zk.checkExists().watched().forPath(npath); + } else { + stat = zk.checkExists().forPath(npath); + } + } + return stat == null ? null : Integer.valueOf(stat.getVersion()); + } + + public static List getChildren(CuratorFramework zk, String path, boolean watch) { + try { + String npath = normalizePath(path); + if (watch) { + return zk.getChildren().watched().forPath(npath); + } else { + return zk.getChildren().forPath(npath); + } + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + } + + // Deletes the state inside the zookeeper for a key, for which the + // contents of the key starts with nimbus host port information + public static void deleteNodeBlobstore(CuratorFramework zk, String parentPath, String hostPortInfo){ + String normalizedPatentPath = normalizePath(parentPath); + List childPathList = null; + if (existsNode(zk, normalizedPatentPath, false)) { + childPathList = getChildren(zk, normalizedPatentPath, false); + for (String child : childPathList) { + if (child.startsWith(hostPortInfo)) { + LOG.debug("deleteNode child {}", child); + deleteNode(zk, normalizedPatentPath + "/" + child); + } + } + } + } + + public static Stat setData(CuratorFramework zk, String path, byte[] data){ + try { + String npath = normalizePath(path); + return zk.setData().forPath(npath, data); + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + } + + public static boolean exists(CuratorFramework zk, String path, boolean watch){ + return existsNode(zk, path, watch); + } + + public static List mkInprocessZookeeper(String localdir, Integer port) throws Exception { + File localfile = new File(localdir); + ZooKeeperServer zk = new ZooKeeperServer(localfile, localfile, 2000); + NIOServerCnxnFactory factory = null; + int report = 2000; + int limitPort = 65535; + if (port != null) { + report = port; + limitPort = port; + } + while (true) { + try { + factory = new NIOServerCnxnFactory(); + factory.configure(new InetSocketAddress(report), 0); + break; + } catch (BindException e) { + report++; + if (report > limitPort) { + throw new RuntimeException("No port is available to launch an inprocess zookeeper"); + } + } + } + LOG.info("Starting inprocess zookeeper at port {} and dir {}", report, localdir); + factory.startup(zk); + return Arrays.asList((Object)new Long(report), (Object)factory); + } + + public static void shutdownInprocessZookeeper(NIOServerCnxnFactory handle) { + handle.shutdown(); + } + + public static NimbusInfo toNimbusInfo(Participant participant) { + String id = participant.getId(); + if (StringUtils.isBlank(id)) { + throw new RuntimeException("No nimbus leader participant host found, have you started your nimbus hosts?"); + } + NimbusInfo nimbusInfo = NimbusInfo.parse(id); + nimbusInfo.setLeader(participant.isLeader()); + return nimbusInfo; + } + + // Leader latch listener that will be invoked when we either gain or lose leadership + public static LeaderLatchListener leaderLatchListenerImpl(Map conf, CuratorFramework zk, LeaderLatch leaderLatch) throws UnknownHostException { + final String hostName = InetAddress.getLocalHost().getCanonicalHostName(); + return new LeaderLatchListener() { + @Override + public void isLeader() { + LOG.info("{} gained leadership", hostName); + } + + @Override + public void notLeader() { + LOG.info("{} lost leadership.", hostName); + } + }; + } + + public static ILeaderElector zkLeaderElector(Map conf) throws UnknownHostException { + return _instance.zkLeaderElectorImpl(conf); + } + + protected ILeaderElector zkLeaderElectorImpl(Map conf) throws UnknownHostException { + List servers = (List) conf.get(Config.STORM_ZOOKEEPER_SERVERS); + Object port = conf.get(Config.STORM_ZOOKEEPER_PORT); + CuratorFramework zk = mkClient(conf, servers, port, "", conf); + String leaderLockPath = conf.get(Config.STORM_ZOOKEEPER_ROOT) + "/leader-lock"; + String id = NimbusInfo.fromConf(conf).toHostPortString(); + AtomicReference leaderLatchAtomicReference = new AtomicReference<>(new LeaderLatch(zk, leaderLockPath, id)); + AtomicReference leaderLatchListenerAtomicReference = + new AtomicReference<>(leaderLatchListenerImpl(conf, zk, leaderLatchAtomicReference.get())); + return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference, leaderLatchListenerAtomicReference); + } + + // To update @return to be a Map + public static APersistentMap getDataWithVersion(CuratorFramework zk, String path, boolean watch) { + APersistentMap map = null; + try { + byte[] bytes = null; + Stat stats = new Stat(); + String npath = normalizePath(path); + if (existsNode(zk, npath, watch)) { + if (watch) { + bytes = zk.getData().storingStatIn(stats).watched().forPath(npath); + } else { + bytes = zk.getData().storingStatIn(stats).forPath(npath); + } + if (bytes != null) { + int version = stats.getVersion(); + map = new PersistentArrayMap(new Object[] { RT.keyword(null, "data"), bytes, RT.keyword(null, "version"), version }); + } + } + } catch (Exception e) { + if (exceptionCause(KeeperException.NoNodeException.class, e)) { + // this is fine b/c we still have a watch from the successful exists call + } else { + Utils.wrapInRuntime(e); + } + } + return map; + } + + public static List tokenizePath(String path) { + String[] toks = path.split("/"); + java.util.ArrayList rtn = new ArrayList(); + for (String str : toks) { + if (!str.isEmpty()) { + rtn.add(str); + } + } + return rtn; + } + + public static String toksToPath(List toks) { + StringBuffer buff = new StringBuffer(); + buff.append("/"); + int size = toks.size(); + for (int i = 0; i < size; i++) { + buff.append(toks.get(i)); + if (i < (size - 1)) { + buff.append("/"); + } + } + return buff.toString(); + } + + public static String normalizePath(String path) { + String rtn = toksToPath(tokenizePath(path)); + return rtn; + } + + // To remove exceptionCause if port Utils.try-cause to java + public static boolean exceptionCause(Class klass, Throwable t) { + boolean ret = false; + Throwable throwable = t; + while (throwable != null) { + if (throwable.getClass() == klass) { + ret = true; + break; + } + throwable = throwable.getCause(); + } + return ret; + } + +} diff --git a/storm-core/test/clj/org/apache/storm/cluster_test.clj b/storm-core/test/clj/org/apache/storm/cluster_test.clj index 7834b54d707..ffd913e7c07 100644 --- a/storm-core/test/clj/org/apache/storm/cluster_test.clj +++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj @@ -18,12 +18,14 @@ [org.apache.storm.nimbus NimbusInfo]) (:import [org.apache.storm.daemon.common Assignment StormBase SupervisorInfo]) (:import [org.apache.storm.generated NimbusSummary]) - (:import [org.apache.zookeeper ZooDefs ZooDefs$Ids]) + (:import [org.apache.zookeeper ZooDefs ZooDefs$Ids Watcher$Event$EventType]) (:import [org.mockito Mockito]) (:import [org.mockito.exceptions.base MockitoAssertionError]) (:import [org.apache.curator.framework CuratorFramework CuratorFrameworkFactory CuratorFrameworkFactory$Builder]) (:import [org.apache.storm.utils Utils TestUtils ZookeeperAuthInfo ConfigUtils]) (:import [org.apache.storm.cluster ClusterState]) + (:import [org.apache.storm.zookeeper Zookeeper]) + (:import [org.apache.storm.testing.staticmocking MockedZookeeper]) (:require [org.apache.storm [zookeeper :as zk]]) (:require [conjure.core]) (:use [conjure core]) @@ -128,7 +130,7 @@ (is (= nil @state1-last-cb)) (is (= nil @state2-last-cb)) (.set-data state2 "/root" (barr 2) ZooDefs$Ids/OPEN_ACL_UNSAFE) - (is (= {:type :node-data-changed :path "/root"} (read-and-reset! state2-last-cb))) + (is (= {:type Watcher$Event$EventType/NodeDataChanged :path "/root"} (read-and-reset! state2-last-cb))) (is (= nil @state1-last-cb)) (.set-data state2 "/root" (barr 3) ZooDefs$Ids/OPEN_ACL_UNSAFE) @@ -136,34 +138,34 @@ (.get-data state2 "/root" true) (.get-data state2 "/root" false) (.delete-node state1 "/root") - (is (= {:type :node-deleted :path "/root"} (read-and-reset! state2-last-cb))) + (is (= {:type Watcher$Event$EventType/NodeDeleted :path "/root"} (read-and-reset! state2-last-cb))) (.get-data state2 "/root" true) (.set-ephemeral-node state1 "/root" (barr 1 2 3 4) ZooDefs$Ids/OPEN_ACL_UNSAFE) - (is (= {:type :node-created :path "/root"} (read-and-reset! state2-last-cb))) + (is (= {:type Watcher$Event$EventType/NodeCreated :path "/root"} (read-and-reset! state2-last-cb))) (.get-children state1 "/" true) (.set-data state2 "/a" (barr 9) ZooDefs$Ids/OPEN_ACL_UNSAFE) (is (= nil @state2-last-cb)) - (is (= {:type :node-children-changed :path "/"} (read-and-reset! state1-last-cb))) + (is (= {:type Watcher$Event$EventType/NodeChildrenChanged :path "/"} (read-and-reset! state1-last-cb))) (.get-data state2 "/root" true) (.set-ephemeral-node state1 "/root" (barr 1 2) ZooDefs$Ids/OPEN_ACL_UNSAFE) - (is (= {:type :node-data-changed :path "/root"} (read-and-reset! state2-last-cb))) + (is (= {:type Watcher$Event$EventType/NodeDataChanged :path "/root"} (read-and-reset! state2-last-cb))) (.mkdirs state1 "/ccc" ZooDefs$Ids/OPEN_ACL_UNSAFE) (.get-children state1 "/ccc" true) (.get-data state2 "/ccc/b" true) (.set-data state2 "/ccc/b" (barr 8) ZooDefs$Ids/OPEN_ACL_UNSAFE) - (is (= {:type :node-created :path "/ccc/b"} (read-and-reset! state2-last-cb))) - (is (= {:type :node-children-changed :path "/ccc"} (read-and-reset! state1-last-cb))) + (is (= {:type Watcher$Event$EventType/NodeCreated :path "/ccc/b"} (read-and-reset! state2-last-cb))) + (is (= {:type Watcher$Event$EventType/NodeChildrenChanged :path "/ccc"} (read-and-reset! state1-last-cb))) (.get-data state2 "/root" true) (.get-data state2 "/root2" true) (.close state1) - (is (= {:type :node-deleted :path "/root"} (read-and-reset! state2-last-cb))) + (is (= {:type Watcher$Event$EventType/NodeDeleted :path "/root"} (read-and-reset! state2-last-cb))) (.set-data state2 "/root2" (barr 9) ZooDefs$Ids/OPEN_ACL_UNSAFE) - (is (= {:type :node-created :path "/root2"} (read-and-reset! state2-last-cb))) + (is (= {:type Watcher$Event$EventType/NodeCreated :path "/root2"} (read-and-reset! state2-last-cb))) (.close state2) ))) @@ -308,14 +310,15 @@ (deftest test-cluster-state-default-acls (testing "The default ACLs are empty." - (stubbing [zk/mkdirs nil - zk/mk-client (reify CuratorFramework (^void close [this] nil))] - (mk-distributed-cluster-state {}) - (verify-call-times-for zk/mkdirs 1) - (verify-first-call-args-for-indices zk/mkdirs [2] nil)) + (let [zk-mock (Mockito/mock Zookeeper)] + ;; No need for when clauses because we just want to return nil + (with-open [_ (MockedZookeeper. zk-mock)] + (stubbing [zk/mk-client (reify CuratorFramework (^void close [this] nil))] + (mk-distributed-cluster-state {}) + (.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) (Mockito/any) (Mockito/anyString) (Mockito/eq nil))))) (stubbing [mk-distributed-cluster-state (reify ClusterState (register [this callback] nil) (mkdirs [this path acls] nil))] - (mk-storm-cluster-state {}) - (verify-call-times-for mk-distributed-cluster-state 1) - (verify-first-call-args-for-indices mk-distributed-cluster-state [4] nil)))) + (mk-storm-cluster-state {}) + (verify-call-times-for mk-distributed-cluster-state 1) + (verify-first-call-args-for-indices mk-distributed-cluster-state [4] nil)))) diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj index 0e19ef86d7f..19c6f596442 100644 --- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj +++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj @@ -21,6 +21,7 @@ (:import [org.apache.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter TestPlannerSpout TestPlannerBolt] [org.apache.storm.nimbus InMemoryTopologyActionNotifier]) + (:import [org.apache.storm.testing.staticmocking MockedZookeeper]) (:import [org.apache.storm.scheduler INimbus]) (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo]) (:import [org.apache.storm.testing.staticmocking MockedConfigUtils]) @@ -31,6 +32,7 @@ (:import [java.util HashMap]) (:import [java.io File]) (:import [org.apache.storm.utils Time Utils ConfigUtils]) + (:import [org.apache.storm.zookeeper Zookeeper]) (:import [org.apache.commons.io FileUtils]) (:use [org.apache.storm testing MockAutoCred util config log timer zookeeper]) (:use [org.apache.storm.daemon common]) @@ -1019,7 +1021,8 @@ (deftest test-cleans-corrupt (with-inprocess-zookeeper zk-port (with-local-tmp [nimbus-dir] - (stubbing [zk-leader-elector (mock-leader-elector)] + (with-open [_ (MockedZookeeper. (proxy [Zookeeper] [] + (zkLeaderElectorImpl [conf] (mock-leader-elector))))] (letlocals (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) {STORM-ZOOKEEPER-SERVERS ["localhost"] @@ -1090,7 +1093,8 @@ "Tests that leader actions can only be performed by master and non leader fails to perform the same actions." (with-inprocess-zookeeper zk-port (with-local-tmp [nimbus-dir] - (stubbing [zk-leader-elector (mock-leader-elector)] + (with-open [_ (MockedZookeeper. (proxy [Zookeeper] [] + (zkLeaderElectorImpl [conf] (mock-leader-elector))))] (letlocals (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) {STORM-ZOOKEEPER-SERVERS ["localhost"] @@ -1103,7 +1107,9 @@ {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)} {})) - (stubbing [zk-leader-elector (mock-leader-elector :is-leader false)] + (with-open [_ (MockedZookeeper. (proxy [Zookeeper] [] + (zkLeaderElectorImpl [conf] (mock-leader-elector :is-leader false))))] + (letlocals (bind non-leader-cluster-state (cluster/mk-storm-cluster-state conf)) (bind non-leader-nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus))) @@ -1343,7 +1349,9 @@ expected-acls nimbus/NIMBUS-ZK-ACLS fake-inimbus (reify INimbus (getForcedScheduler [this] nil))] (with-open [_ (proxy [MockedConfigUtils] [] - (nimbusTopoHistoryStateImpl [conf] nil))] + (nimbusTopoHistoryStateImpl [conf] nil)) + zk-le (MockedZookeeper. (proxy [Zookeeper] [] + (zkLeaderElectorImpl [conf] nil)))] (stubbing [mk-authorization-handler nil cluster/mk-storm-cluster-state nil nimbus/file-cache-map nil @@ -1352,7 +1360,6 @@ uptime-computer nil new-instance nil mk-timer nil - zk-leader-elector nil nimbus/mk-scheduler nil] (nimbus/nimbus-data auth-conf fake-inimbus) (verify-call-times-for cluster/mk-storm-cluster-state 1) @@ -1411,7 +1418,8 @@ (deftest test-topology-action-notifier (with-inprocess-zookeeper zk-port (with-local-tmp [nimbus-dir] - (stubbing [zk-leader-elector (mock-leader-elector)] + (with-open [_ (MockedZookeeper. (proxy [Zookeeper] [] + (zkLeaderElectorImpl [conf] (mock-leader-elector))))] (letlocals (bind conf (merge (clojurify-structure (ConfigUtils/readStormConfig)) {STORM-ZOOKEEPER-SERVERS ["localhost"] diff --git a/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java b/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java index 712537a825c..8445e6a58c5 100644 --- a/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java +++ b/storm-core/test/jvm/org/apache/storm/blobstore/BlobStoreTest.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file