Skip to content

Commit

Permalink
Begin work on parse_advisory() method, add initial tests #597
Browse files Browse the repository at this point in the history
Reference: #597

Signed-off-by: John M. Horan <johnmhoran@gmail.com>
  • Loading branch information
johnmhoran committed Oct 3, 2022
1 parent 4e6c4f0 commit e762561
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 17 deletions.
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from vulnerabilities.importers import pypa
from vulnerabilities.importers import pysec
from vulnerabilities.importers import redhat
from vulnerabilities.importers import archlinux

IMPORTERS_REGISTRY = [
nginx.NginxImporter,
Expand All @@ -29,6 +30,7 @@
debian.DebianImporter,
gitlab.GitLabAPIImporter,
pypa.PyPaImporter,
archlinux.ArchlinuxImporter,
]

IMPORTERS_REGISTRY = {x.qualified_name: x for x in IMPORTERS_REGISTRY}
75 changes: 58 additions & 17 deletions vulnerabilities/importers/archlinux.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,67 @@
from urllib.request import urlopen

from packageurl import PackageURL
import pprint

from vulnerabilities import severity_systems
from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import Importer
from vulnerabilities.importer import Reference
from vulnerabilities.importer import VulnerabilitySeverity

# 9/28/2022 Wednesday 12:58:46 PM. Do we need the next import? It's used at the bottom!
from vulnerabilities.models import Advisory

# 9/28/2022 Wednesday 1:04:27 PM. From /home/jmh/dev/nexb/vulnerablecode/vulnerabilities/importers/alpine_linux.py
# copy fetch_response function to vulnerabilities.utils then import here and use below
from vulnerabilities.utils import fetch_response
from vulnerabilities.utils import nearest_patched_package


# Take a URL -> Grab the data from the URL -> Map it according to AdvisoryData


class ArchlinuxImporter(Importer):
def __enter__(self):
self._api_response = self._fetch()
# def __enter__(self):
# self._api_response = self._fetch()

def updated_advisories(self) -> Set[AdvisoryData]:
advisories = []
# def updated_advisories(self) -> Set[AdvisoryData]:
# advisories = []

for record in self._api_response:
advisories.extend(self._parse(record))
# for record in self._api_response:
# advisories.extend(self._parse(record))

return self.batch_advisories(advisories)
# return self.batch_advisories(advisories)

def _fetch(self) -> Iterable[Mapping]:
with urlopen(self.config.archlinux_tracker_url) as response:
return json.load(response)
# def _fetch(self) -> Iterable[Mapping]:
# with urlopen(self.config.archlinux_tracker_url) as response:
# return json.load(response)

def _parse(self, record) -> List[AdvisoryData]:
advisories = []
url = "https://security.archlinux.org/json"
spdx_license_expression = "unknown"

def fetch(self) -> Iterable[Mapping]:
response = fetch_response(self.url)
return response.json()

def advisory_data(self) -> Iterable[AdvisoryData]:
for record in self.fetch():
yield self.parse_advisory(record)

# def _parse(self, record) -> List[AdvisoryData]:
def parse_advisory(self, record) -> List[AdvisoryData]:
advisories = []
aliases = record["issues"]
for cve_id in record["issues"]:
affected_packages = []
for name in record["packages"]:
impacted_purls, resolved_purls = [], []
impacted_purls.append(
PackageURL(
name=name,
type="pacman",
# type="pacman",
# type="alpm",
type="archlinux",
namespace="archlinux",
version=record["affected"],
)
Expand All @@ -60,7 +85,9 @@ def _parse(self, record) -> List[AdvisoryData]:
resolved_purls.append(
PackageURL(
name=name,
type="pacman",
# type="pacman",
# type="alpm",
type="archlinux",
namespace="archlinux",
version=record["fixed"],
)
Expand Down Expand Up @@ -89,12 +116,26 @@ def _parse(self, record) -> List[AdvisoryData]:
)

advisories.append(
Advisory(
vulnerability_id=cve_id,
summary="",
# Advisory(
AdvisoryData(
# deprecated
# vulnerability_id=cve_id,
aliases=[cve_id],
# summary="",
affected_packages=affected_packages,
references=references,
)
)

print("\rHello World!\r")

print("\radvisories = {}\r".format(advisories))

for apple in advisories:
# pprint.pprint(apple.to_dict())
print(apple.affected_packages)
print(f"aliases: {apple.aliases}")
print(f"summary: {apple.summary}")
print(f"affected_packages: {apple.affected_packages}")

return advisories
76 changes: 76 additions & 0 deletions vulnerabilities/tests/test_archlinux.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from vulnerabilities import models
from vulnerabilities.import_runner import ImportRunner
from vulnerabilities.importers import archlinux

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
TEST_DATA = os.path.join(BASE_DIR, "test_data/")
Expand Down Expand Up @@ -100,3 +101,78 @@ def assert_for_package(self, name, version, cve_ids=None):

if cve_ids:
assert cve_ids == {v.vulnerability_id for v in qs[0].vulnerabilities.all()}


# 9/28/2022 Wednesday 12:45:59 PM. https://security.archlinux.org/json -- view in Firefox, copy, paste, change null to None
def test_parse_advisory():
record = {
"name": "AVG-2781",
"packages": ["python-pyjwt"],
"status": "Unknown",
"severity": "Unknown",
"type": "unknown",
"affected": "2.3.0-1",
"fixed": "2.4.0-1",
"ticket": None,
"issues": ["CVE-2022-29217"],
"advisories": [],
}

assert archlinux.ArchlinuxImporter().parse_advisory(record)


# 9/29/2022 Thursday 9:08:38 PM.
def test_parse_advisory_multi():
# record = {
# "name": "AVG-2781",
# "packages": ["python-pyjwt"],
# "status": "Unknown",
# "severity": "Unknown",
# "type": "unknown",
# "affected": "2.3.0-1",
# "fixed": "2.4.0-1",
# "ticket": None,
# "issues": ["CVE-2022-29217"],
# "advisories": [],
# }

record_list = [
{
"name": "AVG-2781",
"packages": ["python-pyjwt"],
"status": "Unknown",
"severity": "Unknown",
"type": "unknown",
"affected": "2.3.0-1",
"fixed": "2.4.0-1",
"ticket": None,
"issues": ["CVE-2022-29217"],
"advisories": [],
},
{
"name": "AVG-2780",
"packages": ["wpewebkit"],
"status": "Unknown",
"severity": "Unknown",
"type": "unknown",
"affected": "2.36.3-1",
"fixed": "2.36.4-1",
"ticket": None,
"issues": ["CVE-2022-26710", "CVE-2022-22677", "CVE-2022-22662"],
"advisories": [],
},
{
"name": "AVG-4",
"packages": ["bzip2"],
"status": "Fixed",
"severity": "Low",
"type": "denial of service",
"affected": "1.0.6-5",
"fixed": "1.0.6-6",
"ticket": None,
"issues": ["CVE-2016-3189"],
"advisories": ["ASA-201702-19"],
},
]

assert archlinux.ArchlinuxImporter().parse_advisory(record_list)
12 changes: 12 additions & 0 deletions vulnerabilities/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,3 +410,15 @@ def base32_custom(btes):
+ _base32_table[c & 0x3FF] # bits 21 - 30 # bits 31 - 40
)
return bytes(encoded)


# 9/28/2022 Wednesday 1:07:41 PM.Copy from /home/jmh/dev/nexb/vulnerablecode/vulnerabilities/importers/alpine_linux.py
# for use in /home/jmh/dev/nexb/vulnerablecode/vulnerabilities/importers/archlinux.py
def fetch_response(url):
"""
Fetch and return `response` from the `url`
"""
response = requests.get(url)
if response.status_code == 200:
return response
raise Exception(f"Failed to fetch data from {url!r} with status code: {response.status_code!r}")

0 comments on commit e762561

Please sign in to comment.