Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat cpe configurations #300

Merged
merged 29 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
31ce060
feat: Implementation of cpe configs, not tested yet
GeorgeFI Dec 15, 2022
67ffd74
fix: Fixed critical bug in recursion, fixed tests
GeorgeFI Dec 15, 2022
b379c0e
feat: Added method for filtering cves with cpe configs
GeorgeFI Dec 15, 2022
4f806a8
feat: Representation of cpes in cpe configs as set
GeorgeFI Dec 15, 2022
929ca52
fix: fixed tests
GeorgeFI Dec 16, 2022
d9bf915
docs: Added documentation to the major methods
GeorgeFI Dec 16, 2022
b5b0e9d
tests: Written tests for cpe configs
GeorgeFI Dec 16, 2022
9087cf1
refactor: Refactoring from code review
GeorgeFI Dec 25, 2022
33cc8b4
refactor: Refactoring from notes of code review
GeorgeFI Dec 25, 2022
40cbd05
chore: Formating, fixed tests
GeorgeFI Dec 25, 2022
864a725
tests: Prepared fixture setup for cpe config match test
GeorgeFI Dec 27, 2022
5e2f4ae
test: Added test for cc, not passing yet
GeorgeFI Jan 29, 2023
65a3b18
test: Added test for CPE configurations
GeorgeFI Jan 30, 2023
4a638e1
tests: Fixing not passing tests
GeorgeFI Feb 9, 2023
b785501
format: formatting test file with black
GeorgeFI Feb 9, 2023
f0d27cb
format: manual fixes, black is complaining, but wont fix it
GeorgeFI Feb 9, 2023
0448df8
test: Added tests for FIPS
GeorgeFI Feb 9, 2023
25122b1
test: Added tests for fips, raw implementation
GeorgeFI Feb 18, 2023
5997346
test: Refactored the test
GeorgeFI Feb 18, 2023
21ee2d7
merge: merged main into feature branch
GeorgeFI Feb 20, 2023
81f78ac
refactor: Refactored test for CC
GeorgeFI Feb 25, 2023
00558be
refactor: Refactored match function
GeorgeFI Feb 25, 2023
3e822b5
fix: Fixes in jupyter notebook and pandas utils
GeorgeFI Feb 25, 2023
a50e533
test: Added my own dummy vulnerable cert
GeorgeFI Feb 25, 2023
fe2e1de
fix: Fixed the test for matching cpe
GeorgeFI Feb 25, 2023
67cfddb
finalize cpe matching for on/with configurations
adamjanovsky Mar 10, 2023
4fa6672
codecov to informational
adamjanovsky Mar 10, 2023
ad1330a
fix typo codecov.yml
adamjanovsky Mar 10, 2023
21b88b0
codecov.yml to information also on patch
adamjanovsky Mar 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
329 changes: 308 additions & 21 deletions notebooks/cc/vulnerabilities.ipynb

Large diffs are not rendered by default.

49 changes: 41 additions & 8 deletions src/sec_certs/dataset/cve.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import collections
import datetime
import glob
import itertools
Expand Down Expand Up @@ -34,6 +35,7 @@ def __init__(self, cves: dict[str, CVE], json_path: str | Path = constants.DUMMY
self.cves = cves
self.json_path = Path(json_path)
self.cpe_to_cve_ids_lookup: dict[str, set[str]] = {}
self.cves_with_vulnerable_configurations: list[CVE] = []

@property
def serialized_attributes(self) -> list[str]:
Expand All @@ -54,9 +56,15 @@ def __len__(self) -> int:
def __eq__(self, other: object):
return isinstance(other, CVEDataset) and self.cves == other.cves

def _filter_cves_with_cpe_configurations(self) -> None:
"""
Method filters the subset of CVE dataset thah contain at least one CPE configuration in the CVE.
"""
self.cves_with_vulnerable_configurations = [cve for cve in self if cve.vulnerable_cpe_configurations]
adamjanovsky marked this conversation as resolved.
Show resolved Hide resolved

def build_lookup_dict(self, use_nist_mapping: bool = True, nist_matching_filepath: Path | None = None):
"""
Builds look-up dictionary CPE -> Set[CVE]
Builds look-up dictionary CPE -> Set[CVE] and filter the CVEs which contain CPE configurations.
Developer's note: There are 3 CPEs that are present in the cpe matching feed, but are badly processed by CVE
feed, in which case they won't be found as a key in the dictionary. We intentionally ignore those. Feel free
to add corner cases and manual fixes. According to our investigation, the suffereing CPEs are:
Expand Down Expand Up @@ -87,6 +95,8 @@ def build_lookup_dict(self, use_nist_mapping: bool = True, nist_matching_filepat
else:
self.cpe_to_cve_ids_lookup[cpe.uri].add(cve.cve_id)

self._filter_cves_with_cpe_configurations()

@classmethod
def download_cves(cls, output_path_str: str, start_year: int, end_year: int):
output_path = Path(output_path_str)
Expand Down Expand Up @@ -124,21 +134,38 @@ def from_web(
cls.download_cves(tmp_dir, start_year, end_year)
json_files = glob.glob(tmp_dir + "/*.json")

all_cves = {}
logger.info("Downloaded required resources. Building CVEDataset from jsons.")
results = process_parallel(
cls.from_nist_json,
json_files,
use_threading=False,
progress_bar_desc="Building CVEDataset from jsons",
)
for r in results:
all_cves.update(r.cves)
return cls(dict(collections.ChainMap(*(x.cves for x in results))), json_path)

def _get_cve_ids_for_cpe_uri(self, cpe_uri: str) -> set[str]:
return self.cpe_to_cve_ids_lookup.get(cpe_uri, set())

return cls(all_cves, json_path)
def _get_cves_from_exactly_matched_cpes(self, cpe_uris: set[str]) -> set[str]:
return set(itertools.chain.from_iterable([self._get_cve_ids_for_cpe_uri(cpe_uri) for cpe_uri in cpe_uris]))

def get_cve_ids_for_cpe_uri(self, cpe_uri: str) -> set[str] | None:
return self.cpe_to_cve_ids_lookup.get(cpe_uri, None)
def _get_cves_from_cpe_configurations(self, cpe_uris: set[str]) -> set[str]:
return {
cve.cve_id
for cve in self.cves_with_vulnerable_configurations
if any(configuration.matches(cpe_uris) for configuration in cve.vulnerable_cpe_configurations)
}

def get_cves_from_matched_cpes(self, cpe_uris: set[str]) -> set[str]:
"""
Method returns the set of CVEs which are matched to the set of CPEs.
First are matched the classic CPEs to CVEs with lookup dict and then are matched the
'AND' type CPEs containing platform.
"""
return {
*self._get_cves_from_exactly_matched_cpes(cpe_uris),
*self._get_cves_from_cpe_configurations(cpe_uris),
}

def filter_related_cpes(self, relevant_cpes: set[CPE]):
"""
Expand All @@ -151,7 +178,13 @@ def filter_related_cpes(self, relevant_cpes: set[CPE]):
cve_ids_to_delete = []
for cve in self:
n_cpes_orig = len(cve.vulnerable_cpes)
cve.vulnerable_cpes = list(filter(lambda x: x in relevant_cpes, cve.vulnerable_cpes))
cve.vulnerable_cpes = [x for x in cve.vulnerable_cpes if x in relevant_cpes]
cve.vulnerable_cpe_configurations = [
x
for x in cve.vulnerable_cpe_configurations
if x.platform.uri in relevant_cpes and any(y.uri in relevant_cpes for y in x.cpes)
]

total_deleted_cpes += n_cpes_orig - len(cve.vulnerable_cpes)
if not cve.vulnerable_cpes:
cve_ids_to_delete.append(cve.cve_id)
Expand Down
19 changes: 5 additions & 14 deletions src/sec_certs/dataset/dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from __future__ import annotations

import itertools
import json
import logging
import re
Expand Down Expand Up @@ -527,22 +526,14 @@ def compute_related_cves(
)
return

relevant_cpes = set(itertools.chain.from_iterable(x.heuristics.cpe_matches for x in cpe_rich_certs))
self.auxiliary_datasets.cve_dset.filter_related_cpes(relevant_cpes)
# The following lines don't bring any speed-up. They may potentially save memory if rest of CVEs is cleaned explicitly
# relevant_cpes = set(itertools.chain.from_iterable(x.heuristics.cpe_matches for x in cpe_rich_certs))
# self.auxiliary_datasets.cve_dset.filter_related_cpes(relevant_cpes)

cert: Certificate
for cert in tqdm(cpe_rich_certs, desc="Computing related CVES"):
if cert.heuristics.cpe_matches:
related_cves = [
self.auxiliary_datasets.cve_dset.get_cve_ids_for_cpe_uri(x) for x in cert.heuristics.cpe_matches
]
related_cves = list(filter(lambda x: x is not None, related_cves))
if related_cves:
cert.heuristics.related_cves = set(
itertools.chain.from_iterable(x for x in related_cves if x is not None)
)
else:
cert.heuristics.related_cves = None
related_cves = self.auxiliary_datasets.cve_dset.get_cves_from_matched_cpes(cert.heuristics.cpe_matches)
cert.heuristics.related_cves = related_cves if related_cves else None

n_vulnerable = len([x for x in cpe_rich_certs if x.heuristics.related_cves])
n_vulnerabilities = sum([len(x.heuristics.related_cves) for x in cpe_rich_certs if x.heuristics.related_cves])
Expand Down
3 changes: 2 additions & 1 deletion src/sec_certs/sample/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sec_certs.sample.cc import CCCertificate
from sec_certs.sample.cc_certificate_id import CertificateId
from sec_certs.sample.cc_maintenance_update import CCMaintenanceUpdate
from sec_certs.sample.cpe import CPE, cached_cpe
from sec_certs.sample.cpe import CPE, CPEConfiguration, cached_cpe
from sec_certs.sample.cve import CVE
from sec_certs.sample.fips import FIPSCertificate
from sec_certs.sample.fips_algorithm import FIPSAlgorithm
Expand All @@ -19,6 +19,7 @@
"CCMaintenanceUpdate",
"CCCertificate",
"CPE",
"CPEConfiguration",
"cached_cpe",
"CVE",
"FIPSCertificate",
Expand Down
30 changes: 29 additions & 1 deletion src/sec_certs/sample/cpe.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,33 @@
from sec_certs.utils import helpers


@dataclass(init=False)
@dataclass
class CPEConfiguration(ComplexSerializableType):
__slots__ = ["platform", "cpes"]

platform: CPE
cpes: list[CPE]

def __hash__(self) -> int:
return hash(self.platform) + sum([hash(cpe) for cpe in self.cpes])

def __lt__(self, other: CPEConfiguration) -> bool:
return self.platform < other.platform

def __eq__(self, other: Any) -> bool:
return (
isinstance(other, self.__class__) and self.platform == other.platform and set(self.cpes) == set(other.cpes)
)

def matches(self, other_cpe_uris: set[str]) -> bool:
"""
For a given set of CPEs method returns boolean if the CPE configuration is
matched or not.
"""
return self.platform.uri in other_cpe_uris and any(x.uri in other_cpe_uris for x in self.cpes)


@dataclass
class CPE(PandasSerializableType, ComplexSerializableType):
uri: str
version: str
Expand Down Expand Up @@ -88,6 +114,8 @@ def target_hw(self) -> str:
def pandas_tuple(self) -> tuple:
return self.uri, self.vendor, self.item_name, self.version, self.title

# We cannot use frozen=True. It does not work with __slots__ prior to Python 3.10 dataclasses
# Hence we manually provide __hash__ and __eq__ despite not guaranteeing immutability
def __hash__(self) -> int:
return hash((self.uri, self.start_version, self.end_version))

Expand Down
Loading