Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
make python code compatible with both python2 and python3 (#3412)
Browse files Browse the repository at this point in the history
* make code compatible with both python2 and 3
  • Loading branch information
nlu90 authored Dec 13, 2019
1 parent 27179ac commit 250c148
Show file tree
Hide file tree
Showing 84 changed files with 331 additions and 307 deletions.
2 changes: 1 addition & 1 deletion examples/src/python/bolt/stateful_count_bolt.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def init_state(self, stateful_state):
self.logger.info("Checkpoint Snapshot recovered : %s" % str(self.recovered_state))

def pre_save(self, checkpoint_id):
for (k, v) in self.counter.items():
for (k, v) in list(self.counter.items()):
self.recovered_state.put(k, v)
self.logger.info("Checkpoint Snapshot %s : %s" % (checkpoint_id, str(self.recovered_state)))

Expand Down
2 changes: 1 addition & 1 deletion examples/src/python/spout/stateful_word_spout.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def init_state(self, stateful_state):

def pre_save(self, checkpoint_id):
# Purely for debugging purposes
for (k, v) in self.counter.items():
for (k, v) in list(self.counter.items()):
self.recovered_state.put(k, v)
self.logger.info("Checkpoint Snapshot %s : %s" % (checkpoint_id, str(self.recovered_state)))

Expand Down
39 changes: 19 additions & 20 deletions heron/executor/src/python/heron_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,10 @@ def init_from_parsed_args(self, parsed_args):
self.tmaster_stats_port = parsed_args.tmaster_stats_port
self.heron_internals_config_file = parsed_args.heron_internals_config_file
self.override_config_file = parsed_args.override_config_file
self.component_ram_map =\
map(lambda x: {x.split(':')[0]:
int(x.split(':')[1])}, parsed_args.component_ram_map.split(','))
self.component_ram_map =\
functools.reduce(lambda x, y: dict(x.items() + y.items()), self.component_ram_map)
self.component_ram_map = [{x.split(':')[0]:int(x.split(':')[1])}
for x in parsed_args.component_ram_map.split(',')]
self.component_ram_map = functools.reduce(lambda x, y: dict(list(x.items()) + list(y.items())),
self.component_ram_map)

# component_jvm_opts_in_base64 itself is a base64-encoding-json-map, which is appended with
# " at the start and end. It also escapes "=" to "&equals" due to aurora limitation
Expand All @@ -256,7 +255,7 @@ def init_from_parsed_args(self, parsed_args):
base64.b64decode(parsed_args.component_jvm_opts.
lstrip('"').rstrip('"').replace('(61)', '=').replace('=', '='))
if component_jvm_opts_in_json != "":
for (k, v) in json.loads(component_jvm_opts_in_json).items():
for (k, v) in list(json.loads(component_jvm_opts_in_json).items()):
# In json, the component name and JVM options are still in base64 encoding
self.component_jvm_opts[base64.b64decode(k)] = base64.b64decode(v)

Expand Down Expand Up @@ -366,7 +365,7 @@ def parse_args(args):
parser.add_argument("--is-stateful", required=True)
parser.add_argument("--checkpoint-manager-classpath", required=True)
parser.add_argument("--checkpoint-manager-port", required=True)
parser.add_argument("--checkpoint-manager-ram", type=long, required=True)
parser.add_argument("--checkpoint-manager-ram", type=int, required=True)
parser.add_argument("--stateful-config-file", required=True)
parser.add_argument("--health-manager-mode", required=True)
parser.add_argument("--health-manager-classpath", required=True)
Expand Down Expand Up @@ -793,7 +792,7 @@ def _get_streaming_processes(self):
'--zkhostportlist=%s' % self.state_manager_connection,
'--zkroot=%s' % self.state_manager_root,
'--stmgr_id=%s' % self.stmgr_ids[self.shard],
'--instance_ids=%s' % ','.join(map(lambda x: x[0], instance_info)),
'--instance_ids=%s' % ','.join([x[0] for x in instance_info]),
'--myhost=%s' % self.master_host,
'--data_port=%s' % str(self.master_port),
'--local_data_port=%s' % str(self.tmaster_controller_port),
Expand Down Expand Up @@ -958,8 +957,8 @@ def _run_blocking_process(self, cmd, is_shell=False):
def _kill_processes(self, commands):
# remove the command from processes_to_monitor and kill the process
with self.process_lock:
for command_name, command in commands.items():
for process_info in self.processes_to_monitor.values():
for command_name, command in list(commands.items()):
for process_info in list(self.processes_to_monitor.values()):
if process_info.name == command_name:
del self.processes_to_monitor[process_info.pid]
Log.info("Killing %s process with pid %d: %s" %
Expand All @@ -978,7 +977,7 @@ def _start_processes(self, commands):
Log.info("Start processes")
processes_to_monitor = {}
# First start all the processes
for (name, command) in commands.items():
for (name, command) in list(commands.items()):
p = self._run_process(name, command)
processes_to_monitor[p.pid] = ProcessInfo(p, name, command)

Expand All @@ -999,7 +998,7 @@ def start_process_monitor(self):
(pid, status) = os.wait()

with self.process_lock:
if pid in self.processes_to_monitor.keys():
if pid in list(self.processes_to_monitor.keys()):
old_process_info = self.processes_to_monitor[pid]
name = old_process_info.name
command = old_process_info.command
Expand Down Expand Up @@ -1061,19 +1060,19 @@ def get_command_changes(self, current_commands, updated_commands):

# if the current command has a matching command in the updated commands we keep it
# otherwise we kill it
for current_name, current_command in current_commands.items():
for current_name, current_command in list(current_commands.items()):
# We don't restart tmaster since it watches the packing plan and updates itself. The stream
# manager is restarted just to reset state, but we could update it to do so without a restart
if current_name in updated_commands.keys() and \
if current_name in list(updated_commands.keys()) and \
current_command == updated_commands[current_name] and \
not current_name.startswith('stmgr-'):
commands_to_keep[current_name] = current_command
else:
commands_to_kill[current_name] = current_command

# updated commands not in the keep list need to be started
for updated_name, updated_command in updated_commands.items():
if updated_name not in commands_to_keep.keys():
for updated_name, updated_command in list(updated_commands.items()):
if updated_name not in list(commands_to_keep.keys()):
commands_to_start[updated_name] = updated_command

return commands_to_kill, commands_to_keep, commands_to_start
Expand All @@ -1083,8 +1082,8 @@ def launch(self):
Then starts new ones required and kills old ones no longer required.
'''
with self.process_lock:
current_commands = dict(map((lambda process: (process.name, process.command)),
self.processes_to_monitor.values()))
current_commands = dict(list(map((lambda process: (process.name, process.command)),
list(self.processes_to_monitor.values()))))
updated_commands = self.get_commands_to_run()

# get the commands to kill, keep and start
Expand Down Expand Up @@ -1176,7 +1175,7 @@ def cleanup():
Log.info('Executor terminated; exiting all process in executor.')

# Kill child processes first and wait for log collection to finish
for pid in executor.processes_to_monitor.keys():
for pid in list(executor.processes_to_monitor.keys()):
os.kill(pid, signal.SIGTERM)
time.sleep(5)

Expand All @@ -1192,7 +1191,7 @@ def cleanup():
sid = os.getsid(pid)

# POSIX prohibits the change of the process group ID of a session leader
if pid <> sid:
if pid != sid:
Log.info('Set up process group; executor becomes leader')
os.setpgrp() # create new process group, become its leader

Expand Down
44 changes: 20 additions & 24 deletions heron/executor/tests/python/heron_executor_unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def get_expected_shell_command(container_id):

def build_packing_plan(self, instance_distribution):
packing_plan = PackingPlan()
for container_id in instance_distribution.keys():
for container_id in list(instance_distribution.keys()):
container_plan = packing_plan.container_plans.add()
container_plan.id = int(container_id)
for (component_name, global_task_id, component_index) in instance_distribution[container_id]:
Expand Down Expand Up @@ -293,11 +293,11 @@ def get_args(shard_id):
def test_update_packing_plan(self):
self.executor_0.update_packing_plan(self.packing_plan_expected)

self.assertEquals(self.packing_plan_expected, self.executor_0.packing_plan)
self.assertEquals({1: "stmgr-1", 7: "stmgr-7"}, self.executor_0.stmgr_ids)
self.assertEquals(
self.assertEqual(self.packing_plan_expected, self.executor_0.packing_plan)
self.assertEqual({1: "stmgr-1", 7: "stmgr-7"}, self.executor_0.stmgr_ids)
self.assertEqual(
{0: "metricsmgr-0", 1: "metricsmgr-1", 7: "metricsmgr-7"}, self.executor_0.metricsmgr_ids)
self.assertEquals(
self.assertEqual(
{0: "heron-shell-0", 1: "heron-shell-1", 7: "heron-shell-7"}, self.executor_0.heron_shell_ids)

def test_launch_container_0(self):
Expand All @@ -315,17 +315,13 @@ def do_test_launch(self, executor, expected_processes):
monitored_processes = executor.processes_to_monitor

# convert to (pid, name, command)
found_processes = list(map(lambda process_info:
(process_info.pid, process_info.name, process_info.command_str),
executor.processes))
found_monitored = list(map(lambda pinfo:
(pinfo[0], pinfo[1].name, pinfo[1].command_str),
monitored_processes.items()))
found_processes = list([(process_info.pid, process_info.name, process_info.command_str) for process_info in executor.processes])
found_monitored = list([(pinfo[0], pinfo[1].name, pinfo[1].command_str) for pinfo in list(monitored_processes.items())])
found_processes.sort(key=lambda tuple: tuple[0])
found_monitored.sort(key=lambda tuple: tuple[0])
print("do_test_commands - found_processes: %s found_monitored: %s" \
% (found_processes, found_monitored))
self.assertEquals(found_processes, found_monitored)
self.assertEqual(found_processes, found_monitored)

print("do_test_commands - expected_processes: %s monitored_processes: %s" \
% (expected_processes, monitored_processes))
Expand All @@ -337,18 +333,18 @@ def test_change_instance_dist_container_1(self):
current_commands = self.executor_1.get_commands_to_run()

temp_dict = dict(
map((lambda process_info: (process_info.name, process_info.command.split(' '))),
self.expected_processes_container_1))
list(map((lambda process_info: (process_info.name, process_info.command.split(' '))),
self.expected_processes_container_1)))

current_json = json.dumps(current_commands, sort_keys=True, cls=CommandEncoder).split(' ')
temp_json = json.dumps(temp_dict, sort_keys=True).split(' ')

print ("current_json: %s" % current_json)
print ("temp_json: %s" % temp_json)
print("current_json: %s" % current_json)
print("temp_json: %s" % temp_json)

# better test error report
for (s1, s2) in zip(current_json, temp_json):
self.assertEquals(s1, s2)
self.assertEqual(s1, s2)

# update instance distribution
new_packing_plan = self.build_packing_plan(
Expand All @@ -360,20 +356,20 @@ def test_change_instance_dist_container_1(self):
commands_to_kill, commands_to_keep, commands_to_start = \
self.executor_1.get_command_changes(current_commands, updated_commands)

self.assertEquals(['container_1_exclaim1_2', 'stmgr-1'], sorted(commands_to_kill.keys()))
self.assertEquals(
self.assertEqual(['container_1_exclaim1_2', 'stmgr-1'], sorted(commands_to_kill.keys()))
self.assertEqual(
['container_1_exclaim1_1', 'container_1_word_3', 'heron-shell-1', 'metricsmgr-1'],
sorted(commands_to_keep.keys()))
self.assertEquals(['container_1_word_2', 'stmgr-1'], sorted(commands_to_start.keys()))
self.assertEqual(['container_1_word_2', 'stmgr-1'], sorted(commands_to_start.keys()))

def assert_processes(self, expected_processes, found_processes):
self.assertEquals(len(expected_processes), len(found_processes))
self.assertEqual(len(expected_processes), len(found_processes))
for expected_process in expected_processes:
self.assert_process(expected_process, found_processes)

def assert_process(self, expected_process, found_processes):
pid = expected_process.pid
self.assertTrue(found_processes[pid])
self.assertEquals(expected_process.name, found_processes[pid].name)
self.assertEquals(expected_process.command, found_processes[pid].command_str)
self.assertEquals(1, found_processes[pid].attempts)
self.assertEqual(expected_process.name, found_processes[pid].name)
self.assertEqual(expected_process.command, found_processes[pid].command_str)
self.assertEqual(1, found_processes[pid].attempts)
1 change: 1 addition & 0 deletions heron/instance/src/python/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pex_binary(
deps = [":instance-py"],
reqs = [
'colorlog==2.6.1',
'future==0.18.2',
'PyYAML==3.13'
]
)
4 changes: 2 additions & 2 deletions heron/instance/src/python/basics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
'''module for basic python heron component'''
__all__ = ['bolt_instance.py', 'spout_instance.py', 'base_instance']

from bolt_instance import BoltInstance
from spout_instance import SpoutInstance
from .bolt_instance import BoltInstance
from .spout_instance import SpoutInstance
4 changes: 2 additions & 2 deletions heron/instance/src/python/basics/bolt_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
'''bolt_instance.py: module for base bolt for python topology'''

import time
import Queue
import queue

import heronpy.api.api_constants as api_constants
from heronpy.api.state.stateful_component import StatefulComponent
Expand Down Expand Up @@ -181,7 +181,7 @@ def _read_tuples_and_execute(self):
while not self.in_stream.is_empty():
try:
tuples = self.in_stream.poll()
except Queue.Empty:
except queue.Empty:
break

if isinstance(tuples, tuple_pb2.HeronTupleSet):
Expand Down
4 changes: 2 additions & 2 deletions heron/instance/src/python/basics/spout_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

'''spout_instance.py: module for base spout for python topology'''

import Queue
import queue
import time
import collections

Expand Down Expand Up @@ -185,7 +185,7 @@ def _read_tuples(self):
while not self.in_stream.is_empty():
try:
tuples = self.in_stream.poll()
except Queue.Empty:
except queue.Empty:
break

if isinstance(tuples, tuple_pb2.HeronTupleSet):
Expand Down
4 changes: 2 additions & 2 deletions heron/instance/src/python/network/gateway_looper.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import time
import select

from event_looper import EventLooper
from .event_looper import EventLooper
from heron.common.src.python.utils.log import Log

class GatewayLooper(EventLooper):
Expand Down Expand Up @@ -83,7 +83,7 @@ def poll(self, timeout=0.0):
error_lst = []

if self.sock_map is not None:
for fd, obj in self.sock_map.items():
for fd, obj in list(self.sock_map.items()):
is_r = obj.readable()
is_w = obj.writable()
if is_r:
Expand Down
4 changes: 2 additions & 2 deletions heron/instance/src/python/utils/metrics/metrics_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(self, metrics):

def register_metrics(self, metrics_collector, interval):
"""Registers its metrics to a given metrics collector with a given interval"""
for field, metrics in self.metrics.items():
for field, metrics in list(self.metrics.items()):
metrics_collector.register_metric(field, metrics, interval)

def update_count(self, name, incr_by=1, key=None):
Expand Down Expand Up @@ -379,7 +379,7 @@ def _gather_one_metric(self, name, message):
if metric_value is None:
return
elif isinstance(metric_value, dict):
for key, value in metric_value.items():
for key, value in list(metric_value.items()):
if key is not None and value is not None:
self._add_data_to_message(message, name + "/" + str(key), value)
self._add_data_to_message(message, "%s/%s" % (name, str(key)), value)
Expand Down
12 changes: 6 additions & 6 deletions heron/instance/src/python/utils/misc/communicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

'''communicator.py: module responsible for communication between Python heron modules'''
import sys
import Queue
from queue import Queue, Full, Empty

from heron.common.src.python.utils.log import Log

Expand All @@ -40,7 +40,7 @@ def __init__(self, producer_cb=None, consumer_cb=None):
"""
self._producer_callback = producer_cb
self._consumer_callback = consumer_cb
self._buffer = Queue.Queue()
self._buffer = Queue()
self.capacity = sys.maxsize

def register_capacity(self, capacity):
Expand Down Expand Up @@ -72,9 +72,9 @@ def poll(self):
if self._producer_callback is not None:
self._producer_callback()
return ret
except Queue.Empty:
except Empty:
Log.debug("%s: Empty in poll()" % str(self))
raise Queue.Empty
raise Empty

def offer(self, item):
"""Offer to the buffer
Expand All @@ -87,9 +87,9 @@ def offer(self, item):
if self._consumer_callback is not None:
self._consumer_callback()
return True
except Queue.Full:
except Full:
Log.debug("%s: Full in offer()" % str(self))
raise Queue.Full
raise Full

def clear(self):
"""Clear the buffer"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def add(self, stream_id, task_ids, grouping, source_comp_name):

def prepare(self, context):
"""Prepares the custom grouping for this component"""
for stream_id, targets in self.targets.items():
for stream_id, targets in list(self.targets.items()):
for target in targets:
target.prepare(context, stream_id)

Expand Down
2 changes: 1 addition & 1 deletion heron/instance/src/python/utils/system_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

def merge(default, override):
if isinstance(default, dict) and isinstance(override, dict):
for k, v in override.items():
for k, v in list(override.items()):
Log.info("Add overriding configuration '%s'", k)
if k not in default:
default[k] = v
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def get_this_sources(self):
def get_component_tasks(self, component_id):
"""Returns the task ids allocated for the given component id"""
ret = []
for task_id, comp_id in self.task_to_component_map.items():
for task_id, comp_id in list(self.task_to_component_map.items()):
if comp_id == component_id:
ret.append(task_id)
return ret
Expand Down
Loading

0 comments on commit 250c148

Please sign in to comment.