Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Correct heron-executor CLI usage #3587

Merged
merged 1 commit into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion heron/executor/src/python/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package(default_visibility = ["//visibility:public"])
pex_library(
name = "executor-py",
srcs = ["heron_executor.py"],
reqs = ["PyYAML==3.13"],
reqs = ["PyYAML==3.13", "click==7.1.2"],
deps = [
"//heron/common/src/python:common-py",
"//heron/statemgrs/src/python:statemgr-py",
Expand Down
199 changes: 81 additions & 118 deletions heron/executor/src/python/heron_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This CLI manages the execution of a topology binary.

"""

""" The Heron executor is a process that runs on a container and is responsible for starting and
monitoring the processes of the topology and it's support services."""
import argparse
import atexit
import base64
Expand All @@ -38,7 +40,6 @@
import socket
import traceback
import itertools
import yaml

from heron.common.src.python.utils import log
from heron.common.src.python.utils import proc
Expand All @@ -48,42 +49,83 @@
from heron.statemgrs.src.python import configloader
from heron.statemgrs.src.python.config import Config as StateMgrConfig

import click
import yaml

Log = log.Log

# pylint: disable=too-many-lines

def print_usage():
print(
"Usage: ./heron-executor --shard=<shardid> --topology-name=<topname>"
" --topology-id=<topid> --topology-defn-file=<topdefnfile>"
" --state-manager-connection=<state_manager_connection>"
" --state-manager-root=<state_manager_root>"
" --state-manager-config-file=<state_manager_config_file>"
" --tmaster-binary=<tmaster_binary>"
" --stmgr-binary=<stmgr_binary> --metrics-manager-classpath=<metricsmgr_classpath>"
" --instance-jvm-opts=<instance_jvm_opts_in_base64> --classpath=<classpath>"
" --master-port=<master_port> --tmaster-controller-port=<tmaster_controller_port>"
" --tmaster-stats-port=<tmaster_stats_port>"
" --heron-internals-config-file=<heron_internals_config_file>"
" --override-config-file=<override_config_file> --component-ram-map=<component_ram_map>"
" --component-jvm-opts=<component_jvm_opts_in_base64> --pkg-type=<pkg_type>"
" --topology-binary-file=<topology_bin_file> --heron-java-home=<heron_java_home>"
" --shell-port=<shell-port> --heron-shell-binary=<heron_shell_binary>"
" --metrics-manager-port=<metricsmgr_port>"
" --cluster=<cluster> --role=<role> --environment=<environ>"
" --instance-classpath=<instance_classpath>"
" --metrics-sinks-config-file=<metrics_sinks_config_file>"
" --scheduler-classpath=<scheduler_classpath> --scheduler-port=<scheduler_port>"
" --python-instance-binary=<python_instance_binary>"
" --metricscache-manager-classpath=<metricscachemgr_classpath>"
" --metricscache-manager-master-port=<metricscachemgr_masterport>"
" --metricscache-manager-stats-port=<metricscachemgr_statsport>"
" --is-stateful=<is_stateful> --checkpoint-manager-classpath=<ckptmgr_classpath>"
" --checkpoint-manager-port=<ckptmgr_port> --checkpoint-manager-ram=<checkpoint_manager_ram>"
" --stateful-config-file=<stateful_config_file>"
" --health-manager-mode=<healthmgr_mode> --health-manager-classpath=<healthmgr_classpath>"
" --cpp-instance-binary=<cpp_instance_binary>"
" --jvm-remote-debugger-ports=<comma_seperated_port_list>")
@click.command()
@click.option("--cluster", required=True)
@click.option("--role", required=True)
@click.option("--environment", required=True)
@click.option("--checkpoint-manager-classpath", required=True)
@click.option("--checkpoint-manager-port", required=True)
@click.option("--checkpoint-manager-ram", type=int, required=True)
@click.option("--classpath", required=True)
@click.option("--component-jvm-opts", required=True)
@click.option("--component-ram-map", required=True)
@click.option("--cpp-instance-binary", required=True)
@click.option("--health-manager-classpath", required=True)
@click.option("--health-manager-mode", required=True)
@click.option("--heron-internals-config-file", required=True)
@click.option("--heron-java-home", required=True)
@click.option("--heron-shell-binary", required=True)
@click.option("--instance-classpath", required=True)
@click.option("--instance-jvm-opts", required=True)
@click.option("--is-stateful", required=True)
@click.option("--master-port", required=True)
@click.option("--metrics-manager-classpath", required=True)
@click.option("--metrics-manager-port", required=True)
@click.option("--metrics-sinks-config-file", required=True)
@click.option("--metricscache-manager-classpath", required=True)
@click.option("--metricscache-manager-master-port", required=True)
@click.option("--metricscache-manager-mode", required=False)
@click.option("--metricscache-manager-stats-port", required=True)
@click.option("--override-config-file", required=True)
@click.option("--pkg-type", required=True)
@click.option("--python-instance-binary", required=True)
@click.option("--scheduler-classpath", required=True)
@click.option("--scheduler-port", required=True)
@click.option("--shard", type=int, required=True)
@click.option("--shell-port", required=True)
@click.option("--state-manager-config-file", required=True)
@click.option("--state-manager-connection", required=True)
@click.option("--state-manager-root", required=True)
@click.option("--stateful-config-file", required=True)
@click.option("--stmgr-binary", required=True)
@click.option("--tmaster-binary", required=True)
@click.option("--tmaster-controller-port", required=True)
@click.option("--tmaster-stats-port", required=True)
@click.option("--topology-binary-file", required=True)
@click.option("--topology-defn-file", required=True)
@click.option("--topology-id", required=True)
@click.option("--topology-name", required=True)
@click.option("--jvm-remote-debugger-ports",
help="comma separated list of ports to be used"
" by a remote debugger for JVM instances")
def cli(
**kwargs: dict,
) -> None:
"""
The Heron executor is a process that runs on a container and is responsible for
starting and monitoring the processes of the topology and it's support services.

"""
# Since Heron on YARN runs as headless users, pex compiled
# binaries should be exploded into the container working
# directory. In order to do this, we need to set the
# PEX_ROOT shell environment before forking the processes
shell_env = os.environ.copy()
shell_env["PEX_ROOT"] = os.path.join(os.path.abspath('.'), ".pex")

parsed_args = argparse.Namespace(**kwargs)
# Instantiate the executor, bind it to signal handlers and launch it
executor = HeronExecutor(parsed_args, shell_env)
executor.initialize()

start(executor)

def id_map(prefix, container_plans, add_zero_id=False):
ids = {}
Expand Down Expand Up @@ -304,8 +346,7 @@ def init_from_parsed_args(self, parsed_args):
parsed_args.jvm_remote_debugger_ports.split(",") \
if parsed_args.jvm_remote_debugger_ports else None

def __init__(self, args, shell_env):
parsed_args = self.parse_args(args)
def __init__(self, parsed_args, shell_env):
self.init_from_parsed_args(parsed_args)

self.shell_env = shell_env
Expand All @@ -330,69 +371,6 @@ def __init__(self, args, shell_env):
self.state_managers = []
self.jvm_version = None

@staticmethod
def parse_args(args):
"""Uses python argparse to collect positional args"""
Log.info("Input args: %r" % args)

parser = argparse.ArgumentParser()

parser.add_argument("--shard", type=int, required=True)
parser.add_argument("--topology-name", required=True)
parser.add_argument("--topology-id", required=True)
parser.add_argument("--topology-defn-file", required=True)
parser.add_argument("--state-manager-connection", required=True)
parser.add_argument("--state-manager-root", required=True)
parser.add_argument("--state-manager-config-file", required=True)
parser.add_argument("--tmaster-binary", required=True)
parser.add_argument("--stmgr-binary", required=True)
parser.add_argument("--metrics-manager-classpath", required=True)
parser.add_argument("--instance-jvm-opts", required=True)
parser.add_argument("--classpath", required=True)
parser.add_argument("--master-port", required=True)
parser.add_argument("--tmaster-controller-port", required=True)
parser.add_argument("--tmaster-stats-port", required=True)
parser.add_argument("--heron-internals-config-file", required=True)
parser.add_argument("--override-config-file", required=True)
parser.add_argument("--component-ram-map", required=True)
parser.add_argument("--component-jvm-opts", required=True)
parser.add_argument("--pkg-type", required=True)
parser.add_argument("--topology-binary-file", required=True)
parser.add_argument("--heron-java-home", required=True)
parser.add_argument("--shell-port", required=True)
parser.add_argument("--heron-shell-binary", required=True)
parser.add_argument("--metrics-manager-port", required=True)
parser.add_argument("--cluster", required=True)
parser.add_argument("--role", required=True)
parser.add_argument("--environment", required=True)
parser.add_argument("--instance-classpath", required=True)
parser.add_argument("--metrics-sinks-config-file", required=True)
parser.add_argument("--scheduler-classpath", required=True)
parser.add_argument("--scheduler-port", required=True)
parser.add_argument("--python-instance-binary", required=True)
parser.add_argument("--cpp-instance-binary", required=True)
parser.add_argument("--metricscache-manager-classpath", required=True)
parser.add_argument("--metricscache-manager-master-port", required=True)
parser.add_argument("--metricscache-manager-stats-port", required=True)
parser.add_argument("--metricscache-manager-mode", required=False)
parser.add_argument("--is-stateful", required=True)
parser.add_argument("--checkpoint-manager-classpath", required=True)
parser.add_argument("--checkpoint-manager-port", required=True)
parser.add_argument("--checkpoint-manager-ram", type=int, required=True)
parser.add_argument("--stateful-config-file", required=True)
parser.add_argument("--health-manager-mode", required=True)
parser.add_argument("--health-manager-classpath", required=True)
parser.add_argument("--jvm-remote-debugger-ports", required=False,
help="ports to be used by a remote debugger for JVM instances")

parsed_args, unknown_args = parser.parse_known_args(args[1:])

if unknown_args:
Log.warn('Unknown arguments found!!! They are: %s' % unknown_args)
Log.warn(parser.format_help())

return parsed_args

def run_command_or_exit(self, command):
if self._run_blocking_process(command, True) != 0:
Log.error("Failed to run command: %s. Exiting" % command)
Expand Down Expand Up @@ -982,7 +960,7 @@ def start_process_monitor(self):
# Now wait for any child to die
Log.info("Start process monitor")
while True:
if len(self.processes_to_monitor) > 0:
if self.processes_to_monitor:
(pid, status) = os.wait()

with self.process_lock:
Expand Down Expand Up @@ -1100,7 +1078,7 @@ def start_state_manager_watches(self):
with open(self.override_config_file, 'r') as stream:
overrides = yaml.load(stream)
if overrides is None:
overrides = dict()
overrides = {}
overrides["heron.statemgr.connection.string"] = self.state_manager_connection

statemgr_config = StateMgrConfig()
Expand Down Expand Up @@ -1201,20 +1179,5 @@ def start(executor):
# they are dead. This is the main loop of executor
executor.start_process_monitor()

def main():
"""Register exit handlers, initialize the executor and run it."""
# Since Heron on YARN runs as headless users, pex compiled
# binaries should be exploded into the container working
# directory. In order to do this, we need to set the
# PEX_ROOT shell environment before forking the processes
shell_env = os.environ.copy()
shell_env["PEX_ROOT"] = os.path.join(os.path.abspath('.'), ".pex")

# Instantiate the executor, bind it to signal handlers and launch it
executor = HeronExecutor(sys.argv, shell_env)
executor.initialize()

start(executor)

if __name__ == "__main__":
main()
cli()
10 changes: 5 additions & 5 deletions heron/executor/tests/python/heron_executor_unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
# under the License.

'''heron executor unittest'''
import argparse
import os
import socket
import unittest
import json

from pprint import pprint

from heron.executor.src.python.heron_executor import ProcessInfo
from heron.executor.src.python.heron_executor import HeronExecutor
from heron.executor.src.python.heron_executor import cli, HeronExecutor, ProcessInfo
from heron.proto.packing_plan_pb2 import PackingPlan

# pylint: disable=unused-argument
Expand Down Expand Up @@ -299,9 +299,9 @@ def get_args(shard_id):
("--metricscache-manager-mode", "cluster")
]

args = ("%s=%s" % (arg[0], (str(arg[1]))) for arg in executor_args)
command = "./heron-executor %s" % (" ".join(args))
return command.split()
args = [f"{k}={v}" for k, v in executor_args]
ctx = cli.make_context('heron-executor', args)
return argparse.Namespace(**ctx.params)

def test_update_packing_plan(self):
self.executor_0.update_packing_plan(self.packing_plan_expected)
Expand Down