Skip to content

Commit

Permalink
Update parse_advisory() and tests #597
Browse files Browse the repository at this point in the history
Reference: #597

Signed-off-by: John M. Horan <johnmhoran@gmail.com>
  • Loading branch information
johnmhoran committed Oct 6, 2022
1 parent 664af89 commit 8d5ca76
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 67 deletions.
88 changes: 38 additions & 50 deletions vulnerabilities/importers/archlinux.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@
# See https://github.com/nexB/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import dataclasses
import json
import pprint

from typing import Iterable
from typing import List
from typing import Mapping
from typing import Set
from urllib.request import urlopen

from packageurl import PackageURL
Expand All @@ -22,34 +19,11 @@
from vulnerabilities.importer import Importer
from vulnerabilities.importer import Reference
from vulnerabilities.importer import VulnerabilitySeverity

# 9/28/2022 Wednesday 12:58:46 PM. Do we need the next import? It's used at the bottom!
from vulnerabilities.models import Advisory

# 9/28/2022 Wednesday 1:04:27 PM. From /home/jmh/dev/nexb/vulnerablecode/vulnerabilities/importers/alpine_linux.py
# copy fetch_response function to vulnerabilities.utils then import here and use below
from vulnerabilities.utils import fetch_response
from vulnerabilities.utils import nearest_patched_package

# Take a URL -> Grab the data from the URL -> Map it according to AdvisoryData


class ArchlinuxImporter(Importer):
# def __enter__(self):
# self._api_response = self._fetch()

# def updated_advisories(self) -> Set[AdvisoryData]:
# advisories = []

# for record in self._api_response:
# advisories.extend(self._parse(record))

# return self.batch_advisories(advisories)

# def _fetch(self) -> Iterable[Mapping]:
# with urlopen(self.config.archlinux_tracker_url) as response:
# return json.load(response)

url = "https://security.archlinux.org/json"
spdx_license_expression = "unknown"

Expand All @@ -61,20 +35,21 @@ def advisory_data(self) -> Iterable[AdvisoryData]:
for record in self.fetch():
yield self.parse_advisory(record)

# def _parse(self, record) -> List[AdvisoryData]:
# The JSON includes 'status' and 'type' fields do we want to incorporate them into the AdvisoryData objects?
# Although not directly reflected in the JSON, the web page for at least some references include an additional reference,
# see, e.g., https://security.archlinux.org/AVG-2781 (one of our test inputs, which lists this ref: https://github.com/jpadilla/pyjwt/security/advisories/GHSA-ffqj-6fqr-9h24)
# Do we want to incorporate them into the AdvisoryData objects?
def parse_advisory(self, record) -> List[AdvisoryData]:
advisories = []
aliases = record["issues"]
for cve_id in record["issues"]:
for alias in record["issues"]:
affected_packages = []
for name in record["packages"]:
impacted_purls, resolved_purls = [], []
impacted_purls.append(
PackageURL(
name=name,
# type="pacman",
# type="alpm",
type="archlinux",
type="alpm",
namespace="archlinux",
version=record["affected"],
)
Expand All @@ -84,9 +59,7 @@ def parse_advisory(self, record) -> List[AdvisoryData]:
resolved_purls.append(
PackageURL(
name=name,
# type="pacman",
# type="alpm",
type="archlinux",
type="alpm",
namespace="archlinux",
version=record["fixed"],
)
Expand Down Expand Up @@ -115,26 +88,41 @@ def parse_advisory(self, record) -> List[AdvisoryData]:
)

advisories.append(
# Advisory(
AdvisoryData(
# deprecated
# vulnerability_id=cve_id,
aliases=[cve_id],
# summary="",
# Do we want/need to keep this inside a list? "aliases" is plural but I understand we want to break out each alias individually.
# However, it looks like alpine_linux.py and nginx.py, for example, return a list of aliases.
aliases=[alias],
# aliases=alias,
summary="",
affected_packages=affected_packages,
references=references,
)
)

print("\rHello World!\r")

print("\radvisories = {}\r".format(advisories))

for apple in advisories:
# pprint.pprint(apple.to_dict())
print(apple.affected_packages)
print(f"aliases: {apple.aliases}")
print(f"summary: {apple.summary}")
print(f"affected_packages: {apple.affected_packages}")
# The print statements below will print the structure of each test advisory when either of these tests is run:
# pytest -vvs -k test_parse_advisory_single vulnerabilities/tests/test_archlinux.py
# pytest -vvs -k test_parse_advisory_multi vulnerabilities/tests/test_archlinux.py

print("\n\r=================================\n\r")

for advisory in advisories:
print(f"1. aliases: {advisory.aliases}\r")
print("")
print(f"2. summary: {advisory.summary}\r")
print("")
print(f"3. affected_packages: {advisory.affected_packages}\r")
for pkg in advisory.affected_packages:
print("")
print("vulnerable_package: {}\r".format(pkg.vulnerable_package))
print("")
print("patched_package: {}\r".format(pkg.patched_package))
print("")
print(f"4. references: {advisory.references}\r")
for ref in advisory.references:
print("")
print("ref: {}\r".format(ref))
print("")
print(f"5. date_published: {advisory.date_published}\r")
print("\n\r=================================\n\r")

return advisories
20 changes: 3 additions & 17 deletions vulnerabilities/tests/test_archlinux.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ def assert_for_package(self, name, version, cve_ids=None):
assert cve_ids == {v.vulnerability_id for v in qs[0].vulnerabilities.all()}


# 9/28/2022 Wednesday 12:45:59 PM. https://security.archlinux.org/json -- view in Firefox, copy, paste, change null to None
def test_parse_advisory():
def test_parse_advisory_single():
record = {
"name": "AVG-2781",
"packages": ["python-pyjwt"],
Expand All @@ -121,21 +120,7 @@ def test_parse_advisory():
assert archlinux.ArchlinuxImporter().parse_advisory(record)


# 9/29/2022 Thursday 9:08:38 PM.
def test_parse_advisory_multi():
# record = {
# "name": "AVG-2781",
# "packages": ["python-pyjwt"],
# "status": "Unknown",
# "severity": "Unknown",
# "type": "unknown",
# "affected": "2.3.0-1",
# "fixed": "2.4.0-1",
# "ticket": None,
# "issues": ["CVE-2022-29217"],
# "advisories": [],
# }

record_list = [
{
"name": "AVG-2781",
Expand Down Expand Up @@ -175,4 +160,5 @@ def test_parse_advisory_multi():
},
]

assert archlinux.ArchlinuxImporter().parse_advisory(record_list)
for record in record_list:
assert archlinux.ArchlinuxImporter().parse_advisory(record)

0 comments on commit 8d5ca76

Please sign in to comment.