Skip to content

Commit

Permalink
tests using lower level chaining API
Browse files Browse the repository at this point in the history
  • Loading branch information
madhur-ob committed May 23, 2024
1 parent d7266e2 commit 0a90638
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 58 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
python3 -m pip install --upgrade pip setuptools
python3 -m pip install tox numpy
- name: Execute Python tests
- name: Execute Python tests with different executors
run: tox

R:
Expand Down
5 changes: 1 addition & 4 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@
[console_scripts]
metaflow=metaflow.cmd.main_cli:start
""",
install_requires=[
"requests",
"boto3",
],
install_requires=["requests", "boto3"],
extras_require={
"stubs": ["metaflow-stubs==%s" % version],
},
Expand Down
66 changes: 36 additions & 30 deletions test/core/contexts.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@
"--quiet"
],
"run_options": [
"--max-workers", "50",
"--max-num-splits", "10000",
"--tag", "\u523a\u8eab means sashimi",
"--tag", "multiple tags should be ok"
"--max-workers=50",
"--max-num-splits=10000",
"--tag=\u523a\u8eab means sashimi",
"--tag=multiple tags should be ok"
],
"checks": [ "python3-cli", "python3-metadata"],
"disabled_tests": [
"LargeArtifactTest",
"S3FailureTest",
"CardComponentRefreshTest",
"CardWithRefreshTest"
]
],
"executors": ["cli", "api"]
},
{
"name": "python3-all-local-cards-realtime",
Expand All @@ -51,16 +52,17 @@
"--quiet"
],
"run_options": [
"--max-workers", "50",
"--max-num-splits", "10000",
"--tag", "\u523a\u8eab means sashimi",
"--tag", "multiple tags should be ok"
"--max-workers=50",
"--max-num-splits=10000",
"--tag=\u523a\u8eab means sashimi",
"--tag=multiple tags should be ok"
],
"checks": [ "python3-cli", "python3-metadata"],
"enabled_tests": [
"CardComponentRefreshTest",
"CardWithRefreshTest"
]
],
"executors": ["cli", "api"]
},
{
"name": "python3-all-local-azure-storage",
Expand All @@ -81,16 +83,17 @@
"--quiet"
],
"run_options": [
"--max-workers", "50",
"--max-num-splits", "10000",
"--tag", "\u523a\u8eab means sashimi",
"--tag", "multiple tags should be ok"
"--max-workers=50",
"--max-num-splits=10000",
"--tag=\u523a\u8eab means sashimi",
"--tag=multiple tags should be ok"
],
"checks": [ "python3-cli", "python3-metadata"],
"disabled_tests": [
"LargeArtifactTest",
"S3FailureTest"
]
],
"executors": ["cli", "api"]
},
{
"name": "dev-local",
Expand All @@ -111,15 +114,16 @@
"--quiet"
],
"run_options": [
"--max-workers", "50",
"--max-num-splits", "10000",
"--tag", "\u523a\u8eab means sashimi",
"--tag", "multiple tags should be ok"
"--max-workers=50",
"--max-num-splits=10000",
"--tag=\u523a\u8eab means sashimi",
"--tag=multiple tags should be ok"
],
"checks": ["python3-cli", "python3-metadata"],
"disabled_tests": [
"S3FailureTest"
]
],
"executors": ["cli", "api"]
},
{
"name": "python3-batch",
Expand All @@ -140,10 +144,10 @@
"METAFLOW_DEFAULT_METADATA": "service"
},
"run_options": [
"--max-workers", "50",
"--max-num-splits", "10000",
"--tag", "\u523a\u8eab means sashimi",
"--tag", "multiple tags should be ok"
"--max-workers=50",
"--max-num-splits=10000",
"--tag=\u523a\u8eab means sashimi",
"--tag=multiple tags should be ok"
],
"checks": ["python3-cli", "python3-metadata"],
"disabled_tests": [
Expand All @@ -156,7 +160,8 @@
"TimeoutDecoratorTest",
"CardExtensionsImportTest",
"RunIdFileTest"
]
],
"executors": ["cli", "api"]
},
{
"name": "python3-k8s",
Expand All @@ -177,10 +182,10 @@
"METAFLOW_DEFAULT_METADATA": "service"
},
"run_options": [
"--max-workers", "50",
"--max-num-splits", "10000",
"--tag", "\u523a\u8eab means sashimi",
"--tag", "multiple tags should be ok"
"--max-workers=50",
"--max-num-splits=10000",
"--tag=\u523a\u8eab means sashimi",
"--tag=multiple tags should be ok"
],
"checks": ["python3-cli", "python3-metadata"],
"disabled_tests": [
Expand All @@ -193,7 +198,8 @@
"TimeoutDecoratorTest",
"CardExtensionsImportTest",
"RunIdFileTest"
]
],
"executors": ["cli", "api"]
}
],
"checks": {
Expand Down
154 changes: 131 additions & 23 deletions test/core/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,22 @@
import subprocess
from multiprocessing import Pool

from metaflow.cli import start, run
from metaflow._vendor import click

skip_api_executor = False

try:
from metaflow.click_api import (
MetaflowAPI,
extract_all_params,
click_to_python_types,
)
except RuntimeError:
skip_api_executor = True

skip_api_executor = False

try:
from metaflow.click_api import (
MetaflowAPI,
Expand Down Expand Up @@ -73,14 +85,72 @@ def log(msg, formatter=None, context=None, real_bad=False, real_good=False):
click.echo("[pid %s] %s" % (pid, line))


def run_test(formatter, context, debug, checks, env_base):
def run_test(formatter, context, debug, checks, env_base, executor):
def run_cmd(mode):
cmd = [context["python"], "-B", "test_flow.py"]
cmd.extend(context["top_options"])
cmd.extend((mode, "--run-id-file", "run-id"))
cmd.extend(context["run_options"])
return cmd

def construct_arg_dict(params_opts, cli_options):
result_dict = {}
has_value = False
secondary_supplied = False

for arg in cli_options:
if "=" in arg:
given_opt, val = arg.split("=", 1)
has_value = True
else:
given_opt = arg

for key, each_param in params_opts.items():
py_type = click_to_python_types[type(each_param.type)]
if given_opt in each_param.opts:
secondary_supplied = False
elif given_opt in each_param.secondary_opts:
secondary_supplied = True
else:
continue

if has_value:
value = val
else:
if secondary_supplied:
value = False
else:
value = True

if each_param.multiple:
if key not in result_dict:
result_dict[key] = [py_type(value)]
else:
result_dict[key].append(py_type(value))
else:
result_dict[key] = py_type(value)

has_value = False
secondary_supplied = False

return result_dict

def construct_cmd_from_click_api(mode):
api = MetaflowAPI.from_cli("test_flow.py", start)
_, _, param_opts, _, _ = extract_all_params(start)
top_level_options = context["top_options"]
top_level_dict = construct_arg_dict(param_opts, top_level_options)

_, _, param_opts, _, _ = extract_all_params(run)
run_level_options = context["run_options"]
run_level_dict = construct_arg_dict(param_opts, run_level_options)
run_level_dict["run_id_file"] = "run-id"

cmd = getattr(api(**top_level_dict), mode)(**run_level_dict)
command = [context["python"], "-B"]
command.extend(cmd)
return command

cwd = os.getcwd()
tempdir = tempfile.mkdtemp("_metaflow_test")
package = os.path.dirname(os.path.abspath(__file__))
Expand Down Expand Up @@ -135,13 +205,22 @@ def run_cmd(mode):
return pre_ret, path

# run flow
flow_ret = subprocess.call(run_cmd("run"), env=env)
if executor == "cli":
flow_ret = subprocess.call(run_cmd("run"), env=env)
elif executor == "api":
flow_ret = subprocess.call(construct_cmd_from_click_api("run"), env=env)

if flow_ret:
if formatter.should_fail:
log("Flow failed as expected.")
elif formatter.should_resume:
log("Resuming flow", formatter, context)
flow_ret = subprocess.call(run_cmd("resume"), env=env)
if executor == "cli":
flow_ret = subprocess.call(run_cmd("resume"), env=env)
elif executor == "api":
flow_ret = subprocess.call(
construct_cmd_from_click_api("resume"), env=env
)
else:
log("flow failed", formatter, context)
return flow_ret, path
Expand Down Expand Up @@ -179,7 +258,14 @@ def run_cmd(mode):
shutil.rmtree(tempdir)


def run_all(ok_tests, ok_contexts, ok_graphs, debug, num_parallel, inherit_env):
def run_all(
ok_tests,
ok_contexts,
ok_graphs,
debug,
num_parallel,
inherit_env,
):

tests = [
test
Expand Down Expand Up @@ -237,23 +323,45 @@ def run_test_cases(args):
if enabled_tests and (test_name not in map(str, enabled_tests)):
continue

log("running", formatter, context)
ret, path = run_test(
formatter,
context,
debug,
contexts["checks"],
base_env,
)

if ret:
tstid = "%s in context %s" % (formatter, context["name"])
failed.append((tstid, path))
log("failed", formatter, context, real_bad=True)
if debug:
return failed
else:
log("success", formatter, context, real_good=True)
for executor in context["executors"]:
if executor == "api" and skip_api_executor is True:
continue
log(
"running [using %s executor]" % executor,
formatter,
context,
)
ret, path = run_test(
formatter,
context,
debug,
contexts["checks"],
base_env,
executor,
)

if ret:
tstid = "%s in context %s [using %s executor]" % (
formatter,
context["name"],
executor,
)
failed.append((tstid, path))
log(
"failed [using %s executor]" % executor,
formatter,
context,
real_bad=True,
)
if debug:
return failed
else:
log(
"success [using %s executor]" % executor,
formatter,
context,
real_good=True,
)
else:
log("not a valid combination. Skipped.", formatter)
return failed
Expand All @@ -270,13 +378,13 @@ def run_test_cases(args):
"--tests",
default="",
type=str,
help="A comma-separate list of graphs to include (default: all).",
help="A comma-separated list of tests to include (default: all).",
)
@click.option(
"--graphs",
default="",
type=str,
help="A comma-separate list of graphs to include (default: all).",
help="A comma-separated list of graphs to include (default: all).",
)
@click.option(
"--debug",
Expand Down

0 comments on commit 0a90638

Please sign in to comment.