diff --git a/bin/storm b/bin/storm index 809a83a6eb8..3410aa1a85e 100755 --- a/bin/storm +++ b/bin/storm @@ -30,24 +30,6 @@ while [ -h "${PRG}" ]; do fi done -# find python >= 2.6 -if [ -a /usr/bin/python2.6 ]; then - PYTHON=/usr/bin/python2.6 -fi - -if [ -z "$PYTHON" ]; then - PYTHON=/usr/bin/python -fi - -# check for version -majversion=`$PYTHON -V 2>&1 | awk '{print $2}' | cut -d'.' -f1` -minversion=`$PYTHON -V 2>&1 | awk '{print $2}' | cut -d'.' -f2` -numversion=$(( 10 * $majversion + $minversion)) -if (( $numversion < 26 )); then - echo "Need python version > 2.6" - exit 1 -fi - STORM_BIN_DIR=`dirname ${PRG}` export STORM_BASE_DIR=`cd ${STORM_BIN_DIR}/..;pwd` @@ -73,4 +55,6 @@ if [ -f "${STORM_CONF_DIR}/storm-env.sh" ]; then . "${STORM_CONF_DIR}/storm-env.sh" fi -exec "$PYTHON" "${STORM_BIN_DIR}/storm.py" "$@" +JAR_DIR="${STORM_BASE_DIR}/lib" +# exec "${JAVA_HOME}/bin/java" -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005 -cp "${JAR_DIR}/*" backtype.storm.utils.StormCommandExecutor "$@" +exec "${JAVA_HOME}/bin/java" -cp "${JAR_DIR}/*" backtype.storm.utils.StormCommandExecutor "$@" diff --git a/bin/storm.py b/bin/storm.py deleted file mode 100755 index 34157b7421f..00000000000 --- a/bin/storm.py +++ /dev/null @@ -1,576 +0,0 @@ -#!/usr/bin/python - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import sys -import random -import subprocess as sub -import re -import shlex -try: - # python 3 - from urllib.parse import quote_plus -except ImportError: - # python 2 - from urllib import quote_plus -try: - # python 3 - import configparser -except ImportError: - # python 2 - import ConfigParser as configparser - -def is_windows(): - return sys.platform.startswith('win') - -def identity(x): - return x - -def cygpath(x): - command = ["cygpath", "-wp", x] - p = sub.Popen(command,stdout=sub.PIPE) - output, errors = p.communicate() - lines = output.split(os.linesep) - return lines[0] - -def init_storm_env(): - global CLUSTER_CONF_DIR - ini_file = os.path.join(CLUSTER_CONF_DIR, 'storm_env.ini') - if not os.path.isfile(ini_file): - return - config = configparser.ConfigParser() - config.optionxform = str - config.read(ini_file) - options = config.options('environment') - for option in options: - value = config.get('environment', option) - os.environ[option] = value - -normclasspath = cygpath if sys.platform == 'cygwin' else identity -STORM_DIR = os.sep.join(os.path.realpath( __file__ ).split(os.sep)[:-2]) -USER_CONF_DIR = os.path.expanduser("~" + os.sep + ".storm") -STORM_CONF_DIR = os.getenv('STORM_CONF_DIR', None) - -if STORM_CONF_DIR == None: - CLUSTER_CONF_DIR = os.path.join(STORM_DIR, "conf") -else: - CLUSTER_CONF_DIR = STORM_CONF_DIR - -if (not os.path.isfile(os.path.join(USER_CONF_DIR, "storm.yaml"))): - USER_CONF_DIR = CLUSTER_CONF_DIR - -STORM_LIB_DIR = os.path.join(STORM_DIR, "lib") -STORM_BIN_DIR = os.path.join(STORM_DIR, "bin") -STORM_LOG4J_CONF_DIR = os.path.join(STORM_DIR, "log4j2") - -init_storm_env() - -CONFIG_OPTS = [] -CONFFILE = "" -JAR_JVM_OPTS = shlex.split(os.getenv('STORM_JAR_JVM_OPTS', '')) -JAVA_HOME = os.getenv('JAVA_HOME', None) -JAVA_CMD = 'java' if not JAVA_HOME else os.path.join(JAVA_HOME, 'bin', 'java') -if JAVA_HOME and not os.path.exists(JAVA_CMD): - print "ERROR: JAVA_HOME is invalid. Could not find bin/java at %s." % JAVA_HOME - sys.exit(1) -STORM_EXT_CLASSPATH = os.getenv('STORM_EXT_CLASSPATH', None) -STORM_EXT_CLASSPATH_DAEMON = os.getenv('STORM_EXT_CLASSPATH_DAEMON', None) - -def get_config_opts(): - global CONFIG_OPTS - return "-Dstorm.options=" + ','.join(map(quote_plus,CONFIG_OPTS)) - -if not os.path.exists(STORM_LIB_DIR): - print("******************************************") - print("The storm client can only be run from within a release. You appear to be trying to run the client from a checkout of Storm's source code.") - print("\nYou can download a Storm release at http://storm-project.net/downloads.html") - print("******************************************") - sys.exit(1) - -def get_jars_full(adir): - files = os.listdir(adir) - ret = [] - for f in files: - if f.endswith(".jar"): - ret.append(os.path.join(adir, f)) - return ret - -def get_classpath(extrajars, daemon=True): - ret = get_jars_full(STORM_DIR) - ret.extend(get_jars_full(STORM_DIR + "/lib")) - ret.extend(get_jars_full(STORM_DIR + "/extlib")) - if daemon: - ret.extend(get_jars_full(STORM_DIR + "/extlib-daemon")) - if STORM_EXT_CLASSPATH != None: - ret.extend(STORM_EXT_CLASSPATH) - if daemon and STORM_EXT_CLASSPATH_DAEMON != None: - ret.extend(STORM_EXT_CLASSPATH_DAEMON) - ret.extend(extrajars) - return normclasspath(os.pathsep.join(ret)) - -def confvalue(name, extrapaths, daemon=True): - global CONFFILE - command = [ - JAVA_CMD, "-client", get_config_opts(), "-Dstorm.conf.file=" + CONFFILE, - "-cp", get_classpath(extrapaths, daemon), "backtype.storm.command.config_value", name - ] - p = sub.Popen(command, stdout=sub.PIPE) - output, errors = p.communicate() - # python 3 - if not isinstance(output, str): - output = output.decode('utf-8') - lines = output.split(os.linesep) - for line in lines: - tokens = line.split(" ") - if tokens[0] == "VALUE:": - return " ".join(tokens[1:]) - return "" - -def print_localconfvalue(name): - """Syntax: [storm localconfvalue conf-name] - - Prints out the value for conf-name in the local Storm configs. - The local Storm configs are the ones in ~/.storm/storm.yaml merged - in with the configs in defaults.yaml. - """ - print(name + ": " + confvalue(name, [USER_CONF_DIR])) - -def print_remoteconfvalue(name): - """Syntax: [storm remoteconfvalue conf-name] - - Prints out the value for conf-name in the cluster's Storm configs. - The cluster's Storm configs are the ones in $STORM-PATH/conf/storm.yaml - merged in with the configs in defaults.yaml. - - This command must be run on a cluster machine. - """ - print(name + ": " + confvalue(name, [CLUSTER_CONF_DIR])) - -def parse_args(string): - r"""Takes a string of whitespace-separated tokens and parses it into a list. - Whitespace inside tokens may be quoted with single quotes, double quotes or - backslash (similar to command-line arguments in bash). - - >>> parse_args(r'''"a a" 'b b' c\ c "d'd" 'e"e' 'f\'f' "g\"g" "i""i" 'j''j' k" "k l' l' mm n\\n''') - ['a a', 'b b', 'c c', "d'd", 'e"e', "f'f", 'g"g', 'ii', 'jj', 'k k', 'l l', 'mm', r'n\n'] - """ - re_split = re.compile(r'''((?: - [^\s"'\\] | - "(?: [^"\\] | \\.)*" | - '(?: [^'\\] | \\.)*' | - \\. - )+)''', re.VERBOSE) - args = re_split.split(string)[1::2] - args = [re.compile(r'"((?:[^"\\]|\\.)*)"').sub('\\1', x) for x in args] - args = [re.compile(r"'((?:[^'\\]|\\.)*)'").sub('\\1', x) for x in args] - return [re.compile(r'\\(.)').sub('\\1', x) for x in args] - -def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False, daemon=True, daemonName=""): - global CONFFILE - storm_log_dir = confvalue("storm.log.dir",[CLUSTER_CONF_DIR]) - if(storm_log_dir == None or storm_log_dir == "nil"): - storm_log_dir = os.path.join(STORM_DIR, "logs") - all_args = [ - JAVA_CMD, jvmtype, - "-Ddaemon.name=" + daemonName, - get_config_opts(), - "-Dstorm.home=" + STORM_DIR, - "-Dstorm.log.dir=" + storm_log_dir, - "-Djava.library.path=" + confvalue("java.library.path", extrajars, daemon), - "-Dstorm.conf.file=" + CONFFILE, - "-cp", get_classpath(extrajars, daemon), - ] + jvmopts + [klass] + list(args) - print("Running: " + " ".join(all_args)) - if fork: - os.spawnvp(os.P_WAIT, JAVA_CMD, all_args) - elif is_windows(): - # handling whitespaces in JAVA_CMD - sub.call(all_args) - else: - os.execvp(JAVA_CMD, all_args) - -def jar(jarfile, klass, *args): - """Syntax: [storm jar topology-jar-path class ...] - - Runs the main method of class with the specified arguments. - The storm jars and configs in ~/.storm are put on the classpath. - The process is configured so that StormSubmitter - (http://storm.incubator.apache.org/apidocs/backtype/storm/StormSubmitter.html) - will upload the jar at topology-jar-path when the topology is submitted. - """ - exec_storm_class( - klass, - jvmtype="-client", - extrajars=[jarfile, USER_CONF_DIR, STORM_BIN_DIR], - args=args, - daemon=False, - jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + jarfile]) - -def kill(*args): - """Syntax: [storm kill topology-name [-w wait-time-secs]] - - Kills the topology with the name topology-name. Storm will - first deactivate the topology's spouts for the duration of - the topology's message timeout to allow all messages currently - being processed to finish processing. Storm will then shutdown - the workers and clean up their state. You can override the length - of time Storm waits between deactivation and shutdown with the -w flag. - """ - exec_storm_class( - "backtype.storm.command.kill_topology", - args=args, - jvmtype="-client", - extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) - - -def upload_credentials(*args): - """Syntax: [storm upload_credentials topology-name [credkey credvalue]*] - - Uploads a new set of credentials to a running topology - """ - exec_storm_class( - "backtype.storm.command.upload_credentials", - args=args, - jvmtype="-client", - extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) - -def activate(*args): - """Syntax: [storm activate topology-name] - - Activates the specified topology's spouts. - """ - exec_storm_class( - "backtype.storm.command.activate", - args=args, - jvmtype="-client", - extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) - -def listtopos(*args): - """Syntax: [storm list] - - List the running topologies and their statuses. - """ - exec_storm_class( - "backtype.storm.command.list", - args=args, - jvmtype="-client", - extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) - -def deactivate(*args): - """Syntax: [storm deactivate topology-name] - - Deactivates the specified topology's spouts. - """ - exec_storm_class( - "backtype.storm.command.deactivate", - args=args, - jvmtype="-client", - extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) - -def rebalance(*args): - """Syntax: [storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*] - - Sometimes you may wish to spread out where the workers for a topology - are running. For example, let's say you have a 10 node cluster running - 4 workers per node, and then let's say you add another 10 nodes to - the cluster. You may wish to have Storm spread out the workers for the - running topology so that each node runs 2 workers. One way to do this - is to kill the topology and resubmit it, but Storm provides a "rebalance" - command that provides an easier way to do this. - - Rebalance will first deactivate the topology for the duration of the - message timeout (overridable with the -w flag) and then redistribute - the workers evenly around the cluster. The topology will then return to - its previous state of activation (so a deactivated topology will still - be deactivated and an activated topology will go back to being activated). - - The rebalance command can also be used to change the parallelism of a running topology. - Use the -n and -e switches to change the number of workers or number of executors of a component - respectively. - """ - exec_storm_class( - "backtype.storm.command.rebalance", - args=args, - jvmtype="-client", - extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) - -def get_errors(*args): - """Syntax: [storm get-errors topology-name] - - Get the latest error from the running topology. The returned result contains - the key value pairs for component-name and component-error for the components in error. - The result is returned in json format. - """ - exec_storm_class( - "backtype.storm.command.get_errors", - args=args, - jvmtype="-client", - extrajars=[USER_CONF_DIR, os.path.join(STORM_DIR, "bin")]) - -def shell(resourcesdir, command, *args): - tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar" - os.system("jar cf %s %s" % (tmpjarpath, resourcesdir)) - runnerargs = [tmpjarpath, command] - runnerargs.extend(args) - exec_storm_class( - "backtype.storm.command.shell_submission", - args=runnerargs, - jvmtype="-client", - extrajars=[USER_CONF_DIR], - fork=True) - os.system("rm " + tmpjarpath) - -def repl(): - """Syntax: [storm repl] - - Opens up a Clojure REPL with the storm jars and configuration - on the classpath. Useful for debugging. - """ - cppaths = [CLUSTER_CONF_DIR] - exec_storm_class("clojure.main", jvmtype="-client", extrajars=cppaths) - -def get_log4j_conf_dir(): - cppaths = [CLUSTER_CONF_DIR] - storm_log4j_conf_dir = confvalue("storm.logback.conf.dir", cppaths) - if(storm_log4j_conf_dir == None or storm_log4j_conf_dir == "nil"): - storm_log4j_conf_dir = STORM_LOG4J_CONF_DIR - return storm_log4j_conf_dir - -def nimbus(klass="backtype.storm.daemon.nimbus"): - """Syntax: [storm nimbus] - - Launches the nimbus daemon. This command should be run under - supervision with a tool like daemontools or monit. - - See Setting up a Storm cluster for more information. - (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster) - """ - cppaths = [CLUSTER_CONF_DIR] - jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [ - "-Dlogfile.name=nimbus.log", - "-Dlog4j.configurationFile=" + os.path.join(get_log4j_conf_dir(), "cluster.xml"), - ] - exec_storm_class( - klass, - jvmtype="-server", - daemonName="nimbus", - extrajars=cppaths, - jvmopts=jvmopts) - -def supervisor(klass="backtype.storm.daemon.supervisor"): - """Syntax: [storm supervisor] - - Launches the supervisor daemon. This command should be run - under supervision with a tool like daemontools or monit. - - See Setting up a Storm cluster for more information. - (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster) - """ - cppaths = [CLUSTER_CONF_DIR] - jvmopts = parse_args(confvalue("supervisor.childopts", cppaths)) + [ - "-Dlogfile.name=supervisor.log", - "-Dlog4j.configurationFile=" + os.path.join(get_log4j_conf_dir(), "cluster.xml"), - ] - exec_storm_class( - klass, - jvmtype="-server", - daemonName="supervisor", - extrajars=cppaths, - jvmopts=jvmopts) - -def ui(): - """Syntax: [storm ui] - - Launches the UI daemon. The UI provides a web interface for a Storm - cluster and shows detailed stats about running topologies. This command - should be run under supervision with a tool like daemontools or monit. - - See Setting up a Storm cluster for more information. - (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster) - """ - cppaths = [CLUSTER_CONF_DIR] - jvmopts = parse_args(confvalue("ui.childopts", cppaths)) + [ - "-Dlogfile.name=ui.log", - "-Dlog4j.configurationFile=" + os.path.join(get_log4j_conf_dir(), "cluster.xml") - ] - exec_storm_class( - "backtype.storm.ui.core", - jvmtype="-server", - daemonName="ui", - jvmopts=jvmopts, - extrajars=[STORM_DIR, CLUSTER_CONF_DIR]) - -def logviewer(): - """Syntax: [storm logviewer] - - Launches the log viewer daemon. It provides a web interface for viewing - storm log files. This command should be run under supervision with a - tool like daemontools or monit. - - See Setting up a Storm cluster for more information. - (http://storm.incubator.apache.org/documentation/Setting-up-a-Storm-cluster) - """ - cppaths = [CLUSTER_CONF_DIR] - jvmopts = parse_args(confvalue("logviewer.childopts", cppaths)) + [ - "-Dlogfile.name=logviewer.log", - "-Dlog4j.configurationFile=" + os.path.join(get_log4j_conf_dir(), "cluster.xml") - ] - exec_storm_class( - "backtype.storm.daemon.logviewer", - jvmtype="-server", - daemonName="logviewer", - jvmopts=jvmopts, - extrajars=[STORM_DIR, CLUSTER_CONF_DIR]) - -def drpc(): - """Syntax: [storm drpc] - - Launches a DRPC daemon. This command should be run under supervision - with a tool like daemontools or monit. - - See Distributed RPC for more information. - (http://storm.incubator.apache.org/documentation/Distributed-RPC) - """ - cppaths = [CLUSTER_CONF_DIR] - jvmopts = parse_args(confvalue("drpc.childopts", cppaths)) + [ - "-Dlogfile.name=drpc.log", - "-Dlog4j.configurationFile=" + os.path.join(get_log4j_conf_dir(), "cluster.xml") - ] - exec_storm_class( - "backtype.storm.daemon.drpc", - jvmtype="-server", - daemonName="drpc", - jvmopts=jvmopts, - extrajars=[CLUSTER_CONF_DIR]) - -def dev_zookeeper(): - """Syntax: [storm dev-zookeeper] - - Launches a fresh Zookeeper server using "dev.zookeeper.path" as its local dir and - "storm.zookeeper.port" as its port. This is only intended for development/testing, the - Zookeeper instance launched is not configured to be used in production. - """ - cppaths = [CLUSTER_CONF_DIR] - exec_storm_class( - "backtype.storm.command.dev_zookeeper", - jvmtype="-server", - extrajars=[CLUSTER_CONF_DIR]) - -def version(): - """Syntax: [storm version] - - Prints the version number of this Storm release. - """ - cppaths = [CLUSTER_CONF_DIR] - exec_storm_class( - "backtype.storm.utils.VersionInfo", - jvmtype="-client", - extrajars=[CLUSTER_CONF_DIR]) - -def print_classpath(): - """Syntax: [storm classpath] - - Prints the classpath used by the storm client when running commands. - """ - print(get_classpath([])) - -def monitor(*args): - """Syntax: [storm monitor topology-name [-i interval-secs] [-m component-id] [-s stream-id] [-w [emitted | transferred]]] - - Monitor given topology's throughput interactively. - One can specify poll-interval, component-id, stream-id, watch-item[emitted | transferred] - By default, - poll-interval is 4 seconds; - all component-ids will be list; - stream-id is 'default'; - watch-item is 'emitted'; - """ - exec_storm_class( - "backtype.storm.command.monitor", - args=args, - jvmtype="-client", - extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) - - -def print_commands(): - """Print all client commands and link to documentation""" - print("Commands:\n\t" + "\n\t".join(sorted(COMMANDS.keys()))) - print("\nHelp: \n\thelp \n\thelp ") - print("\nDocumentation for the storm client can be found at http://storm.incubator.apache.org/documentation/Command-line-client.html\n") - print("Configs can be overridden using one or more -c flags, e.g. \"storm list -c nimbus.host=nimbus.mycompany.com\"\n") - -def print_usage(command=None): - """Print one help message or list of available commands""" - if command != None: - if command in COMMANDS: - print(COMMANDS[command].__doc__ or - "No documentation provided for <%s>" % command) - else: - print("<%s> is not a valid command" % command) - else: - print_commands() - -def unknown_command(*args): - print("Unknown command: [storm %s]" % ' '.join(sys.argv[1:])) - print_usage() - sys.exit(254) - -COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui, "logviewer": logviewer, - "drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue, - "remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath, - "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage, - "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor": monitor, - "upload-credentials": upload_credentials, "get-errors": get_errors } - -def parse_config(config_list): - global CONFIG_OPTS - if len(config_list) > 0: - for config in config_list: - CONFIG_OPTS.append(config) - -def parse_config_opts(args): - curr = args[:] - curr.reverse() - config_list = [] - args_list = [] - - while len(curr) > 0: - token = curr.pop() - if token == "-c": - config_list.append(curr.pop()) - elif token == "--config": - global CONFFILE - CONFFILE = curr.pop() - else: - args_list.append(token) - - return config_list, args_list - -def main(): - if len(sys.argv) <= 1: - print_usage() - sys.exit(-1) - global CONFIG_OPTS - config_list, args = parse_config_opts(sys.argv[1:]) - parse_config(config_list) - COMMAND = args[0] - ARGS = args[1:] - (COMMANDS.get(COMMAND, unknown_command))(*ARGS) - -if __name__ == "__main__": - main() diff --git a/storm-core/src/jvm/backtype/storm/utils/StormCommandExecutor.java b/storm-core/src/jvm/backtype/storm/utils/StormCommandExecutor.java new file mode 100644 index 00000000000..d263d8fde7d --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/utils/StormCommandExecutor.java @@ -0,0 +1,868 @@ +package backtype.storm.utils; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.util.*; + +import clojure.lang.IFn; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.SystemUtils; + +/** + * Created by pshah on 7/17/15. + */ +abstract class StormCommandExecutor { + final String NIMBUS_CLASS = "backtype.storm.daemon.nimbus"; + final String SUPERVISOR_CLASS = "backtype.storm.daemon.supervisor"; + final String UI_CLASS = "backtype.storm.ui.core"; + final String LOGVIEWER_CLASS = "backtype.storm.daemon.logviewer"; + final String DRPC_CLASS = "backtype.storm.daemon.drpc"; + final String REPL_CLASS = "clojure.main"; + final String ACTIVATE_CLASS = "backtype.storm.command.activate"; + final String DEACTIVATE_CLASS = "backtype.storm.command.deactivate"; + final String REBALANCE_CLASS = "backtype.storm.command.rebalance"; + final String LIST_CLASS = "backtype.storm.command.list"; + final String DEVZOOKEEPER_CLASS = "backtype.storm.command.dev_zookeeper"; + final String VERSION_CLASS = "backtype.storm.utils.VersionInfo"; + final String MONITOR_CLASS = "backtype.storm.command.monitor"; + final String UPLOADCREDENTIALS_CLASS = "backtype.storm.command" + + ".upload_credentials"; + final String GETERRORS_CLASS = "backtype.storm.command.get_errors"; + final String SHELL_CLASS = "backtype.storm.command.shell_submission"; + String stormHomeDirectory; + String userConfDirectory; + String stormConfDirectory; + String clusterConfDirectory; + String stormLibDirectory; + String stormBinDirectory; + String stormLog4jConfDirectory; + String configFile = ""; + String javaCommand; + List configOptions = new ArrayList(); + String stormExternalClasspath; + String stormExternalClasspathDaemon; + String fileSeparator; + final List COMMANDS = Arrays.asList("jar", "kill", "shell", + "nimbus", "ui", "logviewer", "drpc", "supervisor", + "localconfvalue", "remoteconfvalue", "repl", "classpath", + "activate", "deactivate", "rebalance", "help", "list", + "dev-zookeeper", "version", "monitor", "upload-credentials", + "get-errors"); + + public static void main (String[] args) { + for (String arg : args) { + System.out.println("Argument ++ is " + arg); + } + StormCommandExecutor stormCommandExecutor; + if (System.getProperty("os.name").startsWith("Windows")) { + stormCommandExecutor = new WindowsStormCommandExecutor(); + } else { + stormCommandExecutor = new UnixStormCommandExecutor(); + } + stormCommandExecutor.initialize(); + stormCommandExecutor.execute(args); + } + + StormCommandExecutor () { + + } + + abstract void initialize (); + + abstract void execute (String[] args); + + void callMethod (String command, List args) { + Class implementation = this.getClass(); + String methodName = command.replace("-", "") + "Command"; + try { + Method method = implementation.getDeclaredMethod(methodName, List + .class); + method.invoke(this, args); + } catch (NoSuchMethodException ex) { + System.out.println("No such method exception occured while trying" + + " to run storm method " + command); + } catch (IllegalAccessException ex) { + System.out.println("Illegal access exception occured while trying" + + " to run storm method " + command); + } catch (IllegalArgumentException ex) { + System.out.println("Illegal argument exception occured while " + + "trying" + " to run storm method " + command); + } catch (InvocationTargetException ex) { + System.out.println("Invocation target exception occured while " + + "trying" + " to run storm method " + command); + } + } +} + +class UnixStormCommandExecutor extends StormCommandExecutor { + + UnixStormCommandExecutor () { + + } + + void initialize () { + Collections.sort(this.COMMANDS); + this.fileSeparator = System .getProperty ("file.separator"); + this.stormHomeDirectory = System.getenv("STORM_BASE_DIR"); + this.userConfDirectory = System.getProperty("user.home") + + this.fileSeparator + "" + + ".storm"; + this.stormConfDirectory = System.getenv("STORM_CONF_DIR"); + this.clusterConfDirectory = this.stormConfDirectory == null ? (this + .stormHomeDirectory + this.fileSeparator + "conf") : this + .stormConfDirectory; + File f = new File(this.userConfDirectory + this.fileSeparator + + "storm.yaml"); + if (!f.isFile()) { + this.userConfDirectory = this.clusterConfDirectory; + } + this.stormLibDirectory = this.stormHomeDirectory + this.fileSeparator + + "lib"; + this.stormBinDirectory = this.stormHomeDirectory + this.fileSeparator + + "bin"; + this.stormLog4jConfDirectory = this.stormHomeDirectory + + this.fileSeparator + "log4j2"; + if (System.getenv("JAVA_HOME") != null) { + this.javaCommand = System.getenv("JAVA_HOME") + this.fileSeparator + + "bin" + this.fileSeparator + "java"; + if (!(new File(this.javaCommand).exists())) { + System.out.println("ERROR: JAVA_HOME is invalid. Could not " + + "find " + this.javaCommand); + System.exit(1); + } + } else { + this.javaCommand = "java"; + } + this.stormExternalClasspath = System.getenv("STORM_EXT_CLASSPATH"); + this.stormExternalClasspathDaemon = System.getenv + ("STORM_EXT_CLASSPATH_DAEMON"); + if (!(new File(this.stormLibDirectory).exists())) { + System.out.println("******************************************"); + System.out.println("The storm client can only be run from within " + + "a release. " + "You appear to be trying to run the client" + + " from a checkout of Storm's source code."); + System.out.println("You can download a Storm release at " + + "http://storm-project.net/downloads.html"); + System.out.println("******************************************"); + System.exit(1); + } + //System.getProperties().list(System.out); + } + + void execute (String[] args) { + if (args.length == 0) { + this.printUsage(); + System.exit(-1); + } + List commandArgs = new ArrayList(); + for (int i = 0; i < args.length; ++i) { + if (args[i] == "-c") { + this.configOptions.add(args[++i]); + } else if (args[i] == "--config") { + this.configFile = args[++i]; + } else { + commandArgs.add(args[i]); + } + } + if ((commandArgs.size() == 0) || (!this.COMMANDS.contains + (commandArgs.get(0)))) { + System.out.println("Unknown command: [storm " + StringUtils.join + (args, " ") + "]"); + this.printUsage(); + System.exit(254); + + } + this.callMethod(commandArgs.get(0), commandArgs.subList(1, + commandArgs.size())); + + } + + String getConfigOptions() { + String configOptions = "-Dstorm.options="; + //TODO - do urlencode here. python does quote_plus to each configoption + return configOptions + StringUtils.join(this.configOptions, ','); + + } + + List getJarsFull (String directory) { + List fullJarFiles = new ArrayList(); + File file = new File(directory); + File[] files = file.listFiles(); + if (files != null) { + for (File f : files) { + if (f.getName().endsWith(".jar")) { + fullJarFiles.add(f.getPath()); + } + } + } + return fullJarFiles; + } + + String getClassPath (List extraJars, boolean daemon) { + List classPaths = this.getJarsFull(this.stormHomeDirectory); + classPaths.addAll(this.getJarsFull(this.stormLibDirectory)); + classPaths.addAll(this.getJarsFull(this.stormHomeDirectory + this + .fileSeparator + "extlib")); + if (daemon == true) { + classPaths.addAll(this.getJarsFull(this.stormHomeDirectory + this + .fileSeparator + "extlib-daemon")); + } + if (this.stormExternalClasspath != null) { + classPaths.add(this.stormExternalClasspath); + } + if (this.stormExternalClasspathDaemon != null) { + classPaths.add(this.stormExternalClasspathDaemon); + } + classPaths.addAll(extraJars); + return StringUtils.join(classPaths, System.getProperty("path" + + ".separator")); + } + + String confValue (String name, List extraPaths, boolean daemon) { + // The original code from python started a process that started a jvm + // with backtype.storm.command.config_value main method that would + // read the conf value and print it out to an output stream. python + // tapped on to the output stream of that subprocess and returned the + // confvalue for the name. Because the pythong code has been shipped + // to java now it should not spawn a new process which is a jvm since + // we are already in jvm. Instead it should just be doing as the code + // commeneted below. + // However looking at the pythong code it was + // starting a jvm with -cp argument that had classpaths which might + // not be available to this java process. Hence there is a chance + // that the below code might break existing scripts. As a result I + // have decided to still spawn a new process from java just like + // python with similar classpaths being constructed for the jvm + // execution + /*IFn fn = Utils.loadClojureFn("backtype.storm.config", + "read-storm-config"); + Object o = fn.invoke(); + return ((Map) o).get(name).toString();*/ + String confValue = ""; + ProcessBuilder processBuilder = new ProcessBuilder(this.javaCommand, + "-client", this.getConfigOptions(), "-Dstorm.conf.file=" + + this.configFile, "-cp", this.getClassPath(extraPaths, daemon), + "backtype.storm.command.config_value", name); + BufferedReader br; + try { + Process process = processBuilder.start(); + br = new BufferedReader(new InputStreamReader(process + .getInputStream(), StandardCharsets.UTF_8)); + process.waitFor(); + String line; + while ((line = br.readLine()) != null) { + String[] tokens = line.split(" "); + if ("VALUE:".equals(tokens[0])) { + confValue = StringUtils.join(Arrays.copyOfRange(tokens, 1, + tokens.length), " "); + break; + } + } + br.close(); + } catch (Exception ex) { + System.out.println("Exception occured while starting process via " + + "processbuilder " + ex.getMessage()); + } + return confValue; + } + + void executeStormClass (String className, String jvmType, List + jvmOptions, List extraJars, List args, boolean + fork, boolean daemon, String daemonName) { + List extraPaths = new ArrayList<>(); + extraPaths.add(this.clusterConfDirectory); + String stormLogDirectory = this.confValue("storm.log.dir", + extraPaths, daemon); + if ((stormLogDirectory == null) || ("".equals(stormLogDirectory)) || + ("nil".equals(stormLogDirectory))) { + stormLogDirectory = this.stormHomeDirectory + this.fileSeparator + + "logs"; + } + List commandList = new ArrayList(); + commandList.add(this.javaCommand); + commandList.add(jvmType); + commandList.add("-Ddaemon.name=" + daemonName); + commandList.add(this.getConfigOptions()); + commandList.add("-Dstorm.home=" + this.stormHomeDirectory); + commandList.add("-Dstorm.log.dir=" + stormLogDirectory); + commandList.add("-Djava.library.path=" + this + .confValue("java.library.path", extraJars, daemon)); + commandList.add("-Dstorm.conf.file=" + this.configFile); + commandList.add("-cp"); + commandList.add(this.getClassPath(extraJars, daemon)); + commandList.addAll(jvmOptions); + commandList.add(className); + commandList.addAll(args); + ProcessBuilder processBuilder = new ProcessBuilder(commandList); + processBuilder.inheritIO(); + try { + Process process = processBuilder.start(); + System.out.println("Executing the command: "); + String commandLine = StringUtils.join(commandList, " "); + System.out.println(commandLine); + if (daemon == true) { + Runtime.getRuntime().addShutdownHook(new ShutdownHookThread + (process, commandLine)); + } + System.out.println("Waiting for subprocess to finish"); + process.waitFor(); + System.out.println("subprocess finished"); + System.out.println("Exit value from subprocess is :" + process + .exitValue()); + } catch (Exception ex) { + System.out.println("Exception occured while starting process via " + + "processbuilder " + ex.getMessage()); + } + } + + void jarCommand (List args) { + System.out.println("Called jarCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + if ((args == null) || (args.size() < 2)) { + System.out.println("Not enough arguments for storm jar command"); + System.out.println("Please pass a jar file location and the " + + "topology class for jar command"); + //TODO print usage for jar command here + System.exit(-1); + } + String jarJvmOptions = System.getenv("STORM_JAR_JVM_OPTS"); + List jvmOptions = new ArrayList(); + if (jarJvmOptions != null) { + //TODO the python code to parse STORM_JAR_JVM_OPTIONS uses shlex + // .split to get the different jvm options for the jar. For now + // keeping it simple and splitting on space. Need to be in synch + // with python. Not sure though if we really need to use a + // lexical parser + jvmOptions.addAll(Arrays.asList(jarJvmOptions.split(" "))); + } + jvmOptions.add("-Dstorm.jar=" + args.get(0)); + List extraPaths = new ArrayList(); + extraPaths.add(args.get(0)); + extraPaths.add(this.userConfDirectory); + extraPaths.add(this.stormBinDirectory); + this.executeStormClass(args.get(1), "-client", jvmOptions, + extraPaths, args.subList(2, args.size()), false, false, ""); + return; + } + + void killCommand (List args) { + System.out.println("Called killCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + if ((args == null) || (args.size() < 1)) { + System.out.println("Not enough arguments for storm kill command"); + //TODO print usage for kill command here + System.exit(2); + } + List extraPaths = new ArrayList(); + extraPaths.add(this.userConfDirectory); + extraPaths.add(this.stormBinDirectory); + this.executeStormClass("backtype.storm.command.kill_topology", + "-client", new ArrayList(), extraPaths, args, false, + false, ""); + return; + } + + //TODO implement shell command after understanding more about it + void shellCommand (List args) { + System.out.println("Called shellCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + if ((args == null) || (args.size() < 2)) { + System.out.println("Not enough arguments for storm shell command"); + System.out.println("Please pass the resources directory and the " + + "command to be packaged as jar"); + System.exit(2); + } + int random = new Random().nextInt(10000001); + String tmpJarPath = "stormshell" + String.valueOf(random) + ".jar"; + ProcessBuilder processBuilder = new ProcessBuilder("jar", "cf", + tmpJarPath, args.get(0)); + processBuilder.inheritIO(); + try { + Process process = processBuilder.start(); + process.waitFor(); + System.out.println("jar cf subprocess finished"); + System.out.println("Exit value from subprocess is :" + process + .exitValue()); + List runnerArgs = new ArrayList(); + runnerArgs.add(tmpJarPath); + runnerArgs.add(args.get(1)); + runnerArgs.addAll(args.subList(2, args.size())); + List extraPaths = new ArrayList(); + extraPaths.add(this.userConfDirectory); + this.executeStormClass(this.SHELL_CLASS, "-client", new + ArrayList(), extraPaths, runnerArgs, true, false, ""); + List commands = new ArrayList(); + commands.add("rm"); + commands.add(tmpJarPath); + processBuilder.command(commands); + process = processBuilder.start(); + process.waitFor(); + System.out.println("rm subprocess finished"); + System.out.println("Exit value from subprocess is :" + process + .exitValue()); + } catch (Exception ex) { + System.out.println("Exception occured while starting process via " + + "processbuilder " + ex.getMessage()); + } + return; + } + + void nimbusCommand (List args) { + System.out.println("Called nimbusCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + List jvmOptions = new ArrayList(); + List extraPaths = new ArrayList(); + extraPaths.add(this.clusterConfDirectory); + String nimbusOptions = this.confValue("nimbus.childopts", extraPaths, + true); + // below line is different from original python script storm.py where + // it called parse_args method on nimbusOptions. Now we just call a + // split with a space. Hence this will have different behavior and + // a buggy one if the nimbusOptions string in the config file has a + // space. TODO need to fix this + jvmOptions.addAll(Arrays.asList(nimbusOptions.split(" "))); + jvmOptions.add("-Dlogfile.name=nimbus.log"); + jvmOptions.add("-Dlog4j.configurationFile=" + this + .getLog4jConfigDirectory() + this.fileSeparator + "cluster" + + ".xml"); + this.executeStormClass(this.NIMBUS_CLASS, "-server", jvmOptions, + extraPaths, new ArrayList(), false, true, "nimbus"); + return; + } + + void uiCommand (List args) { + System.out.println("Called uiCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + List jvmOptions = new ArrayList(); + List extraPaths = new ArrayList(); + extraPaths.add(this.clusterConfDirectory); + String uiOptions = this.confValue("ui.childopts", extraPaths, + true); + // below line is different from original python script storm.py where + // it called parse_args method on nimbusOptions. Now we just call a + // split with a space. Hence this will have different behavior and + // a buggy one if the nimbusOptions string in the config file has a + // space. TODO need to fix this + jvmOptions.addAll(Arrays.asList(uiOptions.split(" "))); + jvmOptions.add("-Dlogfile.name=ui.log"); + jvmOptions.add("-Dlog4j.configurationFile=" + this + .getLog4jConfigDirectory() + this.fileSeparator + "cluster" + + ".xml"); + extraPaths.add(0, this.stormHomeDirectory); + this.executeStormClass(this.UI_CLASS, "-server", jvmOptions, + extraPaths, new ArrayList(), false, true, "ui"); + return; + } + + void logviewerCommand (List args) { + System.out.println("Called logviewerCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + List jvmOptions = new ArrayList(); + List extraPaths = new ArrayList(); + extraPaths.add(this.clusterConfDirectory); + String logviewerOptions = this.confValue("logviewer.childopts", + extraPaths, true); + // below line is different from original python script storm.py where + // it called parse_args method on nimbusOptions. Now we just call a + // split with a space. Hence this will have different behavior and + // a buggy one if the nimbusOptions string in the config file has a + // space. TODO need to fix this + jvmOptions.addAll(Arrays.asList(logviewerOptions.split(" "))); + jvmOptions.add("-Dlogfile.name=logviewer.log"); + jvmOptions.add("-Dlog4j.configurationFile=" + this + .getLog4jConfigDirectory() + this.fileSeparator + "cluster" + + ".xml"); + extraPaths.add(0, this.stormHomeDirectory); + this.executeStormClass(this.LOGVIEWER_CLASS, "-server", jvmOptions, + extraPaths, new ArrayList(), false, true, "logviewer"); + return; + } + + void drpcCommand (List args) { + System.out.println("Called drpcCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + List jvmOptions = new ArrayList(); + List extraPaths = new ArrayList(); + extraPaths.add(this.clusterConfDirectory); + String drpcOptions = this.confValue("drpc.childopts", extraPaths, + true); + // below line is different from original python script storm.py where + // it called parse_args method on nimbusOptions. Now we just call a + // split with a space. Hence this will have different behavior and + // a buggy one if the nimbusOptions string in the config file has a + // space. TODO need to fix this + jvmOptions.addAll(Arrays.asList(drpcOptions.split(" "))); + jvmOptions.add("-Dlogfile.name=drpc.log"); + jvmOptions.add("-Dlog4j.configurationFile=" + this + .getLog4jConfigDirectory() + this.fileSeparator + "cluster" + + ".xml"); + this.executeStormClass(this.DRPC_CLASS, "-server", jvmOptions, + extraPaths, new ArrayList(), false, true, "drpc"); + return; + } + + void supervisorCommand (List args) { + System.out.println("Called supervisorCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + List jvmOptions = new ArrayList(); + List extraPaths = new ArrayList(); + extraPaths.add(this.clusterConfDirectory); + String supervisorOptions = this.confValue("supervisor.childopts", + extraPaths, + true); + // below line is different from original python script storm.py where + // it called parse_args method on nimbusOptions. Now we just call a + // split with a space. Hence this will have different behavior and + // a buggy one if the nimbusOptions string in the config file has a + // space. TODO need to fix this + jvmOptions.addAll(Arrays.asList(supervisorOptions.split(" "))); + jvmOptions.add("-Dlogfile.name=supervisor.log"); + jvmOptions.add("-Dlog4j.configurationFile=" + this + .getLog4jConfigDirectory() + this.fileSeparator + "cluster" + + ".xml"); + this.executeStormClass(this.SUPERVISOR_CLASS, "-server", jvmOptions, + extraPaths, new ArrayList(), false, true, "supervisor"); + return; + } + + void localconfvalueCommand (List args) { + System.out.println("Called localconfvalueCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + if ((args == null) || (args.size() == 0)) { + System.out.println("Not enough arguments for localconfvalue " + + "command"); + System.out.println("Please pass the name of the config value you " + + "want to be printed"); + //TODO print command help for localconfvalue command + System.exit(2); + } + List extraPaths = new ArrayList(); + extraPaths.add(this.userConfDirectory); + System.out.println(args.get(0) + ": " + this.confValue(args.get(0), + extraPaths, + true)); + return; + } + + void remoteconfvalueCommand (List args) { + System.out.println("Called remoteconfvalueCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + if ((args == null) || (args.size() == 0)) { + System.out.println("Not enough arguments for remoteconfvalue " + + "command"); + System.out.println("Please pass the name of the config value you " + + "want to be printed"); + //TODO print command help for remoteconfvalue command + System.exit(2); + } + List extraPaths = new ArrayList(); + extraPaths.add(this.clusterConfDirectory); + System.out.println(args.get(0) + ": " + this.confValue(args.get(0), + extraPaths, + true)); + return; + } + + void replCommand (List args) { + System.out.println("Called replCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + List extraPaths = new ArrayList(); + extraPaths.add(this.clusterConfDirectory); + this.executeStormClass(this.REPL_CLASS, "-client", new + ArrayList(), extraPaths, new ArrayList(), + false, true, ""); + return; + } + + void classpathCommand (List args) { + System.out.println("Called classpathCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + System.out.println(this.getClassPath(new ArrayList(), true)); + return; + } + + void activateCommand (List args) { + System.out.println("Called activateCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + if ((args == null) || (args.size() < 1)) { + System.out.println("Not enough arguments for activate command"); + System.out.println("Please pass the topology name to activate"); + //TODO print usage for activate command here + System.exit(2); + } + List extraPaths = new ArrayList(); + extraPaths.add(this.userConfDirectory); + extraPaths.add(this.stormBinDirectory); + this.executeStormClass(this.ACTIVATE_CLASS, "-client", new + ArrayList(), extraPaths, args, false, false, ""); + return; + } + + void deactivateCommand (List args) { + System.out.println("Called deactivateCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + if ((args == null) || (args.size() < 1)) { + System.out.println("Not enough arguments for deactivate command"); + System.out.println("Please pass the topology name to deactivate"); + //TODO print usage for deactivate command here + System.exit(2); + } + List extraPaths = new ArrayList(); + extraPaths.add(this.userConfDirectory); + extraPaths.add(this.stormBinDirectory); + this.executeStormClass(this.DEACTIVATE_CLASS, "-client", new + ArrayList(), extraPaths, args, false, false, ""); + return; + } + + void rebalanceCommand (List args) { + System.out.println("Called rebalanceCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + if ((args == null) || (args.size() < 1)) { + System.out.println("Not enough arguments for rebalance command"); + System.out.println("Please pass the topology name to rebalance"); + //TODO print usage for rebalance command here + System.exit(2); + } + List extraPaths = new ArrayList(); + extraPaths.add(this.userConfDirectory); + extraPaths.add(this.stormBinDirectory); + this.executeStormClass(this.REBALANCE_CLASS, "-client", new + ArrayList(), extraPaths, args, false, false, ""); + return; + } + + void helpCommand (List args) { + System.out.println("Called helpCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + if ((args == null) || (args.size() == 0)) { + this.printUsage(); + } else { + if ((!this.COMMANDS.contains(args.get(0)))) { + System.out.println(args.get(0) + " is not a valid command"); + } else { + //TODO print indivudual commands help here + System.out.println("Print command specific help here"); + } + } + return; + } + + void listCommand (List args) { + System.out.println("Called listCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + List extraPaths = new ArrayList(); + extraPaths.add(this.userConfDirectory); + extraPaths.add(this.stormBinDirectory); + this.executeStormClass(this.LIST_CLASS, "-client", new + ArrayList(), extraPaths, args, false, false, ""); + return; + } + + void devzookeeperCommand (List args) { + System.out.println("Called devzookeeperCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + List extraPaths = new ArrayList(); + extraPaths.add(this.clusterConfDirectory); + this.executeStormClass(this.DEVZOOKEEPER_CLASS, "-server", new + ArrayList(), extraPaths, args, false, true, ""); + return; + } + + void versionCommand (List args) { + System.out.println("Called versionCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + List extraPaths = new ArrayList(); + extraPaths.add(this.clusterConfDirectory); + this.executeStormClass(this.VERSION_CLASS, "-client", new + ArrayList(), extraPaths, args, false, false, ""); + return; + } + + void monitorCommand (List args) { + System.out.println("Called monitorCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + List extraPaths = new ArrayList(); + extraPaths.add(this.userConfDirectory); + extraPaths.add(this.stormBinDirectory); + this.executeStormClass(this.MONITOR_CLASS, "-client", new + ArrayList(), extraPaths, args, false, true, ""); + return; + } + + void uploadcredentialsCommand (List args) { + System.out.println("Called uploadcredentialsCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + if ((args == null) || (args.size() < 1)) { + System.out.println("Not enough arguments for " + + "upload-credentials command"); + System.out.println("Please pass the topology name to update"); + //TODO print usage for upload-credentials command here + System.exit(2); + } + List extraPaths = new ArrayList(); + extraPaths.add(this.userConfDirectory); + extraPaths.add(this.stormBinDirectory); + this.executeStormClass(this.UPLOADCREDENTIALS_CLASS, "-client", new + ArrayList(), extraPaths, args, false, false, ""); + return; + } + + void geterrorsCommand (List args) { + System.out.println("Called geterrorsCommand using reflection"); + System.out.println("Arguments are : "); + for (String s: args) { + System.out.println(s); + } + if ((args == null) || (args.size() == 0)) { + System.out.println("Not enough arguments for " + + "get-errors command"); + System.out.println("Please pass the topology name to fetch errors"); + //TODO print usage for get-errors command here + System.exit(2); + } + List extraPaths = new ArrayList(); + extraPaths.add(this.userConfDirectory); + extraPaths.add(this.stormBinDirectory); + this.executeStormClass(this.GETERRORS_CLASS, "-client", new + ArrayList(), extraPaths, args, false, false, ""); + + return; + } + + String getLog4jConfigDirectory () { + List extraPaths = new ArrayList(); + extraPaths.add(this.clusterConfDirectory); + String log4jDirectory = this.confValue("storm.logback.conf.dir", + extraPaths, true); + if ((log4jDirectory == null) || ("".equals(log4jDirectory)) || + ("nil".equals(log4jDirectory))) { + log4jDirectory = this.stormLog4jConfDirectory; + } + return log4jDirectory; + } + + private void printUsage () { + String commands = StringUtils.join(this.COMMANDS, "\n\t"); + System.out.println("Commands:\n\t" + commands); + System.out.println("\nHelp: \n\thelp \n\thelp \n"); + System.out.println("Documentation for the storm client can be found" + + " at " + + "http://storm.incubator.apache" + + ".org/documentation/Command-line-client.html\n"); + System.out.println("Configs can be overridden using one or more -c " + + "flags, e.g. " + + "\"storm list -c nimbus.host=nimbus.mycompany.com\"\n"); + } + + private void executeHelpCommand () { + System.out.println("Print storm help here"); + } + +} + +class WindowsStormCommandExecutor extends StormCommandExecutor { + + WindowsStormCommandExecutor () { + + } + + void initialize () { + return; + } + + void execute (String[] args) { + return; + } + +} + +class ShutdownHookThread extends Thread { + private Process process; + String commandLine; + ShutdownHookThread (Process process, String commandLine) { + this.process = process; + this.commandLine = commandLine; + } + + public void run () { + System.out.println("Executing the shutdown hook for " + + "StormCommandExecutor subprocess"); + if (this.process != null) { + System.out.println("Killing the sub-process for command: " + + (this.commandLine != null ? this.commandLine : "Empty " + + "commandLine found")); + this.process.destroy(); + } else{ + System.out.println("Null process object found. No process to kill"); + } + } +}