-
Notifications
You must be signed in to change notification settings - Fork 201
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add initial fixed-affected-matching work #1228 #1249
Changes from 61 commits
1e4079d
624f047
945b811
6497e90
b6dba78
814cd06
eee1d79
f920ded
40c1758
f370671
9f73ea4
9ccaed7
1a26613
b626762
005c094
ab80c46
55f9678
cc45e2a
f904f09
b750d65
fe23fec
4cdb629
89aab8f
04824e8
e2e4e13
c76a43d
7a8f83b
2fdc594
1bb4f20
dff950f
627d117
65d82a7
3b34e46
eb69a01
9a9401a
5c6838c
3ef542b
1045b36
57007b9
1ffccbe
dc701ca
f0bdd3b
4314820
df7f404
e475e5d
6a72c58
f5e267c
ce2c6cf
7c412ed
eecd504
9978841
6cd41d3
e111dbe
5eaa9ce
4ce5d51
f1a0530
9ec2a6a
91c1817
3dac483
2abcba0
0ec7d6c
7a512c2
26c0239
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
from django.core.validators import MinValueValidator | ||
from django.db import models | ||
from django.db.models import Count | ||
from django.db.models import Prefetch | ||
from django.db.models import Q | ||
from django.db.models.functions import Length | ||
from django.db.models.functions import Trim | ||
|
@@ -32,6 +33,8 @@ | |
from packageurl.contrib.django.models import PackageURLQuerySet | ||
from packageurl.contrib.django.models import without_empty_values | ||
from rest_framework.authtoken.models import Token | ||
from univers import versions | ||
from univers.version_range import RANGE_CLASS_BY_SCHEMES | ||
|
||
from vulnerabilities.severity_systems import SCORING_SYSTEMS | ||
from vulnerabilities.utils import build_vcid | ||
|
@@ -67,6 +70,12 @@ def paginated(self, per_page=5000): | |
|
||
|
||
class VulnerabilityQuerySet(BaseQuerySet): | ||
def affecting_vulnerabilities(self): | ||
johnmhoran marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Return a queryset of Vulnerability that affect a package. | ||
""" | ||
return self.filter(packagerelatedvulnerability__fix=False) | ||
|
||
def with_cpes(self): | ||
""" | ||
Return a queryset of Vulnerability that have one or more NVD CPE references. | ||
|
@@ -404,6 +413,24 @@ def purl_to_dict(purl: PackageURL): | |
|
||
|
||
class PackageQuerySet(BaseQuerySet, PackageURLQuerySet): | ||
def get_fixed_by_package_versions(self, purl: PackageURL, fix=True): | ||
""" | ||
Return a queryset of all the package versions of this `package` that fix any vulnerability. | ||
If `fix` is False, return all package versions whether or not they fix a vulnerability. | ||
""" | ||
filter_dict = { | ||
"name": purl.name, | ||
"namespace": purl.namespace, | ||
"type": purl.type, | ||
"qualifiers": purl.qualifiers, | ||
"subpath": purl.subpath, | ||
} | ||
|
||
if fix: | ||
filter_dict["packagerelatedvulnerability__fix"] = True | ||
|
||
return Package.objects.filter(**filter_dict).distinct() | ||
|
||
def get_or_create_from_purl(self, purl: PackageURL): | ||
""" | ||
Return an existing or new Package (created if neeed) given a | ||
|
@@ -601,7 +628,6 @@ def __str__(self): | |
return self.package_url | ||
|
||
@property | ||
# TODO: consider renaming to "affected_by" | ||
def affected_by(self): | ||
""" | ||
Return a queryset of vulnerabilities affecting this package. | ||
|
@@ -642,6 +668,144 @@ def get_absolute_url(self): | |
""" | ||
return reverse("package_details", args=[self.purl]) | ||
|
||
def sort_by_version(self, packages): | ||
""" | ||
Return a list of `packages` sorted by version. | ||
""" | ||
if not packages: | ||
return [] | ||
|
||
return sorted( | ||
packages, | ||
key=lambda x: self.version_class(x.version), | ||
) | ||
|
||
@property | ||
def version_class(self): | ||
return RANGE_CLASS_BY_SCHEMES[self.type].version_class | ||
|
||
@property | ||
def current_version(self): | ||
return self.version_class(self.version) | ||
|
||
@property | ||
def fixed_package_details(self): | ||
""" | ||
Return a mapping of vulnerabilities that affect this package and the next and | ||
latest non-vulnerable versions. | ||
""" | ||
package_details = {} | ||
package_details["purl"] = PackageURL.from_string(self.purl) | ||
|
||
next_non_vulnerable, latest_non_vulnerable = self.get_non_vulnerable_versions() | ||
package_details["next_non_vulnerable"] = next_non_vulnerable | ||
package_details["latest_non_vulnerable"] = latest_non_vulnerable | ||
|
||
package_details["vulnerabilities"] = self.get_affecting_vulnerabilities() | ||
|
||
return package_details | ||
|
||
def get_non_vulnerable_versions(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing unit test There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test added yesterday. |
||
""" | ||
Return a tuple of the next and latest non-vulnerable versions as PackageURLs. Return a tuple of | ||
(None, None) if there is no non-vulnerable version. | ||
""" | ||
package_versions = Package.objects.get_fixed_by_package_versions(self, fix=False) | ||
|
||
non_vulnerable_versions = [] | ||
for version in package_versions: | ||
if not version.is_vulnerable: | ||
non_vulnerable_versions.append(version) | ||
|
||
later_non_vulnerable_versions = [] | ||
for non_vuln_ver in non_vulnerable_versions: | ||
if self.version_class(non_vuln_ver.version) > self.current_version: | ||
later_non_vulnerable_versions.append(non_vuln_ver) | ||
|
||
if later_non_vulnerable_versions: | ||
sorted_versions = self.sort_by_version(later_non_vulnerable_versions) | ||
next_non_vulnerable_version = sorted_versions[0] | ||
latest_non_vulnerable_version = sorted_versions[-1] | ||
|
||
next_non_vulnerable = PackageURL.from_string(next_non_vulnerable_version.purl) | ||
latest_non_vulnerable = PackageURL.from_string(latest_non_vulnerable_version.purl) | ||
|
||
return next_non_vulnerable, latest_non_vulnerable | ||
|
||
return None, None | ||
|
||
def get_affecting_vulnerabilities(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing unit test There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Test added yesterday. |
||
""" | ||
Return a list of vulnerabilities that affect this package together with information regarding | ||
the versions that fix the vulnerabilities. | ||
""" | ||
package_details_vulns = [] | ||
|
||
fixed_by_packages = Package.objects.get_fixed_by_package_versions(self, fix=True) | ||
|
||
package_vulnerabilities = self.vulnerabilities.affecting_vulnerabilities().prefetch_related( | ||
Prefetch( | ||
"packages", | ||
queryset=fixed_by_packages, | ||
to_attr="fixed_packages", | ||
) | ||
) | ||
|
||
for vuln in package_vulnerabilities: | ||
package_details_vulns.append({"vulnerability": vuln}) | ||
later_fixed_packages = [] | ||
|
||
for fixed_pkg in vuln.fixed_packages: | ||
if fixed_pkg not in fixed_by_packages: | ||
continue | ||
fixed_version = self.version_class(fixed_pkg.version) | ||
if fixed_version > self.current_version: | ||
later_fixed_packages.append(fixed_pkg) | ||
|
||
next_fixed_package = None | ||
next_fixed_package_vulns = [] | ||
|
||
sort_fixed_by_packages_by_version = [] | ||
if later_fixed_packages: | ||
sort_fixed_by_packages_by_version = self.sort_by_version(later_fixed_packages) | ||
|
||
fixed_by_pkgs = [] | ||
|
||
for vuln_details in package_details_vulns: | ||
if vuln_details["vulnerability"] != vuln: | ||
continue | ||
vuln_details["fixed_by_purl"] = [] | ||
vuln_details["fixed_by_purl_vulnerabilities"] = [] | ||
|
||
for fixed_by_pkg in sort_fixed_by_packages_by_version: | ||
fixed_by_package_details = {} | ||
fixed_by_purl = PackageURL.from_string(fixed_by_pkg.purl) | ||
next_fixed_package_vulns = list(fixed_by_pkg.affected_by) | ||
|
||
fixed_by_package_details["fixed_by_purl"] = fixed_by_purl | ||
fixed_by_package_details[ | ||
"fixed_by_purl_vulnerabilities" | ||
] = next_fixed_package_vulns | ||
fixed_by_pkgs.append(fixed_by_package_details) | ||
|
||
vuln_details["fixed_by_package_details"] = fixed_by_pkgs | ||
|
||
return package_details_vulns | ||
|
||
@property | ||
def fixing_vulnerabilities(self): | ||
""" | ||
Return only packages fixing a vulnerability . | ||
""" | ||
return self.vulnerabilities.all().filter(packagerelatedvulnerability__fix=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deleted |
||
|
||
@property | ||
def affecting_vulnerabilities(self): | ||
""" | ||
Return only packages fixing a vulnerability . | ||
""" | ||
return self.vulnerabilities.all().filter(packagerelatedvulnerability__fix=False) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deleted |
||
|
||
|
||
class PackageRelatedVulnerability(models.Model): | ||
""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good case to use a list comprehension:
Also, what does
vuln
stand for? I'm guessingvulnerability
and in that case,get_vulnerability(vulnerability)
is not great.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this @tdruez . Re renaming, when you see terms or functions with names you don't like, if possible please suggest an alternative name that's acceptable so we don't repeat the process of me choosing a different name that you also don't like. ;-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tdruez I can't think of a better naming convention so leaving as is. It's clearly labelled and I don't see a problem. Specific renaming suggestions welcome from anyone and everyone.