Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions vulnerabilities/migrations/0104_ssvc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Generated by Django 4.2.25 on 2025-11-26 13:31

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
("vulnerabilities", "0103_codecommit_impactedpackage_affecting_commits_and_more"),
]

operations = [
migrations.CreateModel(
name="SSVC",
fields=[
(
"id",
models.AutoField(
auto_created=True, primary_key=True, serialize=False, verbose_name="ID"
),
),
(
"vector",
models.CharField(
help_text="The vector string representing the SSVC.", max_length=255
),
),
(
"options",
models.JSONField(help_text="A JSON object containing the SSVC options."),
),
(
"decision",
models.CharField(help_text="The decision string for the SSVC.", max_length=255),
),
(
"advisory",
models.ForeignKey(
help_text="The advisory associated with this SSVC.",
on_delete=django.db.models.deletion.CASCADE,
related_name="ssvc_entries",
to="vulnerabilities.advisoryv2",
),
),
],
options={
"unique_together": {("vector", "advisory", "decision")},
},
),
]
18 changes: 18 additions & 0 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3414,3 +3414,21 @@ class CodeCommit(models.Model):

class Meta:
unique_together = ("commit_hash", "vcs_url")


class SSVC(models.Model):
vector = models.CharField(max_length=255, help_text="The vector string representing the SSVC.")
options = models.JSONField(help_text="A JSON object containing the SSVC options.")
advisory = models.ForeignKey(
AdvisoryV2,
on_delete=models.CASCADE,
related_name="ssvc_entries",
help_text="The advisory associated with this SSVC.",
)
decision = models.CharField(max_length=255, help_text="The decision string for the SSVC.")

def __str__(self):
return f"SSVC for Advisory {self.advisory.advisory_id}: {self.decision}"

class Meta:
unique_together = ("vector", "advisory", "decision")
115 changes: 1 addition & 114 deletions vulnerabilities/pipelines/v2_importers/vulnrichment_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from vulnerabilities.utils import get_advisory_url
from vulnerabilities.utils import get_cwe_id
from vulnerabilities.utils import get_reference_id
from vulnerabilities.utils import ssvc_calculator

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -210,117 +211,3 @@ def clean_downloads(self):

def on_failure(self):
self.clean_downloads()


def ssvc_calculator(ssvc_data):
"""
Return the ssvc vector and the decision value
"""
options = ssvc_data.get("options", [])
timestamp = ssvc_data.get("timestamp")

# Extract the options into a dictionary
options_dict = {k: v.lower() for option in options for k, v in option.items()}

# We copied the table value from this link.
# https://www.cisa.gov/sites/default/files/publications/cisa-ssvc-guide%20508c.pdf

# Determining Mission and Well-Being Impact Value
mission_well_being_table = {
# (Mission Prevalence, Public Well-being Impact) : "Mission & Well-being"
("minimal", "minimal"): "low",
("minimal", "material"): "medium",
("minimal", "irreversible"): "high",
("support", "minimal"): "medium",
("support", "material"): "medium",
("support", "irreversible"): "high",
("essential", "minimal"): "high",
("essential", "material"): "high",
("essential", "irreversible"): "high",
}

if "Mission Prevalence" not in options_dict:
options_dict["Mission Prevalence"] = "minimal"

if "Public Well-being Impact" not in options_dict:
options_dict["Public Well-being Impact"] = "material"

options_dict["Mission & Well-being"] = mission_well_being_table[
(options_dict["Mission Prevalence"], options_dict["Public Well-being Impact"])
]

decision_key = (
options_dict.get("Exploitation"),
options_dict.get("Automatable"),
options_dict.get("Technical Impact"),
options_dict.get("Mission & Well-being"),
)

decision_points = {
"Exploitation": {"E": {"none": "N", "poc": "P", "active": "A"}},
"Automatable": {"A": {"no": "N", "yes": "Y"}},
"Technical Impact": {"T": {"partial": "P", "total": "T"}},
"Public Well-being Impact": {"B": {"minimal": "M", "material": "A", "irreversible": "I"}},
"Mission Prevalence": {"P": {"minimal": "M", "support": "S", "essential": "E"}},
"Mission & Well-being": {"M": {"low": "L", "medium": "M", "high": "H"}},
}

# Create the SSVC vector
ssvc_vector = "SSVCv2/"
for key, value_map in options_dict.items():
options_key = decision_points.get(key)
for lhs, rhs_map in options_key.items():
ssvc_vector += f"{lhs}:{rhs_map.get(value_map)}/"

# "Decision": {"D": {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"}},
decision_values = {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"}

decision_lookup = {
("none", "no", "partial", "low"): "Track",
("none", "no", "partial", "medium"): "Track",
("none", "no", "partial", "high"): "Track",
("none", "no", "total", "low"): "Track",
("none", "no", "total", "medium"): "Track",
("none", "no", "total", "high"): "Track*",
("none", "yes", "partial", "low"): "Track",
("none", "yes", "partial", "medium"): "Track",
("none", "yes", "partial", "high"): "Attend",
("none", "yes", "total", "low"): "Track",
("none", "yes", "total", "medium"): "Track",
("none", "yes", "total", "high"): "Attend",
("poc", "no", "partial", "low"): "Track",
("poc", "no", "partial", "medium"): "Track",
("poc", "no", "partial", "high"): "Track*",
("poc", "no", "total", "low"): "Track",
("poc", "no", "total", "medium"): "Track*",
("poc", "no", "total", "high"): "Attend",
("poc", "yes", "partial", "low"): "Track",
("poc", "yes", "partial", "medium"): "Track",
("poc", "yes", "partial", "high"): "Attend",
("poc", "yes", "total", "low"): "Track",
("poc", "yes", "total", "medium"): "Track*",
("poc", "yes", "total", "high"): "Attend",
("active", "no", "partial", "low"): "Track",
("active", "no", "partial", "medium"): "Track",
("active", "no", "partial", "high"): "Attend",
("active", "no", "total", "low"): "Track",
("active", "no", "total", "medium"): "Attend",
("active", "no", "total", "high"): "Act",
("active", "yes", "partial", "low"): "Attend",
("active", "yes", "partial", "medium"): "Attend",
("active", "yes", "partial", "high"): "Act",
("active", "yes", "total", "low"): "Attend",
("active", "yes", "total", "medium"): "Act",
("active", "yes", "total", "high"): "Act",
}

decision = decision_lookup.get(decision_key, "")

if decision:
ssvc_vector += f"D:{decision_values.get(decision)}/"

if timestamp:
timestamp_formatted = dateparser.parse(timestamp).strftime("%Y-%m-%dT%H:%M:%SZ")

ssvc_vector += f"{timestamp_formatted}/"
return ssvc_vector, decision
112 changes: 112 additions & 0 deletions vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import json
import logging
from pathlib import Path
from typing import Iterable, List

from fetchcode.vcs import fetch_via_vcs

from vulnerabilities.importer import AdvisoryData
from vulnerabilities.models import SSVC, AdvisoryV2
from vulnerabilities.pipelines import VulnerableCodePipeline
from vulnerabilities.severity_systems import SCORING_SYSTEMS
from vulnerabilities.utils import ssvc_calculator

logger = logging.getLogger(__name__)


class CollectSSVCPipeline(VulnerableCodePipeline):
"""
Collect SSVC Pipeline

This pipeline collects SSVC from Vulnrichment project and associates them with existing advisories.
"""

pipeline_id = "collect_ssvc"
spdx_license_expression = "CC0-1.0"
license_url = "https://github.com/cisagov/vulnrichment/blob/develop/LICENSE"
repo_url = "git+https://github.com/cisagov/vulnrichment.git"

@classmethod
def steps(cls):
return (
cls.clone,
cls.collect_ssvc_data,
cls.clean_downloads,
)

def clone(self):
self.log(f"Cloning `{self.repo_url}`")
self.vcs_response = fetch_via_vcs(self.repo_url)

def collect_ssvc_data(self):
self.log(self.vcs_response.dest_dir)
base_path = Path(self.vcs_response.dest_dir)
for file_path in base_path.glob("**/**/*.json"):
self.log(f"Processing file: {file_path}")
if not file_path.name.startswith("CVE-"):
continue
with open(file_path) as f:
raw_data = json.load(f)
file_name = file_path.name
# strip .json from file name
cve_id = file_name[:-5]
advisories = list(AdvisoryV2.objects.filter(advisory_id=cve_id))
if not advisories:
self.log(f"No advisories found for CVE ID: {cve_id}")
continue
self.parse_cve_advisory(raw_data, advisories)

def parse_cve_advisory(self, raw_data, advisories: List[AdvisoryV2]):
self.log(f"Processing CVE data")
cve_metadata = raw_data.get("cveMetadata", {})
cve_id = cve_metadata.get("cveId")

containers = raw_data.get("containers", {})
adp_data = containers.get("adp", {})
self.log(f"Processing ADP")

metrics = [
adp_metrics for data in adp_data for adp_metrics in data.get("metrics", [])
]

vulnrichment_scoring_system = {
"other": {
"ssvc": SCORING_SYSTEMS["ssvc"],
}, # ignore kev
}

for metric in metrics:
self.log(metric)
self.log(f"Processing metric")
for metric_type, metric_value in metric.items():
if metric_type not in vulnrichment_scoring_system:
continue

if metric_type == "other":
other_types = metric_value.get("type")
self.log(f"Processing SSVC")
if other_types == "ssvc":
content = metric_value.get("content", {})
options = content.get("options", {})
vector_string, decision = ssvc_calculator(content)
advisories = list(AdvisoryV2.objects.filter(advisory_id=cve_id))
if not advisories:
continue
ssvc_trees = []
for advisory in advisories:
obj = SSVC(
advisory=advisory,
options=options,
decision=decision,
vector=vector_string,
)
ssvc_trees.append(obj)
SSVC.objects.bulk_create(ssvc_trees, ignore_conflicts=True, batch_size=1000)

def clean_downloads(self):
if self.vcs_response:
self.log("Removing cloned repository")
self.vcs_response.delete()

def on_failure(self):
self.clean_downloads()
Loading
Loading