Skip to content

Commit

Permalink
Merge pull request #300 from crocs-muni/feat-cpe-configurations
Browse files Browse the repository at this point in the history
Feat cpe configurations
  • Loading branch information
adamjanovsky authored Mar 10, 2023
2 parents af3890c + 21b88b0 commit 983bc3c
Show file tree
Hide file tree
Showing 14 changed files with 1,257 additions and 158 deletions.
8 changes: 7 additions & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ coverage:
range: 50..100
round: up
precision: 2

status:
project:
default:
informational: true
patch:
default:
informational: true
ignore:
- "test/**/*.py"
329 changes: 308 additions & 21 deletions notebooks/cc/vulnerabilities.ipynb

Large diffs are not rendered by default.

49 changes: 41 additions & 8 deletions src/sec_certs/dataset/cve.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import collections
import datetime
import glob
import itertools
Expand Down Expand Up @@ -34,6 +35,7 @@ def __init__(self, cves: dict[str, CVE], json_path: str | Path = constants.DUMMY
self.cves = cves
self.json_path = Path(json_path)
self.cpe_to_cve_ids_lookup: dict[str, set[str]] = {}
self.cves_with_vulnerable_configurations: list[CVE] = []

@property
def serialized_attributes(self) -> list[str]:
Expand All @@ -54,9 +56,15 @@ def __len__(self) -> int:
def __eq__(self, other: object):
return isinstance(other, CVEDataset) and self.cves == other.cves

def _filter_cves_with_cpe_configurations(self) -> None:
"""
Method filters the subset of CVE dataset thah contain at least one CPE configuration in the CVE.
"""
self.cves_with_vulnerable_configurations = [cve for cve in self if cve.vulnerable_cpe_configurations]

def build_lookup_dict(self, use_nist_mapping: bool = True, nist_matching_filepath: Path | None = None):
"""
Builds look-up dictionary CPE -> Set[CVE]
Builds look-up dictionary CPE -> Set[CVE] and filter the CVEs which contain CPE configurations.
Developer's note: There are 3 CPEs that are present in the cpe matching feed, but are badly processed by CVE
feed, in which case they won't be found as a key in the dictionary. We intentionally ignore those. Feel free
to add corner cases and manual fixes. According to our investigation, the suffereing CPEs are:
Expand Down Expand Up @@ -87,6 +95,8 @@ def build_lookup_dict(self, use_nist_mapping: bool = True, nist_matching_filepat
else:
self.cpe_to_cve_ids_lookup[cpe.uri].add(cve.cve_id)

self._filter_cves_with_cpe_configurations()

@classmethod
def download_cves(cls, output_path_str: str, start_year: int, end_year: int):
output_path = Path(output_path_str)
Expand Down Expand Up @@ -124,21 +134,38 @@ def from_web(
cls.download_cves(tmp_dir, start_year, end_year)
json_files = glob.glob(tmp_dir + "/*.json")

all_cves = {}
logger.info("Downloaded required resources. Building CVEDataset from jsons.")
results = process_parallel(
cls.from_nist_json,
json_files,
use_threading=False,
progress_bar_desc="Building CVEDataset from jsons",
)
for r in results:
all_cves.update(r.cves)
return cls(dict(collections.ChainMap(*(x.cves for x in results))), json_path)

def _get_cve_ids_for_cpe_uri(self, cpe_uri: str) -> set[str]:
return self.cpe_to_cve_ids_lookup.get(cpe_uri, set())

return cls(all_cves, json_path)
def _get_cves_from_exactly_matched_cpes(self, cpe_uris: set[str]) -> set[str]:
return set(itertools.chain.from_iterable([self._get_cve_ids_for_cpe_uri(cpe_uri) for cpe_uri in cpe_uris]))

def get_cve_ids_for_cpe_uri(self, cpe_uri: str) -> set[str] | None:
return self.cpe_to_cve_ids_lookup.get(cpe_uri, None)
def _get_cves_from_cpe_configurations(self, cpe_uris: set[str]) -> set[str]:
return {
cve.cve_id
for cve in self.cves_with_vulnerable_configurations
if any(configuration.matches(cpe_uris) for configuration in cve.vulnerable_cpe_configurations)
}

def get_cves_from_matched_cpes(self, cpe_uris: set[str]) -> set[str]:
"""
Method returns the set of CVEs which are matched to the set of CPEs.
First are matched the classic CPEs to CVEs with lookup dict and then are matched the
'AND' type CPEs containing platform.
"""
return {
*self._get_cves_from_exactly_matched_cpes(cpe_uris),
*self._get_cves_from_cpe_configurations(cpe_uris),
}

def filter_related_cpes(self, relevant_cpes: set[CPE]):
"""
Expand All @@ -151,7 +178,13 @@ def filter_related_cpes(self, relevant_cpes: set[CPE]):
cve_ids_to_delete = []
for cve in self:
n_cpes_orig = len(cve.vulnerable_cpes)
cve.vulnerable_cpes = list(filter(lambda x: x in relevant_cpes, cve.vulnerable_cpes))
cve.vulnerable_cpes = [x for x in cve.vulnerable_cpes if x in relevant_cpes]
cve.vulnerable_cpe_configurations = [
x
for x in cve.vulnerable_cpe_configurations
if x.platform.uri in relevant_cpes and any(y.uri in relevant_cpes for y in x.cpes)
]

total_deleted_cpes += n_cpes_orig - len(cve.vulnerable_cpes)
if not cve.vulnerable_cpes:
cve_ids_to_delete.append(cve.cve_id)
Expand Down
19 changes: 5 additions & 14 deletions src/sec_certs/dataset/dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import itertools
import json
import logging
import re
Expand Down Expand Up @@ -527,22 +526,14 @@ def compute_related_cves(
)
return

relevant_cpes = set(itertools.chain.from_iterable(x.heuristics.cpe_matches for x in cpe_rich_certs))
self.auxiliary_datasets.cve_dset.filter_related_cpes(relevant_cpes)
# The following lines don't bring any speed-up. They may potentially save memory if rest of CVEs is cleaned explicitly
# relevant_cpes = set(itertools.chain.from_iterable(x.heuristics.cpe_matches for x in cpe_rich_certs))
# self.auxiliary_datasets.cve_dset.filter_related_cpes(relevant_cpes)

cert: Certificate
for cert in tqdm(cpe_rich_certs, desc="Computing related CVES"):
if cert.heuristics.cpe_matches:
related_cves = [
self.auxiliary_datasets.cve_dset.get_cve_ids_for_cpe_uri(x) for x in cert.heuristics.cpe_matches
]
related_cves = list(filter(lambda x: x is not None, related_cves))
if related_cves:
cert.heuristics.related_cves = set(
itertools.chain.from_iterable(x for x in related_cves if x is not None)
)
else:
cert.heuristics.related_cves = None
related_cves = self.auxiliary_datasets.cve_dset.get_cves_from_matched_cpes(cert.heuristics.cpe_matches)
cert.heuristics.related_cves = related_cves if related_cves else None

n_vulnerable = len([x for x in cpe_rich_certs if x.heuristics.related_cves])
n_vulnerabilities = sum([len(x.heuristics.related_cves) for x in cpe_rich_certs if x.heuristics.related_cves])
Expand Down
3 changes: 2 additions & 1 deletion src/sec_certs/sample/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sec_certs.sample.cc import CCCertificate
from sec_certs.sample.cc_certificate_id import CertificateId
from sec_certs.sample.cc_maintenance_update import CCMaintenanceUpdate
from sec_certs.sample.cpe import CPE, cached_cpe
from sec_certs.sample.cpe import CPE, CPEConfiguration, cached_cpe
from sec_certs.sample.cve import CVE
from sec_certs.sample.fips import FIPSCertificate
from sec_certs.sample.fips_algorithm import FIPSAlgorithm
Expand All @@ -19,6 +19,7 @@
"CCMaintenanceUpdate",
"CCCertificate",
"CPE",
"CPEConfiguration",
"cached_cpe",
"CVE",
"FIPSCertificate",
Expand Down
30 changes: 29 additions & 1 deletion src/sec_certs/sample/cpe.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,33 @@
from sec_certs.utils import helpers


@dataclass(init=False)
@dataclass
class CPEConfiguration(ComplexSerializableType):
__slots__ = ["platform", "cpes"]

platform: CPE
cpes: list[CPE]

def __hash__(self) -> int:
return hash(self.platform) + sum([hash(cpe) for cpe in self.cpes])

def __lt__(self, other: CPEConfiguration) -> bool:
return self.platform < other.platform

def __eq__(self, other: Any) -> bool:
return (
isinstance(other, self.__class__) and self.platform == other.platform and set(self.cpes) == set(other.cpes)
)

def matches(self, other_cpe_uris: set[str]) -> bool:
"""
For a given set of CPEs method returns boolean if the CPE configuration is
matched or not.
"""
return self.platform.uri in other_cpe_uris and any(x.uri in other_cpe_uris for x in self.cpes)


@dataclass
class CPE(PandasSerializableType, ComplexSerializableType):
uri: str
version: str
Expand Down Expand Up @@ -88,6 +114,8 @@ def target_hw(self) -> str:
def pandas_tuple(self) -> tuple:
return self.uri, self.vendor, self.item_name, self.version, self.title

# We cannot use frozen=True. It does not work with __slots__ prior to Python 3.10 dataclasses
# Hence we manually provide __hash__ and __eq__ despite not guaranteeing immutability
def __hash__(self) -> int:
return hash((self.uri, self.start_version, self.end_version))

Expand Down
Loading

0 comments on commit 983bc3c

Please sign in to comment.