diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 68f5c1a00..cb676817b 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -10,7 +10,7 @@ Next release - We re-enabled support for the istio vulnerabilities advisories importer. - We re-enabled support for the kbmsr2019 vulnerabilities advisories importer. - We re-enabled support for the suse score advisories importer. - +- We re-enabled support for the elixir security advisories importer. Version v31.1.1 --------------- diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py index 1095739bd..13b6e12c7 100644 --- a/vulnerabilities/importers/__init__.py +++ b/vulnerabilities/importers/__init__.py @@ -12,6 +12,7 @@ from vulnerabilities.importers import archlinux from vulnerabilities.importers import debian from vulnerabilities.importers import debian_oval +from vulnerabilities.importers import elixir_security from vulnerabilities.importers import gentoo from vulnerabilities.importers import github from vulnerabilities.importers import gitlab @@ -53,6 +54,7 @@ istio.IstioImporter, project_kb_msr2019.ProjectKBMSRImporter, suse_scores.SUSESeverityScoreImporter, + elixir_security.ElixirSecurityImporter, ] IMPORTERS_REGISTRY = {x.qualified_name: x for x in IMPORTERS_REGISTRY} diff --git a/vulnerabilities/importers/elixir_security.py b/vulnerabilities/importers/elixir_security.py index d920d9cef..a1d0a33cf 100644 --- a/vulnerabilities/importers/elixir_security.py +++ b/vulnerabilities/importers/elixir_security.py @@ -6,119 +6,97 @@ # See https://github.com/nexB/vulnerablecode for support or download. # See https://aboutcode.org for more information about nexB OSS projects. # -import asyncio +from pathlib import Path from typing import Set from packageurl import PackageURL -from univers.version_range import VersionRange -from univers.versions import SemverVersion +from univers.version_constraint import VersionConstraint +from univers.version_range import HexVersionRange from vulnerabilities.importer import AdvisoryData -from vulnerabilities.importer import GitImporter +from vulnerabilities.importer import AffectedPackage +from vulnerabilities.importer import Importer from vulnerabilities.importer import Reference -from vulnerabilities.package_managers import HexVersionAPI +from vulnerabilities.utils import is_cve from vulnerabilities.utils import load_yaml -from vulnerabilities.utils import nearest_patched_package -class ElixirSecurityImporter(GitImporter): - def __enter__(self): - super(ElixirSecurityImporter, self).__enter__() +class ElixirSecurityImporter(Importer): - if not getattr(self, "_added_files", None): - self._added_files, self._updated_files = self.file_changes( - recursive=True, file_ext="yml", subdir="./packages" - ) - self.pkg_manager_api = HexVersionAPI() - self.set_api(self.collect_packages()) - - def set_api(self, packages): - asyncio.run(self.pkg_manager_api.load_api(packages)) - - def updated_advisories(self) -> Set[AdvisoryData]: - files = self._updated_files.union(self._added_files) - advisories = [] - for f in files: - processed_data = self.process_file(f) - if processed_data: - advisories.append(processed_data) - return self.batch_advisories(advisories) - - def collect_packages(self): - packages = set() - files = self._updated_files.union(self._added_files) - for f in files: - data = load_yaml(f) - if data.get("package"): - packages.add(data["package"]) - - return packages - - def get_versions_for_pkg_from_range_list(self, version_range_list, pkg_name): - # Takes a list of version ranges(pathced and unaffected) of a package - # as parameter and returns a tuple of safe package versions and - # vulnerable package versions - - safe_pkg_versions = [] - vuln_pkg_versions = [] - all_version_list = self.pkg_manager_api.get(pkg_name).valid_versions - if not version_range_list: - return [], all_version_list - version_ranges = [ - VersionRange.from_scheme_version_spec_string("semver", r) for r in version_range_list - ] - for version in all_version_list: - version_obj = SemverVersion(version) - if any([version_obj in v for v in version_ranges]): - safe_pkg_versions.append(version) - - vuln_pkg_versions = set(all_version_list) - set(safe_pkg_versions) - return safe_pkg_versions, vuln_pkg_versions + repo_url = "git+https://github.com/dependabot/elixir-security-advisories" + license_url = "https://github.com/dependabot/elixir-security-advisories/blob/master/LICENSE.txt" + spdx_license_expression = "CC0-1.0" + + def advisory_data(self) -> Set[AdvisoryData]: + try: + self.clone(self.repo_url) + path = Path(self.vcs_response.dest_dir) + vuln = path / "packages" + for file in vuln.glob("**/*.yml"): + yield from self.process_file(file) + finally: + if self.vcs_response: + self.vcs_response.delete() def process_file(self, path): + path = str(path) yaml_file = load_yaml(path) - pkg_name = yaml_file["package"] - safe_pkg_versions = [] - vuln_pkg_versions = [] - if not yaml_file.get("patched_versions"): - yaml_file["patched_versions"] = [] - - if not yaml_file.get("unaffected_versions"): - yaml_file["unaffected_versions"] = [] - - safe_pkg_versions, vuln_pkg_versions = self.get_versions_for_pkg_from_range_list( - yaml_file["patched_versions"] + yaml_file["unaffected_versions"], - pkg_name, - ) + cve_id = "" + summary = yaml_file.get("description") or "" + pkg_name = yaml_file.get("package") or "" + + cve = yaml_file.get("cve") or "" + + if cve and not cve.startswith("CVE-"): + cve_id = f"CVE-{cve}" + + if not cve_id: + return [] + + if not is_cve(cve_id): + return [] + + references = [] + link = yaml_file.get("link") or "" + if link: + references.append( + Reference( + url=link, + ) + ) + + affected_packages = [] + + unaffected_versions = yaml_file.get("unaffected_versions") or [] + patched_versions = yaml_file.get("patched_versions") or [] + + constraints = [] + vrc = HexVersionRange.version_class + + for version in unaffected_versions: + constraints.append(VersionConstraint.from_string(version_class=vrc, string=version)) + + for version in patched_versions: + if version.startswith("~>"): + version = version[2:] + constraints.append( + VersionConstraint.from_string(version_class=vrc, string=version).invert() + ) + + if pkg_name: + affected_packages.append( + AffectedPackage( + package=PackageURL( + type="hex", + name=pkg_name, + ), + affected_version_range=HexVersionRange(constraints=constraints), + ) + ) - if yaml_file.get("cve"): - cve_id = "CVE-" + yaml_file["cve"] - else: - cve_id = "" - - safe_purls = [] - vuln_purls = [] - - safe_purls = [ - PackageURL(name=pkg_name, type="hex", version=version) for version in safe_pkg_versions - ] - - vuln_purls = [ - PackageURL(name=pkg_name, type="hex", version=version) for version in vuln_pkg_versions - ] - - references = [ - Reference( - reference_id=yaml_file["id"], - ), - Reference( - url=yaml_file["link"], - ), - ] - - return AdvisoryData( - summary=yaml_file["description"], - affected_packages=nearest_patched_package(vuln_purls, safe_purls), - vulnerability_id=cve_id, + yield AdvisoryData( + aliases=[cve_id], + summary=summary, references=references, + affected_packages=affected_packages, ) diff --git a/vulnerabilities/tests/conftest.py b/vulnerabilities/tests/conftest.py index 9d9764694..8ee0affda 100644 --- a/vulnerabilities/tests/conftest.py +++ b/vulnerabilities/tests/conftest.py @@ -28,7 +28,6 @@ def no_rmtree(monkeypatch): "test_apache_kafka.py", "test_apache_tomcat.py", "test_api.py", - "test_elixir_security.py", "test_models.py", "test_package_managers.py", "test_ruby.py", diff --git a/vulnerabilities/tests/test_data/elixir_security/elixir-expected.json b/vulnerabilities/tests/test_data/elixir_security/elixir-expected.json new file mode 100644 index 000000000..38695360d --- /dev/null +++ b/vulnerabilities/tests/test_data/elixir_security/elixir-expected.json @@ -0,0 +1,30 @@ +[ + { + "aliases": [ + "CVE-2018-20301" + ], + "summary": "The Coherence library has \"Mass Assignment\"-like vulnerabilities.\n", + "affected_packages": [ + { + "package": { + "type": "hex", + "namespace": null, + "name": "coherence", + "version": null, + "qualifiers": null, + "subpath": null + }, + "affected_version_range": "vers:hex/<0.5.2", + "fixed_version": null + } + ], + "references": [ + { + "reference_id": "", + "url": "https://github.com/smpallen99/coherence/issues/270", + "severities": [] + } + ], + "date_published": null + } +] \ No newline at end of file diff --git a/vulnerabilities/tests/test_elixir_security.py b/vulnerabilities/tests/test_elixir_security.py index 3f751ed8c..631f44450 100644 --- a/vulnerabilities/tests/test_elixir_security.py +++ b/vulnerabilities/tests/test_elixir_security.py @@ -8,242 +8,16 @@ # import os -from collections import OrderedDict -from unittest import TestCase -from packageurl import PackageURL - -from vulnerabilities.importer import AdvisoryData -from vulnerabilities.importer import Reference from vulnerabilities.importers.elixir_security import ElixirSecurityImporter -from vulnerabilities.package_managers import HexVersionAPI -from vulnerabilities.package_managers import Version -from vulnerabilities.utils import AffectedPackage +from vulnerabilities.tests import util_tests BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +TEST_DIR = os.path.join(BASE_DIR, "test_data/elixir_security/") -class TestElixirSecurityImporter(TestCase): - @classmethod - def setUpClass(cls): - data_source_cfg = { - "repository_url": "https://github.com/dependabot/elixir-security-advisories", - } - cls.data_src = ElixirSecurityImporter(1, config=data_source_cfg) - cls.data_src.pkg_manager_api = HexVersionAPI( - { - "coherence": [ - Version("0.5.2"), - Version("0.5.1"), - Version("0.5.0"), - Version("0.4.0"), - Version("0.3.1"), - Version("0.3.0"), - Version("0.2.0"), - Version("0.1.3"), - Version("0.1.2"), - Version("0.1.1"), - Version("0.1.0"), - ] - } - ) - - def test_process_file(self): - - path = os.path.join(BASE_DIR, "test_data/elixir_security/test_file.yml") - expected_advisory = Advisory( - summary=('The Coherence library has "Mass Assignment"-like vulnerabilities.\n'), - affected_packages=[ - AffectedPackage( - vulnerable_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.1.0", - qualifiers={}, - subpath=None, - ), - patched_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.5.2", - qualifiers={}, - subpath=None, - ), - ), - AffectedPackage( - vulnerable_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.1.1", - qualifiers={}, - subpath=None, - ), - patched_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.5.2", - qualifiers={}, - subpath=None, - ), - ), - AffectedPackage( - vulnerable_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.1.2", - qualifiers={}, - subpath=None, - ), - patched_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.5.2", - qualifiers={}, - subpath=None, - ), - ), - AffectedPackage( - vulnerable_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.1.3", - qualifiers={}, - subpath=None, - ), - patched_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.5.2", - qualifiers={}, - subpath=None, - ), - ), - AffectedPackage( - vulnerable_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.2.0", - qualifiers={}, - subpath=None, - ), - patched_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.5.2", - qualifiers={}, - subpath=None, - ), - ), - AffectedPackage( - vulnerable_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.3.0", - qualifiers={}, - subpath=None, - ), - patched_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.5.2", - qualifiers={}, - subpath=None, - ), - ), - AffectedPackage( - vulnerable_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.3.1", - qualifiers={}, - subpath=None, - ), - patched_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.5.2", - qualifiers={}, - subpath=None, - ), - ), - AffectedPackage( - vulnerable_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.4.0", - qualifiers={}, - subpath=None, - ), - patched_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.5.2", - qualifiers={}, - subpath=None, - ), - ), - AffectedPackage( - vulnerable_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.5.0", - qualifiers={}, - subpath=None, - ), - patched_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.5.2", - qualifiers={}, - subpath=None, - ), - ), - AffectedPackage( - vulnerable_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.5.1", - qualifiers={}, - subpath=None, - ), - patched_package=PackageURL( - type="hex", - namespace=None, - name="coherence", - version="0.5.2", - qualifiers={}, - subpath=None, - ), - ), - ], - references=[ - Reference( - reference_id="2aae6e3a-24a3-4d5f-86ff-b964eaf7c6d1", - ), - Reference(url="https://github.com/smpallen99/coherence/issues/270"), - ], - vulnerability_id="CVE-2018-20301", - ) - - found_advisory = self.data_src.process_file(path) - - assert expected_advisory.normalized() == found_advisory.normalized() +def test_elixir_process_file(): + path = os.path.join(TEST_DIR, "test_file.yml") + expected_file = os.path.join(TEST_DIR, f"elixir-expected.json") + result = [data.to_dict() for data in list(ElixirSecurityImporter().process_file(path))] + util_tests.check_results_against_json(result, expected_file)