Skip to content

Commit

Permalink
Fix #109
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-blanchard committed Apr 1, 2015
1 parent ab4c9df commit e297159
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions streamparse/ext/invoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,13 @@ def kill_topology(topology_name=None, env_name="prod", wait=None):
@task
def jar_for_deploy():
print("Cleaning from prior builds...")
sys.stdout.flush()
res = run("lein clean", hide="stdout")
if not res.ok:
raise Exception("Unable to run 'lein clean'!\nSTDOUT:\n{}"
"\nSTDERR:\n{}".format(res.stdout, res.stderr))
print("Creating topology uberjar...")
sys.stdout.flush()
res = run("lein uberjar", hide="stdout")
if not res.ok:
raise Exception("Unable to run 'lein uberjar'!\nSTDOUT:\n{}"
Expand All @@ -157,6 +159,7 @@ def jar_for_deploy():
if l.endswith("standalone.jar")]
uberjar = lines[0]
print("Uberjar created: {}".format(uberjar))
sys.stdout.flush()
return uberjar


Expand All @@ -168,6 +171,7 @@ def run_local_topology(name=None, time=5, workers=2, ackers=2, options=None,

name, topology_file = get_topology_definition(name)
print("Running {} topology...".format(name))
sys.stdout.flush()
cmd = ["lein",
"run -m streamparse.commands.run/-main",
topology_file]
Expand All @@ -182,6 +186,7 @@ def run_local_topology(name=None, time=5, workers=2, ackers=2, options=None,
os.makedirs("logs")
log_path = os.path.join(os.getcwd(), "logs")
print("Routing Python logging to {}.".format(log_path))
sys.stdout.flush()
cmd.append("--option 'streamparse.log.path=\"{}\"'"
.format(log_path))
cmd.append("--option 'streamparse.log.level=\"debug\"'")
Expand All @@ -193,6 +198,7 @@ def run_local_topology(name=None, time=5, workers=2, ackers=2, options=None,
full_cmd = " ".join(cmd)
print("Running lein command to run local cluster:")
print(full_cmd)
sys.stdout.flush()
run(full_cmd)


Expand Down Expand Up @@ -224,11 +230,12 @@ def submit_topology(name=None, env_name="prod", workers=2, ackers=2,
topology_jar = jar_for_deploy()

print('Deploying "{}" topology...'.format(name))
sys.stdout.flush()
# Use ssh tunnel with Nimbus or use host/port for Thrift connection
if is_ssh_for_nimbus(env_config):
with ssh_tunnel(env_config["user"], host, 6627, port):
print("ssh tunnel to Nimbus {}:{} established."
.format(host, port))
print("ssh tunnel to Nimbus {}:{} established.".format(host, port))
sys.stdout.flush()
_kill_existing_topology(name, force, wait)
_submit_topology(name, topology_file, topology_jar,
env_config, workers, ackers, options, debug)
Expand All @@ -245,13 +252,15 @@ def submit_topology(name=None, env_name="prod", workers=2, ackers=2,
def _kill_existing_topology(topology_name, force, wait, host=None, port=None):
if force and not is_safe_to_submit(topology_name, host=host, port=port):
print("Killing current \"{}\" topology.".format(topology_name))

sys.stdout.flush()
_kill_topology(topology_name, run_kwargs={"hide": "both"},
wait=wait, host=host, port=port)
while not is_safe_to_submit(topology_name, host=host, port=port):
print("Waiting for topology {} to quit...".format(topology_name))
sys.stdout.flush()
time.sleep(0.5)
print("Killed.")
sys.stdout.flush()


def _submit_topology(topology_name, topology_file, topology_jar,
Expand Down Expand Up @@ -288,6 +297,7 @@ def _submit_topology(topology_name, topology_file, topology_jar,
log_config = env_config.get("log", {})
log_path = log_config.get("path") or env_config.get("log_path")
print("Routing Python logging to {}.".format(log_path))
sys.stdout.flush()
if log_path:
cmd.append("--option 'streamparse.log.path=\"{}\"'"
.format(log_path))
Expand All @@ -314,6 +324,7 @@ def _submit_topology(topology_name, topology_file, topology_jar,
full_cmd = " ".join(cmd)
print("Running lein command to submit topology to nimbus:")
print(full_cmd)
sys.stdout.flush()
run(full_cmd)


Expand Down Expand Up @@ -352,6 +363,7 @@ def tail_topology(topology_name=None, env_name=None, pattern=None):
def visualize_topology(name=None, flip=False):
name, topology_file = get_topology_definition(name)
print("Visualizing {} topology...".format(name))
sys.stdout.flush()
cmd = ["lein",
"run -m streamparse.commands.visualize/-main",
topology_file]
Expand All @@ -360,4 +372,5 @@ def visualize_topology(name=None, flip=False):
full_cmd = " ".join(cmd)
print("Running lein command to visualize topology:")
print(full_cmd)
sys.stdout.flush()
run(full_cmd)

0 comments on commit e297159

Please sign in to comment.