Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Change executor to not assume contiguous container id set (#1387)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bill Graham authored Sep 14, 2016
1 parent 932b694 commit ab177cb
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 33 deletions.
48 changes: 25 additions & 23 deletions heron/executor/src/python/heron_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,26 @@ def print_usage():
" <cluster> <role> <environ> <instance_classpath> <metrics_sinks_config_file>"
" <scheduler_classpath> <scheduler_port> <python_instance_binary>")

def id_list(prefix, start, count):
ids = []
for i in range(start, count + 1):
ids.append(prefix + str(i))
def id_map(prefix, container_plans, add_zero_id=False):
ids = {}
if add_zero_id:
ids[0] = "%s-0" % prefix

for container_plan in container_plans:
ids[container_plan.id] = "%s-%d" % (prefix, container_plan.id)
return ids

def stmgr_list(count):
return id_list("stmgr-", 1, count)
def stmgr_map(container_plans):
return id_map("stmgr", container_plans)

def metricsmgr_list(count):
return id_list("metricsmgr-", 0, count)
def metricsmgr_map(container_plans):
return id_map("metricsmgr", container_plans, True)

def heron_shell_list(count):
return id_list("heron-shell-", 0, count)
def heron_shell_map(container_plans):
return id_map("heron-shell", container_plans, True)

def get_heron_executor_process_name(shard_id):
return 'heron-executor-' + str(shard_id)
return 'heron-executor-%d' % shard_id

def get_process_pid_filename(process_name):
return '%s.pid' % process_name
Expand Down Expand Up @@ -178,9 +181,9 @@ def __init__(self, args, shell_env):

# these get set when we call update_packing_plan
self.packing_plan = None
self.stmgr_ids = []
self.metricsmgr_ids = []
self.heron_shell_ids = []
self.stmgr_ids = {}
self.metricsmgr_ids = {}
self.heron_shell_ids = {}

# processes_to_monitor gets set once processes are launched. we need to synchronize rw to this
# dict since is used by multiple threads
Expand Down Expand Up @@ -212,10 +215,9 @@ def initialize(self):

def update_packing_plan(self, new_packing_plan):
self.packing_plan = new_packing_plan
num_containers = len(self.packing_plan.container_plans)
self.stmgr_ids = stmgr_list(num_containers)
self.metricsmgr_ids = metricsmgr_list(num_containers)
self.heron_shell_ids = heron_shell_list(num_containers)
self.stmgr_ids = stmgr_map(self.packing_plan.container_plans)
self.metricsmgr_ids = metricsmgr_map(self.packing_plan.container_plans)
self.heron_shell_ids = heron_shell_map(self.packing_plan.container_plans)

# pylint: disable=no-self-use
def _load_logging_dir(self, heron_internals_config_file):
Expand Down Expand Up @@ -269,7 +271,7 @@ def _get_tmaster_processes(self):
self.topology_id,
self.zknode,
self.zkroot,
','.join(self.stmgr_ids),
','.join(self.stmgr_ids.values()),
self.heron_internals_config_file,
self.metrics_sinks_config_file,
self.metricsmgr_port]
Expand Down Expand Up @@ -333,7 +335,7 @@ def _get_java_instance_cmd(self, instance_info):
component_name,
str(global_task_id),
str(component_index),
self.stmgr_ids[self.shard - 1],
self.stmgr_ids[self.shard],
self.master_port,
self.metricsmgr_port,
self.heron_internals_config_file])
Expand All @@ -354,7 +356,7 @@ def _get_python_instance_cmd(self, instance_info):
component_name,
str(global_task_id),
str(component_index),
self.stmgr_ids[self.shard - 1],
self.stmgr_ids[self.shard],
self.master_port,
self.metricsmgr_port,
self.heron_internals_config_file,
Expand Down Expand Up @@ -388,13 +390,13 @@ def _get_streaming_processes(self):
self.topology_defn_file,
self.zknode,
self.zkroot,
self.stmgr_ids[self.shard - 1],
self.stmgr_ids[self.shard],
','.join(map(lambda x: x[0], instance_info)),
self.master_port,
self.metricsmgr_port,
self.shell_port,
self.heron_internals_config_file]
retval[self.stmgr_ids[self.shard - 1]] = stmgr_cmd
retval[self.stmgr_ids[self.shard]] = stmgr_cmd

# metricsmgr_metrics_sink_config_file = 'metrics_sinks.yaml'

Expand Down
43 changes: 33 additions & 10 deletions heron/executor/tests/python/heron_executor_unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def get_expected_metricsmgr_command(container_id):
"metricsmgr_port topname topid %s " \
"metrics_sinks_config_file" % (container_id, INTERNAL_CONF_PATH)

def get_expected_instance_command(component_name, instance_id, container_id=1):
def get_expected_instance_command(component_name, instance_id, container_id):
instance_name = "container_%d_%s_%d" % (container_id, component_name, instance_id)
return "heron_java_home/bin/java -Xmx320M -Xms320M -Xmn160M -XX:MaxPermSize=128M " \
"-XX:PermSize=128M -XX:ReservedCodeCacheSize=64M -XX:+CMSScavengeBeforeRemark " \
Expand All @@ -113,7 +113,7 @@ def get_expected_instance_command(component_name, instance_id, container_id=1):
ProcessInfo(MockPOpen(), 'heron-tmaster',
'tmaster_binary master_port '
'tmaster_controller_port tmaster_stats_port '
'topname topid zknode zkroot stmgr-1 '
'topname topid zknode zkroot stmgr-1,stmgr-7 '
'%s metrics_sinks_config_file metricsmgr_port' % INTERNAL_CONF_PATH),
]

Expand All @@ -123,22 +123,38 @@ def get_expected_instance_command(component_name, instance_id, container_id=1):
'stmgr_binary topname topid topdefnfile zknode zkroot stmgr-1 '
'container_1_word_3,container_1_exclaim1_2,container_1_exclaim1_1 master_port '
'metricsmgr_port shell-port %s' % INTERNAL_CONF_PATH),
ProcessInfo(MockPOpen(), 'container_1_word_3', get_expected_instance_command('word', 3)),
ProcessInfo(MockPOpen(), 'container_1_word_3', get_expected_instance_command('word', 3, 1)),
ProcessInfo(MockPOpen(), 'container_1_exclaim1_1',
get_expected_instance_command('exclaim1', 1)),
get_expected_instance_command('exclaim1', 1, 1)),
ProcessInfo(MockPOpen(), 'container_1_exclaim1_2',
get_expected_instance_command('exclaim1', 2)),
get_expected_instance_command('exclaim1', 2, 1)),
ProcessInfo(MockPOpen(), 'heron-shell-1', shell_command_expected),
ProcessInfo(MockPOpen(), 'metricsmgr-1', get_expected_metricsmgr_command(1)),
]

MockPOpen.set_next_pid(37)
expected_processes_container_7 = [
ProcessInfo(MockPOpen(), 'container_7_word_11', get_expected_instance_command('word', 11, 7)),
ProcessInfo(MockPOpen(), 'container_7_exclaim1_210',
get_expected_instance_command('exclaim1', 210, 7)),
ProcessInfo(MockPOpen(), 'stmgr-7',
'stmgr_binary topname topid topdefnfile zknode zkroot stmgr-7 '
'container_7_word_11,container_7_exclaim1_210 master_port '
'metricsmgr_port shell-port %s' % INTERNAL_CONF_PATH),
ProcessInfo(MockPOpen(), 'metricsmgr-7', get_expected_metricsmgr_command(7)),
ProcessInfo(MockPOpen(), 'heron-shell-7', shell_command_expected),
]

def setUp(self):
MockPOpen.set_next_pid(37)
self.maxDiff = None
self.executor_0 = MockExecutor(self.get_args(0))
self.executor_1 = MockExecutor(self.get_args(1))
self.packing_plan_expected = self.build_packing_plan(
{1:[('word', '3', '0'), ('exclaim1', '2', '0'), ('exclaim1', '1', '0')]})
self.executor_7 = MockExecutor(self.get_args(7))
self.packing_plan_expected = self.build_packing_plan({
1:[('word', '3', '0'), ('exclaim1', '2', '0'), ('exclaim1', '1', '0')],
7:[('word', '11', '0'), ('exclaim1', '210', '0')],
})

# ./heron-executor <shardid> <topname> <topid> <topdefnfile>
# <zknode> <zkroot> <tmaster_binary> <stmgr_binary>
Expand All @@ -165,16 +181,21 @@ def test_update_packing_plan(self):
self.executor_0.update_packing_plan(self.packing_plan_expected)

self.assertEquals(self.packing_plan_expected, self.executor_0.packing_plan)
self.assertEquals(["stmgr-1"], self.executor_0.stmgr_ids)
self.assertEquals(["metricsmgr-0", "metricsmgr-1"], self.executor_0.metricsmgr_ids)
self.assertEquals(["heron-shell-0", "heron-shell-1"], self.executor_0.heron_shell_ids)
self.assertEquals({1: "stmgr-1", 7: "stmgr-7"}, self.executor_0.stmgr_ids)
self.assertEquals(
{0: "metricsmgr-0", 1: "metricsmgr-1", 7: "metricsmgr-7"}, self.executor_0.metricsmgr_ids)
self.assertEquals(
{0: "heron-shell-0", 1: "heron-shell-1", 7: "heron-shell-7"}, self.executor_0.heron_shell_ids)

def test_launch_container_0(self):
self.do_test_launch(self.executor_0, self.expected_processes_container_0)

def test_launch_container_1(self):
self.do_test_launch(self.executor_1, self.expected_processes_container_1)

def test_launch_container_7(self):
self.do_test_launch(self.executor_7, self.expected_processes_container_7)

def do_test_launch(self, executor, expected_processes):
executor.update_packing_plan(self.packing_plan_expected)
executor.launch()
Expand All @@ -187,6 +208,8 @@ def do_test_launch(self, executor, expected_processes):
found_monitored = map(lambda (pid, process_info):
(pid, process_info.name, process_info.command_str),
monitored_processes.items())
found_processes.sort(key=lambda tuple: tuple[0])
found_monitored.sort(key=lambda tuple: tuple[0])
print "do_test_commands - found_processes: %s found_monitored: %s" \
% (found_processes, found_monitored)
self.assertEquals(found_processes, found_monitored)
Expand Down

0 comments on commit ab177cb

Please sign in to comment.