diff --git a/metaflow/plugins/aws/step_functions/event_bridge_client.py b/metaflow/plugins/aws/step_functions/event_bridge_client.py index 5620b1de039..7f2273ed559 100644 --- a/metaflow/plugins/aws/step_functions/event_bridge_client.py +++ b/metaflow/plugins/aws/step_functions/event_bridge_client.py @@ -60,6 +60,19 @@ def _set(self): ], ) + def delete(self): + try: + response = self._client.remove_targets( + Rule=self.name, + Ids=[self.name], + ) + if response.get("FailedEntryCount", 0) > 0: + raise RuntimeError("Failed to remove targets from rule %s" % self.name) + return self._client.delete_rule(Name=self.name) + except self._client.exceptions.ResourceNotFoundException: + # Ignore if the rule does not exist. + return None + def format(name): # AWS Event Bridge has a limit of 64 chars for rule names. diff --git a/metaflow/plugins/aws/step_functions/step_functions.py b/metaflow/plugins/aws/step_functions/step_functions.py index 319afa4192c..cefb41ee171 100644 --- a/metaflow/plugins/aws/step_functions/step_functions.py +++ b/metaflow/plugins/aws/step_functions/step_functions.py @@ -158,6 +158,20 @@ def schedule(self): except Exception as e: raise StepFunctionsSchedulingException(repr(e)) + @classmethod + def delete(cls, name): + # Always attempt to delete the event bridge rule. + schedule_deleted = EventBridgeClient(name).delete() + + sfn_deleted = StepFunctionsClient().delete(name) + + if sfn_deleted is None: + raise StepFunctionsException( + "The workflow *%s* doesn't exist on AWS Step Functions." % name + ) + + return schedule_deleted, sfn_deleted + @classmethod def trigger(cls, name, parameters): try: diff --git a/metaflow/plugins/aws/step_functions/step_functions_cli.py b/metaflow/plugins/aws/step_functions/step_functions_cli.py index 0ac26e1159b..4a0eac859a7 100644 --- a/metaflow/plugins/aws/step_functions/step_functions_cli.py +++ b/metaflow/plugins/aws/step_functions/step_functions_cli.py @@ -545,3 +545,102 @@ def list_runs( "No executions for *%s* found on AWS Step Functions." % (obj.state_machine_name) ) + + +@step_functions.command(help="Delete a workflow") +@click.option( + "--authorize", + default=None, + type=str, + help="Authorize the deletion with a production token", +) +@click.pass_obj +def delete(obj, authorize=None): + def _token_instructions(flow_name, prev_user): + obj.echo( + "There is an existing version of *%s* on AWS Step " + "Functions which was deployed by the user " + "*%s*." % (flow_name, prev_user) + ) + obj.echo( + "To delete this flow, you need to use the same production token that they used." + ) + obj.echo( + "Please reach out to them to get the token. Once you " + "have it, call this command:" + ) + obj.echo(" step-functions delete --authorize MY_TOKEN", fg="green") + obj.echo( + 'See "Organizing Results" at docs.metaflow.org for more ' + "information about production tokens." + ) + + validate_token( + obj.state_machine_name, obj.token_prefix, authorize, _token_instructions + ) + + obj.echo( + "Deleting AWS Step Functions state machine *{name}*...".format( + name=obj.state_machine_name + ), + bold=True, + ) + schedule_deleted, sfn_deleted = StepFunctions.delete(obj.state_machine_name) + + if schedule_deleted: + obj.echo( + "Deleting Amazon EventBridge rule *{name}* as well...".format( + name=obj.state_machine_name + ), + bold=True, + ) + if sfn_deleted: + obj.echo( + "Deleting the AWS Step Functions state machine may take a while. " + "Deploying the flow again to AWS Step Functions while the delete is in-flight will fail." + ) + obj.echo( + "In-flight executions will not be affected. " + "If necessary, terminate them manually." + ) + + +def validate_token(name, token_prefix, authorize, instruction_fn=None): + """ + Validate that the production token matches that of the deployed flow. + + In case both the user and token do not match, raises an error. + Optionally outputs instructions on token usage via the provided instruction_fn(flow_name, prev_user) + """ + # TODO: Unify this with the existing resolve_token implementation. + + # 1) retrieve the previous deployment, if one exists + workflow = StepFunctions.get_existing_deployment(name) + if workflow is None: + prev_token = None + else: + prev_user, prev_token = workflow + + # 2) authorize this deployment + if prev_token is not None: + if authorize is None: + authorize = load_token(token_prefix) + elif authorize.startswith("production:"): + authorize = authorize[11:] + + # we allow the user who deployed the previous version to re-deploy, + # even if they don't have the token + # NOTE: The username is visible in multiple sources, and can be set by the user. + # Should we consider being stricter here? + if prev_user != get_username() and authorize != prev_token: + if instruction_fn: + instruction_fn(flow_name=name, prev_user=prev_user) + raise IncorrectProductionToken( + "Try again with the correct production token." + ) + + # 3) all validations passed, store the previous token for future use + token = prev_token + + store_token(token_prefix, token) + return True diff --git a/metaflow/plugins/aws/step_functions/step_functions_client.py b/metaflow/plugins/aws/step_functions/step_functions_client.py index c42bb0a8907..f7418f15427 100644 --- a/metaflow/plugins/aws/step_functions/step_functions_client.py +++ b/metaflow/plugins/aws/step_functions/step_functions_client.py @@ -117,3 +117,11 @@ def get_state_machine_arn(self, name): if state_machine: return state_machine["stateMachineArn"] return None + + def delete(self, name): + state_machine_arn = self.get_state_machine_arn(name) + if state_machine_arn is None: + return None + return self._client.delete_state_machine( + stateMachineArn=state_machine_arn, + )