Skip to content

Commit

Permalink
AWS Step Functions Integration (#211)
Browse files Browse the repository at this point in the history
Integrates Metaflow with [AWS Step Functions](https://aws.amazon.com/step-functions/).

Introduces a new command `step-functions`:
- `python my_flow.py step-functions create` will compile and export the user-defined Metaflow flow into an AWS Step Functions state machine. This will allow users of Metaflow to move their flows into production seamlessly with AWS. 
    - An additional flow level decorator, `@schedule`, allows users to optionally schedule the execution of their flows by integrating with AWS Event Bridge. 
    - All current functionality of Metaflow - containerized job execution on top of AWS Batch through `@batch`, dependency management via `@conda`, retrying mechanisms through `@retry`, `@catch` and `@timeout`, parameters, branches, and for-eaches are now available within AWS Step Functions through this integration. 
    - Additionally, introduces a notion of `production token` to ensure flow deployments to AWS Step Functions have proper safeguards against unintended production deployments
- `python my_flow.py step-functions trigger` will trigger a deployed workflow on AWS Step Functions


For more details see - [User docs](https://docs.metaflow.org/going-to-production-with-metaflow/scheduling-metaflow-flows) and [Admin docs](https://admin-docs.metaflow.org/metaflow-on-aws/operations-guide/metaflow-service-migration-guide#metaflow-2-1)
  • Loading branch information
savingoyal authored Jul 29, 2020
1 parent 32679a2 commit 621089d
Show file tree
Hide file tree
Showing 32 changed files with 2,712 additions and 308 deletions.
45 changes: 34 additions & 11 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import click

from . import current
from . import lint
from . import plugins
from . import parameters
Expand Down Expand Up @@ -375,6 +376,12 @@ def logs(obj, input_path, stdout=None, stderr=None, both=None):
default=None,
help="Run id of the origin flow, if this task is part of a flow "
"being resumed.")
@click.option('--with',
'decospecs',
multiple=True,
help="Add a decorator to this task. You can specify this "
"option multiple times to attach multiple decorators "
"to this task.")
@click.pass_obj
def step(obj,
step_name,
Expand All @@ -387,7 +394,8 @@ def step(obj,
retry_count=None,
max_user_code_retries=None,
clone_only=None,
clone_run_id=None):
clone_run_id=None,
decospecs=None):
if user_namespace is not None:
namespace(user_namespace or None)

Expand All @@ -402,9 +410,13 @@ def step(obj,
fg='magenta',
bold=False)

if decospecs:
decorators._attach_decorators_to_step(func, decospecs)

obj.datastore.datastore_root = obj.datastore_root
if obj.datastore.datastore_root is None:
obj.datastore.datastore_root = obj.datastore.get_datastore_root_from_config(obj.echo)
obj.datastore.datastore_root = \
obj.datastore.get_datastore_root_from_config(obj.echo)

obj.metadata.add_sticky_tags(tags=tags)
paths = decompress_list(input_paths) if input_paths else []
Expand Down Expand Up @@ -433,7 +445,7 @@ def step(obj,

echo('Success', fg='green', bold=True, indent=True)

@parameters.add_custom_parameters
@parameters.add_custom_parameters(deploy_mode=False)
@cli.command(help="Internal command to initialize a run.")
@click.option('--run-id',
default=None,
Expand All @@ -454,7 +466,8 @@ def init(obj, run_id=None, task_id=None, **kwargs):
# variables.

if obj.datastore.datastore_root is None:
obj.datastore.datastore_root = obj.datastore.get_datastore_root_from_config(obj.echo)
obj.datastore.datastore_root = \
obj.datastore.get_datastore_root_from_config(obj.echo)

runtime = NativeRuntime(obj.flow,
obj.graph,
Expand Down Expand Up @@ -563,7 +576,7 @@ def resume(obj,
write_run_id(run_id_file, runtime.run_id)


@parameters.add_custom_parameters
@parameters.add_custom_parameters(deploy_mode=True)
@cli.command(help='Run the workflow locally.')
@common_run_options
@click.option('--namespace',
Expand Down Expand Up @@ -634,7 +647,8 @@ def before_run(obj, tags, decospecs):
#obj.environment.init_environment(obj.logger)

if obj.datastore.datastore_root is None:
obj.datastore.datastore_root = obj.datastore.get_datastore_root_from_config(obj.echo)
obj.datastore.datastore_root = \
obj.datastore.get_datastore_root_from_config(obj.echo)

decorators._init_decorators(
obj.flow, obj.graph, obj.environment, obj.datastore, obj.logger)
Expand All @@ -643,7 +657,6 @@ def before_run(obj, tags, decospecs):
# Package working directory only once per run.
# We explicitly avoid doing this in `start` since it is invoked for every
# step in the run.
# TODO(crk): Capture time taken to package and log to keystone.
obj.package = MetaflowPackage(obj.flow, obj.environment, obj.logger, obj.package_suffixes)


Expand Down Expand Up @@ -767,9 +780,21 @@ def start(ctx,
ctx.obj.event_logger,
ctx.obj.monitor)
ctx.obj.datastore = DATASTORES[datastore]
ctx.obj.datastore_root = datastore_root

if datastore_root is None:
datastore_root = \
ctx.obj.datastore.get_datastore_root_from_config(ctx.obj.echo)
ctx.obj.datastore_root = ctx.obj.datastore.datastore_root = datastore_root

if decospecs:
decorators._attach_decorators(ctx.obj.flow, decospecs)

# initialize current and parameter context for deploy-time parameters
current._set_env(flow_name=ctx.obj.flow.name, is_running=False)
parameters.set_parameter_context(ctx.obj.flow.name,
ctx.obj.logger,
ctx.obj.datastore)

if ctx.invoked_subcommand not in ('run', 'resume'):
# run/resume are special cases because they can add more decorators with --with,
# so they have to take care of themselves.
Expand Down Expand Up @@ -850,8 +875,6 @@ def main(flow, args=None, handle_exceptions=True, entrypoint=None):
state = CliState(flow)
state.entrypoint = entrypoint

parameters.set_parameter_context(flow.name)

try:
if args is None:
start(auto_envvar_prefix='METAFLOW', obj=state)
Expand All @@ -873,4 +896,4 @@ def main(flow, args=None, handle_exceptions=True, entrypoint=None):
print_unknown_exception(x)
sys.exit(1)
else:
raise
raise
16 changes: 5 additions & 11 deletions metaflow/datastore/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import pickle

from types import MethodType, FunctionType
from ..includefile import InternalFile
from ..parameters import Parameter
from ..exception import MetaflowException, MetaflowInternalError
from ..metadata import DataArtifact
Expand Down Expand Up @@ -377,11 +376,11 @@ def __init__(self,

# We have to make MAX_ATTEMPTS HEAD requests, which is
# very unfortunate performance-wise (TODO: parallelize this).
# On Meson it is possible that some attempts are missing, so
# we have to check all possible attempt files to find the
# latest one. Compared to doing a LIST operation, these checks
# are guaranteed to be consistent as long as the task to be
# looked up has already finished.
# On AWS Step Functions it is possible that some attempts are
# missing, so we have to check all possible attempt files to
# find the latest one. Compared to doing a LIST operation,
# these checks are guaranteed to be consistent as long as the
# task to be looked up has already finished.
self.attempt = None # backwards-compatibility for pre-attempts.
for i in range(0, metaflow_config.MAX_ATTEMPTS):
if self.has_metadata('%d.attempt' % i, with_attempt=False):
Expand Down Expand Up @@ -486,11 +485,6 @@ def serializable_attributes():
isinstance(getattr(flow.__class__, var), property):
continue
val = getattr(flow, var)
if isinstance(val, InternalFile):
# We will force protocol 4 for serialization for anything
# bigger than 1 GB
yield var, TransformableObject(val()), val.size() > 1024 * 1024 * 1024
continue
if not (isinstance(val, MethodType) or
isinstance(val, FunctionType) or
isinstance(val, Parameter)):
Expand Down
4 changes: 4 additions & 0 deletions metaflow/datatools/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ def __repr__(self):

class S3(object):

@classmethod
def get_root_from_config(cls, echo, create_on_absent=True):
return DATATOOLS_S3ROOT

def __init__(self,
tmproot='.',
bucket=None,
Expand Down
29 changes: 19 additions & 10 deletions metaflow/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,24 +348,33 @@ def _attach_decorators(flow, decospecs):
effect as if you defined the decorators statically in the source for
every step. Used by --with command line parameter.
"""
# Attach the decorator to all steps that don't have this decorator
# already. This means that statically defined decorators are always
# preferred over runtime decorators.
#
# Note that each step gets its own instance of the decorator class,
# so decorator can maintain step-specific state.
for step in flow:
_attach_decorators_to_step(step, decospecs)

def _attach_decorators_to_step(step, decospecs):
"""
Attach decorators to a step during runtime. This has the same
effect as if you defined the decorators statically in the source for
the step.
"""
from .plugins import STEP_DECORATORS
decos = {decotype.name: decotype for decotype in STEP_DECORATORS}
for decospec in decospecs:
deconame = decospec.split(':')[0]
if deconame not in decos:
raise UnknownStepDecoratorException(deconame)

# Attach the decorator to all steps that don't have this decorator
# Attach the decorator to step if it doesn't have the decorator
# already. This means that statically defined decorators are always
# preferred over runtime decorators.
#
# Note that each step gets its own instance of the decorator class,
# so decorator can maintain step-specific state.
for step in flow:
if deconame not in [deco.name for deco in step.decorators]:
deco = decos[deconame]._parse_decorator_spec(decospec)
step.decorators.append(deco)

if deconame not in [deco.name for deco in step.decorators]:
deco = decos[deconame]._parse_decorator_spec(decospec)
step.decorators.append(deco)

def _init_decorators(flow, graph, environment, datastore, logger):
for deco in flow._flow_decorators.values():
Expand Down
4 changes: 2 additions & 2 deletions metaflow/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ def get_client_info(cls, flow_name, metadata):
def get_package_commands(self, code_package_url):
cmds = ["set -e",
"echo \'Setting up task environment.\'",
"%s -m pip install awscli click requests boto3 \
--user -qqq" % self._python(),
"%s -m pip install awscli click requests boto3 --user -qqq"
% self._python(),
"mkdir metaflow",
"cd metaflow",
"mkdir .metaflow", # mute local datastore creation log
Expand Down
2 changes: 2 additions & 0 deletions metaflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def __str__(self):
return\
"""*[{0.name} {0.type} (line {0.func_lineno})]*
in_funcs={in_funcs}
out_funcs={out_funcs}
split_parents={parents}
matching_join={matching_join}
is_inside_foreach={is_inside_foreach}
Expand All @@ -139,6 +140,7 @@ def __str__(self):
.format(self,
matching_join=self.matching_join and '[%s]' % self.matching_join,
is_inside_foreach=self.is_inside_foreach,
out_funcs=', '.join('[%s]' % x for x in self.out_funcs),
in_funcs=', '.join('[%s]' % x for x in self.in_funcs),
parents=', '.join('[%s]' % x for x in self.split_parents),
decos=' | '.join(map(str, self.decorators)),
Expand Down
Loading

0 comments on commit 621089d

Please sign in to comment.