Skip to content

Commit

Permalink
Add ability to override topology.name on submit. Fixes #207. (#286)
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-blanchard authored Jul 27, 2016
1 parent 7b81f8c commit c1ce4de
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 39 deletions.
10 changes: 10 additions & 0 deletions streamparse/cli/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ def add_options(parser):
' options.')


def add_override_name(parser):
""" Add --override_name option to parser """
parser.add_argument('-N', '--override_name',
help='For operations such as creating virtualenvs and '
'killing/submitting topologies, use this value '
'instead of NAME. This is useful if you want to '
'submit the same topology twice without having to '
'duplicate the topology file.')


def add_par(parser):
""" Add --par option to parser """
parser.add_argument('-p', '--par',
Expand Down
11 changes: 7 additions & 4 deletions streamparse/cli/kill.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,23 @@
from ..thrift import storm_thrift
from ..util import (get_topology_definition, get_env_config, get_nimbus_client,
ssh_tunnel)
from .common import add_environment, add_name, add_wait
from .common import add_environment, add_name, add_override_name, add_wait


def _kill_topology(topology_name, nimbus_client, wait=None):
kill_opts = storm_thrift.KillOptions(wait_secs=wait)
nimbus_client.killTopologyWithOpts(name=topology_name, options=kill_opts)


def kill_topology(topology_name=None, env_name=None, wait=None):
def kill_topology(topology_name=None, env_name=None, wait=None,
override_name=None):
topology_name = get_topology_definition(topology_name)[0]
env_name, env_config = get_env_config(env_name)
# Use ssh tunnel with Nimbus if use_ssh_for_nimbus is unspecified or True
with ssh_tunnel(env_config) as (host, port):
nimbus_client = get_nimbus_client(env_config, host=host, port=port)
return _kill_topology(topology_name, nimbus_client, wait=wait)
return _kill_topology(override_name or topology_name, nimbus_client,
wait=wait)


def subparser_hook(subparsers):
Expand All @@ -32,10 +34,11 @@ def subparser_hook(subparsers):
subparser.set_defaults(func=main)
add_environment(subparser)
add_name(subparser)
add_override_name(subparser)
add_wait(subparser)


def main(args):
""" Kill the specified Storm topology """
kill_topology(topology_name=args.name, env_name=args.environment,
wait=args.wait)
wait=args.wait, override_name=None)
42 changes: 23 additions & 19 deletions streamparse/cli/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from ..util import (activate_env, get_config, get_env_config, get_nimbus_client,
get_topology_definition, get_topology_from_file, ssh_tunnel)
from .common import (add_ackers, add_debug, add_environment, add_name,
add_options, add_par, add_wait, add_workers,
resolve_ackers_workers)
add_options, add_override_name, add_par, add_wait,
add_workers, resolve_ackers_workers)
from .jar import jar_for_deploy
from .kill import _kill_topology
from .list import _list_topologies
Expand Down Expand Up @@ -169,35 +169,33 @@ def _upload_jar(nimbus_client, local_path):

def submit_topology(name=None, env_name="prod", workers=None, ackers=None,
options=None, force=False, debug=False, wait=None,
simple_jar=True):
simple_jar=True, override_name=None):
"""Submit a topology to a remote Storm cluster."""
config = get_config()
name, topology_file = get_topology_definition(name)
env_name, env_config = get_env_config(env_name)
topology_class = get_topology_from_file(topology_file)
if override_name is None:
override_name = name

# Check if we need to maintain virtualenv during the process
use_venv = env_config.get('use_virtualenv', True)

# Check if user wants to install virtualenv during the process
install_venv = env_config.get('install_virtualenv', use_venv)

# Setup the fabric env dictionary
activate_env(env_name)
# Run pre_submit actions provided by project
_pre_submit_hooks(name, env_name, env_config)

# If using virtualenv, set it up, and make sure paths are correct in specs
if use_venv:
config["virtualenv_specs"] = config["virtualenv_specs"].rstrip("/")

if install_venv:
create_or_update_virtualenvs(
env_name,
name,
"{}/{}.txt".format(config["virtualenv_specs"], name))
streamparse_run_path = '/'.join([env.virtualenv_root, name, 'bin',
'streamparse_run'])
create_or_update_virtualenvs(env_name, name,
override_name=override_name)
streamparse_run_path = '/'.join([env.virtualenv_root, override_name,
'bin', 'streamparse_run'])
# Update python paths in bolts
for thrift_bolt in itervalues(topology_class.thrift_bolts):
inner_shell = thrift_bolt.bolt_object.shell
Expand Down Expand Up @@ -230,16 +228,20 @@ def submit_topology(name=None, env_name="prod", workers=None, ackers=None,
# Prepare a JAR that doesn't have Storm dependencies packaged
topology_jar = jar_for_deploy(simple_jar=simple_jar)

print('Deploying "{}" topology...'.format(name))
if name != override_name:
print('Deploying "{}" topology with name "{}"...'.format(name,
override_name))
else:
print('Deploying "{}" topology...'.format(name))
sys.stdout.flush()
# Use ssh tunnel with Nimbus if use_ssh_for_nimbus is unspecified or True
with ssh_tunnel(env_config) as (host, port):
nimbus_client = get_nimbus_client(env_config, host=host, port=port)
uploaded_jar = _upload_jar(nimbus_client, topology_jar)
_kill_existing_topology(name, force, wait, nimbus_client)
_submit_topology(name, topology_class, uploaded_jar, config, env_config,
workers, ackers, nimbus_client, options=options,
debug=debug)
_kill_existing_topology(override_name, force, wait, nimbus_client)
_submit_topology(override_name, topology_class, uploaded_jar, config,
env_config, workers, ackers, nimbus_client,
options=options, debug=debug)
_post_submit_hooks(name, env_name, env_config)


Expand All @@ -259,6 +261,7 @@ def subparser_hook(subparsers):
'name.')
add_name(subparser)
add_options(subparser)
add_override_name(subparser)
add_par(subparser)
subparser.add_argument('-u', '--uber_jar',
help='Build an Uber-JAR even if you have no Java '
Expand All @@ -275,4 +278,5 @@ def main(args):
submit_topology(name=args.name, env_name=args.environment,
workers=args.workers, ackers=args.ackers,
options=args.options, force=args.force, debug=args.debug,
wait=args.wait, simple_jar=args.simple_jar)
wait=args.wait, simple_jar=args.simple_jar,
override_name=args.override_name)
36 changes: 20 additions & 16 deletions streamparse/cli/update_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@
import os
from io import open

import fabric
from fabric.api import env, execute, parallel, prefix, put, puts, run, show
from fabric.contrib.files import exists

from .common import add_environment, add_name
from .common import add_environment, add_name, add_override_name
from ..util import (activate_env, die, get_config, get_env_config,
get_topology_definition)

Expand All @@ -24,7 +23,7 @@ def _create_or_update_virtualenv(virtualenv_root,
requirements_file,
virtualenv_flags=None):
with show('output'):
virtualenv_path = os.path.join(virtualenv_root, virtualenv_name)
virtualenv_path = '/'.join((virtualenv_root, virtualenv_name))
if not exists(virtualenv_path):
if virtualenv_flags is None:
virtualenv_flags = ''
Expand All @@ -43,23 +42,31 @@ def _create_or_update_virtualenv(virtualenv_root,
run("rm {}".format(tmpfile))


def create_or_update_virtualenvs(env_name, topology_name, requirements_file):
def create_or_update_virtualenvs(env_name, topology_name, override_name=None):
"""Create or update virtualenvs on remote servers.
Assumes that virtualenv is on the path of the remote server(s).
:param env_name: the name of the environment in config.json.
:param topology_name: the name of the topology (and virtualenv).
:param requirements_file: path to the requirements.txt file to use
to update/install this virtualenv.
:param override_name: the name that we should use for the virtualenv, even
though the topology file has a different name.
"""
config = get_config()
topology_name = get_topology_definition(topology_name)[0]
env_name, env_config = get_env_config(env_name)
if override_name is None:
override_name = topology_name

config["virtualenv_specs"] = config["virtualenv_specs"].rstrip("/")

requirements_path = os.path.join(config["virtualenv_specs"],
'{}.txt'.format(topology_name))

# Setup the fabric env dictionary
activate_env(env_name)
# Check to ensure streamparse is in requirements
with open(requirements_file, "r") as fp:
with open(requirements_path, "r") as fp:
found_streamparse = False
for line in fp:
if "streamparse" in line:
Expand All @@ -69,10 +76,10 @@ def create_or_update_virtualenvs(env_name, topology_name, requirements_file):
if not found_streamparse:
die("Could not find streamparse in your requirements file ({}). "
"streamparse is required for all topologies."
.format(requirements_file))
.format(requirements_path))

execute(_create_or_update_virtualenv, env.virtualenv_root, topology_name,
requirements_file,
execute(_create_or_update_virtualenv, env.virtualenv_root, override_name,
requirements_path,
virtualenv_flags=env_config.get('virtualenv_flags'),
hosts=env.storm_workers)

Expand All @@ -85,13 +92,10 @@ def subparser_hook(subparsers):
subparser.set_defaults(func=main)
add_environment(subparser)
add_name(subparser)
add_override_name(subparser)


def main(args):
""" Create or update a virtualenv on Storm workers. """
topology_name = get_topology_definition(args.name)[0]
config = get_config()
config["virtualenv_specs"] = config["virtualenv_specs"].rstrip("/")
create_or_update_virtualenvs(args.environment, topology_name,
"{}/{}.txt".format(config["virtualenv_specs"],
topology_name))
create_or_update_virtualenvs(args.environment, args.name,
override_name=args.override_name)

0 comments on commit c1ce4de

Please sign in to comment.