Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: make ignore reason human readable in text output #1031

Merged
merged 5 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions ggshield/core/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@
from pathlib import Path
from typing import Any, Dict, List

from pygitguardian.models import PolicyBreak

from ggshield.core import ui
from ggshield.core.constants import CACHE_PATH
from ggshield.core.errors import UnexpectedError
from ggshield.core.filter import get_ignore_sha
from ggshield.core.types import IgnoredMatch


Expand Down Expand Up @@ -74,18 +71,18 @@ def save(self) -> None:
def purge(self) -> None:
self.last_found_secrets = []

def add_found_policy_break(self, policy_break: PolicyBreak, filename: str) -> None:
if policy_break.is_secret:
ignore_sha = get_ignore_sha(policy_break)
if not any(
last_found.match == ignore_sha for last_found in self.last_found_secrets
):
self.last_found_secrets.append(
IgnoredMatch(
name=f"{policy_break.break_type} - {filename}",
match=get_ignore_sha(policy_break),
)
def add_found_policy_break(
self, break_type: str, ignore_sha: str, filename: str
) -> None:
if not any(
last_found.match == ignore_sha for last_found in self.last_found_secrets
):
self.last_found_secrets.append(
IgnoredMatch(
name=f"{break_type} - {filename}",
match=ignore_sha,
)
)


class ReadOnlyCache(Cache):
Expand Down
16 changes: 1 addition & 15 deletions ggshield/core/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import math
import operator
import re
from typing import Dict, Iterable, List, Pattern, Set
from typing import Iterable, Pattern, Set

from click import UsageError
from pygitguardian.models import Match, PolicyBreak
Expand Down Expand Up @@ -60,20 +60,6 @@ def get_ignore_sha(policy_break: PolicyBreak) -> str:
return hashlib.sha256(hashable.encode("UTF-8")).hexdigest()


def group_policy_breaks_by_ignore_sha(
policy_breaks: List[PolicyBreak],
) -> Dict[str, List[PolicyBreak]]:
"""
Group policy breaks by their ignore sha.
"""
sha_dict: Dict[str, List[PolicyBreak]] = {}
for policy_break in policy_breaks:
ignore_sha = get_ignore_sha(policy_break)
sha_dict.setdefault(ignore_sha, []).append(policy_break)

return sha_dict


def translate_user_pattern(pattern: str) -> str:
"""
Translate the user pattern into a regex. This function assumes that the given
Expand Down
2 changes: 1 addition & 1 deletion ggshield/core/text_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"warning": {"fg": "yellow"},
"heading": {"fg": "green"},
"incident_validity": {"fg": "bright_yellow", "bold": True},
"policy_break_type": {"fg": "bright_yellow", "bold": True},
"secret_type": {"fg": "bright_yellow", "bold": True},
"occurrence_count": {"fg": "bright_yellow", "bold": True},
"ignore_sha": {"fg": "bright_yellow", "bold": True},
"iac_vulnerability_critical": {"fg": "red", "bold": True},
Expand Down
2 changes: 1 addition & 1 deletion ggshield/verticals/secret/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ def docker_scan_archive(
ui.display_heading(f"Scanning layer {info.diff_id}")
with ui.create_scanner_ui(file_count) as scanner_ui:
layer_results = scanner.scan(files, scanner_ui=scanner_ui)
if not layer_results.has_policy_breaks:
if not layer_results.has_secrets:
layer_id_cache.add(layer_id)
results.extend(layer_results)

Expand Down
14 changes: 14 additions & 0 deletions ggshield/verticals/secret/extended_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,17 @@
f"post_line_end:{self.post_line_end}",
]
)

def __eq__(self, other: Any) -> bool:
if not isinstance(other, ExtendedMatch):
return False

Check warning on line 146 in ggshield/verticals/secret/extended_match.py

View check run for this annotation

Codecov / codecov/patch

ggshield/verticals/secret/extended_match.py#L146

Added line #L146 was not covered by tests
return (
self.span == other.span
and self.lines_before_secret == other.lines_before_secret
and self.lines_with_secret == other.lines_with_secret
and self.lines_after_secret == other.lines_after_secret
and self.pre_line_start == other.pre_line_start
and self.pre_line_end == other.pre_line_end
and self.post_line_start == other.post_line_start
and self.post_line_end == other.post_line_end
)
8 changes: 7 additions & 1 deletion ggshield/verticals/secret/output/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,22 @@
from ggshield.verticals.secret.extended_match import ExtendedMatchSchema


class IgnoreReasonSchema(BaseSchema):
kind = fields.String(required=True)
detail = fields.String()


class FlattenedPolicyBreak(BaseSchema):
policy = fields.String(required=True)
occurrences = fields.List(fields.Nested(ExtendedMatchSchema), required=True)
break_type = fields.String(data_key="type", required=True)
detector = fields.String(data_key="type", required=True)
validity = fields.String(required=False, allow_none=True)
ignore_sha = fields.String(required=True)
total_occurrences = fields.Integer(required=True)
incident_url = fields.String(required=True, dump_default="")
incident_details = fields.Nested(SecretIncidentSchema)
known_secret = fields.Bool(required=True, dump_default=False)
ignore_reason = fields.Nested(IgnoreReasonSchema, dump_default=None)


class JSONResultSchema(BaseSchema):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
from pygitguardian.models import PolicyBreak

from ggshield.core.filter import censor_match
from ggshield.core.text_utils import pluralize, translate_validity

from ..secret_scan_collection import SecretScanCollection
from ..secret_scan_collection import Secret, SecretScanCollection
from .secret_output_handler import SecretOutputHandler


def format_policy_break(policy_break: PolicyBreak) -> str:
def format_secret(secret: Secret) -> str:
"""Returns a string with the policy name, validity and a comma-separated,
double-quoted, censored version of all `policy_break` matches.
double-quoted, censored version of all `secret` matches.

Looks like this:

PayPal OAuth2 Keys (Validity: Valid, id="aa*******bb", secret="cc******dd")
"""
match_str = ", ".join(
f'{x.match_type}: "{censor_match(x)}"' for x in policy_break.matches
f'{x.match_type}: "{censor_match(x)}"' for x in secret.matches
)
validity = translate_validity(policy_break.validity)
return f"{policy_break.break_type} (Validity: {validity}, {match_str})"
validity = translate_validity(secret.validity)
return f"{secret.detector} (Validity: {validity}, {match_str})"


class SecretGitLabWebUIOutputHandler(SecretOutputHandler):
Expand All @@ -35,21 +33,17 @@ class SecretGitLabWebUIOutputHandler(SecretOutputHandler):
def _process_scan_impl(self, scan: SecretScanCollection) -> str:
results = scan.get_all_results()

policy_breaks_to_report = [
policy_break for result in results for policy_break in result.policy_breaks
]
secrets_to_report = [secret for result in results for secret in result.secrets]

# If no secrets or no new secrets were found
if len(policy_breaks_to_report) == 0:
if len(secrets_to_report) == 0:
return ""

# Use a set to ensure we do not report duplicate incidents.
# (can happen when the secret is present in both the old and the new version of
# the document)
formatted_policy_breaks = {
format_policy_break(x) for x in policy_breaks_to_report
}
break_count = len(formatted_policy_breaks)
formatted_secrets = {format_secret(x) for x in secrets_to_report}
break_count = len(formatted_secrets)

if self.ignore_known_secrets:
summary_str = f"{break_count} new {pluralize('incident', break_count)}"
Expand All @@ -60,7 +54,7 @@ def _process_scan_impl(self, scan: SecretScanCollection) -> str:
# do this because of a bug in GitLab Web IDE which causes newline characters to
# be shown as "<br>"
# https://gitlab.com/gitlab-org/gitlab/-/issues/350349
breaks_str = ", ".join(formatted_policy_breaks)
breaks_str = ", ".join(formatted_secrets)

return (
f"GL-HOOK-ERR: ggshield found {summary_str} in these changes: {breaks_str}."
Expand Down
61 changes: 35 additions & 26 deletions ggshield/verticals/secret/output/secret_json_output_handler.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import dataclasses
from typing import Any, Dict, List, cast

from pygitguardian.client import VERSIONS
from pygitguardian.models import PolicyBreak, SecretIncident
from pygitguardian.models import SecretIncident

from ggshield.core.filter import group_policy_breaks_by_ignore_sha
from ggshield.verticals.secret.extended_match import ExtendedMatch

from ..secret_scan_collection import Error, Result, SecretScanCollection
from ..secret_scan_collection import (
Error,
Result,
Secret,
SecretScanCollection,
group_secrets_by_ignore_sha,
)
from .schemas import JSONScanCollectionSchema
from .secret_output_handler import SecretOutputHandler

Expand Down Expand Up @@ -74,15 +80,15 @@ def process_result(
"total_occurrences": 0,
"total_incidents": 0,
}
sha_dict = group_policy_breaks_by_ignore_sha(result.policy_breaks)
sha_dict = group_secrets_by_ignore_sha(result.secrets)
result_dict["total_incidents"] = len(sha_dict)

if not self.show_secrets:
result.censor()

for ignore_sha, policy_breaks in sha_dict.items():
flattened_dict = self.serialized_policy_break(
ignore_sha, policy_breaks, incident_details
for ignore_sha, secrets in sha_dict.items():
flattened_dict = self.serialized_secret(
ignore_sha, secrets, incident_details
)
result_dict["incidents"].append(flattened_dict)
result_dict["total_occurrences"] += flattened_dict["total_occurrences"]
Expand All @@ -102,49 +108,52 @@ def process_error(error: Error) -> Dict[str, Any]:
}
return error_dict

def serialized_policy_break(
def serialized_secret(
self,
ignore_sha: str,
policy_breaks: List[PolicyBreak],
secrets: List[Secret],
incident_details: Dict[str, SecretIncident],
) -> Dict[str, Any]:
flattened_dict: Dict[str, Any] = {
"occurrences": [],
"ignore_sha": ignore_sha,
"policy": policy_breaks[0].policy,
"break_type": policy_breaks[0].break_type,
"total_occurrences": len(policy_breaks),
"policy": secrets[0].policy,
"detector": secrets[0].detector,
"total_occurrences": len(secrets),
}

if policy_breaks[0].validity:
flattened_dict["validity"] = policy_breaks[0].validity
if secrets[0].validity:
flattened_dict["validity"] = secrets[0].validity

if policy_breaks[0].known_secret:
flattened_dict["known_secret"] = policy_breaks[0].known_secret
flattened_dict["incident_url"] = policy_breaks[0].incident_url
assert policy_breaks[0].incident_url
details = incident_details.get(policy_breaks[0].incident_url)
if secrets[0].known_secret:
flattened_dict["known_secret"] = secrets[0].known_secret
flattened_dict["incident_url"] = secrets[0].incident_url
assert secrets[0].incident_url
details = incident_details.get(secrets[0].incident_url)
if details is not None:
flattened_dict["incident_details"] = details

for policy_break in policy_breaks:
flattened_dict["occurrences"].extend(
self.serialize_policy_break_matches(policy_break)
if secrets[0].ignore_reason is not None:
flattened_dict["ignore_reason"] = dataclasses.asdict(
secrets[0].ignore_reason
)

for secret in secrets:
flattened_dict["occurrences"].extend(self.serialize_secret_matches(secret))

return flattened_dict

def serialize_policy_break_matches(
def serialize_secret_matches(
self,
policy_break: PolicyBreak,
secret: Secret,
) -> List[Dict[str, Any]]:
"""
Serialize policy_break matches. The method uses MatchSpan to get the start and
Serialize secret matches. The method uses MatchSpan to get the start and
end index of the match.
Returns a list of matches.
"""
matches_list: List[Dict[str, Any]] = []
for match in policy_break.matches:
for match in secret.matches:
assert isinstance(match, ExtendedMatch)

match_dict: Dict[str, Any] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ def _process_scan_impl(self, scan: SecretScanCollection) -> str:
raise NotImplementedError()

def _get_exit_code(self, scan: SecretScanCollection) -> ExitCode:
if scan.total_policy_breaks_count > 0:
if scan.total_secrets_count > 0:
return ExitCode.SCAN_FOUND_PROBLEMS
return ExitCode.SUCCESS
33 changes: 15 additions & 18 deletions ggshield/verticals/secret/output/secret_sarif_output_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@
from typing import Any, Dict, Iterable, List, cast

from pygitguardian.client import VERSIONS
from pygitguardian.models import PolicyBreak, SecretIncident
from pygitguardian.models import SecretIncident

from ggshield import __version__ as ggshield_version
from ggshield.core.filter import get_ignore_sha
from ggshield.core.match_span import MatchSpan

from ..extended_match import ExtendedMatch
from ..secret_scan_collection import Result, SecretScanCollection
from ..secret_scan_collection import Result, Secret, SecretScanCollection
from .secret_output_handler import SecretOutputHandler


Expand Down Expand Up @@ -60,31 +59,29 @@ def _create_sarif_results(
per policy break.
"""
for result in results:
for policy_break in result.policy_breaks:
yield _create_sarif_result_dict(result.url, policy_break, incident_details)
for secret in result.secrets:
yield _create_sarif_result_dict(result.url, secret, incident_details)


def _create_sarif_result_dict(
url: str,
policy_break: PolicyBreak,
secret: Secret,
incident_details: Dict[str, SecretIncident],
) -> Dict[str, Any]:
# Prepare message with links to the related location for each match
matches_str = ", ".join(
f"[{m.match_type}]({id})" for id, m in enumerate(policy_break.matches)
f"[{m.match_type}]({id})" for id, m in enumerate(secret.matches)
)
matches_li = "\n".join(
f"- [{m.match_type}]({id})" for id, m in enumerate(policy_break.matches)
)
extended_matches = cast(List[ExtendedMatch], policy_break.matches)
message = f"Secret detected: {policy_break.break_type}.\nMatches: {matches_str}"
markdown_message = (
f"Secret detected: {policy_break.break_type}\nMatches:\n{matches_li}"
f"- [{m.match_type}]({id})" for id, m in enumerate(secret.matches)
)
extended_matches = cast(List[ExtendedMatch], secret.matches)
message = f"Secret detected: {secret.detector}.\nMatches: {matches_str}"
markdown_message = f"Secret detected: {secret.detector}\nMatches:\n{matches_li}"

# Create dict
dct = {
"ruleId": policy_break.break_type,
"ruleId": secret.detector,
"level": "error",
"message": {
"text": message,
Expand All @@ -98,12 +95,12 @@ def _create_sarif_result_dict(
for id, m in enumerate(extended_matches)
],
"partialFingerprints": {
"secret/v1": get_ignore_sha(policy_break),
"secret/v1": secret.get_ignore_sha(),
},
}
if policy_break.incident_url:
dct["hostedViewerUri"] = policy_break.incident_url
details = incident_details.get(policy_break.incident_url)
if secret.incident_url:
dct["hostedViewerUri"] = secret.incident_url
details = incident_details.get(secret.incident_url)
if details is not None:
dct["properties"] = {"incidentDetails": details.to_dict()}
return dct
Expand Down
Loading
Loading