Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sfn delete workflow (with prod token validation and messaging) #1379

Merged
merged 6 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions metaflow/plugins/aws/step_functions/event_bridge_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ def _set(self):
],
)

def delete(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any specific reasons to use delete vs disable?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main reason would be for cleanup purposes, so deleted flows do not leave any residual resources in AWS. A disabled rule would not incur any costs so impact is the same as with delete.

what would the case be for preferring disable over delete? My expectation for how step-functions delete command works would be that resources are removed, not simply disabled.

try:
response = self._client.remove_targets(
Rule=self.name,
Ids=[self.name],
)
if response.get("FailedEntryCount", 0) > 0:
raise RuntimeError("Failed to remove targets from rule %s" % self.name)
return self._client.delete_rule(Name=self.name)
except self._client.exceptions.ResourceNotFoundException:
# Ignore if the rule does not exist.
return None


def format(name):
# AWS Event Bridge has a limit of 64 chars for rule names.
Expand Down
14 changes: 14 additions & 0 deletions metaflow/plugins/aws/step_functions/step_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,20 @@ def schedule(self):
except Exception as e:
raise StepFunctionsSchedulingException(repr(e))

@classmethod
def delete(cls, name):
# Always attempt to delete the event bridge rule.
schedule_deleted = EventBridgeClient(name).delete()

sfn_deleted = StepFunctionsClient().delete(name)

if sfn_deleted is None:
raise StepFunctionsException(
"The workflow *%s* doesn't exist on AWS Step Functions." % name
)

return schedule_deleted, sfn_deleted

@classmethod
def trigger(cls, name, parameters):
try:
Expand Down
111 changes: 94 additions & 17 deletions metaflow/plugins/aws/step_functions/step_functions_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,23 +333,12 @@ def resolve_token(
# we allow the user who deployed the previous version to re-deploy,
# even if they don't have the token
if prev_user != get_username() and authorize != prev_token:
obj.echo(
"There is an existing version of *%s* on AWS Step "
"Functions which was deployed by the user "
"*%s*." % (name, prev_user)
)
obj.echo(
"To deploy a new version of this flow, you need to use "
"the same production token that they used. "
)
obj.echo(
"Please reach out to them to get the token. Once you "
"have it, call this command:"
)
obj.echo(" step-functions create --authorize MY_TOKEN", fg="green")
obj.echo(
'See "Organizing Results" at docs.metaflow.org for more '
"information about production tokens."
echo_token_instructions(
obj,
name,
prev_user,
cmd_name="create",
cmd_description="deploy a new version of",
)
raise IncorrectProductionToken(
"Try again with the correct " "production token."
Expand Down Expand Up @@ -545,3 +534,91 @@ def list_runs(
"No executions for *%s* found on AWS Step Functions."
% (obj.state_machine_name)
)


@step_functions.command(help="Delete a workflow")
@click.option(
"--authorize",
default=None,
type=str,
help="Authorize the deletion with a production token",
)
@click.pass_obj
def delete(obj, authorize=None):
validate_token(obj.state_machine_name, obj.token_prefix, obj, authorize, "delete")

obj.echo(
"Deleting Step Function *{name}*...".format(name=obj.state_machine_name),
saikonen marked this conversation as resolved.
Show resolved Hide resolved
bold=True,
)
schedule_deleted, sfn_deleted = StepFunctions.delete(obj.state_machine_name)

if schedule_deleted:
obj.echo(
"Deleting EventBridge rule *{name}* as well...".format(
saikonen marked this conversation as resolved.
Show resolved Hide resolved
name=obj.state_machine_name
),
bold=True,
)
if sfn_deleted:
obj.echo(
"Deleting the Step Function may take a while. "
saikonen marked this conversation as resolved.
Show resolved Hide resolved
"Deploying the flow again to Step Functions while the delete is in-flight will fail."
)
obj.echo(
"In-flight executions will not be affected. "
"If necessary, terminate them manually."
)


def validate_token(name, token_prefix, obj, authorize, cmd_name):
# 1) retrieve the previous deployment, if one exists
workflow = StepFunctions.get_existing_deployment(name)
if workflow is None:
prev_token = None
else:
prev_user, prev_token = workflow

# 2) authorize this deployment
if prev_token is not None:
if authorize is None:
authorize = load_token(token_prefix)
elif authorize.startswith("production:"):
authorize = authorize[11:]

# we allow the user who deployed the previous version to re-deploy,
# even if they don't have the token
# NOTE: The username is visible in multiple sources, and can be set by the user.
# Should we consider being stricter here?
if prev_user != get_username() and authorize != prev_token:
echo_token_instructions(obj, name, prev_user, cmd_name)
raise IncorrectProductionToken(
"Try again with the correct production token."
)

# 3) all validations passed, store the previous token for future use
token = prev_token

store_token(token_prefix, token)
return True


def echo_token_instructions(obj, name, prev_user, cmd_name, cmd_description=None):
obj.echo(
"There is an existing version of *%s* on AWS Step "
"Functions which was deployed by the user "
"*%s*." % (name, prev_user)
)
obj.echo(
"To deploy a new version of this flow, you need to use "
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this wouldn't work for deletes. also, it is likely that we may have more commands that wouldn't allow us to combine the token instructions into one. Maybe it is indeed okay to have some repetition.

Copy link
Collaborator Author

@saikonen saikonen Apr 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems I simply forgot to make this line dynamic, the same output works in the argo-workflows delete/terminate feature already.

refactored this so token usage instructions can be passed to the validator as a function instead, keeping the validate_token() reusable for future commands as well (such as terminate etc. if we end up implementing those). Should be a somewhat cleaner approach now, let me know what you think :)

"the same production token that they used. "
)
obj.echo(
"Please reach out to them to get the token. Once you "
"have it, call this command:"
)
obj.echo(" step-functions %s --authorize MY_TOKEN" % cmd_name, fg="green")
obj.echo(
'See "Organizing Results" at docs.metaflow.org for more '
"information about production tokens."
)
8 changes: 8 additions & 0 deletions metaflow/plugins/aws/step_functions/step_functions_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,11 @@ def get_state_machine_arn(self, name):
if state_machine:
return state_machine["stateMachineArn"]
return None

def delete(self, name):
state_machine_arn = self.get_state_machine_arn(name)
if state_machine_arn is None:
return None
return self._client.delete_state_machine(
stateMachineArn=state_machine_arn,
)