diff --git a/ggshield/core/cache.py b/ggshield/core/cache.py index 7bfa9ecfdd..e8de355e0e 100644 --- a/ggshield/core/cache.py +++ b/ggshield/core/cache.py @@ -2,12 +2,9 @@ from pathlib import Path from typing import Any, Dict, List -from pygitguardian.models import PolicyBreak - from ggshield.core import ui from ggshield.core.constants import CACHE_PATH from ggshield.core.errors import UnexpectedError -from ggshield.core.filter import get_ignore_sha from ggshield.core.types import IgnoredMatch @@ -74,18 +71,18 @@ def save(self) -> None: def purge(self) -> None: self.last_found_secrets = [] - def add_found_policy_break(self, policy_break: PolicyBreak, filename: str) -> None: - if policy_break.is_secret: - ignore_sha = get_ignore_sha(policy_break) - if not any( - last_found.match == ignore_sha for last_found in self.last_found_secrets - ): - self.last_found_secrets.append( - IgnoredMatch( - name=f"{policy_break.break_type} - {filename}", - match=get_ignore_sha(policy_break), - ) + def add_found_policy_break( + self, break_type: str, ignore_sha: str, filename: str + ) -> None: + if not any( + last_found.match == ignore_sha for last_found in self.last_found_secrets + ): + self.last_found_secrets.append( + IgnoredMatch( + name=f"{break_type} - {filename}", + match=ignore_sha, ) + ) class ReadOnlyCache(Cache): diff --git a/ggshield/core/filter.py b/ggshield/core/filter.py index 109d5ce347..91521e9456 100644 --- a/ggshield/core/filter.py +++ b/ggshield/core/filter.py @@ -2,7 +2,7 @@ import math import operator import re -from typing import Dict, Iterable, List, Pattern, Set +from typing import Iterable, Pattern, Set from click import UsageError from pygitguardian.models import Match, PolicyBreak @@ -60,20 +60,6 @@ def get_ignore_sha(policy_break: PolicyBreak) -> str: return hashlib.sha256(hashable.encode("UTF-8")).hexdigest() -def group_policy_breaks_by_ignore_sha( - policy_breaks: List[PolicyBreak], -) -> Dict[str, List[PolicyBreak]]: - """ - Group policy breaks by their ignore sha. - """ - sha_dict: Dict[str, List[PolicyBreak]] = {} - for policy_break in policy_breaks: - ignore_sha = get_ignore_sha(policy_break) - sha_dict.setdefault(ignore_sha, []).append(policy_break) - - return sha_dict - - def translate_user_pattern(pattern: str) -> str: """ Translate the user pattern into a regex. This function assumes that the given diff --git a/ggshield/core/text_utils.py b/ggshield/core/text_utils.py index d3994a0a2d..4523198280 100644 --- a/ggshield/core/text_utils.py +++ b/ggshield/core/text_utils.py @@ -26,7 +26,7 @@ "warning": {"fg": "yellow"}, "heading": {"fg": "green"}, "incident_validity": {"fg": "bright_yellow", "bold": True}, - "policy_break_type": {"fg": "bright_yellow", "bold": True}, + "secret_type": {"fg": "bright_yellow", "bold": True}, "occurrence_count": {"fg": "bright_yellow", "bold": True}, "ignore_sha": {"fg": "bright_yellow", "bold": True}, "iac_vulnerability_critical": {"fg": "red", "bold": True}, diff --git a/ggshield/verticals/secret/docker.py b/ggshield/verticals/secret/docker.py index 28fad3217d..85deae6fad 100644 --- a/ggshield/verticals/secret/docker.py +++ b/ggshield/verticals/secret/docker.py @@ -358,7 +358,7 @@ def docker_scan_archive( ui.display_heading(f"Scanning layer {info.diff_id}") with ui.create_scanner_ui(file_count) as scanner_ui: layer_results = scanner.scan(files, scanner_ui=scanner_ui) - if not layer_results.has_policy_breaks: + if not layer_results.has_secrets: layer_id_cache.add(layer_id) results.extend(layer_results) diff --git a/ggshield/verticals/secret/extended_match.py b/ggshield/verticals/secret/extended_match.py index e0aa3c3e12..be008e87a4 100644 --- a/ggshield/verticals/secret/extended_match.py +++ b/ggshield/verticals/secret/extended_match.py @@ -140,3 +140,17 @@ def __repr__(self) -> str: f"post_line_end:{self.post_line_end}", ] ) + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, ExtendedMatch): + return False + return ( + self.span == other.span + and self.lines_before_secret == other.lines_before_secret + and self.lines_with_secret == other.lines_with_secret + and self.lines_after_secret == other.lines_after_secret + and self.pre_line_start == other.pre_line_start + and self.pre_line_end == other.pre_line_end + and self.post_line_start == other.post_line_start + and self.post_line_end == other.post_line_end + ) diff --git a/ggshield/verticals/secret/output/schemas.py b/ggshield/verticals/secret/output/schemas.py index 78c5dbf332..926b384402 100644 --- a/ggshield/verticals/secret/output/schemas.py +++ b/ggshield/verticals/secret/output/schemas.py @@ -4,16 +4,22 @@ from ggshield.verticals.secret.extended_match import ExtendedMatchSchema +class IgnoreReasonSchema(BaseSchema): + kind = fields.String(required=True) + detail = fields.String() + + class FlattenedPolicyBreak(BaseSchema): policy = fields.String(required=True) occurrences = fields.List(fields.Nested(ExtendedMatchSchema), required=True) - break_type = fields.String(data_key="type", required=True) + detector = fields.String(data_key="type", required=True) validity = fields.String(required=False, allow_none=True) ignore_sha = fields.String(required=True) total_occurrences = fields.Integer(required=True) incident_url = fields.String(required=True, dump_default="") incident_details = fields.Nested(SecretIncidentSchema) known_secret = fields.Bool(required=True, dump_default=False) + ignore_reason = fields.Nested(IgnoreReasonSchema, dump_default=None) class JSONResultSchema(BaseSchema): diff --git a/ggshield/verticals/secret/output/secret_gitlab_webui_output_handler.py b/ggshield/verticals/secret/output/secret_gitlab_webui_output_handler.py index a200aafb65..190c431257 100644 --- a/ggshield/verticals/secret/output/secret_gitlab_webui_output_handler.py +++ b/ggshield/verticals/secret/output/secret_gitlab_webui_output_handler.py @@ -1,25 +1,23 @@ -from pygitguardian.models import PolicyBreak - from ggshield.core.filter import censor_match from ggshield.core.text_utils import pluralize, translate_validity -from ..secret_scan_collection import SecretScanCollection +from ..secret_scan_collection import Secret, SecretScanCollection from .secret_output_handler import SecretOutputHandler -def format_policy_break(policy_break: PolicyBreak) -> str: +def format_secret(secret: Secret) -> str: """Returns a string with the policy name, validity and a comma-separated, - double-quoted, censored version of all `policy_break` matches. + double-quoted, censored version of all `secret` matches. Looks like this: PayPal OAuth2 Keys (Validity: Valid, id="aa*******bb", secret="cc******dd") """ match_str = ", ".join( - f'{x.match_type}: "{censor_match(x)}"' for x in policy_break.matches + f'{x.match_type}: "{censor_match(x)}"' for x in secret.matches ) - validity = translate_validity(policy_break.validity) - return f"{policy_break.break_type} (Validity: {validity}, {match_str})" + validity = translate_validity(secret.validity) + return f"{secret.detector} (Validity: {validity}, {match_str})" class SecretGitLabWebUIOutputHandler(SecretOutputHandler): @@ -35,21 +33,17 @@ class SecretGitLabWebUIOutputHandler(SecretOutputHandler): def _process_scan_impl(self, scan: SecretScanCollection) -> str: results = scan.get_all_results() - policy_breaks_to_report = [ - policy_break for result in results for policy_break in result.policy_breaks - ] + secrets_to_report = [secret for result in results for secret in result.secrets] # If no secrets or no new secrets were found - if len(policy_breaks_to_report) == 0: + if len(secrets_to_report) == 0: return "" # Use a set to ensure we do not report duplicate incidents. # (can happen when the secret is present in both the old and the new version of # the document) - formatted_policy_breaks = { - format_policy_break(x) for x in policy_breaks_to_report - } - break_count = len(formatted_policy_breaks) + formatted_secrets = {format_secret(x) for x in secrets_to_report} + break_count = len(formatted_secrets) if self.ignore_known_secrets: summary_str = f"{break_count} new {pluralize('incident', break_count)}" @@ -60,7 +54,7 @@ def _process_scan_impl(self, scan: SecretScanCollection) -> str: # do this because of a bug in GitLab Web IDE which causes newline characters to # be shown as "
" # https://gitlab.com/gitlab-org/gitlab/-/issues/350349 - breaks_str = ", ".join(formatted_policy_breaks) + breaks_str = ", ".join(formatted_secrets) return ( f"GL-HOOK-ERR: ggshield found {summary_str} in these changes: {breaks_str}." diff --git a/ggshield/verticals/secret/output/secret_json_output_handler.py b/ggshield/verticals/secret/output/secret_json_output_handler.py index fe78ce67c2..71d0d1850c 100644 --- a/ggshield/verticals/secret/output/secret_json_output_handler.py +++ b/ggshield/verticals/secret/output/secret_json_output_handler.py @@ -1,12 +1,18 @@ +import dataclasses from typing import Any, Dict, List, cast from pygitguardian.client import VERSIONS -from pygitguardian.models import PolicyBreak, SecretIncident +from pygitguardian.models import SecretIncident -from ggshield.core.filter import group_policy_breaks_by_ignore_sha from ggshield.verticals.secret.extended_match import ExtendedMatch -from ..secret_scan_collection import Error, Result, SecretScanCollection +from ..secret_scan_collection import ( + Error, + Result, + Secret, + SecretScanCollection, + group_secrets_by_ignore_sha, +) from .schemas import JSONScanCollectionSchema from .secret_output_handler import SecretOutputHandler @@ -74,15 +80,15 @@ def process_result( "total_occurrences": 0, "total_incidents": 0, } - sha_dict = group_policy_breaks_by_ignore_sha(result.policy_breaks) + sha_dict = group_secrets_by_ignore_sha(result.secrets) result_dict["total_incidents"] = len(sha_dict) if not self.show_secrets: result.censor() - for ignore_sha, policy_breaks in sha_dict.items(): - flattened_dict = self.serialized_policy_break( - ignore_sha, policy_breaks, incident_details + for ignore_sha, secrets in sha_dict.items(): + flattened_dict = self.serialized_secret( + ignore_sha, secrets, incident_details ) result_dict["incidents"].append(flattened_dict) result_dict["total_occurrences"] += flattened_dict["total_occurrences"] @@ -102,49 +108,52 @@ def process_error(error: Error) -> Dict[str, Any]: } return error_dict - def serialized_policy_break( + def serialized_secret( self, ignore_sha: str, - policy_breaks: List[PolicyBreak], + secrets: List[Secret], incident_details: Dict[str, SecretIncident], ) -> Dict[str, Any]: flattened_dict: Dict[str, Any] = { "occurrences": [], "ignore_sha": ignore_sha, - "policy": policy_breaks[0].policy, - "break_type": policy_breaks[0].break_type, - "total_occurrences": len(policy_breaks), + "policy": secrets[0].policy, + "detector": secrets[0].detector, + "total_occurrences": len(secrets), } - if policy_breaks[0].validity: - flattened_dict["validity"] = policy_breaks[0].validity + if secrets[0].validity: + flattened_dict["validity"] = secrets[0].validity - if policy_breaks[0].known_secret: - flattened_dict["known_secret"] = policy_breaks[0].known_secret - flattened_dict["incident_url"] = policy_breaks[0].incident_url - assert policy_breaks[0].incident_url - details = incident_details.get(policy_breaks[0].incident_url) + if secrets[0].known_secret: + flattened_dict["known_secret"] = secrets[0].known_secret + flattened_dict["incident_url"] = secrets[0].incident_url + assert secrets[0].incident_url + details = incident_details.get(secrets[0].incident_url) if details is not None: flattened_dict["incident_details"] = details - for policy_break in policy_breaks: - flattened_dict["occurrences"].extend( - self.serialize_policy_break_matches(policy_break) + if secrets[0].ignore_reason is not None: + flattened_dict["ignore_reason"] = dataclasses.asdict( + secrets[0].ignore_reason ) + for secret in secrets: + flattened_dict["occurrences"].extend(self.serialize_secret_matches(secret)) + return flattened_dict - def serialize_policy_break_matches( + def serialize_secret_matches( self, - policy_break: PolicyBreak, + secret: Secret, ) -> List[Dict[str, Any]]: """ - Serialize policy_break matches. The method uses MatchSpan to get the start and + Serialize secret matches. The method uses MatchSpan to get the start and end index of the match. Returns a list of matches. """ matches_list: List[Dict[str, Any]] = [] - for match in policy_break.matches: + for match in secret.matches: assert isinstance(match, ExtendedMatch) match_dict: Dict[str, Any] = { diff --git a/ggshield/verticals/secret/output/secret_output_handler.py b/ggshield/verticals/secret/output/secret_output_handler.py index 8a1d009ea9..ca76ec9734 100644 --- a/ggshield/verticals/secret/output/secret_output_handler.py +++ b/ggshield/verticals/secret/output/secret_output_handler.py @@ -56,6 +56,6 @@ def _process_scan_impl(self, scan: SecretScanCollection) -> str: raise NotImplementedError() def _get_exit_code(self, scan: SecretScanCollection) -> ExitCode: - if scan.total_policy_breaks_count > 0: + if scan.total_secrets_count > 0: return ExitCode.SCAN_FOUND_PROBLEMS return ExitCode.SUCCESS diff --git a/ggshield/verticals/secret/output/secret_sarif_output_handler.py b/ggshield/verticals/secret/output/secret_sarif_output_handler.py index b5ef79faba..3db7e43d4d 100644 --- a/ggshield/verticals/secret/output/secret_sarif_output_handler.py +++ b/ggshield/verticals/secret/output/secret_sarif_output_handler.py @@ -2,14 +2,13 @@ from typing import Any, Dict, Iterable, List, cast from pygitguardian.client import VERSIONS -from pygitguardian.models import PolicyBreak, SecretIncident +from pygitguardian.models import SecretIncident from ggshield import __version__ as ggshield_version -from ggshield.core.filter import get_ignore_sha from ggshield.core.match_span import MatchSpan from ..extended_match import ExtendedMatch -from ..secret_scan_collection import Result, SecretScanCollection +from ..secret_scan_collection import Result, Secret, SecretScanCollection from .secret_output_handler import SecretOutputHandler @@ -60,31 +59,29 @@ def _create_sarif_results( per policy break. """ for result in results: - for policy_break in result.policy_breaks: - yield _create_sarif_result_dict(result.url, policy_break, incident_details) + for secret in result.secrets: + yield _create_sarif_result_dict(result.url, secret, incident_details) def _create_sarif_result_dict( url: str, - policy_break: PolicyBreak, + secret: Secret, incident_details: Dict[str, SecretIncident], ) -> Dict[str, Any]: # Prepare message with links to the related location for each match matches_str = ", ".join( - f"[{m.match_type}]({id})" for id, m in enumerate(policy_break.matches) + f"[{m.match_type}]({id})" for id, m in enumerate(secret.matches) ) matches_li = "\n".join( - f"- [{m.match_type}]({id})" for id, m in enumerate(policy_break.matches) - ) - extended_matches = cast(List[ExtendedMatch], policy_break.matches) - message = f"Secret detected: {policy_break.break_type}.\nMatches: {matches_str}" - markdown_message = ( - f"Secret detected: {policy_break.break_type}\nMatches:\n{matches_li}" + f"- [{m.match_type}]({id})" for id, m in enumerate(secret.matches) ) + extended_matches = cast(List[ExtendedMatch], secret.matches) + message = f"Secret detected: {secret.detector}.\nMatches: {matches_str}" + markdown_message = f"Secret detected: {secret.detector}\nMatches:\n{matches_li}" # Create dict dct = { - "ruleId": policy_break.break_type, + "ruleId": secret.detector, "level": "error", "message": { "text": message, @@ -98,12 +95,12 @@ def _create_sarif_result_dict( for id, m in enumerate(extended_matches) ], "partialFingerprints": { - "secret/v1": get_ignore_sha(policy_break), + "secret/v1": secret.get_ignore_sha(), }, } - if policy_break.incident_url: - dct["hostedViewerUri"] = policy_break.incident_url - details = incident_details.get(policy_break.incident_url) + if secret.incident_url: + dct["hostedViewerUri"] = secret.incident_url + details = incident_details.get(secret.incident_url) if details is not None: dct["properties"] = {"incidentDetails": details.to_dict()} return dct diff --git a/ggshield/verticals/secret/output/secret_text_output_handler.py b/ggshield/verticals/secret/output/secret_text_output_handler.py index b628df4024..54bf851675 100644 --- a/ggshield/verticals/secret/output/secret_text_output_handler.py +++ b/ggshield/verticals/secret/output/secret_text_output_handler.py @@ -3,10 +3,8 @@ from typing import Dict, List, Optional, Tuple from pygitguardian.client import VERSIONS -from pygitguardian.models import PolicyBreak from ggshield.core.constants import IncidentStatus -from ggshield.core.filter import group_policy_breaks_by_ignore_sha from ggshield.core.lines import Line, get_offset, get_padding from ggshield.core.text_utils import ( STYLE, @@ -17,7 +15,13 @@ ) from ..extended_match import ExtendedMatch -from ..secret_scan_collection import IgnoreReason, Result, SecretScanCollection +from ..secret_scan_collection import ( + IgnoreKind, + Result, + Secret, + SecretScanCollection, + group_secrets_by_ignore_sha, +) from .secret_output_handler import SecretOutputHandler @@ -71,9 +75,7 @@ def _process_scan_impl(self, scan: SecretScanCollection) -> str: ) known_secrets_count = sum( - result.ignored_policy_breaks_count_by_reason.get( - IgnoreReason.KNOWN_SECRET, 0 - ) + result.ignored_secrets_count_by_kind.get(IgnoreKind.KNOWN_SECRET, 0) for result in scan.get_all_results() ) if self.ignore_known_secrets and known_secrets_count > 0: @@ -118,27 +120,23 @@ def process_result(self, result: Result) -> str: """ result_buf = StringIO() - sha_dict = group_policy_breaks_by_ignore_sha(result.policy_breaks) + sha_dict = group_secrets_by_ignore_sha(result.secrets) if not self.show_secrets: result.censor() number_of_displayed_secrets = 0 - number_of_hidden_secrets = sum( - result.ignored_policy_breaks_count_by_reason.values() - ) - for ignore_sha, policy_breaks in sha_dict.items(): + number_of_hidden_secrets = sum(result.ignored_secrets_count_by_kind.values()) + for ignore_sha, secrets in sha_dict.items(): number_of_displayed_secrets += 1 result_buf.write( - policy_break_header( - policy_breaks, ignore_sha, policy_breaks[0].known_secret - ) + secret_header(secrets, ignore_sha, secrets[0].known_secret) ) result_buf.write( leak_message_located( - flatten_policy_breaks_by_line(policy_breaks), + flatten_secrets_by_line(secrets), result.is_on_patch, clip_long_lines=not self.verbose, ) @@ -254,17 +252,17 @@ def leak_message_located( return leak_msg.getvalue() -def flatten_policy_breaks_by_line( - policy_breaks: List[PolicyBreak], +def flatten_secrets_by_line( + secrets: List[Secret], ) -> List[Tuple[Line, List[ExtendedMatch]]]: """ - flatten_policy_breaks_by_line turns a list of policy breaks into a list of + flatten_secrets_by_line turns a list of policy breaks into a list of tuples of (line, list of matches) mapping a line to a list of matches starting at that line. The list is sorted by line number. """ flat_match_dict: Dict[Line, List[ExtendedMatch]] = dict() - for policy_break in policy_breaks: - for match in policy_break.matches: + for secret in secrets: + for match in secret.matches: assert isinstance(match, ExtendedMatch) for line in match.lines_before_secret + match.lines_after_secret: if line not in flat_match_dict: @@ -282,36 +280,36 @@ def flatten_policy_breaks_by_line( return ordered_flat_match -def policy_break_header( - policy_breaks: List[PolicyBreak], +def secret_header( + secrets: List[Secret], ignore_sha: str, known_secret: bool = False, ) -> str: """ Build a header for the policy break. """ - policy_break = policy_breaks[0] + secret = secrets[0] indent = " " validity_msg = ( - f"\n{indent}Validity: {format_text(translate_validity(policy_break.validity), STYLE['incident_validity'])}" - if policy_break.validity + f"\n{indent}Validity: {format_text(translate_validity(secret.validity), STYLE['incident_validity'])}" + if secret.validity else "" ) start_line = format_text(">>", STYLE["detector_line_start"]) - policy_break_type = format_text(policy_break.break_type, STYLE["policy_break_type"]) - number_occurrences = format_text(str(len(policy_breaks)), STYLE["occurrence_count"]) + secret_type = format_text(secret.detector, STYLE["secret_type"]) + number_occurrences = format_text(str(len(secrets)), STYLE["occurrence_count"]) ignore_sha = format_text(ignore_sha, STYLE["ignore_sha"]) message = f""" -{start_line} Secret detected: {policy_break_type}{validity_msg} +{start_line} Secret detected: {secret_type}{validity_msg} {indent}Occurrences: {number_occurrences} {indent}Known by GitGuardian dashboard: {"YES" if known_secret else "NO"} -{indent}Incident URL: {policy_breaks[0].incident_url if known_secret and policy_break.incident_url else "N/A"} +{indent}Incident URL: {secrets[0].incident_url if known_secret and secret.incident_url else "N/A"} {indent}Secret SHA: {ignore_sha} """ - if policy_break.is_excluded: - message += f"{indent}Ignored: {policy_break.exclude_reason}\n" + if secret.ignore_reason is not None: + message += f"{indent}Ignored: {secret.ignore_reason.to_human_readable()}\n" return message + "\n" diff --git a/ggshield/verticals/secret/secret_scan_collection.py b/ggshield/verticals/secret/secret_scan_collection.py index 2d8825f828..4137e50c7d 100644 --- a/ggshield/verticals/secret/secret_scan_collection.py +++ b/ggshield/verticals/secret/secret_scan_collection.py @@ -1,3 +1,5 @@ +import hashlib +import operator from dataclasses import dataclass, field from enum import Enum from pathlib import Path @@ -17,48 +19,110 @@ from pygitguardian.models import ( Detail, DiffKind, - Match, PolicyBreak, ScanResult, SecretIncident, ) from ggshield.core.config.user_config import SecretConfig -from ggshield.core.errors import UnexpectedError, handle_api_error +from ggshield.core.errors import handle_api_error from ggshield.core.filter import is_in_ignored_matches -from ggshield.core.lines import Line, get_lines_from_content +from ggshield.core.lines import get_lines_from_content from ggshield.core.scan import Scannable from ggshield.utils.git_shell import Filemode from ggshield.verticals.secret.extended_match import ExtendedMatch -class IgnoreReason(str, Enum): - IGNORED_MATCH = "ignored_match" - IGNORED_DETECTOR = "ignored_detector" - KNOWN_SECRET = "known_secret" - NOT_INTRODUCED = "not_introduced" - BACKEND_EXCLUDED = "backend_excluded" +class IgnoreKind(str, Enum): + IGNORED_MATCH = "Match ignored via local .gitguardian yaml" + IGNORED_DETECTOR = "Detector ignored via local .gitguardian yaml" + KNOWN_SECRET = "Secret is known in dashboard and --ignore-known-secrets is used" + NOT_INTRODUCED = "Secret was not in added in commit" + BACKEND_EXCLUDED = "Excluded by dashboard" + + def __str__(self): + return self.name.lower() + + +@dataclass(frozen=True) +class IgnoreReason: + kind: IgnoreKind + detail: Optional[str] = None + + def to_human_readable(self): + res = f"{self.kind.value}" + if self.detail: + res += f" ({self.detail})" + return res def compute_ignore_reason( policy_break: PolicyBreak, secret_config: SecretConfig -) -> Optional[str]: +) -> Optional[IgnoreReason]: """Computes the possible ignore reason associated with a PolicyBreak""" ignore_reason = None if policy_break.diff_kind in {DiffKind.DELETION, DiffKind.CONTEXT}: - ignore_reason = IgnoreReason.NOT_INTRODUCED + ignore_reason = IgnoreReason(IgnoreKind.NOT_INTRODUCED) elif policy_break.is_excluded: - ignore_reason = f"Excluded from backend ({policy_break.exclude_reason})" + ignore_reason = IgnoreReason( + IgnoreKind.BACKEND_EXCLUDED, policy_break.exclude_reason + ) elif is_in_ignored_matches(policy_break, secret_config.ignored_matches or []): - ignore_reason = IgnoreReason.IGNORED_MATCH + ignore_reason = IgnoreReason(IgnoreKind.IGNORED_MATCH) elif policy_break.break_type in secret_config.ignored_detectors: - ignore_reason = IgnoreReason.IGNORED_DETECTOR + ignore_reason = IgnoreReason(IgnoreKind.IGNORED_DETECTOR) elif secret_config.ignore_known_secrets and policy_break.known_secret: - ignore_reason = IgnoreReason.KNOWN_SECRET + ignore_reason = IgnoreReason(IgnoreKind.KNOWN_SECRET) return ignore_reason +@dataclass +class Secret: + """GGShield specific model to handle policy-breaks. + Named Secret since we are dropping other kind of policy breaks. + """ + + detector: str + validity: str + known_secret: bool + incident_url: Optional[str] + matches: List[ExtendedMatch] + ignore_reason: Optional[IgnoreReason] + diff_kind: Optional[DiffKind] + + @property + def policy(self) -> str: + return "Secrets detection" + + @property + def is_ignored(self) -> bool: + return self.ignore_reason is not None + + def get_ignore_sha(self) -> str: + hashable = "".join( + [ + f"{match.match},{match.match_type}" + for match in sorted(self.matches, key=operator.attrgetter("match_type")) + ] + ) + + return hashlib.sha256(hashable.encode("UTF-8")).hexdigest() + + +def group_secrets_by_ignore_sha( + secrets: List[Secret], +) -> Dict[str, List[Secret]]: + """ + Group policy breaks by their ignore sha. + """ + sha_dict: Dict[str, List[Secret]] = {} + for secret in secrets: + sha_dict.setdefault(secret.get_ignore_sha(), []).append(secret) + + return sha_dict + + @dataclass class Result: """ @@ -70,33 +134,21 @@ class Result: filemode: Filemode path: Path url: str - policy_breaks: List[PolicyBreak] - ignored_policy_breaks_count_by_reason: Counter[str] + secrets: List[Secret] + ignored_secrets_count_by_kind: Counter[IgnoreKind] @property def is_on_patch(self) -> bool: return self.filemode != Filemode.FILE - def enrich_matches(self, lines: List[Line]) -> None: - if len(lines) == 0: - raise UnexpectedError("Parsing of scan result failed.") - for policy_break in self.policy_breaks: - policy_break.matches = cast( - List[Match], - [ - ExtendedMatch.from_match(match, lines, self.is_on_patch) - for match in policy_break.matches - ], - ) - def censor(self) -> None: - for policy_break in self.policy_breaks: - for extended_match in policy_break.matches: + for secret in self.secrets: + for extended_match in secret.matches: cast(ExtendedMatch, extended_match).censor() @property - def has_policy_breaks(self) -> bool: - return len(self.policy_breaks) > 0 + def has_secrets(self) -> bool: + return len(self.secrets) > 0 @classmethod def from_scan_result( @@ -107,31 +159,45 @@ def from_scan_result( - replace matches by ExtendedMatches """ - to_keep = [] - ignored_policy_breaks_count_by_reason = Counter() + to_keep: List[Tuple[PolicyBreak, Optional[IgnoreReason]]] = [] + ignored_secrets_count_by_kind = Counter() for policy_break in scan_result.policy_breaks: ignore_reason = compute_ignore_reason(policy_break, secret_config) if ignore_reason is not None: if secret_config.all_secrets: - policy_break.exclude_reason = ignore_reason - policy_break.is_excluded = True - to_keep.append(policy_break) + to_keep.append((policy_break, ignore_reason)) else: - ignored_policy_breaks_count_by_reason[ignore_reason] += 1 + ignored_secrets_count_by_kind[ignore_reason.kind] += 1 else: - to_keep.append(policy_break) + to_keep.append((policy_break, None)) result = Result( filename=file.filename, filemode=file.filemode, path=file.path, url=file.url, - policy_breaks=to_keep, - ignored_policy_breaks_count_by_reason=ignored_policy_breaks_count_by_reason, + secrets=[], + ignored_secrets_count_by_kind=ignored_secrets_count_by_kind, ) lines = get_lines_from_content(file.content, file.filemode) - result.enrich_matches(lines) + secrets = [ + Secret( + validity=policy_break.validity, + known_secret=policy_break.known_secret, + incident_url=policy_break.incident_url, + detector=policy_break.break_type, + matches=[ + ExtendedMatch.from_match(match, lines, result.is_on_patch) + for match in policy_break.matches + ], + ignore_reason=ignore_reason, + diff_kind=policy_break.diff_kind, + ) + for policy_break, ignore_reason in to_keep + ] + + result.secrets = secrets return result @@ -167,8 +233,8 @@ def extend(self, others: "Results") -> None: self.errors.extend(others.errors) @property - def has_policy_breaks(self) -> bool: - return any(x.has_policy_breaks for x in self.results) + def has_secrets(self) -> bool: + return any(x.has_secrets for x in self.results) class SecretScanCollection: @@ -195,8 +261,8 @@ def __init__( self.optional_header = optional_header self.extra_info = extra_info - self.total_policy_breaks_count = sum( - len(result.policy_breaks) for result in self.get_all_results() + self.total_secrets_count = sum( + len(result.secrets) for result in self.get_all_results() ) @property @@ -217,8 +283,8 @@ def get_all_results(self) -> Iterable[Result]: def get_incident_details(self, client: GGClient) -> Dict[str, SecretIncident]: incident_details: dict[str, SecretIncident] = {} for result in self.get_all_results(): - for policy_break in result.policy_breaks: - url = policy_break.incident_url + for secret in result.secrets: + url = secret.incident_url if url and url not in incident_details: incident_id = int(url.split("/")[-1]) resp = client.retrieve_secret_incident( diff --git a/ggshield/verticals/secret/secret_scanner.py b/ggshield/verticals/secret/secret_scanner.py index dff5eac3ec..b56bce2800 100644 --- a/ggshield/verticals/secret/secret_scanner.py +++ b/ggshield/verticals/secret/secret_scanner.py @@ -214,8 +214,12 @@ def _collect_results( assert isinstance(scan, MultiScanResult) for file, scan_result in zip(chunk, scan.scan_results): result = Result.from_scan_result(file, scan_result, self.secret_config) - for policy_break in result.policy_breaks: - self.cache.add_found_policy_break(policy_break, file.filename) + for secret in result.secrets: + self.cache.add_found_policy_break( + secret.detector, + secret.get_ignore_sha(), + file.filename, + ) results.append(result) self.cache.save() diff --git a/tests/factories.py b/tests/factories.py index e1f4d100aa..64f49dcde5 100644 --- a/tests/factories.py +++ b/tests/factories.py @@ -6,6 +6,7 @@ from ggshield.core.scan.scannable import StringScannable from ggshield.utils.git_shell import Filemode +from ggshield.verticals.secret.secret_scan_collection import Secret from tests.factory_constants import DETECTOR_NAMES, MATCH_NAMES @@ -87,3 +88,16 @@ class Meta: policy_breaks = [] policies = ["Secrets detection"] is_diff = False + + +class SecretFactory(factory.Factory): + class Meta: + model = Secret + + detector = factory.lazy_attribute(lambda obj: random.choice(DETECTOR_NAMES)) + validity = "valid" + known_secret = True + incident_url = None + matches = [] + ignore_reason = None + diff_kind = None diff --git a/tests/unit/cmd/test_ignore.py b/tests/unit/cmd/test_ignore.py index 9c9c97c98c..409f8d1b9f 100644 --- a/tests/unit/cmd/test_ignore.py +++ b/tests/unit/cmd/test_ignore.py @@ -9,6 +9,7 @@ from ggshield.core.cache import Cache from ggshield.core.config import Config from ggshield.core.errors import ExitCode +from ggshield.core.filter import get_ignore_sha from ggshield.core.scan import Commit, ScanContext, ScanMode from ggshield.core.types import IgnoredMatch from ggshield.verticals.secret import SecretScanner @@ -148,7 +149,7 @@ def test_cache_catches_nothing(client, isolated_fs): ) results = scanner.scan(commit.get_files(), scanner_ui=Mock()) - assert sum(len(result.policy_breaks) for result in results.results) == 0 + assert sum(len(result.secrets) for result in results.results) == 0 assert config.user_config.secret.ignored_matches == FOUND_SECRETS assert cache.last_found_secrets == [] @@ -212,26 +213,14 @@ def test_do_not_duplicate_last_found_secrets(client, isolated_fs): ) cache = Cache() - cache.add_found_policy_break(policy_break, "a") - cache.add_found_policy_break(policy_break, "b") - - assert len(cache.last_found_secrets) == 1 - - -def test_do_not_add_policy_breaks_to_last_found(client, isolated_fs): - """ - GIVEN 1 policy breaks on different files with the same ignore sha - WHEN add_found_policy_break is called - THEN only one element should be added - """ - policy_break = PolicyBreak( - "a", "gitignore", None, [Match("apikey", "apikey", 0, 0, 0, 0)] + cache.add_found_policy_break( + policy_break.break_type, get_ignore_sha(policy_break), "a" + ) + cache.add_found_policy_break( + policy_break.break_type, get_ignore_sha(policy_break), "b" ) - cache = Cache() - - cache.add_found_policy_break(policy_break, "a") - assert len(cache.last_found_secrets) == 0 + assert len(cache.last_found_secrets) == 1 def test_ignore_last_found_preserve_previous_config(client, isolated_fs): diff --git a/tests/unit/verticals/secret/output/test_gitlab_webui_output.py b/tests/unit/verticals/secret/output/test_gitlab_webui_output.py index bbf745a450..d3b0bd436a 100644 --- a/tests/unit/verticals/secret/output/test_gitlab_webui_output.py +++ b/tests/unit/verticals/secret/output/test_gitlab_webui_output.py @@ -1,28 +1,26 @@ -from pygitguardian.models import Match, PolicyBreak +from pygitguardian.models import Match from ggshield.core.config.user_config import SecretConfig from ggshield.verticals.secret import Results, SecretScanCollection from ggshield.verticals.secret.output.secret_gitlab_webui_output_handler import ( SecretGitLabWebUIOutputHandler, - format_policy_break, + format_secret, ) +from tests.factories import SecretFactory -def test_format_policy_break(): - policy = PolicyBreak( - "PayPal", - "Secrets detection", - "valid", - [ +def test_format_secret(): + secret = SecretFactory( + matches=[ Match("AZERTYUIOP", "client_id", line_start=123), Match("abcdefghijk", "client_secret", line_start=456), ], ) - out = format_policy_break(policy) + out = format_secret(secret) - assert policy.break_type in out + assert secret.detector in out assert "Validity: Valid" in out - for match in policy.matches: + for match in secret.matches: assert match.match_type in out # match value itself must be obfuscated assert match.match not in out diff --git a/tests/unit/verticals/secret/output/test_json_output.py b/tests/unit/verticals/secret/output/test_json_output.py index 9536c165dd..bcd9cbd350 100644 --- a/tests/unit/verticals/secret/output/test_json_output.py +++ b/tests/unit/verticals/secret/output/test_json_output.py @@ -13,7 +13,6 @@ from voluptuous import Required, validators from ggshield.core.config.user_config import SecretConfig -from ggshield.core.filter import group_policy_breaks_by_ignore_sha from ggshield.core.scan import Commit, ScanContext, ScanMode, StringScannable from ggshield.core.scan.file import File from ggshield.utils.git_shell import Filemode @@ -27,6 +26,12 @@ SecretJSONOutputHandler, SecretOutputHandler, ) +from ggshield.verticals.secret.secret_scan_collection import ( + IgnoreKind, + IgnoreReason, + group_secrets_by_ignore_sha, +) +from tests.factories import PolicyBreakFactory, ScannableFactory, ScanResultFactory from tests.unit.conftest import ( _MULTILINE_SECRET_FILE, _MULTIPLE_SECRETS_PATCH, @@ -94,7 +99,7 @@ "incidents": validators.All( [ { - "break_type": str, + "detector": str, "policy": str, "total_occurrences": validators.All(int, min=1), Required("incident_url"): validators.Match( @@ -425,7 +430,7 @@ def test_json_output_for_patch( assert all( ignore_sha in json_flat_results for result in results.results - for ignore_sha in group_policy_breaks_by_ignore_sha(result.policy_breaks) + for ignore_sha in group_secrets_by_ignore_sha(result.secrets) ) @@ -458,24 +463,24 @@ def test_ignore_known_secrets(verbose, ignore_known_secrets, secrets_types): secret_config=SecretConfig(), # 2 policy breaks ) - all_policy_breaks = result.policy_breaks + all_secrets = result.secrets - known_policy_breaks = [] - new_policy_breaks = all_policy_breaks + known_secrets = [] + new_secrets = all_secrets # add known_secret for the secrets that are known, when the option is, the known_secret field is not returned if ignore_known_secrets: if secrets_types == "only_known_secrets": - known_policy_breaks = all_policy_breaks - new_policy_breaks = [] + known_secrets = all_secrets + new_secrets = [] elif secrets_types == "mixed_secrets": # set only first policy break as known - known_policy_breaks = all_policy_breaks[:1] - new_policy_breaks = all_policy_breaks[1:] + known_secrets = all_secrets[:1] + new_secrets = all_secrets[1:] - for index, policy_break in enumerate(known_policy_breaks): - policy_break.known_secret = True - policy_break.incident_url = ( + for index, secret in enumerate(known_secrets): + secret.known_secret = True + secret.incident_url = ( f"https://dashboard.gitguardian.com/workspace/1/incidents/{index}" ) @@ -501,23 +506,17 @@ def test_ignore_known_secrets(verbose, ignore_known_secrets, secrets_types): ] # We can rely on the policy break type, since in this test there are 2 policy breaks, # and they are of different types - incident_for_policy_break_type = { - incident["type"]: incident for incident in incidents - } + incident_for_secret_type = {incident["type"]: incident for incident in incidents} - for policy_break in known_policy_breaks: - assert incident_for_policy_break_type[policy_break.break_type]["known_secret"] - assert incident_for_policy_break_type[policy_break.break_type][ - "incident_url" - ].startswith("https://dashboard.gitguardian.com/workspace/1/incidents/") + for secret in known_secrets: + assert incident_for_secret_type[secret.detector]["known_secret"] + assert incident_for_secret_type[secret.detector]["incident_url"].startswith( + "https://dashboard.gitguardian.com/workspace/1/incidents/" + ) - for policy_break in new_policy_breaks: - assert not incident_for_policy_break_type[policy_break.break_type][ - "known_secret" - ] - assert not incident_for_policy_break_type[policy_break.break_type][ - "incident_url" - ] + for secret in new_secrets: + assert not incident_for_secret_type[secret.detector]["known_secret"] + assert not incident_for_secret_type[secret.detector]["incident_url"] @pytest.mark.parametrize("with_incident_details", [True, False]) @@ -552,20 +551,20 @@ def test_with_incident_details( secret_config=SecretConfig(), # 2 policy breaks ) - all_policy_breaks = result.policy_breaks + all_secrets = result.secrets - known_policy_breaks = [] + known_secrets = [] if with_incident_details: if secrets_types == "only_known_secrets": - known_policy_breaks = all_policy_breaks + known_secrets = all_secrets elif secrets_types == "mixed_secrets": # set only first policy break as known - known_policy_breaks = all_policy_breaks[:1] + known_secrets = all_secrets[:1] - for index, policy_break in enumerate(known_policy_breaks): - policy_break.known_secret = True - policy_break.incident_url = ( + for index, secret in enumerate(known_secrets): + secret.known_secret = True + secret.incident_url = ( f"https://dashboard.gitguardian.com/workspace/1/incidents/{index}" ) @@ -591,9 +590,7 @@ def test_with_incident_details( ] if with_incident_details: - assert client_mock.retrieve_secret_incident.call_count == len( - known_policy_breaks - ) + assert client_mock.retrieve_secret_incident.call_count == len(known_secrets) for incident in incidents: if incident["known_secret"]: assert incident["incident_details"] @@ -602,3 +599,52 @@ def test_with_incident_details( assert "incident_details" not in incident else: assert client_mock.retrieve_secret_incident.call_count == 0 + + +@pytest.mark.parametrize( + ("ignore_reason", "expected_output"), + ( + (None, None), + ( + IgnoreReason(kind=IgnoreKind.IGNORED_MATCH), + { + "kind": IgnoreKind.IGNORED_MATCH.name.lower(), + "detail": None, + }, + ), + ( + IgnoreReason(kind=IgnoreKind.BACKEND_EXCLUDED, detail="some detail"), + { + "kind": IgnoreKind.BACKEND_EXCLUDED.name.lower(), + "detail": "some detail", + }, + ), + ), +) +def test_ignore_reason(ignore_reason, expected_output): + """ + GIVEN an result + WHEN it is passed to the json output handler + THEN the ignore_reason field is as expected + """ + + secret_config = SecretConfig() + scannable = ScannableFactory() + policy_break = PolicyBreakFactory(content=scannable.content) + result = Result.from_scan_result( + scannable, ScanResultFactory(policy_breaks=[policy_break]), secret_config + ) + result.secrets[0].ignore_reason = ignore_reason + + output_handler = SecretJSONOutputHandler(secret_config=secret_config, verbose=False) + + output = output_handler._process_scan_impl( + SecretScanCollection( + id="scan", + type="scan", + results=Results(results=[result], errors=[]), + ) + ) + + parsed_incidents = json.loads(output)["entities_with_incidents"][0]["incidents"] + assert parsed_incidents[0]["ignore_reason"] == expected_output diff --git a/tests/unit/verticals/secret/output/test_sarif_output.py b/tests/unit/verticals/secret/output/test_sarif_output.py index e1dc8ea2b5..216b92a52e 100644 --- a/tests/unit/verticals/secret/output/test_sarif_output.py +++ b/tests/unit/verticals/secret/output/test_sarif_output.py @@ -5,7 +5,7 @@ import pytest from pygitguardian import GGClient -from pygitguardian.models import PolicyBreak, ScanResult, SecretIncident +from pygitguardian.models import ScanResult, SecretIncident from pytest_voluptuous import S from voluptuous import Optional as VOptional from voluptuous import validators @@ -15,6 +15,7 @@ from ggshield.verticals.secret import Result, Results, SecretScanCollection from ggshield.verticals.secret.output import SecretSARIFOutputHandler from ggshield.verticals.secret.output.secret_sarif_output_handler import SCHEMA_URL +from ggshield.verticals.secret.secret_scan_collection import Secret from tests.unit.conftest import ( _MULTI_SECRET_ONE_LINE_FULL_PATCH, _MULTI_SECRET_ONE_LINE_PATCH_SCAN_RESULT, @@ -206,11 +207,11 @@ def test_sarif_output_for_flat_scan_with_secrets( sarif_results = json_dict["runs"][0]["results"] # Check each found secret is correctly represented - for sarif_result, policy_break in zip(sarif_results, scan_result.policy_breaks): + for sarif_result, secret in zip(sarif_results, result.secrets): check_sarif_result( sarif_result, scannable.content, - policy_break, + secret, with_incident_details and known_incidents, ) @@ -264,36 +265,33 @@ def test_sarif_output_for_nested_scan(init_secrets_engine_version): assert SCHEMA_WITH_INCIDENTS == json_dict - # Create a flat list of policy breaks - policy_breaks = sum((s.results.results[0].policy_breaks for s in scan.scans), []) + # Create a flat list of secrets + secrets = sum((s.results.results[0].secrets for s in scan.scans), []) # Check each found secret is correctly represented sarif_results = json_dict["runs"][0]["results"] - for content, sarif_result, policy_break in zip( - contents, sarif_results, policy_breaks - ): - check_sarif_result(sarif_result, content, policy_break) + for content, sarif_result, secret in zip(contents, sarif_results, secrets): + check_sarif_result(sarif_result, content, secret) - assert len(sarif_results) == len(policy_breaks) + assert len(sarif_results) == len(secrets) def check_sarif_result( sarif_result: Dict[str, Any], content: str, - policy_break: PolicyBreak, + secret: Secret, contains_incident_details: bool = False, ): """Check sarif_result contains a representation of policy_break, applied to content""" # Check the secret name secret_name = sarif_result["ruleId"] - assert secret_name == policy_break.break_type + assert secret_name == secret.detector # Check the matches point to the right part of the content. `expected_matches` # and `actual matches` are dicts of match_name => matched_text. expected_matches = { - m.match_type: content[m.index_start : m.index_end + 1] - for m in policy_break.matches + m.match_type: content[m.index_start : m.index_end + 1] for m in secret.matches } actual_matches = {} diff --git a/tests/unit/verticals/secret/output/test_text_output.py b/tests/unit/verticals/secret/output/test_text_output.py index bbf9d1825f..095c191769 100644 --- a/tests/unit/verticals/secret/output/test_text_output.py +++ b/tests/unit/verticals/secret/output/test_text_output.py @@ -5,7 +5,6 @@ import pytest from ggshield.core.config.user_config import SecretConfig -from ggshield.core.filter import group_policy_breaks_by_ignore_sha from ggshield.core.scan import StringScannable from ggshield.utils.git_shell import Filemode from ggshield.verticals.secret import Result, Results, SecretScanCollection @@ -13,6 +12,12 @@ from ggshield.verticals.secret.output.secret_text_output_handler import ( format_line_count_break, ) +from ggshield.verticals.secret.secret_scan_collection import ( + IgnoreKind, + IgnoreReason, + group_secrets_by_ignore_sha, +) +from tests.factories import PolicyBreakFactory, ScannableFactory, ScanResultFactory from tests.unit.conftest import ( _MULTI_SECRET_ONE_LINE_PATCH, _MULTI_SECRET_ONE_LINE_PATCH_OVERLAY, @@ -148,16 +153,16 @@ def test_leak_message(result_input, snapshot, show_secrets, verbose): # all ignore sha should be in the output assert all( ignore_sha in output - for ignore_sha in group_policy_breaks_by_ignore_sha(result_input.policy_breaks) + for ignore_sha in group_secrets_by_ignore_sha(result_input.secrets) ) -def assert_policies_displayed(output, verbose, ignore_known_secrets, policy_breaks): - for policy_break in policy_breaks: +def assert_policies_displayed(output, verbose, ignore_known_secrets, secrets): + for secret in secrets: if not ignore_known_secrets or verbose: # All secrets are displayed no matter if they're known or not - assert f"Secret detected: {policy_break.break_type}" in output - if policy_break.known_secret: + assert f"Secret detected: {secret.detector}" in output + if secret.known_secret: assert "Known by GitGuardian dashboard: YES" in output assert ( "https://dashboard.gitguardian.com/workspace/1/incidents/" in output @@ -166,13 +171,13 @@ def assert_policies_displayed(output, verbose, ignore_known_secrets, policy_brea assert "Known by GitGuardian dashboard: NO" in output assert "Incident URL: N/A" in output else: - if policy_break.known_secret: - assert f"Secret detected: {policy_break.break_type}" not in output + if secret.known_secret: + assert f"Secret detected: {secret.detector}" not in output if ignore_known_secrets: - secrets_number = sum(1 for x in policy_breaks if not x.known_secret) + secrets_number = sum(1 for x in secrets if not x.known_secret) else: - secrets_number = len(policy_breaks) + secrets_number = len(secrets) if secrets_number: assert ( @@ -219,3 +224,43 @@ def assert_no_leak_message_is_diplayed( def test_format_line_count_break(): assert format_line_count_break(5) == "\x1b[36m\x1b[22m\x1b[22m ...\n\x1b[0m" + + +@pytest.mark.parametrize( + ("ignore_reason"), + ( + None, + IgnoreReason(kind=IgnoreKind.IGNORED_MATCH), + IgnoreReason(kind=IgnoreKind.BACKEND_EXCLUDED, detail="some detail"), + ), +) +def test_ignore_reason(ignore_reason): + """ + GIVEN an result + WHEN it is passed to the json output handler + THEN the ignore_reason field is as expected + """ + + secret_config = SecretConfig() + scannable = ScannableFactory() + policy_break = PolicyBreakFactory(content=scannable.content) + result = Result.from_scan_result( + scannable, ScanResultFactory(policy_breaks=[policy_break]), secret_config + ) + result.secrets[0].ignore_reason = ignore_reason + + output_handler = SecretTextOutputHandler(secret_config=secret_config, verbose=False) + + output = output_handler._process_scan_impl( + SecretScanCollection( + id="scan", + type="scan", + results=Results(results=[result], errors=[]), + ) + ) + + if ignore_reason is None: + assert "Ignored:" not in output + else: + assert "Ignored:" in output + assert ignore_reason.to_human_readable() in output diff --git a/tests/unit/verticals/secret/test_scan_repo.py b/tests/unit/verticals/secret/test_scan_repo.py index 862d37653d..d880c58ff9 100644 --- a/tests/unit/verticals/secret/test_scan_repo.py +++ b/tests/unit/verticals/secret/test_scan_repo.py @@ -147,10 +147,10 @@ def test_scan_2_commits_same_content(secret_scanner_mock): assert len(scan_collection.scans) == 2 - all_policy_breaks_count = sum( - len(result.policy_breaks) for result in scan_collection.get_all_results() + all_secrets_count = sum( + len(result.secrets) for result in scan_collection.get_all_results() ) - assert all_policy_breaks_count == 4 + assert all_secrets_count == 4 @patch("ggshield.verticals.secret.repo.SecretScanner") diff --git a/tests/unit/verticals/secret/test_secret_scan_collection.py b/tests/unit/verticals/secret/test_secret_scan_collection.py index e172e32558..2d0fb5f464 100644 --- a/tests/unit/verticals/secret/test_secret_scan_collection.py +++ b/tests/unit/verticals/secret/test_secret_scan_collection.py @@ -9,6 +9,7 @@ from ggshield.core.types import IgnoredMatch from ggshield.verticals.secret import Results from ggshield.verticals.secret.secret_scan_collection import ( + IgnoreKind, IgnoreReason, Result, compute_ignore_reason, @@ -95,7 +96,7 @@ def test_create_result_removes_ignored_matches( ignored_matches=[IgnoredMatch(name="", match=x) for x in ignores] ), ) - assert len(result.policy_breaks) == final_len + assert len(result.secrets) == final_len @pytest.mark.parametrize("all_secrets", (True, False)) @@ -124,16 +125,13 @@ def test_create_result_removes_ignored_matches_bis(all_secrets): scannable, ScanResultFactory(policy_breaks=policy_breaks), config ) if all_secrets: - assert len(result.policy_breaks) == 2 - assert result.policy_breaks[0].is_excluded is True - assert result.policy_breaks[1].is_excluded is False + assert len(result.secrets) == 2 + assert result.secrets[0].is_ignored is True + assert result.secrets[1].is_ignored is False else: - assert len(result.policy_breaks) == 1 - assert result.policy_breaks[0].is_excluded is False - assert ( - result.ignored_policy_breaks_count_by_reason[IgnoreReason.IGNORED_MATCH] - == 1 - ) + assert len(result.secrets) == 1 + assert result.secrets[0].is_ignored is False + assert result.ignored_secrets_count_by_kind[IgnoreKind.IGNORED_MATCH] == 1 class TestComputeIgnoreReason: @@ -146,7 +144,9 @@ def test_ignore_excluded(self): policy_break = PolicyBreakFactory( is_excluded=True, exclude_reason="BACKEND_REASON" ) - assert "BACKEND_REASON" in compute_ignore_reason(policy_break, SecretConfig()) + assert compute_ignore_reason(policy_break, SecretConfig()) == IgnoreReason( + IgnoreKind.BACKEND_EXCLUDED, "BACKEND_REASON" + ) def test_ignore_ignored_match(self): """ diff --git a/tests/unit/verticals/secret/test_secret_scanner.py b/tests/unit/verticals/secret/test_secret_scanner.py index 4aa8304e66..8dd83f2637 100644 --- a/tests/unit/verticals/secret/test_secret_scanner.py +++ b/tests/unit/verticals/secret/test_secret_scanner.py @@ -115,14 +115,12 @@ def test_scan_patch(client, cache, name: str, input_patch: str, expected: Expect ) results = scanner.scan(commit.get_files(), scanner_ui=Mock()) for result in results.results: - if result.policy_breaks: - assert len(result.policy_breaks[0].matches) == expected.matches + if result.secrets: + assert len(result.secrets[0].matches) == expected.matches if expected.first_match: - assert ( - result.policy_breaks[0].matches[0].match == expected.first_match - ) + assert result.secrets[0].matches[0].match == expected.first_match else: - assert result.policy_breaks == [] + assert result.secrets == [] if expected.want: assert result.filename == expected.want["filename"] @@ -253,10 +251,10 @@ def test_scan_merge_commit(client, cache): secret_config=SecretConfig(), ) results = scanner.scan(commit.get_files(), scanner_ui=Mock()) - policy_breaks = results.results[0].policy_breaks - assert len(policy_breaks) == 1 + secrets = results.results[0].secrets + assert len(secrets) == 1 - matches = {m.match_type: m.match for m in policy_breaks[0].matches} + matches = {m.match_type: m.match for m in secrets[0].matches} assert matches["username"] == "owly" assert matches["password"] == _SIMPLE_SECRET_TOKEN @@ -349,7 +347,7 @@ def test_scan_ignore_known_secrets(scan_mock: Mock, client, ignore_known_secrets """ scannable = StringScannable(url="localhost", content="known\nunknown") known_secret = PolicyBreak( - break_type="a", + break_type="known", policy="Secrets detection", validity="valid", known_secret=True, @@ -365,7 +363,7 @@ def test_scan_ignore_known_secrets(scan_mock: Mock, client, ignore_known_secrets ], ) unknown_secret = PolicyBreak( - break_type="a", + break_type="unknown", policy="Secrets detection", validity="valid", known_secret=False, @@ -401,9 +399,12 @@ def test_scan_ignore_known_secrets(scan_mock: Mock, client, ignore_known_secrets results = scanner.scan([scannable], scanner_ui=Mock()) if ignore_known_secrets: - assert results.results[0].policy_breaks == [unknown_secret] + assert [pbreak.detector for pbreak in results.results[0].secrets] == ["unknown"] else: - assert results.results[0].policy_breaks == [known_secret, unknown_secret] + assert [pbreak.detector for pbreak in results.results[0].secrets] == [ + "known", + "unknown", + ] @patch("pygitguardian.GGClient.multi_content_scan") @@ -482,4 +483,6 @@ def test_all_secrets_is_used(scan_mock: Mock, client): secret_config=SecretConfig(), ) results = scanner.scan([scannable], scanner_ui=Mock()) - assert results.results[0].policy_breaks == [secret] + assert [pbreak.detector for pbreak in results.results[0].secrets] == [ + "not-excluded" + ]