-
Notifications
You must be signed in to change notification settings - Fork 781
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: add terminate for argo workflows #1309
feature: add terminate for argo workflows #1309
Conversation
# TODO: perform check that the found workflow is able to be terminated. We do not want to patch failed / completed workflows. | ||
try: | ||
body = {"spec": workflow["spec"]} | ||
body["spec"]["shutdown"] = "Terminate" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you confirm that shutdown
is a required part of the spec?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing this!
|
||
terminated = ArgoWorkflows.terminate(obj.workflow_name, run_id) | ||
if terminated: | ||
obj.echo(" Terminated.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit - Run terminated. No new tasks will execute after the currently running tasks finish.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is clearly a great feature as is. What about tasks that already started? I'd think that is the UX users actually want - though probably much more involved to implement. Imagine a wide foreach that is inflight...
Wish-list for future (--abort-running-tasks
)!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note that after issuing the terminate command, even currently running tasks will not be able to finish, as they are part of the workflow to be terminated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some minor nits.
One major question - Can you verify that terminate will only ensure that no new tasks are launched, but existing tasks still continue? We may want to introduce a kill
command if it is feasible to kill all the in-flight tasks.
) | ||
|
||
try: | ||
body = {"spec": workflow["spec"]} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor thing: should we skip the patching if we see that shutdown is already set to Terminate as we wish? This code path probably will get exercised - when users rerun stuff (expected delays / inflight tasks etc).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
|
||
terminated = ArgoWorkflows.terminate(obj.workflow_name, run_id) | ||
if terminated: | ||
obj.echo(" Terminated.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is clearly a great feature as is. What about tasks that already started? I'd think that is the UX users actually want - though probably much more involved to implement. Imagine a wide foreach that is inflight...
Wish-list for future (--abort-running-tasks
)!
the behavior from the Task perspective is more or less the same whether we use |
adedb05
to
9a0a4c1
Compare
rebased off the |
|
||
terminated = ArgoWorkflows.terminate(obj.flow.name, run_id) | ||
if terminated: | ||
obj.echo(" Run terminated.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rather than indent and print, we could print after a new line with no indent
type=str, | ||
help="Authorize the termination with a production token", | ||
) | ||
@click.option( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can take an arg run id instead of an option run id here
# Verify that user is trying to terminate an Argo workflow | ||
if not run_id.startswith("argo-"): | ||
raise ArgoWorkflowsException( | ||
"No active workflow for {flow_name}/{run_id} found in Argo Workflows.".format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No execution ... to keep in sync with L:184?
@@ -602,3 +602,30 @@ def echo_token_instructions(obj, name, prev_user, cmd_name, cmd_description=None | |||
'See "Organizing Results" at docs.metaflow.org for more information ' | |||
"about production tokens." | |||
) | |||
|
|||
|
|||
@argo_workflows.command(help="Terminate the Argo workflow") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor not - Terminate flow execution on Argo Workflows.
Also, in L:516 above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would the L:516 then read Delete the flow on Argo Workflows
as its not about a single execution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm! minor changes after which we can merge and release
* introduce terminate command for argo-workflows * rename parameter, add check to terminate that workflow is not already finished. fix error output * changing error messages for argo workflow termination * change run_id from option to argument. refine terminate process output * Check for termination before issuing terminate on a workflow
* Add production token validation. add more output to delete process * add separate output for removing the schedule of a workflow, when applicable. * rename schedule deletion. use staticmethod instead of classmethod. error message cleanup. Document function behavior * add comments to delete behavior in argo cli. formatting fix * fix token instructions output * feature: add terminate for argo workflows (#1309) * introduce terminate command for argo-workflows * rename parameter, add check to terminate that workflow is not already finished. fix error output * changing error messages for argo workflow termination * change run_id from option to argument. refine terminate process output * Check for termination before issuing terminate on a workflow * move schedule deletion logic and comments into main delete method. * feat: refactor sensor deletion (#1343) * add sensor deletion as part of argo workflow deletion * rework token instructions output * add docstring and note about refactoring to validate_token * use validate_run_id for terminate
* Add production token validation. add more output to delete process * add separate output for removing the schedule of a workflow, when applicable. * rename schedule deletion. use staticmethod instead of classmethod. error message cleanup. Document function behavior * add comments to delete behavior in argo cli. formatting fix * fix token instructions output * feature: add terminate for argo workflows (#1309) * introduce terminate command for argo-workflows * rename parameter, add check to terminate that workflow is not already finished. fix error output * changing error messages for argo workflow termination * change run_id from option to argument. refine terminate process output * Check for termination before issuing terminate on a workflow * move schedule deletion logic and comments into main delete method. * feat: refactor sensor deletion (#1343) * add sensor deletion as part of argo workflow deletion * rework token instructions output * add docstring and note about refactoring to validate_token * use validate_run_id for terminate
argo-workflows terminate
as a command, requires a run_id for an argo workflowrelates to #1303
TODO: