Skip to content

Commit

Permalink
feat: sfn delete workflow (with prod token validation and messaging) (#…
Browse files Browse the repository at this point in the history
…1379)

* feat(sfn): delete a workflow

* feat(sfn): delete eventbridge rule

* add output on deletion progress, clean up error handling and add production token validation

* refactor token validation instruction echoing

* address review comments

* add todo for future refactoring

---------

Co-authored-by: Steven Hoelscher <shoelscher@artica.com>
  • Loading branch information
saikonen and Steven Hoelscher authored Jun 20, 2023
1 parent af5ef39 commit 9b86a2f
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 0 deletions.
13 changes: 13 additions & 0 deletions metaflow/plugins/aws/step_functions/event_bridge_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ def _set(self):
],
)

def delete(self):
try:
response = self._client.remove_targets(
Rule=self.name,
Ids=[self.name],
)
if response.get("FailedEntryCount", 0) > 0:
raise RuntimeError("Failed to remove targets from rule %s" % self.name)
return self._client.delete_rule(Name=self.name)
except self._client.exceptions.ResourceNotFoundException:
# Ignore if the rule does not exist.
return None


def format(name):
# AWS Event Bridge has a limit of 64 chars for rule names.
Expand Down
14 changes: 14 additions & 0 deletions metaflow/plugins/aws/step_functions/step_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,20 @@ def schedule(self):
except Exception as e:
raise StepFunctionsSchedulingException(repr(e))

@classmethod
def delete(cls, name):
# Always attempt to delete the event bridge rule.
schedule_deleted = EventBridgeClient(name).delete()

sfn_deleted = StepFunctionsClient().delete(name)

if sfn_deleted is None:
raise StepFunctionsException(
"The workflow *%s* doesn't exist on AWS Step Functions." % name
)

return schedule_deleted, sfn_deleted

@classmethod
def trigger(cls, name, parameters):
try:
Expand Down
99 changes: 99 additions & 0 deletions metaflow/plugins/aws/step_functions/step_functions_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,3 +555,102 @@ def list_runs(
"No executions for *%s* found on AWS Step Functions."
% (obj.state_machine_name)
)


@step_functions.command(help="Delete a workflow")
@click.option(
"--authorize",
default=None,
type=str,
help="Authorize the deletion with a production token",
)
@click.pass_obj
def delete(obj, authorize=None):
def _token_instructions(flow_name, prev_user):
obj.echo(
"There is an existing version of *%s* on AWS Step "
"Functions which was deployed by the user "
"*%s*." % (flow_name, prev_user)
)
obj.echo(
"To delete this flow, you need to use the same production token that they used."
)
obj.echo(
"Please reach out to them to get the token. Once you "
"have it, call this command:"
)
obj.echo(" step-functions delete --authorize MY_TOKEN", fg="green")
obj.echo(
'See "Organizing Results" at docs.metaflow.org for more '
"information about production tokens."
)

validate_token(
obj.state_machine_name, obj.token_prefix, authorize, _token_instructions
)

obj.echo(
"Deleting AWS Step Functions state machine *{name}*...".format(
name=obj.state_machine_name
),
bold=True,
)
schedule_deleted, sfn_deleted = StepFunctions.delete(obj.state_machine_name)

if schedule_deleted:
obj.echo(
"Deleting Amazon EventBridge rule *{name}* as well...".format(
name=obj.state_machine_name
),
bold=True,
)
if sfn_deleted:
obj.echo(
"Deleting the AWS Step Functions state machine may take a while. "
"Deploying the flow again to AWS Step Functions while the delete is in-flight will fail."
)
obj.echo(
"In-flight executions will not be affected. "
"If necessary, terminate them manually."
)


def validate_token(name, token_prefix, authorize, instruction_fn=None):
"""
Validate that the production token matches that of the deployed flow.
In case both the user and token do not match, raises an error.
Optionally outputs instructions on token usage via the provided instruction_fn(flow_name, prev_user)
"""
# TODO: Unify this with the existing resolve_token implementation.

# 1) retrieve the previous deployment, if one exists
workflow = StepFunctions.get_existing_deployment(name)
if workflow is None:
prev_token = None
else:
prev_user, prev_token = workflow

# 2) authorize this deployment
if prev_token is not None:
if authorize is None:
authorize = load_token(token_prefix)
elif authorize.startswith("production:"):
authorize = authorize[11:]

# we allow the user who deployed the previous version to re-deploy,
# even if they don't have the token
# NOTE: The username is visible in multiple sources, and can be set by the user.
# Should we consider being stricter here?
if prev_user != get_username() and authorize != prev_token:
if instruction_fn:
instruction_fn(flow_name=name, prev_user=prev_user)
raise IncorrectProductionToken(
"Try again with the correct production token."
)

# 3) all validations passed, store the previous token for future use
token = prev_token

store_token(token_prefix, token)
return True
8 changes: 8 additions & 0 deletions metaflow/plugins/aws/step_functions/step_functions_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,11 @@ def get_state_machine_arn(self, name):
if state_machine:
return state_machine["stateMachineArn"]
return None

def delete(self, name):
state_machine_arn = self.get_state_machine_arn(name)
if state_machine_arn is None:
return None
return self._client.delete_state_machine(
stateMachineArn=state_machine_arn,
)

0 comments on commit 9b86a2f

Please sign in to comment.