Skip to content

Commit

Permalink
feature: add delete and terminate for argo workflows (#1307)
Browse files Browse the repository at this point in the history
* Add production token validation. add more output to delete process

* add separate output for removing the schedule of a workflow, when applicable.

* rename schedule deletion. use staticmethod instead of classmethod. error message cleanup. Document function behavior

* add comments to delete behavior in argo cli. formatting fix

* fix token instructions output

* feature: add terminate for argo workflows (#1309)

* introduce terminate command for argo-workflows

* rename parameter, add check to terminate that workflow is not already finished. fix error output

* changing error messages for argo workflow termination

* change run_id from option to argument. refine terminate process output

* Check for termination before issuing terminate on a workflow

* move schedule deletion logic and comments into main delete method.

* feat: refactor sensor deletion (#1343)

* add sensor deletion as part of argo workflow deletion

* rework token instructions output

* add docstring and note about refactoring to validate_token

* use validate_run_id for terminate
  • Loading branch information
saikonen authored and wangchy27 committed Jul 13, 2023
1 parent 447fe5e commit 05873f3
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 21 deletions.
125 changes: 109 additions & 16 deletions metaflow/plugins/argo/argo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,92 @@ def register_workflow_template(self, name, workflow_template):
json.loads(e.body)["message"] if e.body is not None else e.reason
)

def delete_cronworkflow(self, name):
"""
Issues an API call for deleting a cronworkflow
Returns either the successful API response, or None in case the resource was not found.
"""
client = self._client.get()

try:
return client.CustomObjectsApi().delete_namespaced_custom_object(
group=self._group,
version=self._version,
namespace=self._namespace,
plural="cronworkflows",
name=name,
)
except client.rest.ApiException as e:
if e.status == 404:
return None
else:
raise ArgoClientException(
json.loads(e.body)["message"] if e.body is not None else e.reason
)

def delete_workflow_template(self, name):
"""
Issues an API call for deleting a cronworkflow
Returns either the successful API response, or None in case the resource was not found.
"""
client = self._client.get()

try:
return client.CustomObjectsApi().delete_namespaced_custom_object(
group=self._group,
version=self._version,
namespace=self._namespace,
plural="workflowtemplates",
name=name,
)
except client.rest.ApiException as e:
if e.status == 404:
return None
else:
raise ArgoClientException(
json.loads(e.body)["message"] if e.body is not None else e.reason
)

def terminate_workflow(self, run_id):
client = self._client.get()
try:
workflow = client.CustomObjectsApi().get_namespaced_custom_object(
group=self._group,
version=self._version,
namespace=self._namespace,
plural="workflows",
name=run_id,
)
except client.rest.ApiException as e:
raise ArgoClientException(
json.loads(e.body)["message"] if e.body is not None else e.reason
)

if workflow["status"]["finishedAt"] is not None:
raise ArgoClientException(
"Cannot terminate an execution that has already finished."
)
if workflow["spec"].get("shutdown") == "Terminate":
raise ArgoClientException("Execution has already been terminated.")

try:
body = {"spec": workflow["spec"]}
body["spec"]["shutdown"] = "Terminate"
return client.CustomObjectsApi().patch_namespaced_custom_object(
group=self._group,
version=self._version,
namespace=self._namespace,
plural="workflows",
name=run_id,
body=body,
)
except client.rest.ApiException as e:
raise ArgoClientException(
json.loads(e.body)["message"] if e.body is not None else e.reason
)

def suspend_workflow(self, name):
workflow = self.get_workflow(name)
if workflow is None:
Expand Down Expand Up @@ -273,8 +359,6 @@ def register_sensor(self, name, sensor=None):
except client.rest.ApiException as e:
# Sensor does not exist and we want to add one
if e.status == 404:
if sensor.get("kind") is None:
return
try:
return client.CustomObjectsApi().create_namespaced_custom_object(
group=self._group,
Expand All @@ -293,20 +377,6 @@ def register_sensor(self, name, sensor=None):
raise ArgoClientException(
json.loads(e.body)["message"] if e.body is not None else e.reason
)
# Since sensors occupy real resources, delete existing sensor if needed
if sensor.get("kind") is None:
try:
return client.CustomObjectsApi().delete_namespaced_custom_object(
group=self._group,
version=self._version,
namespace=self._namespace,
plural="sensors",
name=name,
)
except client.rest.ApiException as e:
raise ArgoClientException(
json.loads(e.body)["message"] if e.body is not None else e.reason
)
try:
return client.CustomObjectsApi().replace_namespaced_custom_object(
group=self._group,
Expand All @@ -320,3 +390,26 @@ def register_sensor(self, name, sensor=None):
raise ArgoClientException(
json.loads(e.body)["message"] if e.body is not None else e.reason
)

def delete_sensor(self, name):
"""
Issues an API call for deleting a sensor
Returns either the successful API response, or None in case the resource was not found.
"""
client = self._client.get()

try:
return client.CustomObjectsApi().delete_namespaced_custom_object(
group=self._group,
version=self._version,
namespace=self._namespace,
plural="sensors",
name=name,
)
except client.rest.ApiException as e:
if e.status == 404:
return None
raise ArgoClientException(
json.loads(e.body)["message"] if e.body is not None else e.reason
)
51 changes: 46 additions & 5 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,43 @@ def _sanitize(name):
return name.replace("_", "-")

@staticmethod
def delete(name):
client = ArgoClient(namespace=KUBERNETES_NAMESPACE)

# Always try to delete the schedule. Failure in deleting the schedule should not
# be treated as an error, due to any of the following reasons
# - there might not have been a schedule, or it was deleted by some other means
# - retaining these resources should have no consequences as long as the workflow deletion succeeds.
# - regarding cost and compute, the significant resources are part of the workflow teardown, not the schedule.
schedule_deleted = client.delete_cronworkflow(name)

# The workflow might have sensors attached to it, which consume actual resources.
# Try to delete these as well.
sensor_deleted = client.delete_sensor(name)

# After cleaning up related resources, delete the workflow in question.
# Failure in deleting is treated as critical and will be made visible to the user
# for further action.
workflow_deleted = client.delete_workflow_template(name)
if workflow_deleted is None:
raise ArgoWorkflowsException(
"The workflow *%s* doesn't exist on Argo Workflows." % name
)

return schedule_deleted, sensor_deleted, workflow_deleted

@staticmethod
def terminate(flow_name, name):
client = ArgoClient(namespace=KUBERNETES_NAMESPACE)

response = client.terminate_workflow(name)
if response is None:
raise ArgoWorkflowsException(
"No execution found for {flow_name}/{run_id} in Argo Workflows.".format(
flow_name=flow_name, run_id=name
)
)

def suspend(name):
client = ArgoClient(namespace=KUBERNETES_NAMESPACE)

Expand Down Expand Up @@ -247,16 +284,20 @@ def _get_schedule(self):

def schedule(self):
try:
ArgoClient(namespace=KUBERNETES_NAMESPACE).schedule_workflow_template(
argo_client = ArgoClient(namespace=KUBERNETES_NAMESPACE)
argo_client.schedule_workflow_template(
self.name, self._schedule, self._timezone
)
# Register sensor. Unfortunately, Argo Events Sensor names don't allow for
# dots (sensors run into an error) which rules out self.name :(
# Metaflow will overwrite any existing sensor.
ArgoClient(namespace=KUBERNETES_NAMESPACE).register_sensor(
self.name.replace(".", "-"),
self._sensor.to_json() if self._sensor else {},
)
sensor_name = self.name.replace(".", "-")
if self._sensor:
argo_client.register_sensor(sensor_name, self._sensor.to_json())
else:
# Since sensors occupy real resources, delete existing sensor if needed
# Deregister sensors that might have existed before this deployment
argo_client.delete_sensor(sensor_name)
except Exception as e:
raise ArgoWorkflowsSchedulingException(str(e))

Expand Down
103 changes: 103 additions & 0 deletions metaflow/plugins/argo/argo_workflows_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,63 @@ def _convert_value(param):
)


@argo_workflows.command(help="Delete the flow on Argo Workflows.")
@click.option(
"--authorize",
default=None,
type=str,
help="Authorize the deletion with a production token",
)
@click.pass_obj
def delete(obj, authorize=None):
def _token_instructions(flow_name, prev_user):
obj.echo(
"There is an existing version of *%s* on Argo Workflows which was "
"deployed by the user *%s*." % (flow_name, prev_user)
)
obj.echo(
"To delete this flow, you need to use the same production token that they used."
)
obj.echo(
"Please reach out to them to get the token. Once you have it, call "
"this command:"
)
obj.echo(" argo-workflows delete --authorize MY_TOKEN", fg="green")
obj.echo(
'See "Organizing Results" at docs.metaflow.org for more information '
"about production tokens."
)

validate_token(obj.workflow_name, obj.token_prefix, authorize, _token_instructions)
obj.echo("Deleting workflow *{name}*...".format(name=obj.workflow_name), bold=True)

schedule_deleted, sensor_deleted, workflow_deleted = ArgoWorkflows.delete(
obj.workflow_name
)

if schedule_deleted:
obj.echo(
"Deleting cronworkflow *{name}*...".format(name=obj.workflow_name),
bold=True,
)

if sensor_deleted:
obj.echo(
"Deleting sensor *{name}*...".format(name=obj.workflow_name),
bold=True,
)

if workflow_deleted:
obj.echo(
"Deleting Kubernetes resources may take a while. "
"Deploying the flow again to Argo Workflows while the delete is in-flight will fail."
)
obj.echo(
"In-flight executions will not be affected. "
"If necessary, terminate them manually."
)


@argo_workflows.command(help="Suspend flow execution on Argo Workflows.")
@click.option(
"--authorize",
Expand Down Expand Up @@ -717,6 +774,52 @@ def validate_token(name, token_prefix, authorize, instructions_fn=None):
return True


@argo_workflows.command(help="Terminate flow execution on Argo Workflows.")
@click.option(
"--authorize",
default=None,
type=str,
help="Authorize the termination with a production token",
)
@click.argument("run-id", required=True, type=str)
@click.pass_obj
def terminate(obj, run_id, authorize=None):
def _token_instructions(flow_name, prev_user):
obj.echo(
"There is an existing version of *%s* on Argo Workflows which was "
"deployed by the user *%s*." % (flow_name, prev_user)
)
obj.echo(
"To terminate this flow, you need to use the same production token that they used."
)
obj.echo(
"Please reach out to them to get the token. Once you have it, call "
"this command:"
)
obj.echo(" argo-workflows terminate --authorize MY_TOKEN RUN_ID", fg="green")
obj.echo(
'See "Organizing Results" at docs.metaflow.org for more information '
"about production tokens."
)

validate_run_id(
obj.workflow_name, obj.token_prefix, authorize, run_id, _token_instructions
)

# Trim prefix from run_id
name = run_id[5:]
obj.echo(
"Terminating run *{run_id}* for {flow_name} ...".format(
run_id=run_id, flow_name=obj.flow.name
),
bold=True,
)

terminated = ArgoWorkflows.terminate(obj.flow.name, name)
if terminated:
obj.echo("\nRun terminated.")


def validate_run_id(
workflow_name, token_prefix, authorize, run_id, instructions_fn=None
):
Expand Down

0 comments on commit 05873f3

Please sign in to comment.