Skip to content

Commit

Permalink
Refactor how we handle options in at topology, environment, and CLI l…
Browse files Browse the repository at this point in the history
…evels. (#289)

-  Remove --par because it would have made this really really hard.
-  Make --ackers, --debug, and --workers use _StoreDictAction so they just end
   up in the  arg.options dict in the end.
-  Remove all the code that was passing them around ackers, debug, and workers.
-  Add a resolve_options function to streamparse.cli.common that takes the env
   config, the CLI options, and the topology class and spits out a unified
   options dict.
-  Stop using TopologyType.propogate_config to propogate topology-level
   config options to each component, because this wasn't actually the right way
   to do it. With the previous approach the options we not actually being set at
   the topology level, so they didn't show up in Storm UI at the topology level.
   You had to look at each component separately to see them.
  • Loading branch information
dan-blanchard authored Jul 28, 2016
1 parent 04efa3c commit a8861e4
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 117 deletions.
105 changes: 79 additions & 26 deletions streamparse/cli/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import copy

from ruamel import yaml

from six import integer_types, string_types

class _StoreDictAction(argparse.Action):
"""Action for storing key=val option strings as a single dict."""
Expand Down Expand Up @@ -37,19 +37,33 @@ def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, items)


def option_alias(option):
"""Returns a function that will create option=val for _StoreDictAction."""
def _create_key_val_str(val):
return '{}={}'.format(option, val)
return _create_key_val_str


def add_ackers(parser):
""" Add --ackers option to parser """
parser.add_argument('-a', '--ackers',
help='Set number of acker bolts. Takes precedence over '
'--par if both set.',
type=int)
help='Set number of acker executors for your topology. '
'Defaults to the number of worker nodes in your '
'Storm environment.',
type=option_alias('topology.acker.executors'),
action=_StoreDictAction,
dest='options')


def add_debug(parser):
""" Add --debug option to parser """
parser.add_argument('-d', '--debug',
action='store_true',
help='Set topology.debug and produce debugging output.')
help='Set topology.debug and produce debugging output.',
type=option_alias('topology.debug'),
action=_StoreDictAction,
dest='options',
const='true',
nargs='?')


def add_environment(parser):
Expand Down Expand Up @@ -92,15 +106,6 @@ def add_override_name(parser):
'duplicate the topology file.')


def add_par(parser):
""" Add --par option to parser """
parser.add_argument('-p', '--par',
type=int,
help='Parallelism of topology; conveniently sets '
'number of Storm workers and acker bolts at once '
'to passed value. Defaults to the number of worker'
' nodes in your Storm environment.')

def add_pattern(parser):
""" Add --pattern option to parser """
parser.add_argument('--pattern',
Expand Down Expand Up @@ -131,14 +136,62 @@ def add_wait(parser):
def add_workers(parser):
""" Add --workers option to parser """
parser.add_argument('-w', '--workers',
type=int,
help='Set number of Storm workers. Takes precedence '
'over --par if both set.')


def resolve_ackers_workers(args):
""" Set --ackers and --workers to --par if they're None. """
if args.ackers is None:
args.ackers = args.par
if args.workers is None:
args.workers = args.par
help='Set number of Storm workers for your topology. '
'Defaults to the number of worker nodes in your '
'Storm environment.',
type=option_alias('topology.workers'),
action=_StoreDictAction,
dest='options')


def resolve_options(cli_options, env_config, topology_class, topology_name):
"""Resolve potentially conflicting Storm options from three sources:
CLI options > Topology options > config.json options
"""
storm_options = {}

# Start with environment options
storm_options.update(env_config.get('options', {}))

# Set topology.python.path
if env_config.get('use_virtualenv', True):
python_path = '/'.join([env_config["virtualenv_root"],
topology_name, "bin", "python"])
# This setting is for information purposes only, and is not actually
# read by any pystorm or streamparse code.
storm_options['topology.python.path'] = python_path

# Set logging options based on environment config
log_config = env_config.get("log", {})
log_path = log_config.get("path") or env_config.get("log_path")
log_file = log_config.get("file") or env_config.get("log_file")
if log_path:
storm_options['pystorm.log.path'] = log_path
if log_file:
storm_options['pystorm.log.file'] = log_file
if isinstance(log_config.get("max_bytes"), integer_types):
storm_options['pystorm.log.max_bytes'] = log_config["max_bytes"]
if isinstance(log_config.get("backup_count"), integer_types):
storm_options['pystorm.log.backup_count'] = log_config["backup_count"]
if isinstance(log_config.get("level"), string_types):
storm_options['pystorm.log.level'] = log_config["level"].lower()

# Override options with topology options
storm_options.update(topology_class.config)

# Override options with CLI options
storm_options.update(cli_options or {})

# Set log level to debug if topology.debug is set
if storm_options.get('topology.debug', False):
storm_options['pystorm.log.level'] = 'debug'

# If ackers and executors still aren't set, use number of worker nodes
num_storm_workers = len(env_config["workers"])
if storm_options.get('topology.acker.executors') is None:
storm_options['topology.acker.executors'] = num_storm_workers
if storm_options.get('topology.workers') is None:
storm_options['topology.workers'] = num_storm_workers

return storm_options
43 changes: 21 additions & 22 deletions streamparse/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,39 @@

from __future__ import absolute_import, print_function

import os
import sys
from tempfile import NamedTemporaryFile

from fabric.api import local
from ruamel import yaml

from ..util import (get_topology_definition, get_topology_from_file,
local_storm_version, storm_lib_version)
from ..util import (get_env_config, get_topology_definition,
get_topology_from_file, local_storm_version,
storm_lib_version)
from .common import (add_ackers, add_debug, add_environment, add_name,
add_options, add_par, add_workers, resolve_ackers_workers)
add_options, add_workers, resolve_options)
from .jar import jar_for_deploy


def run_local_topology(name=None, time=0, workers=None, ackers=None,
options=None, debug=False):
def run_local_topology(name=None, env_name=None, time=0, options=None):
"""Run a topology locally using Flux and `storm jar`."""
if workers is None:
workers = 1
if ackers is None:
ackers = 1
storm_options = {'topology.workers': workers,
'topology.acker.executors': ackers,
'topology.debug': debug}
if debug:
storm_options['pystorm.log.level'] = 'debug'
name, topology_file = get_topology_definition(name)
env_name, env_config = get_env_config(env_name)
topology_class = get_topology_from_file(topology_file)

storm_options = resolve_options(options, env_config, topology_class, name)
if storm_options['topology.acker.executors'] != 0:
storm_options['topology.acker.executors'] = 1
storm_options['topology.workers'] = 1

if not os.path.isdir("logs"):
os.makedirs("logs")
log_path = os.path.join(os.getcwd(), "logs")
storm_options['pystorm.log.path'] = log_path
print("Routing Python logging to {}.".format(log_path))
sys.stdout.flush()

# Check Storm version is the same
local_version = local_storm_version()
project_version = storm_lib_version()
Expand All @@ -42,9 +48,6 @@ def run_local_topology(name=None, time=0, workers=None, ackers=None,
# Prepare a JAR that has Storm dependencies packaged
topology_jar = jar_for_deploy(simple_jar=False)

if options is not None:
storm_options.update(options)

if time <= 0:
time = 9223372036854775807 # Max long value in Java

Expand All @@ -71,7 +74,6 @@ def subparser_hook(subparsers):
add_environment(subparser)
add_name(subparser)
add_options(subparser)
add_par(subparser)
subparser.add_argument('-t', '--time',
default=0,
type=int,
Expand All @@ -83,7 +85,4 @@ def subparser_hook(subparsers):

def main(args):
""" Run the local topology with the given arguments """
resolve_ackers_workers(args)
run_local_topology(name=args.name, time=args.time, workers=args.workers,
ackers=args.ackers, options=args.options,
debug=args.debug)
run_local_topology(name=args.name, time=args.time, options=args.options)
67 changes: 14 additions & 53 deletions streamparse/cli/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@

import simplejson as json
from fabric.api import env
from six import itervalues, string_types
from six import itervalues

from ..dsl.component import JavaComponentSpec
from ..thrift import storm_thrift
from ..util import (activate_env, get_config, get_env_config, get_nimbus_client,
get_topology_definition, get_topology_from_file, ssh_tunnel)
from .common import (add_ackers, add_debug, add_environment, add_name,
add_options, add_override_name, add_par, add_wait,
add_workers, resolve_ackers_workers)
add_options, add_override_name, add_wait, add_workers,
resolve_options)
from .jar import jar_for_deploy
from .kill import _kill_topology
from .list import _list_topologies
Expand Down Expand Up @@ -68,36 +68,9 @@ def _kill_existing_topology(topology_name, force, wait, nimbus_client):


def _submit_topology(topology_name, topology_class, uploaded_jar, config,
env_config, workers, ackers, nimbus_client, options=None,
debug=False):
storm_options = {'topology.workers': workers,
'topology.acker.executors': ackers,
'topology.debug': debug}

if env_config.get('use_virtualenv', True):
python_path = '/'.join([env_config["virtualenv_root"],
topology_name, "bin", "python"])
storm_options['topology.python.path'] = python_path

# Python logging settings
log_config = env_config.get("log", {})
log_path = log_config.get("path") or env_config.get("log_path")
log_file = log_config.get("file") or env_config.get("log_file")
print("Routing Python logging to {}.".format(log_path))
env_config, nimbus_client, options=None):
print("Routing Python logging to {}.".format(options['pystorm.log.path']))
sys.stdout.flush()
if log_path:
storm_options['pystorm.log.path'] = log_path
if log_file:
storm_options['pystorm.log.file'] = log_file
if isinstance(log_config.get("max_bytes"), int):
storm_options['pystorm.log.max_bytes'] = log_config["max_bytes"]
if isinstance(log_config.get("backup_count"), int):
storm_options['pystorm.log.backup_count'] = log_config["backup_count"]
if isinstance(log_config.get("level"), string_types):
storm_options['pystorm.log.level'] = log_config["level"].lower()

if options is not None:
storm_options.update(options)

serializer = env_config.get('serializer', config.get('serializer', None))
if serializer is not None:
Expand All @@ -118,7 +91,7 @@ def _submit_topology(topology_name, topology_class, uploaded_jar, config,
sys.stdout.flush()
nimbus_client.submitTopology(name=topology_name,
uploadedJarLocation=uploaded_jar,
jsonConf=json.dumps(storm_options),
jsonConf=json.dumps(options),
topology=topology_class.thrift_topology)
print('done')

Expand Down Expand Up @@ -167,9 +140,8 @@ def _upload_jar(nimbus_client, local_path):
return upload_location


def submit_topology(name=None, env_name="prod", workers=None, ackers=None,
options=None, force=False, debug=False, wait=None,
simple_jar=True, override_name=None):
def submit_topology(name=None, env_name=None, options=None, force=False,
wait=None, simple_jar=True, override_name=None):
"""Submit a topology to a remote Storm cluster."""
config = get_config()
name, topology_file = get_topology_definition(name)
Expand Down Expand Up @@ -209,16 +181,9 @@ def submit_topology(name=None, env_name="prod", workers=None, ackers=None,
if 'streamparse_run' in inner_shell.execution_command:
inner_shell.execution_command = streamparse_run_path

# Additional options
additional_options = env_config.get('options', {})
if options is not None:
additional_options.update(options)
options = additional_options

if workers is None:
workers = env_config.get('worker_count', len(env.storm_workers))
if ackers is None:
ackers = env_config.get('acker_count', len(env.storm_workers))
# Handle option conflicts
options = resolve_options(options, env_config, topology_class,
override_name)

# Check topology for JVM stuff to see if we need to create uber-jar
if simple_jar:
Expand All @@ -240,8 +205,7 @@ def submit_topology(name=None, env_name="prod", workers=None, ackers=None,
uploaded_jar = _upload_jar(nimbus_client, topology_jar)
_kill_existing_topology(override_name, force, wait, nimbus_client)
_submit_topology(override_name, topology_class, uploaded_jar, config,
env_config, workers, ackers, nimbus_client,
options=options, debug=debug)
env_config, nimbus_client, options=options)
_post_submit_hooks(name, env_name, env_config)


Expand All @@ -262,7 +226,6 @@ def subparser_hook(subparsers):
add_name(subparser)
add_options(subparser)
add_override_name(subparser)
add_par(subparser)
subparser.add_argument('-u', '--uber_jar',
help='Build an Uber-JAR even if you have no Java '
'components in your topology. Useful if you '
Expand All @@ -274,9 +237,7 @@ def subparser_hook(subparsers):

def main(args):
""" Submit a Storm topology to Nimbus. """
resolve_ackers_workers(args)
submit_topology(name=args.name, env_name=args.environment,
workers=args.workers, ackers=args.ackers,
options=args.options, force=args.force, debug=args.debug,
wait=args.wait, simple_jar=args.simple_jar,
options=args.options, force=args.force, wait=args.wait,
simple_jar=args.simple_jar,
override_name=args.override_name)
22 changes: 6 additions & 16 deletions streamparse/dsl/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ def __new__(mcs, classname, bases, class_dict):
if classname != 'Topology' and not spout_specs:
raise ValueError('A Topology requires at least one Spout')
if 'config' in class_dict:
TopologyType.propogate_config(class_dict['config'], specs)
config_dict = class_dict['config']
if not isinstance(config_dict, dict):
raise TypeError('Topology config must be a dictionary. Given: '
'{!r}'.format(config_dict))
else:
class_dict['config'] = {}
class_dict['thrift_bolts'] = bolt_specs
class_dict['thrift_spouts'] = spout_specs
class_dict['specs'] = list(specs.values())
Expand Down Expand Up @@ -119,21 +124,6 @@ def clean_spec_inputs(mcs, spec, specs):
stream_comp.name,
stream_id.streamId))

@classmethod
def propogate_config(mcs, config_dict, specs):
"""Propogate the settings in config to all specs.
This is necessary because there is no way to set topology-level config
options via Thrift.
"""
if not isinstance(config_dict, dict):
raise TypeError('Topology config must be a dictionary. Given: {!r}'
.format(config_dict))
for spec in itervalues(specs):
spec_config_dict = deepcopy(config_dict)
spec_config_dict.update(json.loads(spec.config))
spec.config = json.dumps(spec_config_dict)

def __repr__(cls):
""":returns: A string representation of the topology"""
return repr(getattr(cls, '_topology', None))
Expand Down

0 comments on commit a8861e4

Please sign in to comment.