diff --git a/bin/storm.py b/bin/storm.py index 6bb5a1b4e91..4b4b2f2f257 100755 --- a/bin/storm.py +++ b/bin/storm.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -40,21 +40,27 @@ # python 2 import ConfigParser as configparser + def is_windows(): return sys.platform.startswith('win') + def identity(x): return x + def cygpath(x): command = ["cygpath", "-wp", x] - p = sub.Popen(command,stdout=sub.PIPE) + p = sub.Popen(command, stdout=sub.PIPE) output, errors = p.communicate() lines = output.split(os.linesep) return lines[0] + +normclasspath = cygpath if sys.platform == 'cygwin' else identity + + def init_storm_env(): - global CLUSTER_CONF_DIR ini_file = os.path.join(CLUSTER_CONF_DIR, 'storm_env.ini') if not os.path.isfile(ini_file): return @@ -66,17 +72,16 @@ def init_storm_env(): value = config.get('environment', option) os.environ[option] = value -normclasspath = cygpath if sys.platform == 'cygwin' else identity -STORM_DIR = os.sep.join(os.path.realpath( __file__ ).split(os.sep)[:-2]) +STORM_DIR = os.sep.join(os.path.realpath(__file__).split(os.sep)[:-2]) USER_CONF_DIR = os.path.expanduser("~" + os.sep + ".storm") STORM_CONF_DIR = os.getenv('STORM_CONF_DIR', None) -if STORM_CONF_DIR == None: +if STORM_CONF_DIR is None: CLUSTER_CONF_DIR = os.path.join(STORM_DIR, "conf") else: CLUSTER_CONF_DIR = STORM_CONF_DIR -if (not os.path.isfile(os.path.join(USER_CONF_DIR, "storm.yaml"))): +if not os.path.isfile(os.path.join(USER_CONF_DIR, "storm.yaml")): USER_CONF_DIR = CLUSTER_CONF_DIR STORM_LIB_DIR = os.path.join(STORM_DIR, "lib") @@ -97,17 +102,20 @@ def init_storm_env(): STORM_EXT_CLASSPATH = os.getenv('STORM_EXT_CLASSPATH', None) STORM_EXT_CLASSPATH_DAEMON = os.getenv('STORM_EXT_CLASSPATH_DAEMON', None) + def get_config_opts(): - global CONFIG_OPTS - return "-Dstorm.options=" + ','.join(map(quote_plus,CONFIG_OPTS)) + return "-Dstorm.options=" + ','.join(map(quote_plus, CONFIG_OPTS)) + if not os.path.exists(STORM_LIB_DIR): print("******************************************") - print("The storm client can only be run from within a release. You appear to be trying to run the client from a checkout of Storm's source code.") + print("The storm client can only be run from within a release. " + "You appear to be trying to run the client from a checkout of Storm's source code.") print("\nYou can download a Storm release at http://storm.apache.org/downloads.html") print("******************************************") sys.exit(1) + def get_jars_full(adir): files = [] if os.path.isdir(adir): @@ -121,23 +129,24 @@ def get_jars_full(adir): ret.append(os.path.join(adir, f)) return ret + def get_classpath(extrajars, daemon=True): ret = get_jars_full(STORM_DIR) ret.extend(get_jars_full(STORM_DIR + "/lib")) ret.extend(get_jars_full(STORM_DIR + "/extlib")) if daemon: ret.extend(get_jars_full(STORM_DIR + "/extlib-daemon")) - if STORM_EXT_CLASSPATH != None: + if STORM_EXT_CLASSPATH is not None: for path in STORM_EXT_CLASSPATH.split(os.pathsep): ret.extend(get_jars_full(path)) - if daemon and STORM_EXT_CLASSPATH_DAEMON != None: + if daemon and STORM_EXT_CLASSPATH_DAEMON is not None: for path in STORM_EXT_CLASSPATH_DAEMON.split(os.pathsep): ret.extend(get_jars_full(path)) ret.extend(extrajars) return normclasspath(os.pathsep.join(ret)) + def confvalue(name, extrapaths, daemon=True): - global CONFFILE command = [ JAVA_CMD, "-client", get_config_opts(), "-Dstorm.conf.file=" + CONFFILE, "-cp", get_classpath(extrapaths, daemon), "org.apache.storm.command.config_value", name @@ -154,6 +163,7 @@ def confvalue(name, extrapaths, daemon=True): return " ".join(tokens[1:]) return "" + def print_localconfvalue(name): """Syntax: [storm localconfvalue conf-name] @@ -163,6 +173,7 @@ def print_localconfvalue(name): """ print(name + ": " + confvalue(name, [USER_CONF_DIR])) + def print_remoteconfvalue(name): """Syntax: [storm remoteconfvalue conf-name] @@ -174,6 +185,7 @@ def print_remoteconfvalue(name): """ print(name + ": " + confvalue(name, [CLUSTER_CONF_DIR])) + def parse_args(string): """Takes a string of whitespace-separated tokens and parses it into a list. Whitespace inside tokens may be quoted with single quotes, double quotes or @@ -193,21 +205,26 @@ def parse_args(string): args = [re.compile(r"'((?:[^'\\]|\\.)*)'").sub('\\1', x) for x in args] return [re.compile(r'\\(.)').sub('\\1', x) for x in args] -def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[], fork=False, daemon=True, daemonName=""): - global CONFFILE - storm_log_dir = confvalue("storm.log.dir",[CLUSTER_CONF_DIR]) - if(storm_log_dir == None or storm_log_dir == "nil"): + +def exec_storm_class(klass, jvmtype="-server", jvmopts=None, extrajars=None, args=None, fork=False, daemon=True, + daemon_name=""): + jvmopts = jvmopts if jvmopts is not None else [] + extrajars = extrajars if extrajars is not None else [] + args = args if args is not None else [] + + storm_log_dir = confvalue("storm.log.dir", [CLUSTER_CONF_DIR]) + if storm_log_dir is None or storm_log_dir == "nil": storm_log_dir = os.path.join(STORM_DIR, "logs") all_args = [ - JAVA_CMD, jvmtype, - "-Ddaemon.name=" + daemonName, - get_config_opts(), - "-Dstorm.home=" + STORM_DIR, - "-Dstorm.log.dir=" + storm_log_dir, - "-Djava.library.path=" + confvalue("java.library.path", extrajars, daemon), - "-Dstorm.conf.file=" + CONFFILE, - "-cp", get_classpath(extrajars, daemon), - ] + jvmopts + [klass] + list(args) + JAVA_CMD, jvmtype, + "-Ddaemon.name=" + daemon_name, + get_config_opts(), + "-Dstorm.home=" + STORM_DIR, + "-Dstorm.log.dir=" + storm_log_dir, + "-Djava.library.path=" + confvalue("java.library.path", extrajars, daemon), + "-Dstorm.conf.file=" + CONFFILE, + "-cp", get_classpath(extrajars, daemon), + ] + jvmopts + [klass] + list(args) print("Running: " + " ".join(all_args)) sys.stdout.flush() exit_code = 0 @@ -224,6 +241,7 @@ def exec_storm_class(klass, jvmtype="-server", jvmopts=[], extrajars=[], args=[] os.execvp(JAVA_CMD, all_args) return exit_code + def jar(jarfile, klass, *args): """Syntax: [storm jar topology-jar-path class ...] @@ -234,17 +252,18 @@ def jar(jarfile, klass, *args): will upload the jar at topology-jar-path when the topology is submitted. """ transform_class = confvalue("client.jartransformer.class", [CLUSTER_CONF_DIR]) - if (transform_class != None and transform_class != "nil"): - tmpjar = os.path.join(tempfile.gettempdir(), uuid.uuid1().hex+".jar") - exec_storm_class("org.apache.storm.daemon.ClientJarTransformerRunner", args=[transform_class, jarfile, tmpjar], fork=True, daemon=False) + if transform_class is not None and transform_class != "nil": + tmpjar = os.path.join(tempfile.gettempdir(), uuid.uuid1().hex + ".jar") + exec_storm_class("org.apache.storm.daemon.ClientJarTransformerRunner", args=[transform_class, jarfile, tmpjar], + fork=True, daemon=False) topology_runner_exit_code = exec_storm_class( - klass, - jvmtype="-client", - extrajars=[tmpjar, USER_CONF_DIR, STORM_BIN_DIR], - args=args, - daemon=False, - fork=True, - jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + tmpjar]) + klass, + jvmtype="-client", + extrajars=[tmpjar, USER_CONF_DIR, STORM_BIN_DIR], + args=args, + daemon=False, + fork=True, + jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + tmpjar]) os.remove(tmpjar) sys.exit(topology_runner_exit_code) else: @@ -256,12 +275,13 @@ def jar(jarfile, klass, *args): daemon=False, jvmopts=JAR_JVM_OPTS + ["-Dstorm.jar=" + jarfile]) + def sql(sql_file, topology_name): """Syntax: [storm sql sql-file topology-name] Compiles the SQL statements into a Trident topology and submits it to Storm. """ - extrajars=[USER_CONF_DIR, STORM_BIN_DIR] + extrajars = [USER_CONF_DIR, STORM_BIN_DIR] extrajars.extend(get_jars_full(STORM_DIR + "/external/sql/storm-sql-core")) extrajars.extend(get_jars_full(STORM_DIR + "/external/sql/storm-sql-runtime")) exec_storm_class( @@ -271,6 +291,7 @@ def sql(sql_file, topology_name): args=[sql_file, topology_name], daemon=False) + def kill(*args): """Syntax: [storm kill topology-name [-w wait-time-secs]] @@ -305,6 +326,7 @@ def upload_credentials(*args): jvmtype="-client", extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) + def blobstore(*args): """Syntax: [storm blobstore cmd] @@ -331,6 +353,7 @@ def blobstore(*args): jvmtype="-client", extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) + def heartbeats(*args): """Syntax: [storm heartbeats [cmd]] @@ -343,6 +366,7 @@ def heartbeats(*args): jvmtype="-client", extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) + def activate(*args): """Syntax: [storm activate topology-name] @@ -357,6 +381,7 @@ def activate(*args): jvmtype="-client", extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) + def set_log_level(*args): """ Dynamically change topology log levels @@ -390,6 +415,7 @@ def set_log_level(*args): jvmtype="-client", extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) + def listtopos(*args): """Syntax: [storm list] @@ -401,6 +427,7 @@ def listtopos(*args): jvmtype="-client", extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) + def deactivate(*args): """Syntax: [storm deactivate topology-name] @@ -415,6 +442,7 @@ def deactivate(*args): jvmtype="-client", extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) + def rebalance(*args): """Syntax: [storm rebalance topology-name [-w wait-time-secs] [-n new-num-workers] [-e component=parallelism]*] @@ -445,6 +473,7 @@ def rebalance(*args): jvmtype="-client", extrajars=[USER_CONF_DIR, STORM_BIN_DIR]) + def get_errors(*args): """Syntax: [storm get-errors topology-name] @@ -461,6 +490,7 @@ def get_errors(*args): jvmtype="-client", extrajars=[USER_CONF_DIR, os.path.join(STORM_DIR, "bin")]) + def healthcheck(*args): """Syntax: [storm node-health-check] @@ -472,6 +502,7 @@ def healthcheck(*args): jvmtype="-client", extrajars=[USER_CONF_DIR, os.path.join(STORM_DIR, "bin")]) + def kill_workers(*args): """Syntax: [storm kill_workers] @@ -485,6 +516,7 @@ def kill_workers(*args): jvmtype="-client", extrajars=[USER_CONF_DIR, os.path.join(STORM_DIR, "bin")]) + def shell(resourcesdir, command, *args): tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar" os.system("jar cf %s %s" % (tmpjarpath, resourcesdir)) @@ -498,6 +530,7 @@ def shell(resourcesdir, command, *args): fork=True) os.system("rm " + tmpjarpath) + def repl(): """Syntax: [storm repl] @@ -507,15 +540,17 @@ def repl(): cppaths = [CLUSTER_CONF_DIR] exec_storm_class("clojure.main", jvmtype="-client", extrajars=cppaths) + def get_log4j2_conf_dir(): cppaths = [CLUSTER_CONF_DIR] storm_log4j2_conf_dir = confvalue("storm.log4j2.conf.dir", cppaths) - if(storm_log4j2_conf_dir == None or storm_log4j2_conf_dir == "nil"): + if storm_log4j2_conf_dir is None or storm_log4j2_conf_dir == "nil": storm_log4j2_conf_dir = STORM_LOG4J2_CONF_DIR - elif(not os.path.isabs(storm_log4j2_conf_dir)): + elif not os.path.isabs(storm_log4j2_conf_dir): storm_log4j2_conf_dir = os.path.join(STORM_DIR, storm_log4j2_conf_dir) return storm_log4j2_conf_dir + def nimbus(klass="org.apache.storm.daemon.nimbus"): """Syntax: [storm nimbus] @@ -534,10 +569,11 @@ def nimbus(klass="org.apache.storm.daemon.nimbus"): exec_storm_class( klass, jvmtype="-server", - daemonName="nimbus", + daemon_name="nimbus", extrajars=cppaths, jvmopts=jvmopts) + def pacemaker(klass="org.apache.storm.pacemaker.pacemaker"): """Syntax: [storm pacemaker] @@ -555,10 +591,11 @@ def pacemaker(klass="org.apache.storm.pacemaker.pacemaker"): exec_storm_class( klass, jvmtype="-server", - daemonName="pacemaker", + daemon_name="pacemaker", extrajars=cppaths, jvmopts=jvmopts) + def supervisor(klass="org.apache.storm.daemon.supervisor.Supervisor"): """Syntax: [storm supervisor] @@ -577,10 +614,11 @@ def supervisor(klass="org.apache.storm.daemon.supervisor.Supervisor"): exec_storm_class( klass, jvmtype="-server", - daemonName="supervisor", + daemon_name="supervisor", extrajars=cppaths, jvmopts=jvmopts) + def ui(): """Syntax: [storm ui] @@ -600,10 +638,11 @@ def ui(): exec_storm_class( "org.apache.storm.ui.core", jvmtype="-server", - daemonName="ui", + daemon_name="ui", jvmopts=jvmopts, extrajars=[STORM_DIR, CLUSTER_CONF_DIR]) + def logviewer(): """Syntax: [storm logviewer] @@ -623,10 +662,11 @@ def logviewer(): exec_storm_class( "org.apache.storm.daemon.logviewer", jvmtype="-server", - daemonName="logviewer", + daemon_name="logviewer", jvmopts=jvmopts, extrajars=[STORM_DIR, CLUSTER_CONF_DIR]) + def drpc(): """Syntax: [storm drpc] @@ -645,10 +685,11 @@ def drpc(): exec_storm_class( "org.apache.storm.daemon.drpc", jvmtype="-server", - daemonName="drpc", + daemon_name="drpc", jvmopts=jvmopts, extrajars=[CLUSTER_CONF_DIR]) + def dev_zookeeper(): """Syntax: [storm dev-zookeeper] @@ -656,22 +697,22 @@ def dev_zookeeper(): "storm.zookeeper.port" as its port. This is only intended for development/testing, the Zookeeper instance launched is not configured to be used in production. """ - cppaths = [CLUSTER_CONF_DIR] exec_storm_class( "org.apache.storm.command.dev_zookeeper", jvmtype="-server", extrajars=[CLUSTER_CONF_DIR]) + def version(): - """Syntax: [storm version] + """Syntax: [storm version] + + Prints the version number of this Storm release. + """ + exec_storm_class( + "org.apache.storm.utils.VersionInfo", + jvmtype="-client", + extrajars=[CLUSTER_CONF_DIR]) - Prints the version number of this Storm release. - """ - cppaths = [CLUSTER_CONF_DIR] - exec_storm_class( - "org.apache.storm.utils.VersionInfo", - jvmtype="-client", - extrajars=[CLUSTER_CONF_DIR]) def print_classpath(): """Syntax: [storm classpath] @@ -680,8 +721,10 @@ def print_classpath(): """ print(get_classpath([])) + def monitor(*args): - """Syntax: [storm monitor topology-name [-i interval-secs] [-m component-id] [-s stream-id] [-w [emitted | transferred]]] + """Syntax: [storm monitor topology-name [-i interval-secs] [-m component-id] [-s stream-id] + [-w [emitted | transferred]]] Monitor given topology's throughput interactively. One can specify poll-interval, component-id, stream-id, watch-item[emitted | transferred] @@ -700,10 +743,13 @@ def monitor(*args): def print_commands(): """Print all client commands and link to documentation""" - print("Commands:\n\t" + "\n\t".join(sorted(COMMANDS.keys()))) + print("Commands:\n\t" + "\n\t".join(sorted(COMMANDS.keys()))) print("\nHelp: \n\thelp \n\thelp ") - print("\nDocumentation for the storm client can be found at http://storm.apache.org/documentation/Command-line-client.html\n") - print("Configs can be overridden using one or more -c flags, e.g. \"storm list -c nimbus.host=nimbus.mycompany.com\"\n") + print("\nDocumentation for the storm client can be found at " + "http://storm.apache.org/documentation/Command-line-client.html\n") + print("Configs can be overridden using one or more -c flags, e.g. " + "\"storm list -c nimbus.host=nimbus.mycompany.com\"\n") + def print_usage(command=None): """Print one help message or list of available commands""" @@ -712,58 +758,63 @@ def print_usage(command=None): print(COMMANDS[command].__doc__ or "No documentation provided for <%s>" % command) else: - print("<%s> is not a valid command" % command) + print("<%s> is not a valid command" % command) else: print_commands() + def unknown_command(*args): print("Unknown command: [storm %s]" % ' '.join(sys.argv[1:])) print_usage() sys.exit(254) + COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui, "logviewer": logviewer, "drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue, "remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath, "activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage, "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version, "monitor": monitor, - "upload-credentials": upload_credentials, "pacemaker": pacemaker, "heartbeats": heartbeats, "blobstore": blobstore, - "get-errors": get_errors, "set_log_level": set_log_level, "kill_workers": kill_workers, - "node-health-check": healthcheck, "sql": sql} + "upload-credentials": upload_credentials, "pacemaker": pacemaker, "heartbeats": heartbeats, + "blobstore": blobstore, "get-errors": get_errors, "set_log_level": set_log_level, + "kill_workers": kill_workers, "node-health-check": healthcheck, "sql": sql} + def parse_config(config_list): global CONFIG_OPTS - if len(config_list) > 0: + if len(config_list): for config in config_list: CONFIG_OPTS.append(config) + def parse_config_opts(args): - curr = args[:] - curr.reverse() - config_list = [] - args_list = [] - - while len(curr) > 0: - token = curr.pop() - if token == "-c": - config_list.append(curr.pop()) - elif token == "--config": - global CONFFILE - CONFFILE = curr.pop() - else: - args_list.append(token) + curr = args[:] + curr.reverse() + config_list = [] + args_list = [] + + while len(curr) > 0: + token = curr.pop() + if token == "-c": + config_list.append(curr.pop()) + elif token == "--config": + global CONFFILE + CONFFILE = curr.pop() + else: + args_list.append(token) + + return config_list, args_list - return config_list, args_list def main(): if len(sys.argv) <= 1: print_usage() sys.exit(-1) - global CONFIG_OPTS config_list, args = parse_config_opts(sys.argv[1:]) parse_config(config_list) COMMAND = args[0] ARGS = args[1:] (COMMANDS.get(COMMAND, unknown_command))(*ARGS) + if __name__ == "__main__": main()