Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
;; limitations under the License.

(ns org.apache.storm.cluster-state.zookeeper-state-factory
(:import [org.apache.curator.framework.state ConnectionStateListener])
(:import [org.apache.zookeeper KeeperException$NoNodeException]
(:import [org.apache.curator.framework.state ConnectionStateListener]
[org.apache.storm.zookeeper Zookeeper])
(:import [org.apache.zookeeper KeeperException$NoNodeException CreateMode
Watcher$Event$EventType Watcher$Event$KeeperState]
[org.apache.storm.cluster ClusterState DaemonType])
(:use [org.apache.storm cluster config log util])
(:require [org.apache.storm [zookeeper :as zk]])
Expand All @@ -25,7 +27,7 @@

(defn -mkState [this conf auth-conf acls context]
(let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf auth-conf)]
(zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT) acls)
(Zookeeper/mkdirs zk (conf STORM-ZOOKEEPER-ROOT) acls)
(.close zk))
(let [callbacks (atom {})
active (atom true)
Expand All @@ -36,9 +38,9 @@
:root (conf STORM-ZOOKEEPER-ROOT)
:watcher (fn [state type path]
(when @active
(when-not (= :connected state)
(when-not (= Watcher$Event$KeeperState/SyncConnected state)
(log-warn "Received event " state ":" type ":" path " with disconnected Writer Zookeeper."))
(when-not (= :none type)
(when-not (= Watcher$Event$EventType/None type)
(doseq [callback (vals @callbacks)]
(callback type path))))))
is-nimbus? (= (.getDaemonType context) DaemonType/NIMBUS)
Expand All @@ -50,9 +52,9 @@
:root (conf STORM-ZOOKEEPER-ROOT)
:watcher (fn [state type path]
(when @active
(when-not (= :connected state)
(when-not (= Watcher$Event$KeeperState/SyncConnected state)
(log-warn "Received event " state ":" type ":" path " with disconnected Reader Zookeeper."))
(when-not (= :none type)
(when-not (= Watcher$Event$EventType/None type)
(doseq [callback (vals @callbacks)]
(callback type path))))))
zk-writer)]
Expand All @@ -71,87 +73,87 @@

(set-ephemeral-node
[this path data acls]
(zk/mkdirs zk-writer (parent-path path) acls)
(if (zk/exists zk-writer path false)
(Zookeeper/mkdirs zk-writer (parent-path path) acls)
(if (Zookeeper/exists zk-writer path false)
(try-cause
(zk/set-data zk-writer path data) ; should verify that it's ephemeral
(Zookeeper/setData zk-writer path data) ; should verify that it's ephemeral
(catch KeeperException$NoNodeException e
(log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")
(zk/create-node zk-writer path data :ephemeral acls)))
(zk/create-node zk-writer path data :ephemeral acls)))
(Zookeeper/createNode zk-writer path data CreateMode/EPHEMERAL acls)))
(Zookeeper/createNode zk-writer path data CreateMode/EPHEMERAL acls)))

(create-sequential
[this path data acls]
(zk/create-node zk-writer path data :sequential acls))
(Zookeeper/createNode zk-writer path data CreateMode/PERSISTENT_SEQUENTIAL acls))

(set-data
[this path data acls]
;; note: this does not turn off any existing watches
(if (zk/exists zk-writer path false)
(zk/set-data zk-writer path data)
(if (Zookeeper/exists zk-writer path false)
(Zookeeper/setData zk-writer path data)
(do
(zk/mkdirs zk-writer (parent-path path) acls)
(zk/create-node zk-writer path data :persistent acls))))
(Zookeeper/mkdirs zk-writer (parent-path path) acls)
(Zookeeper/createNode zk-writer path data CreateMode/PERSISTENT acls))))

(set-worker-hb
[this path data acls]
(.set_data this path data acls))

(delete-node
[this path]
(zk/delete-node zk-writer path))
(Zookeeper/deleteNode zk-writer path))

(delete-worker-hb
[this path]
(.delete_node this path))

(get-data
[this path watch?]
(zk/get-data zk-reader path watch?))
(Zookeeper/getData zk-reader path watch?))

(get-data-with-version
[this path watch?]
(zk/get-data-with-version zk-reader path watch?))
(Zookeeper/getDataWithVersion zk-reader path watch?))

(get-version
[this path watch?]
(zk/get-version zk-reader path watch?))
(Zookeeper/getVersion zk-reader path watch?))

(get-worker-hb
[this path watch?]
(.get_data this path watch?))

(get-children
[this path watch?]
(zk/get-children zk-reader path watch?))
(Zookeeper/getChildren zk-reader path watch?))

(get-worker-hb-children
[this path watch?]
(.get_children this path watch?))

(mkdirs
[this path acls]
(zk/mkdirs zk-writer path acls))
(Zookeeper/mkdirs zk-writer path acls))

(node-exists
[this path watch?]
(zk/exists-node? zk-reader path watch?))
(Zookeeper/existsNode zk-reader path watch?))

(add-listener
[this listener]
(let [curator-listener (reify ConnectionStateListener
(stateChanged
[this client newState]
(.stateChanged listener client newState)))]
(zk/add-listener zk-reader curator-listener)))
(Zookeeper/addListener zk-reader curator-listener)))

(sync-path
[this path]
(zk/sync-path zk-writer path))
(Zookeeper/syncPath zk-writer path))

(delete-node-blobstore
[this path nimbus-host-port-info]
(zk/delete-node-blobstore zk-writer path nimbus-host-port-info))
(Zookeeper/deleteNodeBlobstore zk-writer path nimbus-host-port-info))

(close
[this]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
(ns org.apache.storm.command.dev-zookeeper
(:use [org.apache.storm zookeeper util config])
(:import [org.apache.storm.utils ConfigUtils])
(:import [org.apache.storm.zookeeper Zookeeper])
(:gen-class))

(defn -main [& args]
(let [conf (clojurify-structure (ConfigUtils/readStormConfig))
port (conf STORM-ZOOKEEPER-PORT)
localpath (conf DEV-ZOOKEEPER-PATH)]
(rmr localpath)
(mk-inprocess-zookeeper localpath :port port)
(Zookeeper/mkInprocessZookeeper localpath port)
))
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.command.shell-submission
(:import [org.apache.storm StormSubmitter])
(:import [org.apache.storm StormSubmitter]
[org.apache.storm.zookeeper Zookeeper])
(:use [org.apache.storm thrift util config log zookeeper])
(:require [clojure.string :as str])
(:import [org.apache.storm.utils ConfigUtils])
Expand All @@ -23,7 +24,7 @@

(defn -main [^String tmpjarpath & args]
(let [conf (clojurify-structure (ConfigUtils/readStormConfig))
zk-leader-elector (zk-leader-elector conf)
zk-leader-elector (Zookeeper/zkLeaderElector conf)
leader-nimbus (.getLeader zk-leader-elector)
host (.getHost leader-nimbus)
port (.getPort leader-nimbus)
Expand Down
3 changes: 2 additions & 1 deletion storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
[stats :as stats]])
(:require [clojure.set :as set])
(:import [org.apache.storm.daemon.common StormBase Assignment])
(:import [org.apache.storm.zookeeper Zookeeper])
(:use [org.apache.storm.daemon common])
(:use [org.apache.storm config])
(:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
Expand Down Expand Up @@ -195,7 +196,7 @@
(exit-process! 20 "Error when processing an event")
))
:scheduler (mk-scheduler conf inimbus)
:leader-elector (zk-leader-elector conf)
:leader-elector (Zookeeper/zkLeaderElector conf)
:id->sched-status (atom {})
:node-id->resources (atom {}) ;;resources of supervisors
:id->resources (atom {}) ;;resources of topologies
Expand Down
11 changes: 6 additions & 5 deletions storm-core/src/clj/org/apache/storm/testing.clj
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
[worker :as worker]
[executor :as executor]])
(:require [org.apache.storm [process-simulator :as psim]])
(:import [org.apache.commons.io FileUtils])
(:import [org.apache.commons.io FileUtils]
[org.apache.storm.zookeeper Zookeeper])
(:import [java.io File])
(:import [java.util HashMap ArrayList])
(:import [java.util.concurrent.atomic AtomicInteger])
Expand Down Expand Up @@ -134,7 +135,7 @@
(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024 :nimbus-daemon false]
(let [zk-tmp (local-temp-path)
[zk-port zk-handle] (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
(zk/mk-inprocess-zookeeper zk-tmp))
(Zookeeper/mkInprocessZookeeper zk-tmp nil))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also needs to be updated similarly to below.

(let [zk-tmp (local-temp-path)
       zk-handle (if-not (contains? daemon-conf STORM-ZOOKEEPER-SERVERS)
                          (Zookeeper/mkInprocessZookeeper zk-tmp nil))
       zk-port (.getLocalPort zk-handle)

daemon-conf (merge (clojurify-structure (ConfigUtils/readStormConfig))
{TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true
ZMQ-LINGER-MILLIS 0
Expand Down Expand Up @@ -203,7 +204,7 @@
(if (not-nil? (:zookeeper cluster-map))
(do
(log-message "Shutting down in process zookeeper")
(zk/shutdown-inprocess-zookeeper (:zookeeper cluster-map))
(Zookeeper/shutdownInprocessZookeeper (:zookeeper cluster-map))
(log-message "Done shutting down in process zookeeper")))
(doseq [t @(:tmp-dirs cluster-map)]
(log-message "Deleting temporary path " t)
Expand Down Expand Up @@ -288,11 +289,11 @@
(defmacro with-inprocess-zookeeper
[port-sym & body]
`(with-local-tmp [tmp#]
(let [[~port-sym zks#] (zk/mk-inprocess-zookeeper tmp#)]
(let [[~port-sym zks#] (Zookeeper/mkInprocessZookeeper tmp# nil)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a drop in replacement because mkInprocessZookeeper is returning just zks# not ~port-sym. You might be able to make this work by changing it to.

   (let [zks# (Zookeeper/mkInprocessZookeeper tmp# nil)
          ~port-sym (.getLocalPort zks#)]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did this make it through unit tests?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It didn't I fixed it in a different way. I made Zookeeper/mkInprocessZookeeper match zk/mk-inprocess-zookeeper instead of returning just the factory and trying to pull the port out of the factory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhhh, okay. This is the part you were talking about. So this is resolved.

(try
~@body
(finally
(zk/shutdown-inprocess-zookeeper zks#))))))
(Zookeeper/shutdownInprocessZookeeper zks#))))))

(defn submit-local-topology
[nimbus storm-name conf topology]
Expand Down
Loading