diff --git a/heron/executor/src/python/BUILD b/heron/executor/src/python/BUILD index 432a5295c97..392326cc986 100644 --- a/heron/executor/src/python/BUILD +++ b/heron/executor/src/python/BUILD @@ -3,7 +3,7 @@ package(default_visibility = ["//visibility:public"]) pex_library( name = "executor-py", srcs = ["heron_executor.py"], - reqs = ["PyYAML==3.13"], + reqs = ["PyYAML==3.13", "click==7.1.2"], deps = [ "//heron/common/src/python:common-py", "//heron/statemgrs/src/python:statemgr-py", diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py index 68c094d8169..491cbdc7ff0 100755 --- a/heron/executor/src/python/heron_executor.py +++ b/heron/executor/src/python/heron_executor.py @@ -17,9 +17,11 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +""" +This CLI manages the execution of a topology binary. + +""" -""" The Heron executor is a process that runs on a container and is responsible for starting and -monitoring the processes of the topology and it's support services.""" import argparse import atexit import base64 @@ -38,7 +40,6 @@ import socket import traceback import itertools -import yaml from heron.common.src.python.utils import log from heron.common.src.python.utils import proc @@ -48,42 +49,83 @@ from heron.statemgrs.src.python import configloader from heron.statemgrs.src.python.config import Config as StateMgrConfig +import click +import yaml + Log = log.Log # pylint: disable=too-many-lines -def print_usage(): - print( - "Usage: ./heron-executor --shard= --topology-name=" - " --topology-id= --topology-defn-file=" - " --state-manager-connection=" - " --state-manager-root=" - " --state-manager-config-file=" - " --tmaster-binary=" - " --stmgr-binary= --metrics-manager-classpath=" - " --instance-jvm-opts= --classpath=" - " --master-port= --tmaster-controller-port=" - " --tmaster-stats-port=" - " --heron-internals-config-file=" - " --override-config-file= --component-ram-map=" - " --component-jvm-opts= --pkg-type=" - " --topology-binary-file= --heron-java-home=" - " --shell-port= --heron-shell-binary=" - " --metrics-manager-port=" - " --cluster= --role= --environment=" - " --instance-classpath=" - " --metrics-sinks-config-file=" - " --scheduler-classpath= --scheduler-port=" - " --python-instance-binary=" - " --metricscache-manager-classpath=" - " --metricscache-manager-master-port=" - " --metricscache-manager-stats-port=" - " --is-stateful= --checkpoint-manager-classpath=" - " --checkpoint-manager-port= --checkpoint-manager-ram=" - " --stateful-config-file=" - " --health-manager-mode= --health-manager-classpath=" - " --cpp-instance-binary=" - " --jvm-remote-debugger-ports=") +@click.command() +@click.option("--cluster", required=True) +@click.option("--role", required=True) +@click.option("--environment", required=True) +@click.option("--checkpoint-manager-classpath", required=True) +@click.option("--checkpoint-manager-port", required=True) +@click.option("--checkpoint-manager-ram", type=int, required=True) +@click.option("--classpath", required=True) +@click.option("--component-jvm-opts", required=True) +@click.option("--component-ram-map", required=True) +@click.option("--cpp-instance-binary", required=True) +@click.option("--health-manager-classpath", required=True) +@click.option("--health-manager-mode", required=True) +@click.option("--heron-internals-config-file", required=True) +@click.option("--heron-java-home", required=True) +@click.option("--heron-shell-binary", required=True) +@click.option("--instance-classpath", required=True) +@click.option("--instance-jvm-opts", required=True) +@click.option("--is-stateful", required=True) +@click.option("--master-port", required=True) +@click.option("--metrics-manager-classpath", required=True) +@click.option("--metrics-manager-port", required=True) +@click.option("--metrics-sinks-config-file", required=True) +@click.option("--metricscache-manager-classpath", required=True) +@click.option("--metricscache-manager-master-port", required=True) +@click.option("--metricscache-manager-mode", required=False) +@click.option("--metricscache-manager-stats-port", required=True) +@click.option("--override-config-file", required=True) +@click.option("--pkg-type", required=True) +@click.option("--python-instance-binary", required=True) +@click.option("--scheduler-classpath", required=True) +@click.option("--scheduler-port", required=True) +@click.option("--shard", type=int, required=True) +@click.option("--shell-port", required=True) +@click.option("--state-manager-config-file", required=True) +@click.option("--state-manager-connection", required=True) +@click.option("--state-manager-root", required=True) +@click.option("--stateful-config-file", required=True) +@click.option("--stmgr-binary", required=True) +@click.option("--tmaster-binary", required=True) +@click.option("--tmaster-controller-port", required=True) +@click.option("--tmaster-stats-port", required=True) +@click.option("--topology-binary-file", required=True) +@click.option("--topology-defn-file", required=True) +@click.option("--topology-id", required=True) +@click.option("--topology-name", required=True) +@click.option("--jvm-remote-debugger-ports", + help="comma separated list of ports to be used" + " by a remote debugger for JVM instances") +def cli( + **kwargs: dict, +) -> None: + """ + The Heron executor is a process that runs on a container and is responsible for + starting and monitoring the processes of the topology and it's support services. + + """ + # Since Heron on YARN runs as headless users, pex compiled + # binaries should be exploded into the container working + # directory. In order to do this, we need to set the + # PEX_ROOT shell environment before forking the processes + shell_env = os.environ.copy() + shell_env["PEX_ROOT"] = os.path.join(os.path.abspath('.'), ".pex") + + parsed_args = argparse.Namespace(**kwargs) + # Instantiate the executor, bind it to signal handlers and launch it + executor = HeronExecutor(parsed_args, shell_env) + executor.initialize() + + start(executor) def id_map(prefix, container_plans, add_zero_id=False): ids = {} @@ -304,8 +346,7 @@ def init_from_parsed_args(self, parsed_args): parsed_args.jvm_remote_debugger_ports.split(",") \ if parsed_args.jvm_remote_debugger_ports else None - def __init__(self, args, shell_env): - parsed_args = self.parse_args(args) + def __init__(self, parsed_args, shell_env): self.init_from_parsed_args(parsed_args) self.shell_env = shell_env @@ -330,69 +371,6 @@ def __init__(self, args, shell_env): self.state_managers = [] self.jvm_version = None - @staticmethod - def parse_args(args): - """Uses python argparse to collect positional args""" - Log.info("Input args: %r" % args) - - parser = argparse.ArgumentParser() - - parser.add_argument("--shard", type=int, required=True) - parser.add_argument("--topology-name", required=True) - parser.add_argument("--topology-id", required=True) - parser.add_argument("--topology-defn-file", required=True) - parser.add_argument("--state-manager-connection", required=True) - parser.add_argument("--state-manager-root", required=True) - parser.add_argument("--state-manager-config-file", required=True) - parser.add_argument("--tmaster-binary", required=True) - parser.add_argument("--stmgr-binary", required=True) - parser.add_argument("--metrics-manager-classpath", required=True) - parser.add_argument("--instance-jvm-opts", required=True) - parser.add_argument("--classpath", required=True) - parser.add_argument("--master-port", required=True) - parser.add_argument("--tmaster-controller-port", required=True) - parser.add_argument("--tmaster-stats-port", required=True) - parser.add_argument("--heron-internals-config-file", required=True) - parser.add_argument("--override-config-file", required=True) - parser.add_argument("--component-ram-map", required=True) - parser.add_argument("--component-jvm-opts", required=True) - parser.add_argument("--pkg-type", required=True) - parser.add_argument("--topology-binary-file", required=True) - parser.add_argument("--heron-java-home", required=True) - parser.add_argument("--shell-port", required=True) - parser.add_argument("--heron-shell-binary", required=True) - parser.add_argument("--metrics-manager-port", required=True) - parser.add_argument("--cluster", required=True) - parser.add_argument("--role", required=True) - parser.add_argument("--environment", required=True) - parser.add_argument("--instance-classpath", required=True) - parser.add_argument("--metrics-sinks-config-file", required=True) - parser.add_argument("--scheduler-classpath", required=True) - parser.add_argument("--scheduler-port", required=True) - parser.add_argument("--python-instance-binary", required=True) - parser.add_argument("--cpp-instance-binary", required=True) - parser.add_argument("--metricscache-manager-classpath", required=True) - parser.add_argument("--metricscache-manager-master-port", required=True) - parser.add_argument("--metricscache-manager-stats-port", required=True) - parser.add_argument("--metricscache-manager-mode", required=False) - parser.add_argument("--is-stateful", required=True) - parser.add_argument("--checkpoint-manager-classpath", required=True) - parser.add_argument("--checkpoint-manager-port", required=True) - parser.add_argument("--checkpoint-manager-ram", type=int, required=True) - parser.add_argument("--stateful-config-file", required=True) - parser.add_argument("--health-manager-mode", required=True) - parser.add_argument("--health-manager-classpath", required=True) - parser.add_argument("--jvm-remote-debugger-ports", required=False, - help="ports to be used by a remote debugger for JVM instances") - - parsed_args, unknown_args = parser.parse_known_args(args[1:]) - - if unknown_args: - Log.warn('Unknown arguments found!!! They are: %s' % unknown_args) - Log.warn(parser.format_help()) - - return parsed_args - def run_command_or_exit(self, command): if self._run_blocking_process(command, True) != 0: Log.error("Failed to run command: %s. Exiting" % command) @@ -982,7 +960,7 @@ def start_process_monitor(self): # Now wait for any child to die Log.info("Start process monitor") while True: - if len(self.processes_to_monitor) > 0: + if self.processes_to_monitor: (pid, status) = os.wait() with self.process_lock: @@ -1100,7 +1078,7 @@ def start_state_manager_watches(self): with open(self.override_config_file, 'r') as stream: overrides = yaml.load(stream) if overrides is None: - overrides = dict() + overrides = {} overrides["heron.statemgr.connection.string"] = self.state_manager_connection statemgr_config = StateMgrConfig() @@ -1201,20 +1179,5 @@ def start(executor): # they are dead. This is the main loop of executor executor.start_process_monitor() -def main(): - """Register exit handlers, initialize the executor and run it.""" - # Since Heron on YARN runs as headless users, pex compiled - # binaries should be exploded into the container working - # directory. In order to do this, we need to set the - # PEX_ROOT shell environment before forking the processes - shell_env = os.environ.copy() - shell_env["PEX_ROOT"] = os.path.join(os.path.abspath('.'), ".pex") - - # Instantiate the executor, bind it to signal handlers and launch it - executor = HeronExecutor(sys.argv, shell_env) - executor.initialize() - - start(executor) - if __name__ == "__main__": - main() + cli() diff --git a/heron/executor/tests/python/heron_executor_unittest.py b/heron/executor/tests/python/heron_executor_unittest.py index f09799907c7..84481a1d9fc 100644 --- a/heron/executor/tests/python/heron_executor_unittest.py +++ b/heron/executor/tests/python/heron_executor_unittest.py @@ -19,6 +19,7 @@ # under the License. '''heron executor unittest''' +import argparse import os import socket import unittest @@ -26,8 +27,7 @@ from pprint import pprint -from heron.executor.src.python.heron_executor import ProcessInfo -from heron.executor.src.python.heron_executor import HeronExecutor +from heron.executor.src.python.heron_executor import cli, HeronExecutor, ProcessInfo from heron.proto.packing_plan_pb2 import PackingPlan # pylint: disable=unused-argument @@ -299,9 +299,9 @@ def get_args(shard_id): ("--metricscache-manager-mode", "cluster") ] - args = ("%s=%s" % (arg[0], (str(arg[1]))) for arg in executor_args) - command = "./heron-executor %s" % (" ".join(args)) - return command.split() + args = [f"{k}={v}" for k, v in executor_args] + ctx = cli.make_context('heron-executor', args) + return argparse.Namespace(**ctx.params) def test_update_packing_plan(self): self.executor_0.update_packing_plan(self.packing_plan_expected)