From eb69a014f299c557b626e3b971d802437e6a780f Mon Sep 17 00:00:00 2001 From: "John M. Horan" Date: Mon, 14 Aug 2023 15:04:38 -0700 Subject: [PATCH] Add univers version, revise sort and related code, update and add new tests #1228 Reference: https://github.com/nexB/vulnerablecode/issues/1228 Signed-off-by: John M. Horan --- vulnerabilities/models.py | 117 +++++------ vulnerabilities/tests/test_models.py | 292 +++++++++++++++++++++++++-- 2 files changed, 326 insertions(+), 83 deletions(-) diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index 494b045f9..d0824610b 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -34,6 +34,7 @@ from packageurl.contrib.django.models import without_empty_values from rest_framework.authtoken.models import Token from univers import versions +from univers.version_range import RANGE_CLASS_BY_SCHEMES from vulnerabilities.severity_systems import SCORING_SYSTEMS from vulnerabilities.utils import build_vcid @@ -671,60 +672,16 @@ def get_sibling_packages(self, package): subpath=package.subpath, ).distinct() - def assign_univers_version(self, fixed_pkg): - """ - Identify which univers version applies to a package and return that version for use, e.g., - in sorting a group of sibling packages (same type etc.). - """ - # TODO: Many more to be added. - match_type_to_univers_version = { - "conan": versions.ConanVersion, - "deb": versions.DebianVersion, - # The following throws an error: AttributeError: module 'univers.versions' has no attribute 'GemVersion' - # "gem": versions.GemVersion, - "maven": versions.MavenVersion, - "nginx": versions.NginxVersion, - # "npm": - "openssl": versions.OpensslVersion, - "pypi": versions.PypiVersion, - # from https://github.com/nexB/univers/blob/205da7ecbf7b0f195662373ea710b2b84a877eb0/tests/test_version_comparison.py - # versions.SemverVersion, - # versions.GolangVersion, - # versions.PypiVersion, - # versions.GenericVersion, - # versions.ComposerVersion, - # versions.NginxVersion, - # versions.ArchLinuxVersion, - # versions.DebianVersion, - # versions.RpmVersion, - # versions.MavenVersion, - # versions.NugetVersion, - # versions.GentooVersion, - # versions.OpensslVersion, - # versions.LegacyOpensslVersion, - # versions.AlpineLinuxVersion, - } - - command_name = "" - matched_type_to_version = match_type_to_univers_version.get(fixed_pkg.type) - if matched_type_to_version: - command_name = matched_type_to_version - else: - command_name = versions.SemverVersion - - return command_name - - def sort_by_version(self, later_matching_fixed_packages): + def sort_by_version(self, packages_to_sort): # Incoming is a list of - # We'll use assign_univers_version() above to get the univers version as a command_name. - command_name = self.assign_univers_version(later_matching_fixed_packages[0]) - test_sort_by_version = [] - test_sort_by_version = sorted( - later_matching_fixed_packages, - key=lambda x: command_name(x.version), + univers_version = RANGE_CLASS_BY_SCHEMES[packages_to_sort[0].type].version_class + sorted_by_version = [] + sorted_by_version = sorted( + packages_to_sort, + key=lambda x: univers_version(x.version), ) - return test_sort_by_version + return sorted_by_version @property def fixed_package_details(self): @@ -746,11 +703,10 @@ def fixed_package_details(self): non_vuln_sibs.append(sib) # Add just the greater-than versions to a new list - command_name = self.assign_univers_version(self) - + univers_version = RANGE_CLASS_BY_SCHEMES[self.type].version_class later_non_vuln_sibs = [] for non_vuln_sib in non_vuln_sibs: - if command_name(non_vuln_sib.version) > command_name(self.version): + if univers_version(non_vuln_sib.version) > univers_version(self.version): later_non_vuln_sibs.append(non_vuln_sib) # Take the list of vulns affecting the current package, retrieve a list of the fixed packages for each vuln, and assign the result to a custom attribute, 'matching_fixed_packages'. Ex: qs[0].matching_fixed_packages gives us the fixed package(s) for the 1st vuln for this affected package (i.e., self). @@ -766,20 +722,24 @@ def fixed_package_details(self): purl_dict["purl"] = self.purl purl_dict.update({"vulnerabilities": []}) - # HOT: In constructing the purl_dict, I need to include packages that have no vulnerabilities -- we still want various dict fields to be populated to reflect the relevant structure and content all such packages. atm only a few fields are included in the dict for such a package. "closest_non_vulnerable_fix", "closest_non_vulnerable_fix_url", "most_recent_non_vulnerable_fix" and "most_recent_non_vulnerable_fix_url" are currently omitted. + purl_dict["closest_non_vulnerable_fix"] = "" + purl_dict["closest_non_vulnerable_fix_url"] = "" + purl_dict["most_recent_non_vulnerable_fix"] = "" + purl_dict["most_recent_non_vulnerable_fix_url"] = "" for vuln in qs: later_matching_fixed_packages = [] purl_dict["vulnerabilities"].append({"vulnerability": vuln.vulnerability_id}) vuln_matching_fixed_packages = vuln.matching_fixed_packages - command_name = self.assign_univers_version(self) closest_fixed_package = "" + # Need to define here to avoid "" vs. [] for some closest_fixed_by_vulnerabilities values? + closest_fixed_package_vulns_dict = [] if len(vuln_matching_fixed_packages) > 0: for fixed_pkg in vuln_matching_fixed_packages: - if fixed_pkg in matching_fixed_packages and command_name( + if fixed_pkg in matching_fixed_packages and univers_version( fixed_pkg.version - ) > command_name(self.version): + ) > univers_version(self.version): later_matching_fixed_packages.append(fixed_pkg) sort_fixed_by_packages_by_version = self.sort_by_version( @@ -788,7 +748,7 @@ def fixed_package_details(self): closest_fixed_package = sort_fixed_by_packages_by_version[0] closest_fixed_package_vulns = closest_fixed_package.affected_by - + # 2023-08-13 Sunday 16:25:41. Not sure but I think I need to define this initially above just after closest_fixed_package = "" closest_fixed_package_vulns_dict = [ { "vuln_id": fixed_pkg_vuln.vulnerability_id, @@ -803,6 +763,7 @@ def fixed_package_details(self): for dict_vuln in purl_dict["vulnerabilities"]: closest_non_vulnerable_fix = "" + # TODO: 2023-08-13 Sunday 16:01:57. Refactor here and elsewhere! if len(later_non_vuln_sibs) > 0: closest_non_vulnerable_fix = self.sort_by_version(later_non_vuln_sibs)[0] @@ -810,27 +771,45 @@ def fixed_package_details(self): if len(later_non_vuln_sibs) > 0: most_recent_non_vulnerable_fix = self.sort_by_version(later_non_vuln_sibs)[-1] else: - most_recent_non_vulnerable_fix = None + most_recent_non_vulnerable_fix = "" if dict_vuln["vulnerability"] == str(vuln): dict_vuln["closest_fixed_by_purl"] = str(closest_fixed_package) - dict_vuln["closest_fixed_by_url"] = closest_fixed_package.get_absolute_url() - dict_vuln["closest_fixed_by_vulnerabilities"] = closest_fixed_package_vulns_dict + if len(vuln_matching_fixed_packages) > 0: + dict_vuln["closest_fixed_by_url"] = closest_fixed_package.get_absolute_url() + closest_fixed_package_vulns_dict = [ + { + "vuln_id": fixed_pkg_vuln.vulnerability_id, + "vuln_get_absolute_url": fixed_pkg_vuln.get_absolute_url(), + } + for fixed_pkg_vuln in closest_fixed_package_vulns + ] + dict_vuln[ + "closest_fixed_by_vulnerabilities" + ] = closest_fixed_package_vulns_dict + else: + dict_vuln["closest_fixed_by_url"] = "" + dict_vuln["closest_fixed_by_vulnerabilities"] = [] purl_dict["closest_non_vulnerable_fix"] = str(closest_non_vulnerable_fix) - purl_dict[ - "closest_non_vulnerable_fix_url" - ] = closest_non_vulnerable_fix.get_absolute_url() + if len(vuln_matching_fixed_packages) > 0: + purl_dict[ + "closest_non_vulnerable_fix_url" + ] = closest_non_vulnerable_fix.get_absolute_url() + purl_dict[ + "most_recent_non_vulnerable_fix_url" + ] = most_recent_non_vulnerable_fix.get_absolute_url() + else: + purl_dict["closest_non_vulnerable_fix_url"] = "" + purl_dict["most_recent_non_vulnerable_fix_url"] = "" + purl_dict["most_recent_non_vulnerable_fix"] = str( most_recent_non_vulnerable_fix ) - purl_dict[ - "most_recent_non_vulnerable_fix_url" - ] = most_recent_non_vulnerable_fix.get_absolute_url() # Temporary print output during dev/testing: # print("\npurl_dict = {}\n".format(purl_dict)) - # print(json.dumps(purl_dict, indent=4, sort_keys=False)) + print(json.dumps(purl_dict, indent=4, sort_keys=False)) # TODO: Consider whether we want to provide the user with an option to output the dictionary to a file. # pretty_purl_dict = json.dumps(purl_dict, indent=4, sort_keys=False) diff --git a/vulnerabilities/tests/test_models.py b/vulnerabilities/tests/test_models.py index 24bf0e166..8b3115a19 100644 --- a/vulnerabilities/tests/test_models.py +++ b/vulnerabilities/tests/test_models.py @@ -10,13 +10,19 @@ import urllib.parse from datetime import datetime from unittest import TestCase +from unittest import mock import pytest +from django.db import transaction +from django.db.models.query import QuerySet from django.db.utils import IntegrityError from freezegun import freeze_time +from packageurl import PackageURL from univers import versions +from univers.version_range import RANGE_CLASS_BY_SCHEMES from vulnerabilities import models +from vulnerabilities.models import Package class TestVulnerabilityModel(TestCase): @@ -100,10 +106,156 @@ def test_cwe_not_present_in_weaknesses_db(self): @pytest.mark.django_db class TestPackageModel(TestCase): + def setUp(self): + self.vuln1 = models.Vulnerability.objects.create( + summary="test-vuln1", + vulnerability_id="VCID-123", + ) + self.vuln2 = models.Vulnerability.objects.create( + summary="test-vuln2", + vulnerability_id="VCID-456", + ) + + # Create a vuln of its own for the fixed_by_package + self.vuln3 = models.Vulnerability.objects.create( + summary="test-vuln-not-used-anywhere", + vulnerability_id="VCID-000", + ) + + self.vulnerablecode_package = models.Package.objects.create( + type="maven", + namespace="com.fasterxml.jackson.core", + name="jackson-databind", + version="2.13.1", + qualifiers={}, + subpath="", + ) + + self.fixed_by_package = models.Package.objects.create( + type="maven", + namespace="com.fasterxml.jackson.core", + name="jackson-databind", + version="2.13.2", + qualifiers={}, + subpath="", + ) + + self.backport_fixed_by_package = models.Package.objects.create( + type="maven", + namespace="com.fasterxml.jackson.core", + name="jackson-databind", + version="2.12.6.1", + qualifiers={}, + subpath="", + ) + + self.non_vulnerable_package = models.Package.objects.create( + type="maven", + namespace="com.fasterxml.jackson.core", + name="jackson-databind", + version="2.14.0-rc1", + qualifiers={}, + subpath="", + ) + + models.PackageRelatedVulnerability.objects.create( + package=self.vulnerablecode_package, + vulnerability=self.vuln1, + fix=False, + ) + + models.PackageRelatedVulnerability.objects.create( + package=self.vulnerablecode_package, + vulnerability=self.vuln2, + fix=False, + ) + + # Create a fixed_by package for vuln1 + models.PackageRelatedVulnerability.objects.create( + package=self.fixed_by_package, + vulnerability=self.vuln1, + fix=True, + ) + + # Add backport_fixed_by_package as a fixed_by for vuln1 -- but this should be excluded because its version is less than the affected package's version. + models.PackageRelatedVulnerability.objects.create( + package=self.backport_fixed_by_package, + vulnerability=self.vuln1, + fix=True, + ) + + # Create a vuln of its own for the fixed_by_packagefixed_by package for vuln1 + models.PackageRelatedVulnerability.objects.create( + package=self.fixed_by_package, + vulnerability=self.vuln3, + fix=False, + ) + + def test_get_vulnerable_packages(self): + vuln_packages = Package.objects.vulnerable() + assert vuln_packages.count() == 3 + assert vuln_packages.distinct().count() == 2 + + # matching_fixed_packages = vulnerablecode_package.get_fixed_packages(vulnerablecode_package) + # assert vuln_packages.distinct()[0] + + # matching_fixed_packages = vuln_packages.distinct()[0].get_fixed_packages( + # vuln_packages.distinct()[0] + # ) + + first_vulnerable_package = vuln_packages.distinct()[0] + matching_fixed_packages = first_vulnerable_package.get_fixed_packages( + first_vulnerable_package + ) + first_fixed_by_package = matching_fixed_packages[0] + + assert ( + first_vulnerable_package.purl + == "pkg:maven/com.fasterxml.jackson.core/jackson-databind@2.13.1" + ) + assert len(matching_fixed_packages) == 2 + assert ( + first_fixed_by_package.purl + == "pkg:maven/com.fasterxml.jackson.core/jackson-databind@2.12.6.1" + ) + + purl_dict = { + "purl": "pkg:maven/com.fasterxml.jackson.core/jackson-databind@2.13.1", + "vulnerabilities": [ + { + "vulnerability": "VCID-123", + "closest_fixed_by_purl": "pkg:maven/com.fasterxml.jackson.core/jackson-databind@2.13.2", + "closest_fixed_by_url": "/packages/pkg:maven/com.fasterxml.jackson.core/jackson-databind@2.13.2", + "closest_fixed_by_vulnerabilities": [ + { + "vuln_id": "VCID-000", + "vuln_get_absolute_url": "/vulnerabilities/VCID-000", + } + ], + }, + { + "vulnerability": "VCID-456", + "closest_fixed_by_purl": "There are no reported fixed packages.", + "closest_fixed_by_url": "", + "closest_fixed_by_vulnerabilities": [], + }, + ], + "closest_non_vulnerable_fix": "pkg:maven/com.fasterxml.jackson.core/jackson-databind@2.14.0-rc1", + "closest_non_vulnerable_fix_url": "", + "most_recent_non_vulnerable_fix": "pkg:maven/com.fasterxml.jackson.core/jackson-databind@2.14.0-rc1", + "most_recent_non_vulnerable_fix_url": "", + } + + assert vuln_packages.distinct()[0].fixed_package_details == purl_dict + def test_univers_version_comparisons(self): assert versions.PypiVersion("1.2.3") < versions.PypiVersion("1.2.4") assert versions.PypiVersion("0.9") < versions.PypiVersion("0.10") + deb01 = models.Package.objects.create(type="deb", name="git", version="2.30.1") + deb02 = models.Package.objects.create(type="deb", name="git", version="2.31.1") + assert versions.DebianVersion(deb01.version) < versions.DebianVersion(deb02.version) + # pkg:deb/debian/jackson-databind@2.12.1-1%2Bdeb11u1 is a real PURL in the DB # But we need to replace/delete the "%". Test the error: with pytest.raises(versions.InvalidVersion): @@ -124,21 +276,133 @@ def test_univers_version_comparisons(self): # Use SemverVersion instead as a default fallback version for comparisons. assert versions.SemverVersion("0.9") < versions.SemverVersion("0.10") - deb01 = models.Package.objects.create(type="deb", name="git", version="2.30.1") - deb02 = models.Package.objects.create(type="deb", name="git", version="2.31.1") - assert versions.DebianVersion(deb01.version) < versions.DebianVersion(deb02.version) + def test_univers_version_class(self): + gem_version = RANGE_CLASS_BY_SCHEMES["gem"].version_class + assert gem_version == versions.RubygemsVersion + + gem_package = models.Package.objects.create(type="gem", name="sidekiq", version="0.9") + gem_package_version = RANGE_CLASS_BY_SCHEMES[gem_package.type].version_class + assert gem_package_version == versions.RubygemsVersion + + deb_version = RANGE_CLASS_BY_SCHEMES["deb"].version_class + assert deb_version == versions.DebianVersion + + deb_package = models.Package.objects.create(type="deb", name="git", version="2.31.1") + deb_package_version = RANGE_CLASS_BY_SCHEMES[deb_package.type].version_class + assert deb_package_version == versions.DebianVersion + + pypi_version = RANGE_CLASS_BY_SCHEMES["pypi"].version_class + assert pypi_version == versions.PypiVersion + + pypi_package = models.Package.objects.create(type="pypi", name="pyopenssl", version="0.9") + pypi_package_version = RANGE_CLASS_BY_SCHEMES[pypi_package.type].version_class + assert pypi_package_version == versions.PypiVersion + + def test_sort_by_version(self): + list_to_sort = [ + "pkg:npm/sequelize@3.13.1", + "pkg:npm/sequelize@3.10.1", + "pkg:npm/sequelize@3.40.1", + "pkg:npm/sequelize@3.9.1", + ] + + # Convert list of strings ^ to a list of vulnerablecode Package objects. + vuln_pkg_list = [] + for package in list_to_sort: + purl = PackageURL.from_string(package) + attrs = {k: v for k, v in purl.to_dict().items() if v} + vulnerablecode_package = models.Package.objects.create(**attrs) + vuln_pkg_list.append(vulnerablecode_package) - def test_assign_univers_version(self): - requesting_package = models.Package.objects.create(type="deb", name="git", version="2.30.1") + requesting_package = models.Package.objects.create( + type="npm", + name="sequelize", + version="3.0.0", + ) + + sorted_pkgs = requesting_package.sort_by_version(vuln_pkg_list) + first_sorted_item = sorted_pkgs[0] + + assert sorted_pkgs[0].purl == "pkg:npm/sequelize@3.9.1" + assert sorted_pkgs[-1].purl == "pkg:npm/sequelize@3.40.1" + + def test_string_to_purl_to_dict_to_package(self): + # Convert a PURL string to a PURL to a dictionary to a VulnerableCode Package, i.e., + # a . + + # Convert a PURL string to a PURL. + purl_string = "pkg:maven/org.apache.tomcat.embed/tomcat-embed-core@9.0.31" + purl = PackageURL.from_string(purl_string) + + assert type(purl) == PackageURL + assert purl.type == "maven" + assert purl.qualifiers == {} + assert purl.subpath == None + + # Convert the PURL to a dictionary. + # It appears that this step is where the unwanted None values are created for qualifiers and + # subpath when the PURL does not already contain values for those attributes. + purl_to_dict = purl.to_dict() + + assert purl_to_dict == { + "type": "maven", + "namespace": "org.apache.tomcat.embed", + "name": "tomcat-embed-core", + "version": "9.0.31", + "qualifiers": None, + "subpath": None, + } + assert purl_to_dict.get("qualifiers") == None + assert purl_to_dict.get("subpath") == None - deb01 = models.Package.objects.create(type="deb", name="git", version="2.31.1") - command_name_deb01 = requesting_package.assign_univers_version(deb01) - assert command_name_deb01 == versions.DebianVersion + # Convert the dictionary to a VulnerableCode Package, i.e., + # a - pypi01 = models.Package.objects.create(type="pypi", name="pyopenssl", version="0.9") - command_name_pypi01 = requesting_package.assign_univers_version(pypi01) - assert command_name_pypi01 == versions.PypiVersion + # If subpath is None we get error: django.db.utils.IntegrityError: null value in column + # "subpath" violates not-null constraint -- need to convert value from None to empty string. + # Similar issue with qualifiers, which must be converted from None to {}. - gem01 = models.Package.objects.create(type="gem", name="sidekiq", version="0.9") - command_name_gem01 = requesting_package.assign_univers_version(gem01) - assert command_name_gem01 == versions.SemverVersion + # I've structured the following in this way because trying instead to use + # "with pytest.raises(IntegrityError):" will throw the error + # django.db.transaction.TransactionManagementError: An error occurred in the current + # transaction. You can't execute queries until the end of the 'atomic' block. + + try: + with transaction.atomic(): + vulnerablecode_package = models.Package.objects.create( + type=purl_to_dict.get("type"), + namespace=purl_to_dict.get("namespace"), + name=purl_to_dict.get("name"), + version=purl_to_dict.get("version"), + qualifiers=purl_to_dict.get("qualifiers"), + subpath=purl_to_dict.get("subpath"), + ) + except IntegrityError: + print("\nAs expected, an IntegrityError has occurred.\n") + + # This will avoid the IntegrityError: + if purl_to_dict.get("qualifiers") is None: + purl_to_dict["qualifiers"] = {} + if purl_to_dict.get("subpath") is None: + purl_to_dict["subpath"] = "" + + # Check the qualifiers and subpath values again. + assert purl_to_dict.get("qualifiers") == {} + assert purl_to_dict.get("subpath") == "" + + vulnerablecode_package = models.Package.objects.create( + type=purl_to_dict.get("type"), + namespace=purl_to_dict.get("namespace"), + name=purl_to_dict.get("name"), + version=purl_to_dict.get("version"), + qualifiers=purl_to_dict.get("qualifiers"), + subpath=purl_to_dict.get("subpath"), + ) + + assert type(vulnerablecode_package) == models.Package + assert ( + vulnerablecode_package.purl + == "pkg:maven/org.apache.tomcat.embed/tomcat-embed-core@9.0.31" + ) + assert vulnerablecode_package.qualifiers == {} + assert vulnerablecode_package.subpath == ""