Skip to content

Commit

Permalink
Refactor according to first review
Browse files Browse the repository at this point in the history
Use batch_advisories for now. It has it's own problems and there's aboutcode-org#338 for that.
The generator thing won't do much, since we are importing like 10-20 MBs of data.

The codebase already has overuse of methods starting with _ , I'd say avoid them.
They don't help with readability nor are they trivial in this case

_parse_md and _parse_yml:
The name is misleading. The function is parsing + enriching the data.
converted to get_advisories_from_yml and get_advisories_from_md

Remove `"branch": None` in importer_yielder

Group imports

Signed-off-by: Hritik Vijay <hritikxx8@gmail.com>
  • Loading branch information
Hritik14 committed Apr 2, 2021
1 parent d1f8315 commit 7aad267
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 75 deletions.
1 change: 0 additions & 1 deletion vulnerabilities/importer_yielder.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@
"last_run": None,
"data_source": "MozillaDataSource",
"data_source_cfg": {
"branch": None,
"repository_url": "https://github.com/mozilla/foundation-security-advisories",
},
},
Expand Down
100 changes: 26 additions & 74 deletions vulnerabilities/importers/mozilla.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
from typing import Set, List, Generator

import re
import asyncio
from bs4 import BeautifulSoup
from packageurl import PackageURL
import requests
from typing import List
from typing import Set

import yaml
from bs4 import BeautifulSoup
from markdown import markdown
from packageurl import PackageURL

from vulnerabilities.data_source import Advisory
from vulnerabilities.data_source import GitDataSource
from vulnerabilities.data_source import Reference
from vulnerabilities.data_source import VulnerabilitySeverity
from vulnerabilities.severity_systems import scoring_systems
from vulnerabilities.package_managers import GitHubTagsAPI
from vulnerabilities.helpers import is_cve
from vulnerabilities.helpers import split_markdown_front_matter
from vulnerabilities.severity_systems import scoring_systems


REPOSITORY = "mozilla/foundation-security-advisories"
Expand All @@ -31,61 +29,40 @@ def __enter__(self):
recursive=True, subdir="announce"
)

# Do we need this ?
# self.version_api = GitHubTagsAPI()
# self.set_api()

def set_api(self):
repository = "/".join(self.config.repository_url.split("/")[-2:])
asyncio.run(self.version_api.load_api([repository]))

def added_advisories(self) -> Set[Advisory]:
return self._load_advisories(self._added_files)

def updated_advisories(self) -> Set[Advisory]:
return self._load_advisories(self._updated_files)

def _load_advisories(self, files) -> Set[Advisory]:
"""
Yields list of advisories of batch size
"""
files = self._updated_files.union(self._added_files)
files = [
f for f in files if f.endswith(".md") or f.endswith(".yml")
] # skip irrelevant files

advisories = []
for path in files:
for advisory in self._to_advisories(path):
advisories.append(advisory)
if len(advisories) >= self.batch_size:
yield advisories
advisories = []
advisories.extend(self.to_advisories(path))

# In case batch size is too high
yield advisories
return self.batch_advisories(advisories)

def _to_advisories(self, path: str) -> List[Advisory]:
def to_advisories(self, path: str) -> List[Advisory]:
"""
Convert a file to corresponding advisories.
This calls proper method to handle yml/md files.
"""
mfsa_id = self._mfsa_id_from_filename(path)
mfsa_id = self.mfsa_id_from_filename(path)

with open(path) as lines:
if path.endswith(".md"):
return self._parse_md(mfsa_id, lines)
elif path.endswith(".yml"):
return self._parse_yml(mfsa_id, lines)
return self.get_advisories_from_md(mfsa_id, lines)
if path.endswith(".yml"):
return self.get_advisories_from_yml(mfsa_id, lines)

return []

def _parse_yml(self, mfsa_id, lines) -> List[Advisory]:
def get_advisories_from_yml(self, mfsa_id, lines) -> List[Advisory]:
advisories = []
data = yaml.safe_load(lines)
data["mfsa_id"] = mfsa_id

fixed_package_urls = self._get_package_urls(data.get("fixed_in"))
references = self._get_references(data)
fixed_package_urls = self.get_package_urls(data.get("fixed_in"))
references = self.get_references(data)

if not data.get("advisories"):
return []
Expand All @@ -103,16 +80,15 @@ def _parse_yml(self, mfsa_id, lines) -> List[Advisory]:

return advisories

def _parse_md(self, mfsa_id, lines) -> List[Advisory]:
yamltext, mdtext = self._parse_md_front_matter(lines)

def get_advisories_from_md(self, mfsa_id, lines) -> List[Advisory]:
yamltext, mdtext = split_markdown_front_matter(lines)
data = yaml.safe_load(yamltext)
data["mfsa_id"] = mfsa_id

fixed_package_urls = self._get_package_urls(data.get("fixed_in"))
references = self._get_references(data)
fixed_package_urls = self.get_package_urls(data.get("fixed_in"))
references = self.get_references(data)

description = self._html_get_p_under_h3(markdown(mdtext), "description")
description = self.html_get_p_under_h3(markdown(mdtext), "description")

# FIXME: add references from md ? They lack a proper reference id and are mostly bug reports

Expand All @@ -126,7 +102,7 @@ def _parse_md(self, mfsa_id, lines) -> List[Advisory]:
)
]

def _html_get_p_under_h3(self, html, h3: str):
def html_get_p_under_h3(self, html, h3: str):
soup = BeautifulSoup(html, features="lxml")
h3tag = soup.find("h3", text=lambda txt: txt.lower() == h3)
p = ""
Expand All @@ -138,38 +114,14 @@ def _html_get_p_under_h3(self, html, h3: str):
p += tag.get_text()
return p

def _parse_md_front_matter(self, lines):
"""
Return the YAML and MD sections.
:param: lines iterator
:return: str YAML, str Markdown
"""
# fm_count: 0: init, 1: in YAML, 2: in Markdown
fm_count = 0
yaml_lines = []
md_lines = []
for line in lines:
# first line we care about is FM start
if fm_count < 2 and line.strip() == "---":
fm_count += 1
continue

if fm_count == 1:
yaml_lines.append(line)

if fm_count == 2:
md_lines.append(line)

return "".join(yaml_lines), "".join(md_lines)

def _mfsa_id_from_filename(self, filename):
def mfsa_id_from_filename(self, filename):
match = MFSA_FILENAME_RE.search(filename)
if match:
return "mfsa" + match.group(1)

return None

def _get_package_urls(self, pkgs: List[str]) -> List[PackageURL]:
def get_package_urls(self, pkgs: List[str]) -> List[PackageURL]:
package_urls = [
PackageURL(
type="mozilla",
Expand All @@ -182,7 +134,7 @@ def _get_package_urls(self, pkgs: List[str]) -> List[PackageURL]:
]
return package_urls

def _get_references(self, data: any) -> List[Reference]:
def get_references(self, data: any) -> List[Reference]:
"""
Returns a list of references
Currently only considers the given mfsa as a reference
Expand Down

0 comments on commit 7aad267

Please sign in to comment.