Skip to content

Commit

Permalink
Simplify mflog (#979)
Browse files Browse the repository at this point in the history
  • Loading branch information
savingoyal authored Mar 16, 2022
1 parent 1748175 commit 7e210a2
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 93 deletions.
18 changes: 9 additions & 9 deletions metaflow/mflog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,18 @@
BASH_SAVE_LOGS = " ".join(BASH_SAVE_LOGS_ARGS)

# this function returns a bash expression that redirects stdout
# and stderr of the given command to mflog
def capture_output_to_mflog(command_and_args, var_transform=None):
# and stderr of the given bash expression to mflog.tee
def bash_capture_logs(bash_expr, var_transform=None):
if var_transform is None:
var_transform = lambda s: "$%s" % s

return "python -m metaflow.mflog.redirect_streams %s %s %s %s" % (
TASK_LOG_SOURCE,
var_transform("MFLOG_STDOUT"),
var_transform("MFLOG_STDERR"),
command_and_args,
cmd = "python -m metaflow.mflog.tee %s %s"
parts = (
bash_expr,
cmd % (TASK_LOG_SOURCE, var_transform("MFLOG_STDOUT")),
cmd % (TASK_LOG_SOURCE, var_transform("MFLOG_STDERR")),
)
return "(%s) 1>> >(%s) 2>> >(%s >&2)" % parts


# update_delay determines how often logs should be uploaded to S3
Expand All @@ -76,8 +77,7 @@ def update_delay(secs_since_start):


# this function is used to generate a Bash 'export' expression that
# sets environment variables that are used by 'redirect_streams' and
# 'save_logs'.
# sets environment variables that are used by 'tee' and 'save_logs'.
# Note that we can't set the env vars statically, as some of them
# may need to be evaluated during runtime
def export_mflog_env_vars(
Expand Down
54 changes: 0 additions & 54 deletions metaflow/mflog/redirect_streams.py

This file was deleted.

12 changes: 4 additions & 8 deletions metaflow/plugins/aws/batch/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from metaflow.mflog.mflog import refine, set_should_persist
from metaflow.mflog import (
export_mflog_env_vars,
capture_output_to_mflog,
bash_capture_logs,
tail_logs,
BASH_SAVE_LOGS,
)
Expand Down Expand Up @@ -61,12 +61,8 @@ def _command(self, environment, code_package_url, step_name, step_cmds, task_spe
)
init_cmds = environment.get_package_commands(code_package_url)
init_expr = " && ".join(init_cmds)
step_expr = " && ".join(
[
capture_output_to_mflog(a)
for a in (environment.bootstrap_commands(step_name))
]
+ step_cmds
step_expr = bash_capture_logs(
" && ".join(environment.bootstrap_commands(step_name) + step_cmds)
)

# construct an entry point that
Expand Down Expand Up @@ -297,7 +293,7 @@ def launch_job(
)
job = self.create_job(
step_name,
capture_output_to_mflog(step_cli),
step_cli,
task_spec,
code_package_sha,
code_package_url,
Expand Down
2 changes: 1 addition & 1 deletion metaflow/plugins/aws/batch/batch_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from metaflow.exception import CommandException, METAFLOW_EXIT_DISALLOW_RETRY
from metaflow.metadata.util import sync_local_metadata_from_datastore
from metaflow.metaflow_config import DATASTORE_LOCAL_DIR
from metaflow.mflog import TASK_LOG_SOURCE, capture_output_to_mflog
from metaflow.mflog import TASK_LOG_SOURCE

from .batch import Batch, BatchKilledException

Expand Down
13 changes: 5 additions & 8 deletions metaflow/plugins/aws/eks/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)
from metaflow.mflog import (
export_mflog_env_vars,
capture_output_to_mflog,
bash_capture_logs,
tail_logs,
BASH_SAVE_LOGS,
)
Expand Down Expand Up @@ -136,13 +136,10 @@ def _command(
)
init_cmds = self._environment.get_package_commands(code_package_url)
init_expr = " && ".join(init_cmds)
step_expr = " && ".join(
[
capture_output_to_mflog(a)
for a in (
self._environment.bootstrap_commands(self._step_name) + step_cmds
)
]
step_expr = bash_capture_logs(
" && ".join(
self._environment.bootstrap_commands(self._step_name) + step_cmds
)
)

# Construct an entry point that
Expand Down
20 changes: 7 additions & 13 deletions metaflow/plugins/aws/step_functions/step_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
from ..batch.batch import Batch
from ..aws_utils import compute_resource_attributes

from metaflow.mflog import capture_output_to_mflog


class StepFunctionsException(MetaflowException):
headline = "AWS Step Functions error"
Expand Down Expand Up @@ -738,14 +736,10 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries):
param_file = "".join(
random.choice(string.ascii_lowercase) for _ in range(10)
)
export_params = " && ".join(
[
capture_output_to_mflog(
"python -m metaflow.plugins.aws.step_functions.set_batch_environment parameters %s"
% param_file
),
". `pwd`/%s" % param_file,
]
export_params = (
"python -m "
"metaflow.plugins.aws.step_functions.set_batch_environment "
"parameters %s && . `pwd`/%s" % (param_file, param_file)
)
params = (
entrypoint
Expand All @@ -771,7 +765,7 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries):
cmd = "if ! %s >/dev/null 2>/dev/null; then %s && %s; fi" % (
" ".join(exists),
export_params,
capture_output_to_mflog(" ".join(params)),
" ".join(params),
)
cmds.append(cmd)
paths = "sfn-${METAFLOW_RUN_ID}/_parameters/%s" % (task_id_params)
Expand All @@ -780,7 +774,7 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries):
parent_tasks_file = "".join(
random.choice(string.ascii_lowercase) for _ in range(10)
)
export_parent_tasks = capture_output_to_mflog(
export_parent_tasks = (
"python -m "
"metaflow.plugins.aws.step_functions.set_batch_environment "
"parent_tasks %s && . `pwd`/%s" % (parent_tasks_file, parent_tasks_file)
Expand All @@ -806,7 +800,7 @@ def _step_cli(self, node, paths, code_package_url, user_code_retries):
step.extend("--tag %s" % tag for tag in self.tags)
if self.namespace is not None:
step.append("--namespace=%s" % self.namespace)
cmds.append(capture_output_to_mflog(" ".join(entrypoint + top_level + step)))
cmds.append(" ".join(entrypoint + top_level + step))
return " && ".join(cmds)


Expand Down

0 comments on commit 7e210a2

Please sign in to comment.