Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: argo namelength issue fixes for remaining commands #2123

Open
wants to merge 8 commits into
base: name-length-issues
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ def terminate(cls, flow_name, name):
flow_name=flow_name, run_id=name
)
)
return True

@staticmethod
def get_workflow_status(flow_name, name):
Expand Down
170 changes: 121 additions & 49 deletions metaflow/plugins/argo/argo_workflows_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,22 @@ def _convert_value(param):
if kwargs.get(param.name.replace("-", "_").lower()) is not None
}

response = ArgoWorkflows.trigger(obj.workflow_name, params)
workflow_name_to_deploy = obj.workflow_name
# For users that upgraded the client but did not redeploy their flow,
# we fallback to old workflow names in case of a conflict.
if obj.workflow_name != obj._v1_workflow_name:
# use the old name only if there exists a deployment.
if ArgoWorkflows.get_existing_deployment(obj._v1_workflow_name):
obj.echo("Warning! ", bold=True, nl=False)
obj.echo(
"Found a deployment of this flow with an old style name, defaulted to triggering *%s*. \nDue to new naming restrictions on Argo Workflows, "
"this flow will have a shorter name with newer\nversions of Metaflow (>=2.13) "
"which will allow it to be triggered through Argo UI as well. "
% obj._v1_workflow_name
)
obj.echo("re-deploy your flow in order to get rid of this message.")
workflow_name_to_deploy = obj._v1_workflow_name
response = ArgoWorkflows.trigger(workflow_name_to_deploy, params)
run_id = "argo-" + response["metadata"]["name"]

if run_id_file:
Expand All @@ -780,7 +795,7 @@ def _convert_value(param):
with open(deployer_attribute_file, "w") as f:
json.dump(
{
"name": obj.workflow_name,
"name": workflow_name_to_deploy,
"metadata": get_metadata(),
"pathspec": "/".join((obj.flow.name, run_id)),
},
Expand All @@ -789,7 +804,7 @@ def _convert_value(param):

obj.echo(
"Workflow *{name}* triggered on Argo Workflows "
"(run-id *{run_id}*).".format(name=obj.workflow_name, run_id=run_id),
"(run-id *{run_id}*).".format(name=workflow_name_to_deploy, run_id=run_id),
bold=True,
)

Expand Down Expand Up @@ -831,26 +846,56 @@ def _token_instructions(flow_name, prev_user):
"about production tokens."
)

validate_token(obj.workflow_name, obj.token_prefix, authorize, _token_instructions)
obj.echo("Deleting workflow *{name}*...".format(name=obj.workflow_name), bold=True)
# Cases and expected behaviours:
# old name exists, new name does not exist -> delete old and do not fail on missing new
# old name exists, new name exists -> delete both
# old name does not exist, new name exists -> only try to delete new
# old name does not exist, new name does not exist -> keep previous behaviour where missing deployment raises error for the new name.
def _delete(workflow_name):
validate_token(workflow_name, obj.token_prefix, authorize, _token_instructions)
obj.echo("Deleting workflow *{name}*...".format(name=workflow_name), bold=True)

schedule_deleted, sensor_deleted, workflow_deleted = ArgoWorkflows.delete(
workflow_name
)

schedule_deleted, sensor_deleted, workflow_deleted = ArgoWorkflows.delete(
obj.workflow_name
)
if schedule_deleted:
obj.echo(
"Deleting cronworkflow *{name}*...".format(name=workflow_name),
bold=True,
)

if schedule_deleted:
obj.echo(
"Deleting cronworkflow *{name}*...".format(name=obj.workflow_name),
bold=True,
)
if sensor_deleted:
obj.echo(
"Deleting sensor *{name}*...".format(name=workflow_name),
bold=True,
)
return workflow_deleted

workflows_deleted = False
cleanup_old_name = False
if obj.workflow_name != obj._v1_workflow_name:
# Only add the old name if there exists a deployment with such name.
# This is due to the way validate_token is tied to an existing deployment.
if ArgoWorkflows.get_existing_deployment(obj._v1_workflow_name) is not None:
cleanup_old_name = True
obj.echo(
"This flow has been deployed with another name in the past due to a limitation with Argo Workflows.\n"
"Will also delete the older deployment."
)
_delete(obj._v1_workflow_name)
workflows_deleted = True

if sensor_deleted:
obj.echo(
"Deleting sensor *{name}*...".format(name=obj.workflow_name),
bold=True,
)
# Always try to delete the current name.
# Do not raise exception if we deleted old name before this.
try:
_delete(obj.workflow_name)
workflows_deleted = True
except ArgoWorkflowsException:
if not cleanup_old_name:
raise

if workflow_deleted:
if workflows_deleted:
obj.echo(
"Deleting Kubernetes resources may take a while. "
"Deploying the flow again to Argo Workflows while the delete is in-flight will fail."
Expand Down Expand Up @@ -889,17 +934,26 @@ def _token_instructions(flow_name, prev_user):
"about production tokens."
)

validate_run_id(
obj.workflow_name, obj.token_prefix, authorize, run_id, _token_instructions
)
workflows = [obj.workflow_name]
if obj.workflow_name != obj._v1_workflow_name:
# Only add the old name if there exists a deployment with such name.
# This is due to the way validate_token is tied to an existing deployment.
if ArgoWorkflows.get_existing_deployment(obj._v1_workflow_name) is not None:
workflows.append(obj._v1_workflow_name)

# Trim prefix from run_id
name = run_id[5:]
for workflow_name in workflows:
validate_run_id(
workflow_name, obj.token_prefix, authorize, run_id, _token_instructions
)

# Trim prefix from run_id
name = run_id[5:]

workflow_suspended = ArgoWorkflows.suspend(name)
workflow_suspended = ArgoWorkflows.suspend(name)

if workflow_suspended:
obj.echo("Suspended execution of *%s*" % run_id)
if workflow_suspended:
obj.echo("Suspended execution of *%s*" % run_id)
break # no need to try out all workflow_names if we found the running one.


@argo_workflows.command(help="Unsuspend flow execution on Argo Workflows.")
Expand Down Expand Up @@ -933,17 +987,26 @@ def _token_instructions(flow_name, prev_user):
"about production tokens."
)

validate_run_id(
obj.workflow_name, obj.token_prefix, authorize, run_id, _token_instructions
)
workflows = [obj.workflow_name]
if obj.workflow_name != obj._v1_workflow_name:
# Only add the old name if there exists a deployment with such name.
# This is due to the way validate_token is tied to an existing deployment.
if ArgoWorkflows.get_existing_deployment(obj._v1_workflow_name) is not None:
workflows.append(obj._v1_workflow_name)

# Trim prefix from run_id
name = run_id[5:]
for workflow_name in workflows:
validate_run_id(
workflow_name, obj.token_prefix, authorize, run_id, _token_instructions
)

workflow_suspended = ArgoWorkflows.unsuspend(name)
# Trim prefix from run_id
name = run_id[5:]

if workflow_suspended:
obj.echo("Unsuspended execution of *%s*" % run_id)
workflow_suspended = ArgoWorkflows.unsuspend(name)

if workflow_suspended:
obj.echo("Unsuspended execution of *%s*" % run_id)
break # no need to try all workflow_names if we found one.


def validate_token(name, token_prefix, authorize, instructions_fn=None):
Expand Down Expand Up @@ -1051,22 +1114,31 @@ def _token_instructions(flow_name, prev_user):
"about production tokens."
)

validate_run_id(
obj.workflow_name, obj.token_prefix, authorize, run_id, _token_instructions
)
workflows = [obj.workflow_name]
if obj.workflow_name != obj._v1_workflow_name:
# Only add the old name if there exists a deployment with such name.
# This is due to the way validate_token is tied to an existing deployment.
if ArgoWorkflows.get_existing_deployment(obj._v1_workflow_name) is not None:
workflows.append(obj._v1_workflow_name)

# Trim prefix from run_id
name = run_id[5:]
obj.echo(
"Terminating run *{run_id}* for {flow_name} ...".format(
run_id=run_id, flow_name=obj.flow.name
),
bold=True,
)
for workflow_name in workflows:
validate_run_id(
workflow_name, obj.token_prefix, authorize, run_id, _token_instructions
)

# Trim prefix from run_id
name = run_id[5:]
obj.echo(
"Terminating run *{run_id}* for {flow_name} ...".format(
run_id=run_id, flow_name=obj.flow.name
),
bold=True,
)

terminated = ArgoWorkflows.terminate(obj.flow.name, name)
if terminated:
obj.echo("\nRun terminated.")
terminated = ArgoWorkflows.terminate(obj.flow.name, name)
if terminated:
obj.echo("\nRun terminated.")
break # no need to try all workflow_names if we found the running one.


@argo_workflows.command(help="List Argo Workflow templates for the flow.")
Expand Down