From 3d9269742866145a99f5adf0ef72884913cc3120 Mon Sep 17 00:00:00 2001 From: Aaron Loo Date: Sat, 15 Jun 2019 00:13:55 -0700 Subject: [PATCH 1/5] adding scaffolding for verifiable bearer tokens --- detect_secrets/core/constants.py | 9 +++ detect_secrets/core/potential_secret.py | 15 ++++ detect_secrets/plugins/base.py | 56 +++++++++++++-- detect_secrets/plugins/slack.py | 14 ++++ tests/core/potential_secret_test.py | 7 +- tests/core/secrets_collection_test.py | 3 + tests/plugins/base_test.py | 91 ++++++++++++++++++++++++- tests/plugins/slack_test.py | 2 +- tests/pre_commit_hook_test.py | 1 + 9 files changed, 188 insertions(+), 10 deletions(-) diff --git a/detect_secrets/core/constants.py b/detect_secrets/core/constants.py index 4e80dab49..c57b046fc 100644 --- a/detect_secrets/core/constants.py +++ b/detect_secrets/core/constants.py @@ -1,3 +1,6 @@ +from enum import Enum + + # We don't scan files with these extensions. # NOTE: We might be able to do this better with # `subprocess.check_output(['file', filename])` @@ -26,3 +29,9 @@ 'webp', 'zip', } + + +class VerifiedResult(Enum): + UNVERIFIED = 1 + VERIFIED_FALSE = 2 + VERIFIED_TRUE = 3 diff --git a/detect_secrets/core/potential_secret.py b/detect_secrets/core/potential_secret.py index a48d33ea6..56919c178 100644 --- a/detect_secrets/core/potential_secret.py +++ b/detect_secrets/core/potential_secret.py @@ -40,12 +40,26 @@ def __init__( :type is_secret: bool|None :param is_secret: whether or not the secret is a true- or false- positive + + :type is_verified: bool + :param is_verified: whether the secret has been externally verified """ self.type = typ self.filename = filename self.lineno = lineno self.secret_hash = self.hash_secret(secret) self.is_secret = is_secret + self.is_verified = False + + # NOTE: Originally, we never wanted to keep the secret value in memory, + # after finding it in the codebase. However, to support verifiable + # secrets (and avoid the pain of re-scanning again), we need to + # keep the plaintext in memory as such. + # + # This value should never appear in the baseline though, seeing that + # we don't want to create a file that contains all plaintext secrets + # in the repository. + self.secret_value = secret # If two PotentialSecrets have the same values for these fields, # they are considered equal. Note that line numbers aren't included @@ -69,6 +83,7 @@ def json(self): 'filename': self.filename, 'line_number': self.lineno, 'hashed_secret': self.secret_hash, + 'is_verified': self.is_verified, } if self.is_secret is not None: diff --git a/detect_secrets/plugins/base.py b/detect_secrets/plugins/base.py index 6acecf677..5bd3e429c 100644 --- a/detect_secrets/plugins/base.py +++ b/detect_secrets/plugins/base.py @@ -3,6 +3,7 @@ from abc import abstractmethod from abc import abstractproperty +from detect_secrets.core.constants import VerifiedResult from detect_secrets.core.potential_secret import PotentialSecret from detect_secrets.plugins.common.constants import ALLOWLIST_REGEXES @@ -14,7 +15,7 @@ class BasePlugin(object): secret_type = None - def __init__(self, exclude_lines_regex=None, **kwargs): + def __init__(self, exclude_lines_regex=None, should_verify=True, **kwargs): """ :type exclude_lines_regex: str|None :param exclude_lines_regex: optional regex for ignored lines. @@ -24,9 +25,9 @@ def __init__(self, exclude_lines_regex=None, **kwargs): self.exclude_lines_regex = None if exclude_lines_regex: - self.exclude_lines_regex = re.compile( - exclude_lines_regex, - ) + self.exclude_lines_regex = re.compile(exclude_lines_regex) + + self.should_verify = should_verify def analyze(self, file, filename): """ @@ -39,8 +40,21 @@ def analyze(self, file, filename): """ potential_secrets = {} for line_num, line in enumerate(file.readlines(), start=1): - secrets = self.analyze_string(line, line_num, filename) - potential_secrets.update(secrets) + results = self.analyze_string(line, line_num, filename) + if not self.should_verify: + potential_secrets.update(results) + continue + + filtered_results = {} + for result in results: + is_verified = self.verify(result.secret_value) + if is_verified != VerifiedResult.UNVERIFIED: + result.is_verified = True + + if is_verified != VerifiedResult.VERIFIED_FALSE: + filtered_results[result] = result + + potential_secrets.update(filtered_results) return potential_secrets @@ -117,9 +131,37 @@ def adhoc_scan(self, string): results = self.analyze_string(string, 0, 'does_not_matter') if not results: return 'False' - else: + + if not self.should_verify: return 'True' + verified_result = VerifiedResult.UNVERIFIED + for result in results: + is_verified = self.verify(result.secret_value) + if is_verified != VerifiedResult.UNVERIFIED: + verified_result = is_verified + break + + output = { + VerifiedResult.VERIFIED_FALSE: 'False (verified)', + VerifiedResult.VERIFIED_TRUE: 'True (verified)', + VerifiedResult.UNVERIFIED: 'True (unverified)', + } + + return output[verified_result] + + def verify(self, token): + """ + To increase accuracy and reduce false positives, plugins can also + optionally declare a method to verify their status. + + :type token: str + :param token: secret found by current plugin + + :rtype: VerifiedResult + """ + return VerifiedResult.UNVERIFIED + @property def __dict__(self): return { diff --git a/detect_secrets/plugins/slack.py b/detect_secrets/plugins/slack.py index a1fbb5799..a3f713286 100644 --- a/detect_secrets/plugins/slack.py +++ b/detect_secrets/plugins/slack.py @@ -5,7 +5,10 @@ import re +import requests + from .base import RegexBasedDetector +from detect_secrets.core.constants import VerifiedResult class SlackDetector(RegexBasedDetector): @@ -15,3 +18,14 @@ class SlackDetector(RegexBasedDetector): denylist = ( re.compile(r'xox(?:a|b|p|o|s|r)-(?:\d+-)+[a-z0-9]+', flags=re.IGNORECASE), ) + + def verify(self, token, **kwargs): + response = requests.post( + 'https://slack.com/api/auth.test', + data={ + 'token': token, + }, + ).json() + + return VerifiedResult.VERIFIED_TRUE if response['ok'] \ + else VerifiedResult.VERIFIED_FALSE diff --git a/tests/core/potential_secret_test.py b/tests/core/potential_secret_test.py index 75a86a134..7f4dff4be 100644 --- a/tests/core/potential_secret_test.py +++ b/tests/core/potential_secret_test.py @@ -35,5 +35,10 @@ def test_equality(self, a, b, is_equal): assert (a != b) is not is_equal def test_secret_storage(self): - secret = potential_secret_factory() + secret = potential_secret_factory(secret='secret') assert secret.secret_hash != 'secret' + + def test_json(self): + secret = potential_secret_factory(secret='blah') + for value in secret.json().values(): + assert value != 'blah' diff --git a/tests/core/secrets_collection_test.py b/tests/core/secrets_collection_test.py index ad8789d12..3cf5cdd5f 100644 --- a/tests/core/secrets_collection_test.py +++ b/tests/core/secrets_collection_test.py @@ -372,11 +372,13 @@ def _get_baseline_dict(self, gmtime): # Line numbers should be sorted, for better readability { 'type': 'B', + 'is_verified': False, 'line_number': 2, 'hashed_secret': secret_hash, }, { 'type': 'A', + 'is_verified': False, 'line_number': 3, 'hashed_secret': secret_hash, }, @@ -384,6 +386,7 @@ def _get_baseline_dict(self, gmtime): 'fileB': [ { 'type': 'C', + 'is_verified': False, 'line_number': 1, 'hashed_secret': secret_hash, }, diff --git a/tests/plugins/base_test.py b/tests/plugins/base_test.py index e9537137c..0cdf5613b 100644 --- a/tests/plugins/base_test.py +++ b/tests/plugins/base_test.py @@ -2,7 +2,10 @@ import pytest +from detect_secrets.core.constants import VerifiedResult from detect_secrets.plugins.base import BasePlugin +from testing.factories import potential_secret_factory +from testing.mocks import mock_file_object def test_fails_if_no_secret_type_defined(): @@ -14,5 +17,91 @@ def analyze_string_content(self, *args, **kwargs): def secret_generator(self, *args, **kwargs): pass - with pytest.raises(ValueError): + with pytest.raises(ValueError) as excinfo: MockPlugin() + + assert 'Plugins need to declare a secret_type.' == str(excinfo.value) + + +class TestVerify: + @pytest.mark.parametrize( + 'result, output', + ( + ( + VerifiedResult.UNVERIFIED, + 'True (unverified)', + ), + ( + VerifiedResult.VERIFIED_FALSE, + 'False (verified)', + ), + ( + VerifiedResult.VERIFIED_TRUE, + 'True (verified)', + ), + ), + ) + def test_adhoc_scan_values(self, result, output): + plugin = self.create_test_plugin(result) + assert plugin.adhoc_scan('test value') == output + + def test_adhoc_scan_should_abide_by_no_verify_flag(self): + plugin = self.create_test_plugin(VerifiedResult.VERIFIED_TRUE) + plugin.should_verify = False + + assert plugin.adhoc_scan('test value') == 'True' + + def test_analyze_verified_false_ignores_value(self): + plugin = self.create_test_plugin(VerifiedResult.VERIFIED_FALSE) + + file = mock_file_object('does not matter') + result = plugin.analyze(file, 'does not matter') + + assert len(result) == 0 + + def test_analyze_verified_true_adds_intel(self): + plugin = self.create_test_plugin(VerifiedResult.VERIFIED_TRUE) + + file = mock_file_object('does not matter') + result = plugin.analyze(file, 'does not matter') + + assert list(result.keys())[0].is_verified + + def test_analyze_unverified_stays_the_same(self): + plugin = self.create_test_plugin(VerifiedResult.UNVERIFIED) + + file = mock_file_object('does not matter') + result = plugin.analyze(file, 'does not matter') + + assert not list(result.keys())[0].is_verified + + def test_analyze_should_abide_by_no_verify_flag(self): + plugin = self.create_test_plugin(VerifiedResult.VERIFIED_FALSE) + plugin.should_verify = False + + file = mock_file_object('does not matter') + result = plugin.analyze(file, 'does not matter') + + # If it is verified, this value should be 0. + assert len(result) == 1 + + def create_test_plugin(self, result): + """ + :type result: VerifiedResult + """ + class MockPlugin(BasePlugin): # pragma: no cover + secret_type = 'test_verify' + + def analyze_string_content(self, *args, **kwargs): + secret = potential_secret_factory() + return { + secret: secret, + } + + def secret_generator(self, *args, **kwargs): + pass + + def verify(self, *args, **kwargs): + return result + + return MockPlugin() diff --git a/tests/plugins/slack_test.py b/tests/plugins/slack_test.py index ffeb89b33..444b5df1e 100644 --- a/tests/plugins/slack_test.py +++ b/tests/plugins/slack_test.py @@ -36,7 +36,7 @@ class TestSlackDetector(object): ], ) def test_analyze(self, file_content): - logic = SlackDetector() + logic = SlackDetector(should_verify=False) f = mock_file_object(file_content) output = logic.analyze(f, 'mock_filename') diff --git a/tests/pre_commit_hook_test.py b/tests/pre_commit_hook_test.py index 554e78150..b4f482d0f 100644 --- a/tests/pre_commit_hook_test.py +++ b/tests/pre_commit_hook_test.py @@ -308,6 +308,7 @@ def _create_baseline_template(has_result, use_private_key_scan): { 'type': 'Base64 High Entropy String', 'is_secret': True, + 'is_verified': False, 'line_number': 3, 'hashed_secret': PotentialSecret.hash_secret(base64_secret), }, From 2b6b385a1b5d855040d0c888653fc9b667063f80 Mon Sep 17 00:00:00 2001 From: Aaron Loo Date: Sat, 15 Jun 2019 09:31:37 -0700 Subject: [PATCH 2/5] supporting verifiable multi-factor tokens --- detect_secrets/plugins/aws.py | 166 +++++++++++++++++++++++++++++++++ detect_secrets/plugins/base.py | 24 ++++- test_data/each_secret.py | 3 + tests/plugins/base_test.py | 52 ++++++----- 4 files changed, 222 insertions(+), 23 deletions(-) diff --git a/detect_secrets/plugins/aws.py b/detect_secrets/plugins/aws.py index 865943861..bdf48e364 100644 --- a/detect_secrets/plugins/aws.py +++ b/detect_secrets/plugins/aws.py @@ -3,9 +3,17 @@ """ from __future__ import absolute_import +import hashlib +import hmac import re +import string +import textwrap +from datetime import datetime + +import requests from .base import RegexBasedDetector +from detect_secrets.core.constants import VerifiedResult class AWSKeyDetector(RegexBasedDetector): @@ -15,3 +23,161 @@ class AWSKeyDetector(RegexBasedDetector): denylist = ( re.compile(r'AKIA[0-9A-Z]{16}'), ) + + def verify(self, token, content): + secret_access_key = get_secret_access_key(content) + if not secret_access_key: + return VerifiedResult.UNVERIFIED + + for candidate in secret_access_key: + if verify_aws_secret_access_key(token, candidate): + return VerifiedResult.VERIFIED_TRUE + + return VerifiedResult.VERIFIED_FALSE + + +def get_secret_access_key(content): + # AWS secret access keys are 40 characters long. + regex = re.compile( + r'= *([\'"]?)([%s]{40})(\1)$' % ( + string.ascii_letters + string.digits + '+/=' + ), + ) + + return [ + match[1] + for line in content.splitlines() + for match in regex.findall(line) + ] + + +def verify_aws_secret_access_key(key, secret): + """ + Using requests, because we don't want to require boto3 for this one + optional verification step. + + Loosely based off: + https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html + + :type key: str + :type secret: str + """ + now = datetime.utcnow() + amazon_datetime = now.strftime('%Y%m%dT%H%M%SZ') + + headers = { + # This is a required header for the signing process + 'Host': 'sts.amazonaws.com', + 'X-Amz-Date': amazon_datetime, + } + body = { + 'Action': 'GetCallerIdentity', + 'Version': '2011-06-15', + } + + # Step #1: Canonical Request + signed_headers = ';'.join( + map( + lambda x: x.lower(), + headers.keys(), + ), + ) + canonical_request = textwrap.dedent(""" + POST + / + + {headers} + {signed_headers} + {hashed_payload} + """)[1:-1].format( + + headers='\n'.join([ + '{}:{}'.format(key.lower(), value) + for key, value in headers.items() + ]), + signed_headers=signed_headers, + + # Poor man's method, but works for this use case. + hashed_payload=hashlib.sha256( + '&'.join([ + '{}={}'.format(key, value) + for key, value in body.items() + ]).encode('utf-8'), + ).hexdigest(), + ) + + # Step #2: String to Sign + region = 'us-east-1' + scope = '{request_date}/{region}/sts/aws4_request'.format( + request_date=now.strftime('%Y%m%d'), + + # STS is a global service; this is just for latency control. + region=region, + ) + + string_to_sign = textwrap.dedent(""" + AWS4-HMAC-SHA256 + {request_datetime} + {scope} + {hashed_canonical_request} + """)[1:-1].format( + request_datetime=amazon_datetime, + scope=scope, + hashed_canonical_request=hashlib.sha256( + canonical_request.encode('utf-8'), + ).hexdigest(), + ) + + # Step #3: Calculate signature + signing_key = _sign( + _sign( + _sign( + _sign( + 'AWS4{}'.format(secret).encode('utf-8'), + now.strftime('%Y%m%d'), + ), + region, + ), + 'sts', + ), + 'aws4_request', + ) + + signature = _sign( + signing_key, + string_to_sign, + hex=True, + ) + + # Step #4: Add to request headers + headers['Authorization'] = ( + 'AWS4-HMAC-SHA256 ' + 'Credential={access_key}/{scope}, ' + 'SignedHeaders={signed_headers}, ' + 'Signature={signature}' + ).format( + access_key=key, + scope=scope, + signed_headers=signed_headers, + signature=signature, + ) + + # Step #5: Finally send the request + response = requests.post( + 'https://sts.amazonaws.com', + headers=headers, + data=body, + ) + + if response.status_code == 403: + return False + + return True + + +def _sign(key, message, hex=False): + value = hmac.new(key, message.encode('utf-8'), hashlib.sha256) + if not hex: + return value.digest() + + return value.hexdigest() diff --git a/detect_secrets/plugins/base.py b/detect_secrets/plugins/base.py index 5bd3e429c..9ce645e1c 100644 --- a/detect_secrets/plugins/base.py +++ b/detect_secrets/plugins/base.py @@ -3,11 +3,22 @@ from abc import abstractmethod from abc import abstractproperty +from detect_secrets.core.code_snippet import CodeSnippetHighlighter from detect_secrets.core.constants import VerifiedResult from detect_secrets.core.potential_secret import PotentialSecret from detect_secrets.plugins.common.constants import ALLOWLIST_REGEXES +# NOTE: In this whitepaper (Section V-D), it suggests that there's an +# 80% chance of finding a multi-factor secret (e.g. username + +# password) within five lines of context, before and after a secret. +# +# This number can be tweaked if desired, at the cost of performance. +# +# https://www.ndss-symposium.org/wp-content/uploads/2019/02/ndss2019_04B-3_Meli_paper.pdf +LINES_OF_CONTEXT = 5 + + class BasePlugin(object): """This is an abstract class to define Plugins API""" @@ -47,7 +58,13 @@ def analyze(self, file, filename): filtered_results = {} for result in results: - is_verified = self.verify(result.secret_value) + snippet = CodeSnippetHighlighter().get_code_snippet( + filename, + result.lineno, + lines_of_context=LINES_OF_CONTEXT, + ) + + is_verified = self.verify(result.secret_value, content=str(snippet)) if is_verified != VerifiedResult.UNVERIFIED: result.is_verified = True @@ -150,7 +167,7 @@ def adhoc_scan(self, string): return output[verified_result] - def verify(self, token): + def verify(self, token, content=''): """ To increase accuracy and reduce false positives, plugins can also optionally declare a method to verify their status. @@ -158,6 +175,9 @@ def verify(self, token): :type token: str :param token: secret found by current plugin + :type context: str + :param context: lines of context around identified secret + :rtype: VerifiedResult """ return VerifiedResult.UNVERIFIED diff --git a/test_data/each_secret.py b/test_data/each_secret.py index 8faa5d0db..f5f09b8a6 100644 --- a/test_data/each_secret.py +++ b/test_data/each_secret.py @@ -3,3 +3,6 @@ base64_secret = 'c2VjcmV0IG1lc3NhZ2Ugc28geW91J2xsIG5ldmVyIGd1ZXNzIG15IHBhc3N3b3Jk' hex_secret = '8b1118b376c313ed420e5133ba91307817ed52c2' basic_auth = 'http://username:whywouldyouusehttpforpasswords@example.com' + +aws_access_key = 'AKIAIOSFODNN7EXAMPLE' +aws_secret_access_key = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' diff --git a/tests/plugins/base_test.py b/tests/plugins/base_test.py index 0cdf5613b..08a0f17da 100644 --- a/tests/plugins/base_test.py +++ b/tests/plugins/base_test.py @@ -1,5 +1,8 @@ from __future__ import absolute_import +from contextlib import contextmanager + +import mock import pytest from detect_secrets.core.constants import VerifiedResult @@ -42,49 +45,47 @@ class TestVerify: ), ) def test_adhoc_scan_values(self, result, output): - plugin = self.create_test_plugin(result) - assert plugin.adhoc_scan('test value') == output + with self.create_test_plugin(result) as plugin: + assert plugin.adhoc_scan('test value') == output def test_adhoc_scan_should_abide_by_no_verify_flag(self): - plugin = self.create_test_plugin(VerifiedResult.VERIFIED_TRUE) - plugin.should_verify = False + with self.create_test_plugin(VerifiedResult.VERIFIED_TRUE) as plugin: + plugin.should_verify = False assert plugin.adhoc_scan('test value') == 'True' def test_analyze_verified_false_ignores_value(self): - plugin = self.create_test_plugin(VerifiedResult.VERIFIED_FALSE) - - file = mock_file_object('does not matter') - result = plugin.analyze(file, 'does not matter') + with self.create_test_plugin(VerifiedResult.VERIFIED_FALSE) as plugin: + file = mock_file_object('does not matter') + result = plugin.analyze(file, 'does not matter') assert len(result) == 0 def test_analyze_verified_true_adds_intel(self): - plugin = self.create_test_plugin(VerifiedResult.VERIFIED_TRUE) - - file = mock_file_object('does not matter') - result = plugin.analyze(file, 'does not matter') + with self.create_test_plugin(VerifiedResult.VERIFIED_TRUE) as plugin: + file = mock_file_object('does not matter') + result = plugin.analyze(file, 'does not matter') assert list(result.keys())[0].is_verified def test_analyze_unverified_stays_the_same(self): - plugin = self.create_test_plugin(VerifiedResult.UNVERIFIED) - - file = mock_file_object('does not matter') - result = plugin.analyze(file, 'does not matter') + with self.create_test_plugin(VerifiedResult.UNVERIFIED) as plugin: + file = mock_file_object('does not matter') + result = plugin.analyze(file, 'does not matter') assert not list(result.keys())[0].is_verified def test_analyze_should_abide_by_no_verify_flag(self): - plugin = self.create_test_plugin(VerifiedResult.VERIFIED_FALSE) - plugin.should_verify = False + with self.create_test_plugin(VerifiedResult.VERIFIED_FALSE) as plugin: + plugin.should_verify = False - file = mock_file_object('does not matter') - result = plugin.analyze(file, 'does not matter') + file = mock_file_object('does not matter') + result = plugin.analyze(file, 'does not matter') # If it is verified, this value should be 0. assert len(result) == 1 + @contextmanager def create_test_plugin(self, result): """ :type result: VerifiedResult @@ -104,4 +105,13 @@ def secret_generator(self, *args, **kwargs): def verify(self, *args, **kwargs): return result - return MockPlugin() + with mock.patch( + 'detect_secrets.plugins.base.CodeSnippetHighlighter', + autospec=True, + ) as mock_snippet: + plugin = MockPlugin() + plugin.should_verify = True + + mock_snippet().get_code_snippet.return_value = '' + + yield plugin From 047e3fa272bcbaf01457dc585ee80bd4350b2dc8 Mon Sep 17 00:00:00 2001 From: Aaron Loo Date: Sat, 15 Jun 2019 11:05:58 -0700 Subject: [PATCH 3/5] adding --no-verify flag --- detect_secrets/core/baseline.py | 6 +-- detect_secrets/core/secrets_collection.py | 13 ++++-- detect_secrets/core/usage.py | 19 +++++++- detect_secrets/main.py | 4 +- detect_secrets/plugins/base.py | 4 +- detect_secrets/plugins/common/initialize.py | 52 +++++++++++++++------ detect_secrets/pre_commit_hook.py | 1 + testing/factories.py | 4 ++ tests/core/baseline_test.py | 7 ++- tests/main_test.py | 8 ++-- 10 files changed, 86 insertions(+), 32 deletions(-) diff --git a/detect_secrets/core/baseline.py b/detect_secrets/core/baseline.py index caea0aac5..0af22b197 100644 --- a/detect_secrets/core/baseline.py +++ b/detect_secrets/core/baseline.py @@ -16,7 +16,7 @@ def initialize( plugins, exclude_files_regex=None, exclude_lines_regex=None, - scan_all_files=False, + should_scan_all_files=False, ): """Scans the entire codebase for secrets, and returns a SecretsCollection object. @@ -27,7 +27,7 @@ def initialize( :type exclude_files_regex: str|None :type exclude_lines_regex: str|None :type path: list - :type scan_all_files: bool + :type should_scan_all_files: bool :rtype: SecretsCollection """ @@ -40,7 +40,7 @@ def initialize( files_to_scan = list() for element in path: if os.path.isdir(element): - if scan_all_files: + if should_scan_all_files: files_to_scan.extend(_get_files_recursively(element)) else: files_to_scan.extend(_get_git_tracked_files(element)) diff --git a/detect_secrets/core/secrets_collection.py b/detect_secrets/core/secrets_collection.py index 99972aed6..fefc73e52 100644 --- a/detect_secrets/core/secrets_collection.py +++ b/detect_secrets/core/secrets_collection.py @@ -92,11 +92,14 @@ def load_baseline_from_dict(cls, data): plugins = [] for plugin in data['plugins_used']: plugin_classname = plugin.pop('name') - plugins.append(initialize.from_plugin_classname( - plugin_classname, - exclude_lines_regex=result.exclude_lines, - **plugin - )) + plugins.append( + initialize.from_plugin_classname( + plugin_classname, + exclude_lines_regex=result.exclude_lines, + should_verify_secrets=False, + **plugin + ), + ) result.plugins = tuple(plugins) for filename in data['results']: diff --git a/detect_secrets/core/usage.py b/detect_secrets/core/usage.py index d541a6907..a9ee590a0 100644 --- a/detect_secrets/core/usage.py +++ b/detect_secrets/core/usage.py @@ -22,6 +22,15 @@ def add_use_all_plugins_argument(parser): ) +def add_no_verify_flag(parser): + parser.add_argument( + '-n', + '--no-verify', + action='store_true', + help='Disables additional verification of secrets via network call.', + ) + + class ParserBuilder(object): def __init__(self): @@ -37,7 +46,8 @@ def add_pre_commit_arguments(self): self._add_filenames_argument()\ ._add_set_baseline_argument()\ ._add_exclude_lines_argument()\ - ._add_use_all_plugins_argument() + ._add_use_all_plugins_argument()\ + ._add_no_verify_flag() PluginOptions(self.parser).add_arguments() @@ -100,6 +110,11 @@ def _add_exclude_lines_argument(self): def _add_use_all_plugins_argument(self): add_use_all_plugins_argument(self.parser) + return self + + def _add_no_verify_flag(self): + add_no_verify_flag(self.parser) + return self class ScanOptions(object): @@ -161,6 +176,8 @@ def _add_initialize_baseline_argument(self): help='Scan all files recursively (as compared to only scanning git tracked files).', ) + add_no_verify_flag(self.parser) + return self def _add_adhoc_scanning_argument(self): diff --git a/detect_secrets/main.py b/detect_secrets/main.py index e608c2261..79597149c 100644 --- a/detect_secrets/main.py +++ b/detect_secrets/main.py @@ -34,6 +34,7 @@ def main(argv=None): plugins = initialize.from_parser_builder( args.plugins, exclude_lines_regex=args.exclude_lines, + should_verify_secrets=not args.no_verify, ) if args.string: line = args.string @@ -115,7 +116,6 @@ def _perform_scan(args, plugins): :rtype: dict """ - old_baseline = _get_existing_baseline(args.import_filename) if old_baseline: plugins = initialize.merge_plugin_from_baseline( @@ -144,7 +144,7 @@ def _perform_scan(args, plugins): exclude_files_regex=args.exclude_files, exclude_lines_regex=args.exclude_lines, path=args.path, - scan_all_files=args.all_files, + should_scan_all_files=args.all_files, ).format_for_baseline_output() if old_baseline: diff --git a/detect_secrets/plugins/base.py b/detect_secrets/plugins/base.py index 9ce645e1c..3ff926c0d 100644 --- a/detect_secrets/plugins/base.py +++ b/detect_secrets/plugins/base.py @@ -26,10 +26,12 @@ class BasePlugin(object): secret_type = None - def __init__(self, exclude_lines_regex=None, should_verify=True, **kwargs): + def __init__(self, exclude_lines_regex=None, should_verify=False, **kwargs): """ :type exclude_lines_regex: str|None :param exclude_lines_regex: optional regex for ignored lines. + + :type should_verify: bool """ if not self.secret_type: raise ValueError('Plugins need to declare a secret_type.') diff --git a/detect_secrets/plugins/common/initialize.py b/detect_secrets/plugins/common/initialize.py index 85af4972b..d55f8d7cf 100644 --- a/detect_secrets/plugins/common/initialize.py +++ b/detect_secrets/plugins/common/initialize.py @@ -18,7 +18,11 @@ from detect_secrets.core.usage import PluginOptions -def from_parser_builder(plugins_dict, exclude_lines_regex=None): +def from_parser_builder( + plugins_dict, + exclude_lines_regex=None, + should_verify_secrets=False, +): """ :param plugins_dict: plugins dictionary received from ParserBuilder. See example in tests.core.usage_test. @@ -26,15 +30,20 @@ def from_parser_builder(plugins_dict, exclude_lines_regex=None): :type exclude_lines_regex: str|None :param exclude_lines_regex: optional regex for ignored lines. + :type should_verify_secrets: bool + :returns: tuple of initialized plugins """ output = [] for plugin_name in plugins_dict: - output.append(from_plugin_classname( - plugin_name, - exclude_lines_regex=exclude_lines_regex, - **plugins_dict[plugin_name] - )) + output.append( + from_plugin_classname( + plugin_name, + exclude_lines_regex=exclude_lines_regex, + should_verify_secrets=should_verify_secrets, + **plugins_dict[plugin_name] + ), + ) return tuple(output) @@ -64,11 +73,11 @@ def merge_plugin_from_baseline(baseline_plugins, args): :param baseline_plugins: BasePlugin instances from baseline file :type args: dict - :param args: diction of arguments parsed from usage + :param args: dictionary of arguments parsed from usage param priority: input param > baseline param > default - :Returns tuple of initialized plugins + :returns: tuple of initialized plugins """ def _remove_key(d, key): r = dict(d) @@ -95,13 +104,14 @@ def _remove_key(d, key): plugins_dict[plugin_name][param_name] = param_value except KeyError: log.warning( - 'Baseline contain plugin %s which is not in all plugins! Ignoring...' - % (plugin_name), + 'Baseline contain plugin {} which is not in all plugins! Ignoring...', + plugin_name, ) return from_parser_builder( plugins_dict, exclude_lines_regex=args.exclude_lines, + should_verify_secrets=not args.no_verify, ) # Use baseline plugin as starting point @@ -123,8 +133,9 @@ def _remove_key(d, key): plugins_dict[plugin_name][param_name] = param_value except KeyError: log.warning( - '%s specified, but %s not configured! Ignoring...' - % ("".join(["--", param_name.replace("_", "-")]), plugin_name), + '{} specified, but {} not configured! Ignoring...', + "".join(["--", param_name.replace("_", "-")]), + plugin_name, ) return from_parser_builder( @@ -133,7 +144,12 @@ def _remove_key(d, key): ) -def from_plugin_classname(plugin_classname, exclude_lines_regex=None, **kwargs): +def from_plugin_classname( + plugin_classname, + exclude_lines_regex=None, + should_verify_secrets=False, + **kwargs +): """Initializes a plugin class, given a classname and kwargs. :type plugin_classname: str @@ -149,7 +165,11 @@ def from_plugin_classname(plugin_classname, exclude_lines_regex=None, **kwargs): raise TypeError try: - instance = klass(exclude_lines_regex=exclude_lines_regex, **kwargs) + instance = klass( + exclude_lines_regex=exclude_lines_regex, + should_verify=should_verify_secrets, + **kwargs + ) except TypeError: log.warning( 'Unable to initialize plugin!', @@ -188,6 +208,10 @@ def from_secret_type(secret_type, settings): return from_plugin_classname( classname, + + # `audit` doesn't need verification + should_verify_secrets=False, + **plugin_init_vars ) diff --git a/detect_secrets/pre_commit_hook.py b/detect_secrets/pre_commit_hook.py index daf7a8563..509604649 100644 --- a/detect_secrets/pre_commit_hook.py +++ b/detect_secrets/pre_commit_hook.py @@ -39,6 +39,7 @@ def main(argv=None): plugins = initialize.from_parser_builder( args.plugins, exclude_lines_regex=args.exclude_lines, + should_verify_secrets=not args.no_verify, ) # Merge plugin from baseline diff --git a/testing/factories.py b/testing/factories.py index cc91676ab..6c3ea0467 100644 --- a/testing/factories.py +++ b/testing/factories.py @@ -28,6 +28,10 @@ def secrets_collection_factory(secrets=None, plugins=(), exclude_files_regex=Non ) if plugins: + for plugin in plugins: + # We don't want to incur network calls during test cases + plugin.should_verify = False + collection.plugins = plugins # Handle secrets diff --git a/tests/core/baseline_test.py b/tests/core/baseline_test.py index 276306d6f..b1c4e8bb9 100644 --- a/tests/core/baseline_test.py +++ b/tests/core/baseline_test.py @@ -39,7 +39,7 @@ def get_results( path, self.plugins, exclude_files_regex=exclude_files_regex, - scan_all_files=scan_all_files, + should_scan_all_files=scan_all_files, ).json() @pytest.mark.parametrize( @@ -126,7 +126,10 @@ def test_single_non_tracked_git_file_should_work(self): assert len(results['will_be_mocked']) == 1 def test_scan_all_files(self): - results = self.get_results(path=['test_data/files'], scan_all_files=True) + results = self.get_results( + path=['test_data/files'], + scan_all_files=True, + ) assert len(results.keys()) == 2 diff --git a/tests/main_test.py b/tests/main_test.py index 00beb56dd..7d44c413d 100644 --- a/tests/main_test.py +++ b/tests/main_test.py @@ -30,7 +30,7 @@ def test_scan_basic(self, mock_baseline_initialize): exclude_files_regex=None, exclude_lines_regex=None, path='.', - scan_all_files=False, + should_scan_all_files=False, ) def test_scan_with_rootdir(self, mock_baseline_initialize): @@ -42,7 +42,7 @@ def test_scan_with_rootdir(self, mock_baseline_initialize): exclude_files_regex=None, exclude_lines_regex=None, path=['test_data'], - scan_all_files=False, + should_scan_all_files=False, ) def test_scan_with_exclude_args(self, mock_baseline_initialize): @@ -56,7 +56,7 @@ def test_scan_with_exclude_args(self, mock_baseline_initialize): exclude_files_regex='some_pattern_here', exclude_lines_regex='other_patt', path='.', - scan_all_files=False, + should_scan_all_files=False, ) @pytest.mark.parametrize( @@ -132,7 +132,7 @@ def test_scan_with_all_files_flag(self, mock_baseline_initialize): exclude_files_regex=None, exclude_lines_regex=None, path='.', - scan_all_files=True, + should_scan_all_files=True, ) def test_reads_from_stdin(self, mock_merge_baseline): From 6cb9f789a443403cbea2b71f78c70bc8c8366f3f Mon Sep 17 00:00:00 2001 From: Aaron Loo Date: Sat, 15 Jun 2019 11:42:39 -0700 Subject: [PATCH 4/5] fixing coverage --- detect_secrets/plugins/aws.py | 12 ++-- detect_secrets/plugins/slack.py | 2 +- detect_secrets/util.py | 2 +- setup.py | 1 + tests/plugins/aws_key_test.py | 109 ++++++++++++++++++++++++++++++++ tests/plugins/base_test.py | 1 + 6 files changed, 119 insertions(+), 8 deletions(-) diff --git a/detect_secrets/plugins/aws.py b/detect_secrets/plugins/aws.py index bdf48e364..7c1a44814 100644 --- a/detect_secrets/plugins/aws.py +++ b/detect_secrets/plugins/aws.py @@ -51,7 +51,7 @@ def get_secret_access_key(content): ] -def verify_aws_secret_access_key(key, secret): +def verify_aws_secret_access_key(key, secret): # pragma: no cover """ Using requests, because we don't want to require boto3 for this one optional verification step. @@ -92,16 +92,16 @@ def verify_aws_secret_access_key(key, secret): """)[1:-1].format( headers='\n'.join([ - '{}:{}'.format(key.lower(), value) - for key, value in headers.items() + '{}:{}'.format(header.lower(), value) + for header, value in headers.items() ]), signed_headers=signed_headers, # Poor man's method, but works for this use case. hashed_payload=hashlib.sha256( '&'.join([ - '{}={}'.format(key, value) - for key, value in body.items() + '{}={}'.format(header, value) + for header, value in body.items() ]).encode('utf-8'), ).hexdigest(), ) @@ -175,7 +175,7 @@ def verify_aws_secret_access_key(key, secret): return True -def _sign(key, message, hex=False): +def _sign(key, message, hex=False): # pragma: no cover value = hmac.new(key, message.encode('utf-8'), hashlib.sha256) if not hex: return value.digest() diff --git a/detect_secrets/plugins/slack.py b/detect_secrets/plugins/slack.py index a3f713286..b2b301d59 100644 --- a/detect_secrets/plugins/slack.py +++ b/detect_secrets/plugins/slack.py @@ -19,7 +19,7 @@ class SlackDetector(RegexBasedDetector): re.compile(r'xox(?:a|b|p|o|s|r)-(?:\d+-)+[a-z0-9]+', flags=re.IGNORECASE), ) - def verify(self, token, **kwargs): + def verify(self, token, **kwargs): # pragma: no cover response = requests.post( 'https://slack.com/api/auth.test', data={ diff --git a/detect_secrets/util.py b/detect_secrets/util.py index 7a135a42e..df078b91a 100644 --- a/detect_secrets/util.py +++ b/detect_secrets/util.py @@ -1,7 +1,7 @@ import os -def get_root_directory(): +def get_root_directory(): # pragma: no cover return os.path.realpath( os.path.join( os.path.dirname(__file__), diff --git a/setup.py b/setup.py index 2693f592e..f0c548757 100644 --- a/setup.py +++ b/setup.py @@ -20,6 +20,7 @@ keywords=['secret-management', 'pre-commit', 'security', 'entropy-checks'], install_requires=[ 'pyyaml', + 'requests', ], extras_require={ ':python_version=="2.7"': [ diff --git a/tests/plugins/aws_key_test.py b/tests/plugins/aws_key_test.py index 9fad8792d..1f0e3eea6 100644 --- a/tests/plugins/aws_key_test.py +++ b/tests/plugins/aws_key_test.py @@ -1,14 +1,25 @@ from __future__ import absolute_import from __future__ import unicode_literals +import textwrap + +import mock import pytest +from detect_secrets.core.constants import VerifiedResult from detect_secrets.plugins.aws import AWSKeyDetector +from detect_secrets.plugins.aws import get_secret_access_key from testing.mocks import mock_file_object +EXAMPLE_SECRET = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' + + class TestAWSKeyDetector(object): + def setup(self): + self.example_key = 'AKIAZZZZZZZZZZZZZZZZ' + @pytest.mark.parametrize( 'file_content,should_flag', [ @@ -34,3 +45,101 @@ def test_analyze(self, file_content, should_flag): assert len(output) == (1 if should_flag else 0) for potential_secret in output: assert 'mock_filename' == potential_secret.filename + + def test_verify_no_secret(self): + logic = AWSKeyDetector() + + assert logic.verify(self.example_key, '') == VerifiedResult.UNVERIFIED + + def test_verify_valid_secret(self): + with mock.patch( + 'detect_secrets.plugins.aws.verify_aws_secret_access_key', + return_value=True, + ): + assert AWSKeyDetector().verify( + self.example_key, + '={}'.format(EXAMPLE_SECRET), + ) == VerifiedResult.VERIFIED_TRUE + + def test_verify_invalid_secret(self): + with mock.patch( + 'detect_secrets.plugins.aws.verify_aws_secret_access_key', + return_value=False, + ): + assert AWSKeyDetector().verify( + self.example_key, + '={}'.format(EXAMPLE_SECRET), + ) == VerifiedResult.VERIFIED_FALSE + + def test_verify_keep_trying_until_found_something(self): + data = {'count': 0} + + def counter(*args, **kwargs): + output = data['count'] + data['count'] += 1 + + return bool(output) + + with mock.patch( + 'detect_secrets.plugins.aws.verify_aws_secret_access_key', + counter, + ): + assert AWSKeyDetector().verify( + self.example_key, + textwrap.dedent(""" + false_secret = {} + real_secret = {} + """)[1:-1].format( + 'TEST' * 10, + EXAMPLE_SECRET, + ), + ) == VerifiedResult.VERIFIED_TRUE + + +@pytest.mark.parametrize( + 'content, expected_output', + ( + # No quotes + ( + textwrap.dedent(""" + aws_secret_access_key = {} + """)[1:-1].format( + EXAMPLE_SECRET, + ), + [EXAMPLE_SECRET], + ), + + # With quotes + ( + textwrap.dedent(""" + secret_key = "{}" + """)[1:-1].format( + EXAMPLE_SECRET, + ), + [EXAMPLE_SECRET], + ), + + # multiple candidates + ( + textwrap.dedent(""" + base64_keyA = '{}' + aws_secret = '{}' + base64_keyB = '{}' + """)[1:-1].format( + 'TEST' * 10, + + EXAMPLE_SECRET, + + # This should not be a candidate, because it's not exactly + # 40 chars long. + 'EXAMPLE' * 7, + ), + [ + 'TEST' * 10, + EXAMPLE_SECRET, + ], + ), + ), +) +def test_get_secret_access_key(content, expected_output): + assert get_secret_access_key(content) == expected_output diff --git a/tests/plugins/base_test.py b/tests/plugins/base_test.py index 08a0f17da..18c09f2fe 100644 --- a/tests/plugins/base_test.py +++ b/tests/plugins/base_test.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from __future__ import unicode_literals from contextlib import contextmanager From bb98b8281873fc50b6a73c68baab6fc366769125 Mon Sep 17 00:00:00 2001 From: Aaron Loo Date: Mon, 17 Jun 2019 17:08:37 -0700 Subject: [PATCH 5/5] bug fix: accurate signature for AWS STS service. It looks like Amazon needs an EXTRA trailing newline after declaring all the headers that you want to sign. --- detect_secrets/plugins/aws.py | 1 + 1 file changed, 1 insertion(+) diff --git a/detect_secrets/plugins/aws.py b/detect_secrets/plugins/aws.py index 7c1a44814..84c940516 100644 --- a/detect_secrets/plugins/aws.py +++ b/detect_secrets/plugins/aws.py @@ -87,6 +87,7 @@ def verify_aws_secret_access_key(key, secret): # pragma: no cover / {headers} + {signed_headers} {hashed_payload} """)[1:-1].format(