Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Correct heron-executor CLI usage (#3587)
Browse files Browse the repository at this point in the history
  • Loading branch information
Code0x58 authored Jul 27, 2020
1 parent c42c211 commit 26e3f63
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 124 deletions.
2 changes: 1 addition & 1 deletion heron/executor/src/python/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package(default_visibility = ["//visibility:public"])
pex_library(
name = "executor-py",
srcs = ["heron_executor.py"],
reqs = ["PyYAML==3.13"],
reqs = ["PyYAML==3.13", "click==7.1.2"],
deps = [
"//heron/common/src/python:common-py",
"//heron/statemgrs/src/python:statemgr-py",
Expand Down
199 changes: 81 additions & 118 deletions heron/executor/src/python/heron_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This CLI manages the execution of a topology binary.
"""

""" The Heron executor is a process that runs on a container and is responsible for starting and
monitoring the processes of the topology and it's support services."""
import argparse
import atexit
import base64
Expand All @@ -38,7 +40,6 @@
import socket
import traceback
import itertools
import yaml

from heron.common.src.python.utils import log
from heron.common.src.python.utils import proc
Expand All @@ -48,42 +49,83 @@
from heron.statemgrs.src.python import configloader
from heron.statemgrs.src.python.config import Config as StateMgrConfig

import click
import yaml

Log = log.Log

# pylint: disable=too-many-lines

def print_usage():
print(
"Usage: ./heron-executor --shard=<shardid> --topology-name=<topname>"
" --topology-id=<topid> --topology-defn-file=<topdefnfile>"
" --state-manager-connection=<state_manager_connection>"
" --state-manager-root=<state_manager_root>"
" --state-manager-config-file=<state_manager_config_file>"
" --tmaster-binary=<tmaster_binary>"
" --stmgr-binary=<stmgr_binary> --metrics-manager-classpath=<metricsmgr_classpath>"
" --instance-jvm-opts=<instance_jvm_opts_in_base64> --classpath=<classpath>"
" --master-port=<master_port> --tmaster-controller-port=<tmaster_controller_port>"
" --tmaster-stats-port=<tmaster_stats_port>"
" --heron-internals-config-file=<heron_internals_config_file>"
" --override-config-file=<override_config_file> --component-ram-map=<component_ram_map>"
" --component-jvm-opts=<component_jvm_opts_in_base64> --pkg-type=<pkg_type>"
" --topology-binary-file=<topology_bin_file> --heron-java-home=<heron_java_home>"
" --shell-port=<shell-port> --heron-shell-binary=<heron_shell_binary>"
" --metrics-manager-port=<metricsmgr_port>"
" --cluster=<cluster> --role=<role> --environment=<environ>"
" --instance-classpath=<instance_classpath>"
" --metrics-sinks-config-file=<metrics_sinks_config_file>"
" --scheduler-classpath=<scheduler_classpath> --scheduler-port=<scheduler_port>"
" --python-instance-binary=<python_instance_binary>"
" --metricscache-manager-classpath=<metricscachemgr_classpath>"
" --metricscache-manager-master-port=<metricscachemgr_masterport>"
" --metricscache-manager-stats-port=<metricscachemgr_statsport>"
" --is-stateful=<is_stateful> --checkpoint-manager-classpath=<ckptmgr_classpath>"
" --checkpoint-manager-port=<ckptmgr_port> --checkpoint-manager-ram=<checkpoint_manager_ram>"
" --stateful-config-file=<stateful_config_file>"
" --health-manager-mode=<healthmgr_mode> --health-manager-classpath=<healthmgr_classpath>"
" --cpp-instance-binary=<cpp_instance_binary>"
" --jvm-remote-debugger-ports=<comma_seperated_port_list>")
@click.command()
@click.option("--cluster", required=True)
@click.option("--role", required=True)
@click.option("--environment", required=True)
@click.option("--checkpoint-manager-classpath", required=True)
@click.option("--checkpoint-manager-port", required=True)
@click.option("--checkpoint-manager-ram", type=int, required=True)
@click.option("--classpath", required=True)
@click.option("--component-jvm-opts", required=True)
@click.option("--component-ram-map", required=True)
@click.option("--cpp-instance-binary", required=True)
@click.option("--health-manager-classpath", required=True)
@click.option("--health-manager-mode", required=True)
@click.option("--heron-internals-config-file", required=True)
@click.option("--heron-java-home", required=True)
@click.option("--heron-shell-binary", required=True)
@click.option("--instance-classpath", required=True)
@click.option("--instance-jvm-opts", required=True)
@click.option("--is-stateful", required=True)
@click.option("--master-port", required=True)
@click.option("--metrics-manager-classpath", required=True)
@click.option("--metrics-manager-port", required=True)
@click.option("--metrics-sinks-config-file", required=True)
@click.option("--metricscache-manager-classpath", required=True)
@click.option("--metricscache-manager-master-port", required=True)
@click.option("--metricscache-manager-mode", required=False)
@click.option("--metricscache-manager-stats-port", required=True)
@click.option("--override-config-file", required=True)
@click.option("--pkg-type", required=True)
@click.option("--python-instance-binary", required=True)
@click.option("--scheduler-classpath", required=True)
@click.option("--scheduler-port", required=True)
@click.option("--shard", type=int, required=True)
@click.option("--shell-port", required=True)
@click.option("--state-manager-config-file", required=True)
@click.option("--state-manager-connection", required=True)
@click.option("--state-manager-root", required=True)
@click.option("--stateful-config-file", required=True)
@click.option("--stmgr-binary", required=True)
@click.option("--tmaster-binary", required=True)
@click.option("--tmaster-controller-port", required=True)
@click.option("--tmaster-stats-port", required=True)
@click.option("--topology-binary-file", required=True)
@click.option("--topology-defn-file", required=True)
@click.option("--topology-id", required=True)
@click.option("--topology-name", required=True)
@click.option("--jvm-remote-debugger-ports",
help="comma separated list of ports to be used"
" by a remote debugger for JVM instances")
def cli(
**kwargs: dict,
) -> None:
"""
The Heron executor is a process that runs on a container and is responsible for
starting and monitoring the processes of the topology and it's support services.
"""
# Since Heron on YARN runs as headless users, pex compiled
# binaries should be exploded into the container working
# directory. In order to do this, we need to set the
# PEX_ROOT shell environment before forking the processes
shell_env = os.environ.copy()
shell_env["PEX_ROOT"] = os.path.join(os.path.abspath('.'), ".pex")

parsed_args = argparse.Namespace(**kwargs)
# Instantiate the executor, bind it to signal handlers and launch it
executor = HeronExecutor(parsed_args, shell_env)
executor.initialize()

start(executor)

def id_map(prefix, container_plans, add_zero_id=False):
ids = {}
Expand Down Expand Up @@ -304,8 +346,7 @@ def init_from_parsed_args(self, parsed_args):
parsed_args.jvm_remote_debugger_ports.split(",") \
if parsed_args.jvm_remote_debugger_ports else None

def __init__(self, args, shell_env):
parsed_args = self.parse_args(args)
def __init__(self, parsed_args, shell_env):
self.init_from_parsed_args(parsed_args)

self.shell_env = shell_env
Expand All @@ -330,69 +371,6 @@ def __init__(self, args, shell_env):
self.state_managers = []
self.jvm_version = None

@staticmethod
def parse_args(args):
"""Uses python argparse to collect positional args"""
Log.info("Input args: %r" % args)

parser = argparse.ArgumentParser()

parser.add_argument("--shard", type=int, required=True)
parser.add_argument("--topology-name", required=True)
parser.add_argument("--topology-id", required=True)
parser.add_argument("--topology-defn-file", required=True)
parser.add_argument("--state-manager-connection", required=True)
parser.add_argument("--state-manager-root", required=True)
parser.add_argument("--state-manager-config-file", required=True)
parser.add_argument("--tmaster-binary", required=True)
parser.add_argument("--stmgr-binary", required=True)
parser.add_argument("--metrics-manager-classpath", required=True)
parser.add_argument("--instance-jvm-opts", required=True)
parser.add_argument("--classpath", required=True)
parser.add_argument("--master-port", required=True)
parser.add_argument("--tmaster-controller-port", required=True)
parser.add_argument("--tmaster-stats-port", required=True)
parser.add_argument("--heron-internals-config-file", required=True)
parser.add_argument("--override-config-file", required=True)
parser.add_argument("--component-ram-map", required=True)
parser.add_argument("--component-jvm-opts", required=True)
parser.add_argument("--pkg-type", required=True)
parser.add_argument("--topology-binary-file", required=True)
parser.add_argument("--heron-java-home", required=True)
parser.add_argument("--shell-port", required=True)
parser.add_argument("--heron-shell-binary", required=True)
parser.add_argument("--metrics-manager-port", required=True)
parser.add_argument("--cluster", required=True)
parser.add_argument("--role", required=True)
parser.add_argument("--environment", required=True)
parser.add_argument("--instance-classpath", required=True)
parser.add_argument("--metrics-sinks-config-file", required=True)
parser.add_argument("--scheduler-classpath", required=True)
parser.add_argument("--scheduler-port", required=True)
parser.add_argument("--python-instance-binary", required=True)
parser.add_argument("--cpp-instance-binary", required=True)
parser.add_argument("--metricscache-manager-classpath", required=True)
parser.add_argument("--metricscache-manager-master-port", required=True)
parser.add_argument("--metricscache-manager-stats-port", required=True)
parser.add_argument("--metricscache-manager-mode", required=False)
parser.add_argument("--is-stateful", required=True)
parser.add_argument("--checkpoint-manager-classpath", required=True)
parser.add_argument("--checkpoint-manager-port", required=True)
parser.add_argument("--checkpoint-manager-ram", type=int, required=True)
parser.add_argument("--stateful-config-file", required=True)
parser.add_argument("--health-manager-mode", required=True)
parser.add_argument("--health-manager-classpath", required=True)
parser.add_argument("--jvm-remote-debugger-ports", required=False,
help="ports to be used by a remote debugger for JVM instances")

parsed_args, unknown_args = parser.parse_known_args(args[1:])

if unknown_args:
Log.warn('Unknown arguments found!!! They are: %s' % unknown_args)
Log.warn(parser.format_help())

return parsed_args

def run_command_or_exit(self, command):
if self._run_blocking_process(command, True) != 0:
Log.error("Failed to run command: %s. Exiting" % command)
Expand Down Expand Up @@ -982,7 +960,7 @@ def start_process_monitor(self):
# Now wait for any child to die
Log.info("Start process monitor")
while True:
if len(self.processes_to_monitor) > 0:
if self.processes_to_monitor:
(pid, status) = os.wait()

with self.process_lock:
Expand Down Expand Up @@ -1100,7 +1078,7 @@ def start_state_manager_watches(self):
with open(self.override_config_file, 'r') as stream:
overrides = yaml.load(stream)
if overrides is None:
overrides = dict()
overrides = {}
overrides["heron.statemgr.connection.string"] = self.state_manager_connection

statemgr_config = StateMgrConfig()
Expand Down Expand Up @@ -1201,20 +1179,5 @@ def start(executor):
# they are dead. This is the main loop of executor
executor.start_process_monitor()

def main():
"""Register exit handlers, initialize the executor and run it."""
# Since Heron on YARN runs as headless users, pex compiled
# binaries should be exploded into the container working
# directory. In order to do this, we need to set the
# PEX_ROOT shell environment before forking the processes
shell_env = os.environ.copy()
shell_env["PEX_ROOT"] = os.path.join(os.path.abspath('.'), ".pex")

# Instantiate the executor, bind it to signal handlers and launch it
executor = HeronExecutor(sys.argv, shell_env)
executor.initialize()

start(executor)

if __name__ == "__main__":
main()
cli()
10 changes: 5 additions & 5 deletions heron/executor/tests/python/heron_executor_unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@
# under the License.

'''heron executor unittest'''
import argparse
import os
import socket
import unittest
import json

from pprint import pprint

from heron.executor.src.python.heron_executor import ProcessInfo
from heron.executor.src.python.heron_executor import HeronExecutor
from heron.executor.src.python.heron_executor import cli, HeronExecutor, ProcessInfo
from heron.proto.packing_plan_pb2 import PackingPlan

# pylint: disable=unused-argument
Expand Down Expand Up @@ -299,9 +299,9 @@ def get_args(shard_id):
("--metricscache-manager-mode", "cluster")
]

args = ("%s=%s" % (arg[0], (str(arg[1]))) for arg in executor_args)
command = "./heron-executor %s" % (" ".join(args))
return command.split()
args = [f"{k}={v}" for k, v in executor_args]
ctx = cli.make_context('heron-executor', args)
return argparse.Namespace(**ctx.params)

def test_update_packing_plan(self):
self.executor_0.update_packing_plan(self.packing_plan_expected)
Expand Down

0 comments on commit 26e3f63

Please sign in to comment.