Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Python topology submission/execution support #1238

Merged
merged 4 commits into from
Aug 10, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions heron/cli/src/python/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,30 @@ def heron_tar(class_name, topology_tar, arguments, tmpdir_root, java_defines):
# Now execute the class
heron_class(class_name, lib_jars, extra_jars, arguments, java_defines)

def heron_pex(topology_pex, topology_class_name, tmp_dir):
def heron_pex(topology_pex, topology_class_name):
Log.debug("Importing %s from %s" % (topology_class_name, topology_pex))
try:
pex_loader.load_pex(topology_pex)
topology_class = pex_loader.import_and_get_class(topology_pex, topology_class_name)
topology_class.write(tmp_dir)
except Exception:
traceback.print_exc()
err_str = "Topology pex failed to be loaded. Bailing out..."
raise RuntimeError(err_str)
if topology_class_name == '-':
# loading topology by running its main method (if __name__ == "__main__")
heron_env = os.environ.copy()
heron_env['HERON_OPTIONS'] = opts.get_heron_config()

args = [topology_pex]
Log.debug('$> %s' % ' '.join(args))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

joining a list of one element?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, better rename it to cmd. args does not make sense to me.

Log.debug('Heron options: %s' % str(heron_env['HERON_OPTIONS']))

# invoke the command with subprocess and print error message, if any
status = subprocess.call(args, env=heron_env)
if status != 0:
err_str = "User main failed with status %d. Bailing out..." % status
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary because main execution failed. Not really being a troll, but a user can mistakenly submit a PEX even without if __name__ = "__main__". It's also possible that this PEX has some module importing problem. In this case, user still will see error message about main...

Also, the if topology_class_name == '-' looks good to me for now, but I feel somehow this hack could be shaky under some circumstances.

raise RuntimeError(err_str)
else:
try:
# loading topology from Topology's subclass (no main method)
os.environ["HERON_OPTIONS"] = opts.get_heron_config()
pex_loader.load_pex(topology_pex)
topology_class = pex_loader.import_and_get_class(topology_pex, topology_class_name)
topology_class.write()
except Exception:
Log.debug(traceback.print_exc())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be traceback.format_exc()). Fix all the same errors if you think there are more.

err_str = "Topology pex failed to be loaded dynamically. Bailing out..."
raise RuntimeError(err_str)
4 changes: 2 additions & 2 deletions heron/cli/src/python/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,9 @@ def submit_pex(cl_args, unknown_args, tmp_dir):
topology_file = cl_args['topology-file-name']
topology_class_name = cl_args['topology-class-name']
try:
execute.heron_pex(topology_file, topology_class_name, tmp_dir)
execute.heron_pex(topology_file, topology_class_name)
except Exception as ex:
Log.error("Unable to execute topology main class: " + ex.message)
Log.error("Error when loading a topology: " + ex.message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about Log.error("Error when loading a topology: %s", str(ex))

return False

try:
Expand Down