Skip to content

Commit

Permalink
Support CertificateManager service
Browse files Browse the repository at this point in the history
Only the quota "Number of ACM certificates" is checked.

Added unit tests.
  • Loading branch information
TagadaPoe authored and antonincms committed Jul 22, 2021
1 parent a829ce2 commit f6c7ff2
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 0 deletions.
2 changes: 2 additions & 0 deletions awslimitchecker/services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
from awslimitchecker.services.base import _AwsService
from awslimitchecker.services.apigateway import _ApigatewayService
from awslimitchecker.services.autoscaling import _AutoscalingService
from awslimitchecker.services.certificatemanager import \
_CertificatemanagerService
from awslimitchecker.services.cloudformation import _CloudformationService
from awslimitchecker.services.cloudtrail import _CloudTrailService
from awslimitchecker.services.directoryservice import _DirectoryserviceService
Expand Down
117 changes: 117 additions & 0 deletions awslimitchecker/services/certificatemanager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""
awslimitchecker/services/certificatemanager.py
The latest version of this package is available at:
<https://github.com/jantman/awslimitchecker>
################################################################################
Copyright 2015-2018 Jason Antman <jason@jasonantman.com>
This file is part of awslimitchecker, also known as awslimitchecker.
awslimitchecker is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
awslimitchecker is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with awslimitchecker. If not, see <http://www.gnu.org/licenses/>.
The Copyright and Authors attributions contained herein may not be removed or
otherwise altered, except to add the Author attribution of a contributor to
this work. (Additional Terms pursuant to Section 7b of the AGPL v3)
################################################################################
While not legally required, I sincerely request that anyone who finds
bugs please submit them at <https://github.com/jantman/awslimitchecker> or
to me via email, and that you send any contributions or improvements
either as a pull request on GitHub, or to me via email.
################################################################################
AUTHORS:
Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>
################################################################################
"""

import abc # noqa
import logging

from .base import _AwsService
from ..limit import AwsLimit
from ..utils import paginate_dict

logger = logging.getLogger(__name__)


class _CertificatemanagerService(_AwsService):

service_name = 'CertificateManager'
api_name = 'acm' # AWS API name to connect to (boto3.client)
quotas_service_code = 'acm'

def find_usage(self):
"""
List CloudFront distributions by calling AWS list_certificates, and
update usage in self.limits for the limit 'ACM certificates'
"""
logger.debug("Checking usage for service %s", self.service_name)
self.connect()
for lim in self.limits.values():
lim._reset_usage()

self._find_usage_certificates()

self._have_usage = True
logger.debug("Done checking usage.")

def _find_usage_certificates(self):
"""find usage for ACM certificates"""
res = paginate_dict(
self.conn.list_certificates,
alc_marker_path=['NextToken'],
alc_data_path=['CertificateSummaryList'],
alc_marker_param='NextToken'
)
if 'CertificateSummaryList' not in res:
nb_certificates = 0
else:
nb_certificates = len(res['CertificateSummaryList'])
self.limits['ACM certificates']._add_current_usage(nb_certificates)

def get_limits(self):
"""
Return all known limits for this service, as a dict of their names
to :py:class:`~.AwsLimit` objects.
:returns: dict of limit names to :py:class:`~.AwsLimit` objects
:rtype: dict
"""
if self.limits != {}:
return self.limits
limits = {}
limits['ACM certificates'] = AwsLimit(
'ACM certificates',
self,
1000,
self.warning_threshold,
self.critical_threshold
)
self.limits = limits
return limits

def required_iam_permissions(self):
"""
Return a list of IAM Actions required for this Service to function
properly. All Actions will be shown with an Effect of "Allow"
and a Resource of "*".
:returns: list of IAM Action strings
:rtype: list
"""
return [
"acm:ListCertificates",
]
23 changes: 23 additions & 0 deletions awslimitchecker/tests/services/result_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -4784,3 +4784,26 @@ class EKS(object):
}
}
]


class CertificateManager(object):
test_find_usage_certificates_empty = {
}

test_find_usage_certificates = {
'NextToken': 'string',
'CertificateSummaryList': [
{
'CertificateArn': 'string1',
'DomainName': 'string1'
},
{
'CertificateArn': 'string2',
'DomainName': 'string2'
},
{
'CertificateArn': 'string3',
'DomainName': 'string3'
},
]
}
173 changes: 173 additions & 0 deletions awslimitchecker/tests/services/test_certificatemanager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""
awslimitchecker/tests/services/test_certificatemanager.py
The latest version of this package is available at:
<https://github.com/jantman/awslimitchecker>
################################################################################
Copyright 2015-2018 Jason Antman <jason@jasonantman.com>
This file is part of awslimitchecker, also known as awslimitchecker.
awslimitchecker is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
awslimitchecker is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with awslimitchecker. If not, see <http://www.gnu.org/licenses/>.
The Copyright and Authors attributions contained herein may not be removed or
otherwise altered, except to add the Author attribution of a contributor to
this work. (Additional Terms pursuant to Section 7b of the AGPL v3)
################################################################################
While not legally required, I sincerely request that anyone who finds
bugs please submit them at <https://github.com/jantman/awslimitchecker> or
to me via email, and that you send any contributions or improvements
either as a pull request on GitHub, or to me via email.
################################################################################
AUTHORS:
Jason Antman <jason@jasonantman.com> <http://www.jasonantman.com>
################################################################################
"""

import sys
from awslimitchecker.tests.services import result_fixtures
from awslimitchecker.services.certificatemanager import \
_CertificatemanagerService

# https://code.google.com/p/mock/issues/detail?id=249
# py>=3.4 should use unittest.mock not the mock package on pypi
if (
sys.version_info[0] < 3 or
sys.version_info[0] == 3 and sys.version_info[1] < 4
):
from mock import patch, call, Mock, DEFAULT
else:
from unittest.mock import patch, call, Mock, DEFAULT


pbm = 'awslimitchecker.services.certificatemanager' # module patch base
pb = '%s._CertificatemanagerService' % pbm # class patch pase


class Test_CertificatemanagerService(object):

def test_init(self):
"""test __init__()"""
cls = _CertificatemanagerService(21, 43, {}, None)
assert cls.service_name == 'CertificateManager'
assert cls.api_name == 'acm'
assert cls.conn is None
assert cls.warning_threshold == 21
assert cls.critical_threshold == 43

def test_get_limits(self):
cls = _CertificatemanagerService(21, 43, {}, None)
cls.limits = {}
res = cls.get_limits()
assert sorted(res.keys()) == sorted([
'ACM certificates',
])
for name, limit in res.items():
assert limit.service == cls
assert limit.def_warning_threshold == 21
assert limit.def_critical_threshold == 43

def test_get_limits_again(self):
"""test that existing limits dict is returned on subsequent calls"""
mock_limits = Mock()
cls = _CertificatemanagerService(21, 43, {}, None)
cls.limits = mock_limits
res = cls.get_limits()
assert res == mock_limits

def test_find_usage(self):
"""
Test overall find_usage method
Check that find_usage() method calls the other methods.
"""
with patch.multiple(
pb,
connect=DEFAULT,
_find_usage_certificates=DEFAULT,
autospec=True
) as mocks:
cls = _CertificatemanagerService(21, 43, {}, None)
assert cls._have_usage is False
cls.find_usage()

assert cls._have_usage is True
assert len(mocks) == 2
# the other methods should have been called
for x in [
"_find_usage_certificates"
]:
assert mocks[x].mock_calls == [call(cls)]

def test_find_usage_certificates_empty(self):
"""
Verify the correctness of usage (when there are no certificates)
This test mocks the AWS list_certificates response (after pagination).
"""
# Setup the mock and call the tested function
resp = result_fixtures.CertificateManager\
.test_find_usage_certificates_empty
mock_conn = Mock()
with patch("%s.paginate_dict" % pbm) as mock_paginate:
cls = _CertificatemanagerService(21, 43, {}, None)
cls.conn = mock_conn
mock_paginate.return_value = resp
cls._find_usage_certificates()

# Check that usage values are correctly set
assert len(
cls.limits["ACM certificates"].get_current_usage()
) == 1
assert (
cls.limits["ACM certificates"].get_current_usage()[0]
.get_value() == 0
)
assert (
cls.limits["ACM certificates"].get_current_usage()[0]
.resource_id is None
)

def test_find_usage_certificates(self):
"""
Verify the correctness of usage
This test mocks the AWS list_certificates response (after pagination).
"""
# Setup the mock and call the tested function
resp = result_fixtures.CertificateManager.test_find_usage_certificates
mock_conn = Mock()
with patch("%s.paginate_dict" % pbm) as mock_paginate:
cls = _CertificatemanagerService(21, 43, {}, None)
cls.conn = mock_conn
mock_paginate.return_value = resp
cls._find_usage_certificates()

# Check that usage values are correctly set
assert len(
cls.limits["ACM certificates"].get_current_usage()
) == 1
assert (
cls.limits["ACM certificates"].get_current_usage()[0]
.get_value() == 3
)
assert (
cls.limits["ACM certificates"].get_current_usage()[0]
.resource_id is None
)

def test_required_iam_permissions(self):
cls = _CertificatemanagerService(21, 43, {}, None)
assert cls.required_iam_permissions() == [
"acm:ListCertificates"
]
8 changes: 8 additions & 0 deletions docs/source/awslimitchecker.services.certificatemanager.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
awslimitchecker.services.certificatemanager module
==================================================

.. automodule:: awslimitchecker.services.certificatemanager
:members:
:undoc-members:
:show-inheritance:
:private-members:
1 change: 1 addition & 0 deletions docs/source/awslimitchecker.services.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Submodules
awslimitchecker.services.apigateway
awslimitchecker.services.autoscaling
awslimitchecker.services.base
awslimitchecker.services.certificatemanager
awslimitchecker.services.cloudformation
awslimitchecker.services.cloudtrail
awslimitchecker.services.directoryservice
Expand Down
1 change: 1 addition & 0 deletions docs/source/iam_policy.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ services that do not affect the results of this program.
"Statement": [
{
"Action": [
"acm:ListCertificates",
"apigateway:GET",
"apigateway:HEAD",
"apigateway:OPTIONS",
Expand Down

0 comments on commit f6c7ff2

Please sign in to comment.