Skip to content

Commit

Permalink
Work on alias/CVE loop #1079
Browse files Browse the repository at this point in the history
Reference: #1079

Signed-off-by: John M. Horan <johnmhoran@gmail.com>
  • Loading branch information
johnmhoran committed Jan 27, 2023
1 parent 69e5115 commit 44aeb6a
Show file tree
Hide file tree
Showing 3 changed files with 11,778 additions and 582 deletions.
173 changes: 127 additions & 46 deletions vulnerabilities/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,83 @@ def advisory_data(self) -> List[AdvisoryData]:
)
continue

# def get_data_from_xml_doc(
# self, xml_doc: ET.ElementTree, pkg_metadata={}
# ) -> Iterable[AdvisoryData]:
# """
# The orchestration method of the OvalDataSource. This method breaks an
# OVAL xml ElementTree into a list of `Advisory`.

# Note: pkg_metadata is a mapping of Package URL data that MUST INCLUDE
# "type" key.

# Example value of pkg_metadata:
# {"type":"deb","qualifiers":{"distro":"buster"} }
# """
# oval_parsed_data = OvalParser(self.translations, xml_doc)
# raw_data = oval_parsed_data.get_data()
# oval_doc = oval_parsed_data.oval_document
# timestamp = oval_doc.getGenerator().getTimestamp()

# print("\noval_parsed_data = {}\n".format(oval_parsed_data))
# print("\nraw_data = {}\n".format(raw_data))

# # convert definition_data to Advisory objects
# for definition_data in raw_data:
# print("\ndefinition_data = {}\n".format(definition_data))
# # These fields are definition level, i.e common for all elements
# # connected/linked to an OvalDefinition

# # TODO: 2023-01-24 Tuesday 22:34:20. Is this where we'd loop through the list of CVEs/aliases?

# vuln_id = definition_data["vuln_id"]
# description = definition_data["description"]

# severities = []
# severity = definition_data.get("severity")
# if severity:
# severities.append(
# VulnerabilitySeverity(system=severity_systems.GENERIC, value=severity)
# )
# references = [
# Reference(url=url, severities=severities)
# for url in definition_data["reference_urls"]
# ]
# affected_packages = []
# print('\ndefinition_data["test_data"] = {}\n'.format(definition_data["test_data"]))
# for test_data in definition_data["test_data"]:
# print("\ntest_data['package_list'] = {}\n".format(test_data["package_list"]))
# for package_name in test_data["package_list"]:
# affected_version_range = test_data["version_ranges"]
# vrc = RANGE_CLASS_BY_SCHEMES[pkg_metadata["type"]]
# if affected_version_range:
# try:
# affected_version_range = vrc.from_native(affected_version_range)
# except Exception as e:
# logger.error(
# f"Failed to parse version range {affected_version_range!r} "
# f"for package {package_name!r}:\n{e}"
# )
# continue
# if package_name:
# affected_packages.append(
# AffectedPackage(
# package=self.create_purl(package_name, pkg_metadata),
# affected_version_range=affected_version_range,
# )
# )
# print("affected_packages = {}".format(affected_packages))
# date_published = dateparser.parse(timestamp)
# if not date_published.tzinfo:
# date_published = date_published.replace(tzinfo=pytz.UTC)
# yield AdvisoryData(
# aliases=[vuln_id],
# summary=description,
# affected_packages=sorted(affected_packages),
# references=sorted(references),
# date_published=date_published,
# )

def get_data_from_xml_doc(
self, xml_doc: ET.ElementTree, pkg_metadata={}
) -> Iterable[AdvisoryData]:
Expand Down Expand Up @@ -450,50 +527,54 @@ def get_data_from_xml_doc(

# TODO: 2023-01-24 Tuesday 22:34:20. Is this where we'd loop through the list of CVEs/aliases?

vuln_id = definition_data["vuln_id"]
description = definition_data["description"]

severities = []
severity = definition_data.get("severity")
if severity:
severities.append(
VulnerabilitySeverity(system=severity_systems.GENERIC, value=severity)
)
references = [
Reference(url=url, severities=severities)
for url in definition_data["reference_urls"]
]
affected_packages = []
print('\ndefinition_data["test_data"] = {}\n'.format(definition_data["test_data"]))
for test_data in definition_data["test_data"]:
print("\ntest_data['package_list'] = {}\n".format(test_data["package_list"]))
for package_name in test_data["package_list"]:
affected_version_range = test_data["version_ranges"]
vrc = RANGE_CLASS_BY_SCHEMES[pkg_metadata["type"]]
if affected_version_range:
try:
affected_version_range = vrc.from_native(affected_version_range)
except Exception as e:
logger.error(
f"Failed to parse version range {affected_version_range!r} "
f"for package {package_name!r}:\n{e}"
vuln_id_list = definition_data["vuln_id"]

for vuln_id_item in vuln_id_list:
# vuln_id = definition_data["vuln_id"]
vuln_id = vuln_id_item
description = definition_data["description"]

severities = []
severity = definition_data.get("severity")
if severity:
severities.append(
VulnerabilitySeverity(system=severity_systems.GENERIC, value=severity)
)
references = [
Reference(url=url, severities=severities)
for url in definition_data["reference_urls"]
]
affected_packages = []
print('\ndefinition_data["test_data"] = {}\n'.format(definition_data["test_data"]))
for test_data in definition_data["test_data"]:
print("\ntest_data['package_list'] = {}\n".format(test_data["package_list"]))
for package_name in test_data["package_list"]:
affected_version_range = test_data["version_ranges"]
vrc = RANGE_CLASS_BY_SCHEMES[pkg_metadata["type"]]
if affected_version_range:
try:
affected_version_range = vrc.from_native(affected_version_range)
except Exception as e:
logger.error(
f"Failed to parse version range {affected_version_range!r} "
f"for package {package_name!r}:\n{e}"
)
continue
if package_name:
affected_packages.append(
AffectedPackage(
package=self.create_purl(package_name, pkg_metadata),
affected_version_range=affected_version_range,
)
)
continue
if package_name:
affected_packages.append(
AffectedPackage(
package=self.create_purl(package_name, pkg_metadata),
affected_version_range=affected_version_range,
)
)
print("affected_packages = {}".format(affected_packages))
date_published = dateparser.parse(timestamp)
if not date_published.tzinfo:
date_published = date_published.replace(tzinfo=pytz.UTC)
yield AdvisoryData(
aliases=[vuln_id],
summary=description,
affected_packages=sorted(affected_packages),
references=sorted(references),
date_published=date_published,
)
print("affected_packages = {}".format(affected_packages))
date_published = dateparser.parse(timestamp)
if not date_published.tzinfo:
date_published = date_published.replace(tzinfo=pytz.UTC)
yield AdvisoryData(
aliases=[vuln_id],
summary=description,
affected_packages=sorted(affected_packages),
references=sorted(references),
date_published=date_published,
)
41 changes: 30 additions & 11 deletions vulnerabilities/oval_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def get_data(self) -> List[Dict]:
Return a list of OvalDefinition mappings.
"""
oval_data = []
# print(len(self.all_definitions))
print("\nlen(self.all_definitions) = {}\n".format(len(self.all_definitions)))
for definition in self.all_definitions:
# print(definition)
Expand All @@ -54,20 +53,28 @@ def get_data(self) -> List[Dict]:

definition_data["severity"] = self.get_severity_from_definition(definition)
print("\nlen(matching_tests) = {}\n".format(len(matching_tests)))
print("\nmatching_tests = {}\n".format(matching_tests))
for test in matching_tests:
print("\ntest = {}\n".format(test))
test_obj, test_state = self.get_object_state_of_test(test)
if not test_obj or not test_state:
continue
test_data = {"package_list": []}
print(test_obj)
print("\ntest_obj = {}\n".format(test_obj))
test_data["package_list"].extend(self.get_pkgs_from_obj(test_obj))
print(self.get_pkgs_from_obj(test_obj))
print(
"\nself.get_pkgs_from_obj(test_obj) = {}\n".format(
self.get_pkgs_from_obj(test_obj)
)
)
version_ranges = self.get_version_range_from_state(test_state)
test_data["version_ranges"] = version_ranges
definition_data["test_data"].append(test_data)

oval_data.append(definition_data)

# print('\ntest_data["package_list"] = {}\n'.format(test_data["package_list"]))

return oval_data

def get_tests_of_definition(self, definition: OvalDefinition) -> List[OvalTest]:
Expand Down Expand Up @@ -185,15 +192,27 @@ def get_severity_from_definition(definition: OvalDefinition) -> Set[str]:

@staticmethod
def get_vuln_id_from_definition(definition):
# SUSE and Ubuntu OVAL files will get cves via this loop
# # SUSE and Ubuntu OVAL files will get cves via this loop
# for child in definition.element.iter():
# # if child.get("ref_id"):
# # return child.get("ref_id")
# # Must also check whether 'source' field exists and value is 'CVE'
# # TODO: what if there are multiple elements that satisfy the condition?
# # Add to list and report as separate AdvisoryData() objects?
# if child.get("ref_id") and child.get("source"):
# if child.get("source") == "CVE":
# return child.get("ref_id")
# # Debian OVAL files will get cves via this
# return definition.getMetadata().getTitle()
# ========================================================
cve_list = []
for child in definition.element.iter():
# if child.get("ref_id"):
# return child.get("ref_id")
# Must also check whether 'source' field exists and value is 'CVE'
# TODO: what if there are multiple elements that satisfy the condition?
# Add to list and report as separate AdvisoryData() objects?
if child.get("ref_id") and child.get("source"):
if child.get("source") == "CVE":
return child.get("ref_id")
cve_list.append(child.get("ref_id"))

# Debian OVAL files will get cves via this
return definition.getMetadata().getTitle()
if len(cve_list) == 0:
cve_list.append(definition.getMetadata().getTitle())

return cve_list
Loading

0 comments on commit 44aeb6a

Please sign in to comment.