diff --git a/samcli/commands/delete/delete_context.py b/samcli/commands/delete/delete_context.py index 37014b74bd..2ce33336a7 100644 --- a/samcli/commands/delete/delete_context.py +++ b/samcli/commands/delete/delete_context.py @@ -35,6 +35,7 @@ class DeleteContext: + # TODO: Separate this context into 2 separate contexts guided and non-guided, just like deploy. def __init__(self, stack_name: str, region: str, profile: str, config_file: str, config_env: str, no_prompts: bool): self.stack_name = stack_name self.region = region @@ -57,9 +58,15 @@ def __enter__(self): self.parse_config_file() if not self.stack_name: LOG.debug("No stack-name input found") - self.stack_name = prompt( - click.style("\tEnter stack name you want to delete:", bold=True), type=click.STRING - ) + if not self.no_prompts: + self.stack_name = prompt( + click.style("\tEnter stack name you want to delete", bold=True), type=click.STRING + ) + else: + raise click.BadOptionUsage( + option_name="--stack-name", + message="Missing option '--stack-name', provide a stack name that needs to be deleted.", + ) self.init_clients() return self @@ -94,9 +101,15 @@ def init_clients(self): Initialize all the clients being used by sam delete. """ if not self.region: - session = boto3.Session() - region = session.region_name - self.region = region if region else "us-east-1" + if not self.no_prompts: + session = boto3.Session() + region = session.region_name + self.region = region if region else "us-east-1" + else: + raise click.BadOptionUsage( + option_name="--region", + message="Missing option '--region', region is required to run the non guided delete command.", + ) if self.profile: Context.get_current_context().profile = self.profile @@ -218,7 +231,6 @@ def delete_ecr_companion_stack(self): ) retain_repos = self.ecr_repos_prompts(ecr_companion_stack_template) - # Delete the repos created by ECR companion stack if not retained ecr_companion_stack_template.delete(retain_resources=retain_repos) @@ -229,9 +241,11 @@ def delete_ecr_companion_stack(self): self.cf_utils.delete_stack(stack_name=self.companion_stack_name) self.cf_utils.wait_for_delete(stack_name=self.companion_stack_name) LOG.debug("Deleted ECR Companion Stack: %s", self.companion_stack_name) + except CfDeleteFailedStatusError: LOG.debug("delete_stack resulted failed and so re-try with retain_resources") self.cf_utils.delete_stack(stack_name=self.companion_stack_name, retain_resources=retain_repos) + self.cf_utils.wait_for_delete(stack_name=self.companion_stack_name) def delete(self): """ @@ -296,9 +310,11 @@ def delete(self): self.cf_utils.delete_stack(stack_name=self.stack_name) self.cf_utils.wait_for_delete(self.stack_name) LOG.debug("Deleted Cloudformation stack: %s", self.stack_name) + except CfDeleteFailedStatusError: LOG.debug("delete_stack resulted failed and so re-try with retain_resources") self.cf_utils.delete_stack(stack_name=self.stack_name, retain_resources=retain_resources) + self.cf_utils.wait_for_delete(self.stack_name) # If s3_bucket information is not available, warn the user if not self.s3_bucket: diff --git a/samcli/lib/delete/cf_utils.py b/samcli/lib/delete/cf_utils.py index d418306e00..b2f6acae7b 100644 --- a/samcli/lib/delete/cf_utils.py +++ b/samcli/lib/delete/cf_utils.py @@ -30,6 +30,10 @@ def has_stack(self, stack_name: str) -> bool: return False stack = resp["Stacks"][0] + if stack["EnableTerminationProtection"]: + message = "Stack cannot be deleted while TerminationProtection is enabled." + raise DeleteFailedError(stack_name=stack_name, msg=message) + # Note: Stacks with REVIEW_IN_PROGRESS can be deleted # using delete_stack but get_template does not return # the template_str for this stack restricting deletion of @@ -53,11 +57,6 @@ def has_stack(self, stack_name: str) -> bool: LOG.error("Botocore Exception : %s", str(e)) raise DeleteFailedError(stack_name=stack_name, msg=str(e)) from e - except Exception as e: - # We don't know anything about this exception. Don't handle - LOG.error("Unable to get stack details.", exc_info=e) - raise e - def get_stack_template(self, stack_name: str, stage: str) -> Dict: """ Return the Cloudformation template of the given stack_name diff --git a/samcli/lib/package/ecr_uploader.py b/samcli/lib/package/ecr_uploader.py index c9546aa93e..4d5d8c18e6 100644 --- a/samcli/lib/package/ecr_uploader.py +++ b/samcli/lib/package/ecr_uploader.py @@ -133,8 +133,10 @@ def delete_artifact(self, image_uri: str, resource_id: str, property_name: str): except botocore.exceptions.ClientError as ex: # Handle Client errors such as RepositoryNotFoundException or InvalidParameterException - LOG.error("DeleteArtifactFailedError Exception : %s", str(ex)) - raise DeleteArtifactFailedError(resource_id=resource_id, property_name=property_name, ex=ex) from ex + if "RepositoryNotFoundException" not in str(ex): + LOG.debug("DeleteArtifactFailedError Exception : %s", str(ex)) + raise DeleteArtifactFailedError(resource_id=resource_id, property_name=property_name, ex=ex) from ex + LOG.debug("RepositoryNotFoundException : %s", str(ex)) def delete_ecr_repository(self, physical_id: str): """ diff --git a/samcli/lib/package/packageable_resources.py b/samcli/lib/package/packageable_resources.py index 808468a670..140aeaedf2 100644 --- a/samcli/lib/package/packageable_resources.py +++ b/samcli/lib/package/packageable_resources.py @@ -178,7 +178,11 @@ def get_property_value(self, resource_dict): return {"Bucket": None, "Key": None} resource_path = jmespath.search(self.PROPERTY_NAME, resource_dict) - if resource_path: + # In the case where resource_path is pointing to an intrinsinc + # ref function, sam delete will delete the stack but skip the deletion of this + # artifact, as deletion of intrinsic ref function artifacts is not supported yet. + # TODO: Allow deletion of S3 artifacts with intrinsic ref functions. + if resource_path and isinstance(resource_path, str): return self.uploader.parse_s3_url(resource_path) return {"Bucket": None, "Key": None} @@ -233,12 +237,14 @@ def delete(self, resource_id, resource_dict): return remote_path = resource_dict.get(self.PROPERTY_NAME, {}).get(self.EXPORT_PROPERTY_CODE_KEY) - if is_ecr_url(remote_path): + # In the case where remote_path is pointing to an intrinsinc + # ref function, sam delete will delete the stack but skip the deletion of this + # artifact, as deletion of intrinsic ref function artifacts is not supported yet. + # TODO: Allow deletion of ECR artifacts with intrinsic ref functions. + if isinstance(remote_path, str) and is_ecr_url(remote_path): self.uploader.delete_artifact( image_uri=remote_path, resource_id=resource_id, property_name=self.PROPERTY_NAME ) - else: - raise ValueError("URL given to the parse method is not a valid ECR url {0}".format(remote_path)) class ResourceImage(Resource): @@ -288,13 +294,15 @@ def delete(self, resource_id, resource_dict): if resource_dict is None: return - remote_path = resource_dict[self.PROPERTY_NAME] - if is_ecr_url(remote_path): + remote_path = resource_dict.get(self.PROPERTY_NAME) + # In the case where remote_path is pointing to an intrinsinc + # ref function, sam delete will delete the stack but skip the deletion of this + # artifact, as deletion of intrinsic ref function artifacts is not supported yet. + # TODO: Allow deletion of ECR artifacts with intrinsic ref functions. + if isinstance(remote_path, str) and is_ecr_url(remote_path): self.uploader.delete_artifact( image_uri=remote_path, resource_id=resource_id, property_name=self.PROPERTY_NAME ) - else: - raise ValueError("URL given to the parse method is not a valid ECR url {0}".format(remote_path)) class ResourceWithS3UrlDict(ResourceZip): @@ -350,7 +358,13 @@ def get_property_value(self, resource_dict): s3_bucket = resource_path.get(self.BUCKET_NAME_PROPERTY, None) key = resource_path.get(self.OBJECT_KEY_PROPERTY, None) - return {"Bucket": s3_bucket, "Key": key} + # In the case where resource_path is pointing to an intrinsinc + # ref function, sam delete will delete the stack but skip the deletion of this + # artifact, as deletion of intrinsic ref function artifacts is not supported yet. + # TODO: Allow deletion of S3 artifacts with intrinsic ref functions. + if isinstance(s3_bucket, str) and isinstance(key, str): + return {"Bucket": s3_bucket, "Key": key} + return {"Bucket": None, "Key": None} class ServerlessFunctionResource(ResourceZip): @@ -535,7 +549,8 @@ def delete(self, resource_id, resource_dict): return repository_name = self.get_property_value(resource_dict) - if repository_name: + # TODO: Allow deletion of ECR Repositories with intrinsic ref functions. + if repository_name and isinstance(repository_name, str): self.uploader.delete_ecr_repository(physical_id=repository_name) def get_property_value(self, resource_dict): diff --git a/samcli/lib/package/s3_uploader.py b/samcli/lib/package/s3_uploader.py index a7f1a9a8b9..0e0932f546 100644 --- a/samcli/lib/package/s3_uploader.py +++ b/samcli/lib/package/s3_uploader.py @@ -189,7 +189,9 @@ def delete_prefix_artifacts(self): LOG.error("Bucket not specified") raise BucketNotSpecifiedError() if self.prefix: - response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=self.prefix) + # Note: list_objects_v2 api uses prefix to fetch the keys that begin with the prefix + # To restrict fetching files with exact prefix self.prefix, "/" is used below. + response = self.s3.list_objects_v2(Bucket=self.bucket_name, Prefix=self.prefix + "/") prefix_files = response.get("Contents", []) for obj in prefix_files: self.delete_artifact(obj["Key"], True) diff --git a/tests/integration/delete/delete_integ_base.py b/tests/integration/delete/delete_integ_base.py index 1eb70ef174..5eb15810b3 100644 --- a/tests/integration/delete/delete_integ_base.py +++ b/tests/integration/delete/delete_integ_base.py @@ -1,11 +1,12 @@ import os +from pathlib import Path from unittest import TestCase class DeleteIntegBase(TestCase): @classmethod def setUpClass(cls): - pass + cls.delete_test_data_path = Path(__file__).resolve().parents[1].joinpath("testdata", "delete") def setUp(self): super().setUp() diff --git a/tests/integration/delete/test_delete_command.py b/tests/integration/delete/test_delete_command.py index 639152e746..9261245d55 100644 --- a/tests/integration/delete/test_delete_command.py +++ b/tests/integration/delete/test_delete_command.py @@ -5,7 +5,6 @@ import uuid from pathlib import Path from unittest import skipIf -import logging import boto3 import docker from botocore.config import Config @@ -26,7 +25,6 @@ CFN_SLEEP = 3 TIMEOUT = 300 CFN_PYTHON_VERSION_SUFFIX = os.environ.get("PYTHON_VERSION", "0.0.0").replace(".", "-") -LOG = logging.getLogger(__name__) @skipIf(SKIP_DELETE_TESTS, "Skip delete tests in CI/CD only") @@ -55,7 +53,7 @@ def test_delete_command_no_stack_deployed(self): stack_name = self._method_to_stack_name(self.id()) - delete_command_list = self.get_delete_command_list(stack_name=stack_name, no_prompts=True) + delete_command_list = self.get_delete_command_list(stack_name=stack_name, region="us-east-1", no_prompts=True) delete_process_execute = run_command(delete_command_list) self.assertEqual(delete_process_execute.process.returncode, 0) @@ -82,7 +80,7 @@ def test_delete_command_no_stack_deployed(self): "aws-stepfunctions-statemachine.yaml", ] ) - def test_delete_with_s3_prefix_present_zip(self, template_file): + def test_delete_no_prompts_with_s3_prefix_present_zip(self, template_file): template_path = self.test_data_path.joinpath(template_file) stack_name = self._method_to_stack_name(self.id()) @@ -98,7 +96,7 @@ def test_delete_with_s3_prefix_present_zip(self, template_file): config_file_path = self.test_data_path.joinpath(config_file_name) delete_command_list = self.get_delete_command_list( - stack_name=stack_name, config_file=config_file_path, no_prompts=True + stack_name=stack_name, config_file=config_file_path, region="us-east-1", no_prompts=True ) delete_process_execute = run_command(delete_command_list) @@ -118,14 +116,14 @@ def test_delete_with_s3_prefix_present_zip(self, template_file): "aws-serverless-function-image.yaml", ] ) - def test_delete_with_s3_prefix_present_image(self, template_file): + def test_delete_no_prompts_with_s3_prefix_present_image(self, template_file): template_path = self.test_data_path.joinpath(template_file) stack_name = self._method_to_stack_name(self.id()) config_file_name = stack_name + ".toml" deploy_command_list = self.get_deploy_command_list( - template_file=template_path, guided=True, config_file=config_file_name + template_file=template_path, guided=True, config_file=config_file_name, image_repository=self.ecr_repo_name ) deploy_process_execute = run_command_with_input( @@ -134,7 +132,7 @@ def test_delete_with_s3_prefix_present_image(self, template_file): config_file_path = self.test_data_path.joinpath(config_file_name) delete_command_list = self.get_delete_command_list( - stack_name=stack_name, config_file=config_file_path, no_prompts=True + stack_name=stack_name, config_file=config_file_path, region="us-east-1", no_prompts=True ) delete_process_execute = run_command(delete_command_list) @@ -154,7 +152,7 @@ def test_delete_with_s3_prefix_present_image(self, template_file): "aws-serverless-function.yaml", ] ) - def test_delete_guided_prompts(self, template_file): + def test_delete_guided_config_file_present(self, template_file): template_path = self.test_data_path.joinpath(template_file) stack_name = self._method_to_stack_name(self.id()) @@ -171,7 +169,6 @@ def test_delete_guided_prompts(self, template_file): config_file_path = self.test_data_path.joinpath(config_file_name) delete_command_list = self.get_delete_command_list(stack_name=stack_name, config_file=config_file_path) - LOG.info(delete_command_list) delete_process_execute = run_command_with_input(delete_command_list, "y\nn\ny\n".encode()) self.assertEqual(delete_process_execute.process.returncode, 0) @@ -216,7 +213,7 @@ def test_delete_no_config_file_zip(self, template_file): "aws-serverless-function.yaml", ] ) - def test_delete_no_s3_prefix_zip(self, template_file): + def test_delete_no_prompts_no_s3_prefix_zip(self, template_file): template_path = self.test_data_path.joinpath(template_file) stack_name = self._method_to_stack_name(self.id()) @@ -254,7 +251,7 @@ def test_delete_no_s3_prefix_zip(self, template_file): "aws-serverless-function-image.yaml", ] ) - def test_delete_no_s3_prefix_image(self, template_file): + def test_delete_no_prompts_no_s3_prefix_image(self, template_file): template_path = self.test_data_path.joinpath(template_file) stack_name = self._method_to_stack_name(self.id()) @@ -328,6 +325,210 @@ def test_delete_nested_stacks(self, template_file): except ClientError as ex: self.assertIn(f"Stack with id {stack_name} does not exist", str(ex)) + def test_delete_stack_termination_protection_enabled(self): + template_str = """ + AWSTemplateFormatVersion: '2010-09-09' + Description: Stack for testing termination protection enabled stacks. + Resources: + MyRepository: + Type: AWS::ECR::Repository + Properties: + RepositoryName: "test-termination-protection-repository" + """ + + stack_name = self._method_to_stack_name(self.id()) + + self.cf_client.create_stack(StackName=stack_name, TemplateBody=template_str, EnableTerminationProtection=True) + + delete_command_list = self.get_delete_command_list(stack_name=stack_name, region="us-east-1", no_prompts=True) + + delete_process_execute = run_command(delete_command_list) + + self.assertEqual(delete_process_execute.process.returncode, 1) + self.assertIn( + bytes( + "TerminationProtection is enabled", + encoding="utf-8", + ), + delete_process_execute.stderr, + ) + + self.cf_client.update_termination_protection(StackName=stack_name, EnableTerminationProtection=False) + + delete_process_execute = run_command(delete_command_list) + self.assertEqual(delete_process_execute.process.returncode, 0) + + try: + resp = self.cf_client.describe_stacks(StackName=stack_name) + except ClientError as ex: + self.assertIn(f"Stack with id {stack_name} does not exist", str(ex)) + + def test_no_prompts_no_stack_name(self): + + delete_command_list = self.get_delete_command_list(no_prompts=True) + delete_process_execute = run_command(delete_command_list) + self.assertEqual(delete_process_execute.process.returncode, 2) + + def test_no_prompts_no_region(self): + stack_name = self._method_to_stack_name(self.id()) + + delete_command_list = self.get_delete_command_list(stack_name=stack_name, no_prompts=True) + delete_process_execute = run_command(delete_command_list) + self.assertEqual(delete_process_execute.process.returncode, 2) + + @parameterized.expand( + [ + "aws-serverless-function.yaml", + ] + ) + def test_delete_guided_no_stack_name_no_region(self, template_file): + template_path = self.test_data_path.joinpath(template_file) + + stack_name = self._method_to_stack_name(self.id()) + + deploy_command_list = self.get_deploy_command_list( + template_file=template_path, + stack_name=stack_name, + capabilities="CAPABILITY_IAM", + s3_bucket=self.bucket_name, + force_upload=True, + notification_arns=self.sns_arn, + parameter_overrides="Parameter=Clarity", + kms_key_id=self.kms_key, + no_execute_changeset=False, + tags="integ=true clarity=yes foo_bar=baz", + confirm_changeset=False, + region="us-east-1", + ) + deploy_process_execute = run_command(deploy_command_list) + + delete_command_list = self.get_delete_command_list() + delete_process_execute = run_command_with_input(delete_command_list, "{}\ny\ny\n".format(stack_name).encode()) + + self.assertEqual(delete_process_execute.process.returncode, 0) + + try: + resp = self.cf_client.describe_stacks(StackName=stack_name) + except ClientError as ex: + self.assertIn(f"Stack with id {stack_name} does not exist", str(ex)) + + @parameterized.expand( + [ + "aws-ecr-repository.yaml", + ] + ) + def test_delete_guided_ecr_repository_present(self, template_file): + template_path = self.delete_test_data_path.joinpath(template_file) + stack_name = self._method_to_stack_name(self.id()) + + deploy_command_list = self.get_deploy_command_list( + template_file=template_path, + stack_name=stack_name, + capabilities="CAPABILITY_IAM", + s3_bucket=self.bucket_name, + force_upload=True, + notification_arns=self.sns_arn, + parameter_overrides="Parameter=Clarity", + kms_key_id=self.kms_key, + no_execute_changeset=False, + tags="integ=true clarity=yes foo_bar=baz", + confirm_changeset=False, + region="us-east-1", + ) + deploy_process_execute = run_command(deploy_command_list) + + delete_command_list = self.get_delete_command_list(stack_name=stack_name, region="us-east-1") + delete_process_execute = run_command_with_input(delete_command_list, "y\ny\ny\n".encode()) + + self.assertEqual(delete_process_execute.process.returncode, 0) + + try: + resp = self.cf_client.describe_stacks(StackName=stack_name) + except ClientError as ex: + self.assertIn(f"Stack with id {stack_name} does not exist", str(ex)) + + @parameterized.expand( + [ + "aws-serverless-function-image.yaml", + ] + ) + def test_delete_guided_no_s3_prefix_image(self, template_file): + template_path = self.test_data_path.joinpath(template_file) + + stack_name = self._method_to_stack_name(self.id()) + + # Try to deploy to another region. + deploy_command_list = self.get_deploy_command_list( + template_file=template_path, + stack_name=stack_name, + capabilities="CAPABILITY_IAM", + image_repository=self.ecr_repo_name, + s3_bucket=self.bucket_name, + force_upload=True, + notification_arns=self.sns_arn, + parameter_overrides="Parameter=Clarity", + kms_key_id=self.kms_key, + no_execute_changeset=False, + tags="integ=true clarity=yes foo_bar=baz", + confirm_changeset=False, + region="us-east-1", + ) + + deploy_process_execute = run_command(deploy_command_list) + + delete_command_list = self.get_delete_command_list(stack_name=stack_name, region="us-east-1") + + delete_process_execute = run_command_with_input(delete_command_list, "y\n".encode()) + + self.assertEqual(delete_process_execute.process.returncode, 0) + + try: + resp = self.cf_client.describe_stacks(StackName=stack_name) + except ClientError as ex: + self.assertIn(f"Stack with id {stack_name} does not exist", str(ex)) + + @parameterized.expand( + [ + "aws-serverless-function-retain.yaml", + ] + ) + def test_delete_guided_retain_s3_artifact(self, template_file): + template_path = self.delete_test_data_path.joinpath(template_file) + stack_name = self._method_to_stack_name(self.id()) + + deploy_command_list = self.get_deploy_command_list( + template_file=template_path, + stack_name=stack_name, + capabilities="CAPABILITY_IAM", + s3_bucket=self.bucket_name, + force_upload=True, + notification_arns=self.sns_arn, + parameter_overrides="Parameter=Clarity", + kms_key_id=self.kms_key, + no_execute_changeset=False, + tags="integ=true clarity=yes foo_bar=baz", + confirm_changeset=False, + region="us-east-1", + ) + deploy_process_execute = run_command(deploy_command_list) + + delete_command_list = self.get_delete_command_list(stack_name=stack_name, region="us-east-1") + delete_process_execute = run_command_with_input(delete_command_list, "y\nn\nn\n".encode()) + + self.assertEqual(delete_process_execute.process.returncode, 0) + + try: + resp = self.cf_client.describe_stacks(StackName=stack_name) + except ClientError as ex: + self.assertIn(f"Stack with id {stack_name} does not exist", str(ex)) + + # TODO: Add 3 more tests after Auto ECR is merged to develop + # 1. Create a stack using guided deploy of type image and delete + # 2. Delete the ECR Companion Stack as input stack. + # 3. Retain ECR Repository that contains atleast 1 image. + # - Create a stack using guided deploy of type image + # - Select no for deleting ECR repository and this will retain the non-empty repository + def _method_to_stack_name(self, method_name): """Method expects method name which can be a full path. Eg: test.integration.test_deploy_command.method_name""" method_name = method_name.split(".")[-1] diff --git a/tests/integration/testdata/delete/aws-ecr-repository.yaml b/tests/integration/testdata/delete/aws-ecr-repository.yaml new file mode 100644 index 0000000000..af6d63c336 --- /dev/null +++ b/tests/integration/testdata/delete/aws-ecr-repository.yaml @@ -0,0 +1,7 @@ +AWSTemplateFormatVersion: '2010-09-09' +Description: Stack for creating ECR repository for testing +Resources: + MyRepository: + Type: AWS::ECR::Repository + Properties: + RepositoryName: "test-stack-with-ecr-repository" \ No newline at end of file diff --git a/tests/integration/testdata/delete/aws-serverless-function-retain.yaml b/tests/integration/testdata/delete/aws-serverless-function-retain.yaml new file mode 100644 index 0000000000..ae857aa196 --- /dev/null +++ b/tests/integration/testdata/delete/aws-serverless-function-retain.yaml @@ -0,0 +1,19 @@ +AWSTemplateFormatVersion : '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: A hello world application. + +Parameters: + Parameter: + Type: String + Default: Sample + Description: A custom parameter + +Resources: + HelloWorldFunction: + Type: AWS::Serverless::Function + Properties: + Handler: main.handler + Runtime: python3.7 + CodeUri: . + Timeout: 600 + DeletionPolicy: Retain diff --git a/tests/unit/commands/delete/test_delete_context.py b/tests/unit/commands/delete/test_delete_context.py index 087f552bc4..25c98810c5 100644 --- a/tests/unit/commands/delete/test_delete_context.py +++ b/tests/unit/commands/delete/test_delete_context.py @@ -10,6 +10,8 @@ from samcli.lib.package.s3_uploader import S3Uploader from samcli.lib.package.ecr_uploader import ECRUploader +from samcli.commands.delete.exceptions import CfDeleteFailedStatusError + class TestDeleteContext(TestCase): @patch("samcli.commands.delete.delete_context.click.echo") @@ -78,9 +80,10 @@ def test_delete_context_parse_config_file(self, patched_click_get_current_contex self.assertEqual(delete_context.s3_prefix, "s3-prefix") @patch("samcli.commands.delete.delete_context.prompt") + @patch("samcli.commands.delete.delete_context.confirm") @patch("samcli.commands.delete.delete_context.click.get_current_context") @patch.object(CfUtils, "has_stack", MagicMock(return_value=(False))) - def test_delete_no_user_input(self, patched_click_get_current_context, patched_prompt): + def test_delete_no_user_input(self, patched_click_get_current_context, patched_confirm, patched_prompt): patched_click_get_current_context = MagicMock() with DeleteContext( stack_name=None, @@ -88,14 +91,15 @@ def test_delete_no_user_input(self, patched_click_get_current_context, patched_p config_file=None, config_env=None, profile=None, - no_prompts=True, + no_prompts=None, ) as delete_context: delete_context.run() patched_prompt.side_effect = ["sam-app"] + patched_confirm.side_effect = [True] expected_prompt_calls = [ - call(click.style("\tEnter stack name you want to delete:", bold=True), type=click.STRING), + call(click.style("\tEnter stack name you want to delete", bold=True), type=click.STRING), ] self.assertEqual(expected_prompt_calls, patched_prompt.call_args_list) @@ -388,3 +392,43 @@ def test_no_prompts_input_is_ecr_companion_stack_present_execute_run( call("\nDeleted successfully"), ] self.assertEqual(expected_click_echo_calls, patched_click_echo.call_args_list) + + @patch("samcli.commands.delete.delete_context.get_cf_template_name") + @patch("samcli.commands.delete.delete_context.click.get_current_context") + @patch.object(CfUtils, "has_stack", MagicMock(side_effect=(True, True))) + @patch.object(CfUtils, "get_stack_template", MagicMock(return_value=({"TemplateBody": "Hello World"}))) + @patch.object(CfUtils, "delete_stack", MagicMock()) + @patch.object( + CfUtils, + "wait_for_delete", + MagicMock( + side_effect=( + CfDeleteFailedStatusError("test-098f6bcd-CompanionStack", "Mock WaitError"), + {}, + CfDeleteFailedStatusError("test", "Mock WaitError"), + {}, + ) + ), + ) + @patch.object(S3Uploader, "delete_prefix_artifacts", MagicMock()) + @patch.object(ECRUploader, "delete_ecr_repository", MagicMock()) + @patch.object(Template, "get_ecr_repos", MagicMock(side_effect=({}, {"logical_id": {"Repository": "test_id"}}))) + def test_retain_resources_delete_stack(self, patched_click_get_current_context, patched_get_cf_template_name): + patched_get_cf_template_name.return_value = "hello.template" + with DeleteContext( + stack_name="test", + region="us-east-1", + config_file="samconfig.toml", + config_env="default", + profile="test", + no_prompts=True, + ) as delete_context: + delete_context.s3_bucket = "s3_bucket" + delete_context.s3_prefix = "s3_prefix" + + delete_context.run() + + self.assertEqual(CfUtils.has_stack.call_count, 2) + self.assertEqual(CfUtils.get_stack_template.call_count, 2) + self.assertEqual(CfUtils.delete_stack.call_count, 4) + self.assertEqual(CfUtils.wait_for_delete.call_count, 4) diff --git a/tests/unit/lib/delete/test_cf_utils.py b/tests/unit/lib/delete/test_cf_utils.py index 61c1c186e1..8a18c782ce 100644 --- a/tests/unit/lib/delete/test_cf_utils.py +++ b/tests/unit/lib/delete/test_cf_utils.py @@ -51,14 +51,16 @@ def test_cf_utils_has_stack_exception_client_error(self): with self.assertRaises(DeleteFailedError): self.cf_utils.has_stack("test") - def test_cf_utils_has_stack_exception(self): - self.cf_utils._client.describe_stacks = MagicMock(side_effect=Exception()) - with self.assertRaises(Exception): + def test_cf_utils_has_stack_termination_protection_enabled(self): + self.cf_utils._client.describe_stacks = MagicMock( + return_value={"Stacks": [{"StackStatus": "CREATE_COMPLETE", "EnableTerminationProtection": True}]} + ) + with self.assertRaises(DeleteFailedError): self.cf_utils.has_stack("test") def test_cf_utils_has_stack_in_review(self): self.cf_utils._client.describe_stacks = MagicMock( - return_value={"Stacks": [{"StackStatus": "REVIEW_IN_PROGRESS"}]} + return_value={"Stacks": [{"StackStatus": "REVIEW_IN_PROGRESS", "EnableTerminationProtection": False}]} ) self.assertEqual(self.cf_utils.has_stack("test"), False)