Skip to content

Commit

Permalink
nits: sort imports, doc typos, minor cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-williams committed Oct 20, 2021
1 parent 1f30607 commit 070d25e
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 66 deletions.
100 changes: 51 additions & 49 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from datetime import datetime
from functools import wraps
import inspect
import os
import sys
import traceback
from datetime import datetime
from functools import wraps

import click

Expand Down Expand Up @@ -789,13 +789,15 @@ def start(ctx,
else:
echo = echo_always

ctx.obj.version = metaflow_version.get_version()
version = ctx.obj.version
obj = ctx.obj
obj.version = metaflow_version.get_version()
flow = obj.flow
version = obj.version
if use_r():
version = metaflow_r_version()

echo('Metaflow %s' % version, fg='magenta', bold=True, nl=False)
echo(" executing *%s*" % ctx.obj.flow.name, fg='magenta', nl=False)
echo(" executing *%s*" % flow.name, fg='magenta', nl=False)
echo(" for *%s*" % resolve_identity(), fg='magenta')

if coverage:
Expand All @@ -808,80 +810,80 @@ def start(ctx,
cov.start()

cli_args._set_top_kwargs(ctx.params)
ctx.obj.echo = echo
ctx.obj.echo_always = echo_always
ctx.obj.graph = FlowGraph(ctx.obj.flow.__class__)
ctx.obj.logger = logger
ctx.obj.check = _check
ctx.obj.pylint = pylint
ctx.obj.top_cli = cli
ctx.obj.package_suffixes = package_suffixes.split(',')
ctx.obj.reconstruct_cli = _reconstruct_cli
obj.echo = echo
obj.echo_always = echo_always
obj.graph = FlowGraph(flow.__class__)
obj.logger = logger
obj.check = _check
obj.pylint = pylint
obj.top_cli = cli
obj.package_suffixes = package_suffixes.split(',')
obj.reconstruct_cli = _reconstruct_cli

ctx.obj.event_logger = EventLogger(event_logger)
obj.event_logger = EventLogger(event_logger)

ctx.obj.environment = [e for e in ENVIRONMENTS + [MetaflowEnvironment]
if e.TYPE == environment][0](ctx.obj.flow)
ctx.obj.environment.validate_environment(echo)
obj.environment = [e for e in ENVIRONMENTS + [MetaflowEnvironment]
if e.TYPE == environment][0](flow)
obj.environment.validate_environment(echo)

ctx.obj.monitor = Monitor(monitor, ctx.obj.environment, ctx.obj.flow.name)
ctx.obj.monitor.start()
obj.monitor = Monitor(monitor, obj.environment, flow.name)
obj.monitor.start()

ctx.obj.metadata = [m for m in METADATA_PROVIDERS
if m.TYPE == metadata][0](ctx.obj.environment,
ctx.obj.flow,
ctx.obj.event_logger,
ctx.obj.monitor)
obj.metadata = [m for m in METADATA_PROVIDERS
if m.TYPE == metadata][0](obj.environment,
flow,
obj.event_logger,
obj.monitor)

ctx.obj.datastore_impl = DATASTORES[datastore]
obj.datastore_impl = DATASTORES[datastore]

if datastore_root is None:
datastore_root = \
ctx.obj.datastore_impl.get_datastore_root_from_config(ctx.obj.echo)
obj.datastore_impl.get_datastore_root_from_config(obj.echo)
if datastore_root is None:
raise CommandException(
"Could not find the location of the datastore -- did you correctly set the "
"METAFLOW_DATASTORE_SYSROOT_%s environment variable?" % datastore.upper())

ctx.obj.datastore_impl.datastore_root = datastore_root
obj.datastore_impl.datastore_root = datastore_root

FlowDataStore.default_storage_impl = ctx.obj.datastore_impl
ctx.obj.flow_datastore = FlowDataStore(
ctx.obj.flow.name,
ctx.obj.environment,
ctx.obj.metadata,
ctx.obj.event_logger,
ctx.obj.monitor)
FlowDataStore.default_storage_impl = obj.datastore_impl
obj.flow_datastore = FlowDataStore(
flow.name,
obj.environment,
obj.metadata,
obj.event_logger,
obj.monitor)

# It is important to initialize flow decorators early as some of the
# things they provide may be used by some of the objects initialize after.
decorators._init_flow_decorators(ctx.obj.flow,
ctx.obj.graph,
ctx.obj.environment,
ctx.obj.flow_datastore,
ctx.obj.metadata,
ctx.obj.logger,
decorators._init_flow_decorators(flow,
obj.graph,
obj.environment,
obj.flow_datastore,
obj.metadata,
obj.logger,
echo,
deco_options)

if decospecs:
decorators._attach_decorators(ctx.obj.flow, decospecs)
decorators._attach_decorators(flow, decospecs)

# initialize current and parameter context for deploy-time parameters
current._set_env(flow_name=ctx.obj.flow.name, is_running=False)
parameters.set_parameter_context(ctx.obj.flow.name,
ctx.obj.echo,
ctx.obj.flow_datastore)
current._set_env(flow_name=flow.name, is_running=False)
parameters.set_parameter_context(flow.name,
obj.echo,
obj.flow_datastore)

if ctx.invoked_subcommand not in ('run', 'resume'):
# run/resume are special cases because they can add more decorators with --with,
# so they have to take care of themselves.
decorators._attach_decorators(
ctx.obj.flow, ctx.obj.environment.decospecs())
flow, obj.environment.decospecs())
decorators._init_step_decorators(
ctx.obj.flow, ctx.obj.graph, ctx.obj.environment, ctx.obj.flow_datastore, ctx.obj.logger)
flow, obj.graph, obj.environment, obj.flow_datastore, obj.logger)
#TODO (savin): Enable lazy instantiation of package
ctx.obj.package = None
obj.package = None
if ctx.invoked_subcommand is None:
ctx.invoke(check)

Expand Down
2 changes: 1 addition & 1 deletion metaflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ def __repr__(self):

class MetaflowCode(object):
"""
Describes the code that is occasionally stored with a run.
Describes the code that is sometimes stored with a run.
A code package will contain the version of Metaflow that was used (all the files comprising
the Metaflow library) as well as selected files from the directory containing the Python
Expand Down
9 changes: 7 additions & 2 deletions metaflow/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,13 @@ def _parse_decorator_spec(cls, deco_spec):
return cls()
else:
name, attrspec = top
attrs = dict(map(lambda x: x.strip(), a.split('='))
for a in re.split(''',(?=[\s\w]+=)''', attrspec.strip('"\'')))
quote_stripped = attrspec.strip('"\'')
kv_regex = r',(?=[\s\w]+=)'
kvs = re.split(kv_regex, quote_stripped)
attrs = dict(
[ x.strip() for x in kv.split('=', 1) ]
for kv in kvs
)
return cls(attributes=attrs)

def make_decorator_spec(self):
Expand Down
28 changes: 18 additions & 10 deletions metaflow/flowspec.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
from itertools import islice
import os
import sys
import inspect
import traceback

from . import cmd_with_io
from .parameters import Parameter
from .exception import MetaflowException, MetaflowInternalError, \
MissingInMergeArtifactsException, UnhandledInMergeArtifactsException
from .exception import MetaflowException, MissingInMergeArtifactsException, UnhandledInMergeArtifactsException
from .graph import FlowGraph
from .unbounded_foreach import UnboundedForeachInput


# For Python 3 compatibility
try:
basestring
Expand Down Expand Up @@ -40,7 +39,7 @@ class FlowSpec(object):
"""

# Attributes that are not saved in the datastore when checkpointing.
# Name starting with '__', methods, functions and Parameters do not need
# Names starting with '__', methods, functions and Parameters do not need
# to be listed.
_EPHEMERAL = {'_EPHEMERAL',
'_NON_PARAMETERS',
Expand Down Expand Up @@ -99,12 +98,13 @@ def script_name(self):
fname = fname[:-1]
return os.path.basename(fname)

def _get_parameters(self):
for var in dir(self):
if var[0] == '_' or var in self._NON_PARAMETERS:
@classmethod
def _get_parameters(cls):
for var in dir(cls):
if var[0] == '_' or var in cls._NON_PARAMETERS:
continue
try:
val = getattr(self, var)
val = getattr(cls, var)
except:
continue
if isinstance(val, Parameter):
Expand Down Expand Up @@ -135,7 +135,11 @@ def __getattr__(self, name):
raise AttributeError("Flow %s has no attribute '%s'" %
(self.name, name))

def cmd(self, cmdline, input={}, output=[]):
def cmd(self, cmdline, input=None, output=None):
if input is None:
input = {}
if output is None:
output = []
return cmd_with_io.cmd(cmdline,
input=input,
output=output)
Expand Down Expand Up @@ -260,7 +264,7 @@ def _find_input(self, stack_index=None):
frame.index + 1))
return self._cached_input[stack_index]

def merge_artifacts(self, inputs, exclude=[], include=[]):
def merge_artifacts(self, inputs, exclude=None, include=None):
"""
Merge the artifacts coming from each merge branch (from inputs)
Expand Down Expand Up @@ -312,6 +316,10 @@ def merge_artifacts(self, inputs, exclude=[], include=[]):
be found
"""
node = self._graph[self._current_step]
if include is None:
include = []
if exclude is None:
exclude = []
if node.type != 'join':
msg = "merge_artifacts can only be called in a join and step *{step}* "\
"is not a join".format(step=self._current_step)
Expand Down
4 changes: 3 additions & 1 deletion metaflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def deindent_docstring(doc):
else:
return ''


class DAGNode(object):
def __init__(self, func_ast, decos, doc):
self.name = func_ast.name
Expand Down Expand Up @@ -77,8 +78,9 @@ def _parse(self, func_ast):
if self.name == 'end':
# TYPE: end
self.type = 'end'
return

# ensure that the tail an expression
# ensure that the tail is an expression
if not isinstance(tail, ast.Expr):
return

Expand Down
2 changes: 1 addition & 1 deletion metaflow/lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def check_reserved_words(graph):
@linter.ensure_fundamentals
@linter.check
def check_basic_steps(graph):
msg ="Add %s *%s* step in your flow."
msg = "Add %s *%s* step in your flow."
for prefix, node in (('a', 'start'), ('an', 'end')):
if node not in graph:
raise LintWarn(msg % (prefix, node))
Expand Down
8 changes: 6 additions & 2 deletions metaflow/pylint_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

from .exception import MetaflowException


class PyLintWarn(MetaflowException):
headline="Pylint is not happy"


class PyLint(object):

def __init__(self, fname):
Expand All @@ -23,7 +25,9 @@ def __init__(self, fname):
def has_pylint(self):
return self._run is not None

def run(self, logger=None, warnings=False, pylint_config=[]):
def run(self, logger=None, warnings=False, pylint_config=None):
if pylint_config is None:
pylint_config = []
args = [self._fname]
if not warnings:
args.append('--errors-only')
Expand All @@ -36,7 +40,7 @@ def run(self, logger=None, warnings=False, pylint_config=[]):
try:
pylint_is_happy = True
pylint_exception_msg = ""
run = self._run(args, None, False)
self._run(args, None, False)
except Exception as e:
pylint_is_happy = False
pylint_exception_msg = repr(e)
Expand Down

0 comments on commit 070d25e

Please sign in to comment.