Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@
<jgrapht.version>0.9.0</jgrapht.version>
<guava.version>16.0.1</guava.version>
<netty.version>3.9.0.Final</netty.version>
<sysout-over-slf4j.version>1.0.2</sysout-over-slf4j.version>
<log4j-over-slf4j.version>1.6.6</log4j-over-slf4j.version>
<log4j.version>2.1</log4j.version>
<slf4j.version>1.7.7</slf4j.version>
Expand Down Expand Up @@ -829,6 +830,11 @@
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>uk.org.lidalia</groupId>
<artifactId>sysout-over-slf4j</artifactId>
<version>${sysout-over-slf4j.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
9 changes: 9 additions & 0 deletions storm-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
from the classpath, classpathDependencyExcludes, but it didn't work in practice.
This is here as a work around to place it at the beginning of the classpath
even though maven does not officially support ordering of the classpath.-->
<dependency>
<groupId>uk.org.lidalia</groupId>
<artifactId>sysout-over-slf4j</artifactId>
</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new dependency seems OK, and it is unlikely that we will encounter some topology already using this, but could we please shade it anyways?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
Expand Down Expand Up @@ -521,6 +525,7 @@
<include>org.clojure:core.incubator</include>
<include>io.dropwizard.metrics:*</include>
<include>metrics-clojure:*</include>
<include>uk.org.lidalia:*</include>
</includes>
</artifactSet>
<relocations>
Expand Down Expand Up @@ -725,6 +730,10 @@
<pattern>metrics.utils</pattern>
<shadedPattern>org.apache.storm.shade.metrics.utils</shadedPattern>
</relocation>
<relocation>
<pattern>uk.org.lidalia</pattern>
<shadedPattern>org.apache.storm.shade.uk.org.lidalia</shadedPattern>
</relocation>
</relocations>
<transformers>
<transformer implementation="org.apache.storm.maven.shade.clojure.ClojureTransformer" />
Expand Down
4 changes: 2 additions & 2 deletions storm-core/src/clj/org/apache/storm/LocalCluster.clj
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
;; limitations under the License.

(ns org.apache.storm.LocalCluster
(:use [org.apache.storm testing config util])
(:use [org.apache.storm testing config])
(:import [org.apache.storm.utils Utils])
(:import [java.util Map])
(:gen-class
Expand Down Expand Up @@ -48,7 +48,7 @@
[this name conf topology]
(submit-local-topology
(:nimbus (. this state)) name conf topology)
(let [hook (get-configured-class conf STORM-TOPOLOGY-SUBMISSION-NOTIFIER-PLUGIN)]
(let [hook (Utils/getConfiguredClass conf STORM-TOPOLOGY-SUBMISSION-NOTIFIER-PLUGIN)]
(when hook (submit-hook hook name conf topology))))


Expand Down
8 changes: 7 additions & 1 deletion storm-core/src/clj/org/apache/storm/clojure.clj
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
(:import [org.apache.storm.spout SpoutOutputCollector ISpout])
(:import [org.apache.storm.utils Utils])
(:import [org.apache.storm.clojure ClojureBolt ClojureSpout])
(:import [java.util List])
(:import [java.util Collection List])
(:require [org.apache.storm [thrift :as thrift]]))

(defn direct-stream [fields]
Expand Down Expand Up @@ -153,6 +153,12 @@
(tuple-values [this collector stream]
this))

(defn- collectify
[obj]
(if (or (sequential? obj) (instance? Collection obj))
obj
[obj]))

(defnk emit-bolt! [collector values
:stream Utils/DEFAULT_STREAM_ID :anchor []]
(let [^List anchor (collectify anchor)
Expand Down
27 changes: 18 additions & 9 deletions storm-core/src/clj/org/apache/storm/cluster.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
(:import [org.apache.zookeeper.data Stat ACL Id]
[org.apache.storm.generated SupervisorInfo Assignment StormBase ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary
LogConfig ProfileAction ProfileRequest NodeInfo]
[java.io Serializable])
[java.io Serializable StringWriter PrintWriter]
[java.net URLEncoder])
(:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms])
(:import [org.apache.curator.framework CuratorFramework])
(:import [org.apache.storm.utils Utils])
(:import [org.apache.storm.utils Utils Time])
(:import [org.apache.storm.cluster ClusterState ClusterStateContext ClusterStateListener ConnectionState])
(:import [java.security MessageDigest])
(:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider])
(:import [org.apache.storm.nimbus NimbusInfo])
(:import [org.apache.storm.nimbus NimbusInfo]
[org.apache.storm.zookeeper Zookeeper])
(:use [org.apache.storm util log config converter])
(:require [org.apache.storm [zookeeper :as zk]])
(:require [org.apache.storm.daemon [common :as common]]))
Expand Down Expand Up @@ -176,15 +178,15 @@

(defn error-path
[storm-id component-id]
(str (error-storm-root storm-id) "/" (url-encode component-id)))
(str (error-storm-root storm-id) "/" (URLEncoder/encode component-id)))

(def last-error-path-seg "last-error")

(defn last-error-path
[storm-id component-id]
(str (error-storm-root storm-id)
"/"
(url-encode component-id)
(URLEncoder/encode component-id)
"-"
last-error-path-seg))

Expand Down Expand Up @@ -240,6 +242,12 @@
:stats (get executor-stats t)}})))
(into {}))))

(defn- stringify-error [error]
(let [result (StringWriter.)
printer (PrintWriter. result)]
(.printStackTrace error printer)
(.toString result)))

;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called.
(defnk mk-storm-cluster-state
[cluster-state-spec :acls nil :context (ClusterStateContext.)]
Expand All @@ -259,7 +267,7 @@
state-id (.register
cluster-state
(fn [type path]
(let [[subtree & args] (tokenize-path path)]
(let [[subtree & args] (Zookeeper/tokenizePath path)]
(condp = subtree
ASSIGNMENTS-ROOT (if (empty? args)
(issue-callback! assignments-callback)
Expand All @@ -274,7 +282,8 @@
LOGCONFIG-ROOT (issue-map-callback! log-config-callback (first args))
BACKPRESSURE-ROOT (issue-map-callback! backpressure-callback (first args))
;; this should never happen
(exit-process! 30 "Unknown callback for subtree " subtree args)))))]
(Utils/exitProcess 30 ["Unknown callback for subtree " subtree args])
))))]
(doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE BLOBSTORE-SUBTREE NIMBUSES-SUBTREE
LOGCONFIG-SUBTREE]]
(.mkdirs cluster-state p acls))
Expand Down Expand Up @@ -381,7 +390,7 @@
;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats
;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,
;; we avoid situations like that
(let [node+port->executors (reverse-map executor->node+port)
(let [node+port->executors (clojurify-structure (Utils/reverseMap executor->node+port))
all-heartbeats (for [[[node port] executors] node+port->executors]
(->> (get-worker-heartbeat this storm-id node port)
(convert-executor-beats executors)
Expand Down Expand Up @@ -580,7 +589,7 @@
[this storm-id component-id node port error]
(let [path (error-path storm-id component-id)
last-error-path (last-error-path storm-id component-id)
data (thriftify-error {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port})
data (thriftify-error {:time-secs (Time/currentTimeSecs) :error (stringify-error error) :host node :port port})
_ (.mkdirs cluster-state path acls)
ser-data (Utils/serialize data)
_ (.mkdirs cluster-state path acls)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

(ns org.apache.storm.cluster-state.zookeeper-state-factory
(:import [org.apache.curator.framework.state ConnectionStateListener]
[org.apache.storm.zookeeper Zookeeper])
[org.apache.storm.zookeeper Zookeeper]
[org.apache.storm.utils Utils])
(:import [org.apache.zookeeper KeeperException$NoNodeException CreateMode
Watcher$Event$EventType Watcher$Event$KeeperState]
Watcher$Event$EventType Watcher$Event$KeeperState]
[org.apache.storm.cluster ClusterState DaemonType])
(:use [org.apache.storm cluster config log util])
(:require [org.apache.storm [zookeeper :as zk]])
Expand Down Expand Up @@ -63,7 +64,7 @@

(register
[this callback]
(let [id (uuid)]
(let [id (Utils/uuid)]
(swap! callbacks assoc id callback)
id))

Expand All @@ -73,7 +74,7 @@

(set-ephemeral-node
[this path data acls]
(Zookeeper/mkdirs zk-writer (parent-path path) acls)
(Zookeeper/mkdirs zk-writer (Zookeeper/parentPath path) acls)
(if (Zookeeper/exists zk-writer path false)
(try-cause
(Zookeeper/setData zk-writer path data) ; should verify that it's ephemeral
Expand All @@ -92,7 +93,7 @@
(if (Zookeeper/exists zk-writer path false)
(Zookeeper/setData zk-writer path data)
(do
(Zookeeper/mkdirs zk-writer (parent-path path) acls)
(Zookeeper/mkdirs zk-writer (Zookeeper/parentPath path) acls)
(Zookeeper/createNode zk-writer path data CreateMode/PERSISTENT acls))))

(set-worker-hb
Expand Down
11 changes: 6 additions & 5 deletions storm-core/src/clj/org/apache/storm/command/blobstore.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
(:import [java.io InputStream OutputStream]
[org.apache.storm.generated SettableBlobMeta AccessControl AuthorizationException
KeyNotFoundException]
[org.apache.storm.blobstore BlobStoreAclHandler])
[org.apache.storm.blobstore BlobStoreAclHandler]
[org.apache.storm.utils Utils])
(:use [org.apache.storm config]
[clojure.string :only [split]]
[clojure.tools.cli :only [cli]]
[clojure.java.io :only [copy input-stream output-stream]]
[org.apache.storm blobstore log util])
[org.apache.storm blobstore log])
(:gen-class))

(defn update-blob-from-stream
Expand Down Expand Up @@ -88,10 +89,10 @@
(defn create-cli [args]
(let [[{file :file acl :acl replication-factor :replication-factor} [key] _] (cli args ["-f" "--file" :default nil]
["-a" "--acl" :default [] :parse-fn as-acl]
["-r" "--replication-factor" :default -1 :parse-fn parse-int])
["-r" "--replication-factor" :default -1 :parse-fn #(Integer/parseInt %)])
meta (doto (SettableBlobMeta. acl)
(.set_replication_factor replication-factor))]
(validate-key-name! key)
(Utils/validateKeyName key)
(log-message "Creating " key " with ACL " (pr-str (map access-control-str acl)))
(if file
(with-open [f (input-stream file)]
Expand Down Expand Up @@ -140,7 +141,7 @@
(log-message "Current replication factor " blob-replication)
blob-replication)
"--update" (let [[{replication-factor :replication-factor} [key] _]
(cli new-args ["-r" "--replication-factor" :parse-fn parse-int])]
(cli new-args ["-r" "--replication-factor" :parse-fn #(Integer/parseInt %)])]
(if (nil? replication-factor)
(throw (RuntimeException. (str "Please set the replication factor")))
(let [blob-replication (.updateBlobReplication blobstore key replication-factor)]
Expand Down
6 changes: 3 additions & 3 deletions storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.command.dev-zookeeper
(:import [org.apache.storm.utils Utils])
(:use [org.apache.storm zookeeper util config])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it better to remove util from this list where ever possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

(:import [org.apache.storm.utils ConfigUtils])
(:import [org.apache.storm.zookeeper Zookeeper])
Expand All @@ -23,6 +24,5 @@
(let [conf (clojurify-structure (ConfigUtils/readStormConfig))
port (conf STORM-ZOOKEEPER-PORT)
localpath (conf DEV-ZOOKEEPER-PATH)]
(rmr localpath)
(Zookeeper/mkInprocessZookeeper localpath port)
))
(Utils/forceDelete localpath)
(Zookeeper/mkInprocessZookeeper localpath port)))
12 changes: 7 additions & 5 deletions storm-core/src/clj/org/apache/storm/command/get_errors.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
[nimbus :as nimbus]
[common :as common]])
(:import [org.apache.storm.generated GetInfoOptions NumErrorsChoice
TopologySummary ErrorInfo])
TopologySummary ErrorInfo]
[org.json.simple JSONValue])
(:gen-class))

(defn get-topology-id [name topologies]
Expand All @@ -44,9 +45,10 @@
topo-id (get-topology-id name topologies)
topo-info (when (not-nil? topo-id) (.getTopologyInfoWithOpts nimbus topo-id opts))]
(if (or (nil? topo-id) (nil? topo-info))
(println (to-json {"Failure" (str "No topologies running with name " name)}))
(println (JSONValue/toJSONString {"Failure" (str "No topologies running with name " name)}))
(let [topology-name (.get_name topo-info)
topology-errors (.get_errors topo-info)]
(println (to-json (hash-map
"Topology Name" topology-name
"Comp-Errors" (get-component-errors topology-errors)))))))))
(println (JSONValue/toJSONString
(hash-map
"Topology Name" topology-name
"Comp-Errors" (get-component-errors topology-errors)))))))))
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
;; limitations under the License.
(ns org.apache.storm.command.shell-submission
(:import [org.apache.storm StormSubmitter]
[org.apache.storm.utils Utils]
[org.apache.storm.zookeeper Zookeeper])
(:use [org.apache.storm thrift util config log zookeeper])
(:require [clojure.string :as str])
Expand All @@ -31,5 +32,4 @@
no-op (.close zk-leader-elector)
jarpath (StormSubmitter/submitJar conf tmpjarpath)
args (concat args [host port jarpath])]
(exec-command! (str/join " " args))
))
(Utils/execCommand args)))
18 changes: 17 additions & 1 deletion storm-core/src/clj/org/apache/storm/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
(:import [java.io FileReader File IOException]
[org.apache.storm.generated StormTopology])
(:import [org.apache.storm Config])
(:import [org.apache.storm.utils Utils LocalState ConfigUtils])
(:import [org.apache.storm.utils Utils LocalState ConfigUtils MutableInt])
(:import [org.apache.storm.validation ConfigValidation])
(:import [org.apache.commons.io FileUtils])
(:require [clojure [string :as str]])
Expand Down Expand Up @@ -49,6 +49,22 @@
(/ 1)
int))

(defn- even-sampler
[freq]
(let [freq (int freq)
start (int 0)
r (java.util.Random.)
curr (MutableInt. -1)
target (MutableInt. (.nextInt r freq))]
(with-meta
(fn []
(let [i (.increment curr)]
(when (>= i freq)
(.set curr start)
(.set target (.nextInt r freq))))
(= (.get curr) (.get target)))
{:rate freq})))

;; TODO this function together with sampling-rate are to be replaced with Java version when util.clj is in
(defn mk-stats-sampler
[conf]
Expand Down
Loading