Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support query using CVE in VulnTotal #1160

Merged
merged 6 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions vulntotal/datasources/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from urllib.parse import quote

import requests
from packageurl import PackageURL

from vulntotal.validator import DataSource
from vulntotal.validator import VendorData
Expand Down Expand Up @@ -41,7 +42,7 @@ def datasource_advisory(self, purl) -> Iterable[VendorData]:
fetched_advisory = self.fetch_json_response(advisory_payload)
self._raw_dump.append(fetched_advisory)
if fetched_advisory:
return parse_advisory(fetched_advisory)
return parse_advisory(fetched_advisory, purl)

@classmethod
def supported_ecosystem(cls):
Expand All @@ -56,11 +57,12 @@ def supported_ecosystem(cls):
}


def parse_advisory(advisory) -> Iterable[VendorData]:
def parse_advisory(advisory, purl) -> Iterable[VendorData]:
package = advisory["packages"][0]
affected_versions = [event["version"] for event in package["versionsAffected"]]
fixed_versions = [event["version"] for event in package["versionsUnaffected"]]
yield VendorData(
purl=PackageURL(purl.type, purl.namespace, purl.name),
aliases=sorted(set(advisory["aliases"])),
affected_versions=sorted(set(affected_versions)),
fixed_versions=sorted(set(fixed_versions)),
Expand Down
242 changes: 215 additions & 27 deletions vulntotal/datasources/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
from typing import Iterable

from dotenv import load_dotenv
from packageurl import PackageURL

from vulnerabilities import utils
from vulntotal.validator import DataSource
from vulntotal.validator import InvalidCVEError
from vulntotal.validator import VendorData
from vulntotal.vulntotal_utils import get_item
from vulntotal.vulntotal_utils import github_constraints_satisfied
Expand All @@ -27,7 +29,7 @@ class GithubDataSource(DataSource):

def fetch_github(self, graphql_query):
"""
Requires GitHub API key in .env file
Requires GitHub API key in .env file.
For example::

GH_TOKEN="your-github-token"
Expand All @@ -39,15 +41,36 @@ def datasource_advisory(self, purl) -> Iterable[VendorData]:
end_cursor = ""
interesting_edges = []
while True:
queryset = generate_graphql_payload(purl, end_cursor)
queryset = generate_graphql_payload_from_purl(purl, end_cursor)
response = self.fetch_github(queryset)
self._raw_dump.append(response)
security_advisories = get_item(response, "data", "securityVulnerabilities")
interesting_edges.extend(extract_interesting_edge(security_advisories["edges"], purl))
end_cursor = get_item(security_advisories, "pageInfo", "endCursor")
if not security_advisories["pageInfo"]["hasNextPage"]:
break
return parse_advisory(interesting_edges)
return parse_advisory(interesting_edges, purl)

def datasource_advisory_from_cve(self, cve: str) -> Iterable[VendorData]:
if not cve.upper().startswith("CVE-"):
raise InvalidCVEError

queryset = generate_graphql_payload_from_cve(cve)
response = self.fetch_github(queryset)
self._raw_dump = [response]
grouped_advisory = group_advisory_by_package(response, cve)

for advisory in grouped_advisory:
ecosystem = get_item(advisory, "package", "ecosystem")
ecosystem = get_purl_type(ecosystem)
package_name = get_item(advisory, "package", "name")
purl = PackageURL.from_string(f"pkg:{ecosystem}/{package_name}")
yield VendorData(
purl=purl,
aliases=sorted(list(set(advisory.get("identifiers", None)))),
affected_versions=sorted(list(set(advisory.get("firstPatchedVersion", None)))),
fixed_versions=sorted(list(set(advisory.get("vulnerableVersionRange", None)))),
)

@classmethod
def supported_ecosystem(cls):
Expand All @@ -61,17 +84,29 @@ def supported_ecosystem(cls):
"cargo": "RUST",
"npm": "NPM",
"hex": "ERLANG",
"pub": "PUB",
keshav-space marked this conversation as resolved.
Show resolved Hide resolved
}


def parse_advisory(interesting_edges) -> Iterable[VendorData]:
def parse_advisory(interesting_edges, purl) -> Iterable[VendorData]:
"""
Parse the GraphQL response and yield VendorData instances.

Parameters:
interesting_edges (list): List of edges containing security advisory.
purl (PackageURL): PURL to be included in VendorData.

Yields:
VendorData instance containing purl, aliases, affected_versions and fixed_versions.
"""
for edge in interesting_edges:
node = edge["node"]
aliases = [aliase["value"] for aliase in get_item(node, "advisory", "identifiers")]
affected_versions = node["vulnerableVersionRange"].strip().replace(" ", "").split(",")
parsed_fixed_versions = get_item(node, "firstPatchedVersion", "identifier")
fixed_versions = [parsed_fixed_versions] if parsed_fixed_versions else []
yield VendorData(
purl=PackageURL(purl.type, purl.namespace, purl.name),
aliases=sorted(list(set(aliases))),
affected_versions=sorted(list(set(affected_versions))),
fixed_versions=sorted(list(set(fixed_versions))),
Expand All @@ -86,39 +121,49 @@ def extract_interesting_edge(edges, purl):
return interesting_edges


def generate_graphql_payload(purl, end_cursor):
def generate_graphql_payload_from_purl(purl, end_cursor=""):
"""
Generate a GraphQL payload for querying security vulnerabilities related to a PURL.

Parameters:
purl (PackageURL): The PURL to search for vulnerabilities.
end_cursor (str): An optional end cursor to use for pagination.

Returns:
dict: A dictionary containing the GraphQL query string with ecosystem and package.
"""
GRAPHQL_QUERY_TEMPLATE = """
query{
securityVulnerabilities(first: 100, ecosystem: %s, package: "%s", %s){
edges {
node {
advisory {
identifiers {
type
value
node {
advisory {
identifiers {
type
value
}
summary
references {
url
}
severity
publishedAt
}
summary
references {
url
firstPatchedVersion{
identifier
}
severity
publishedAt
}
firstPatchedVersion{
identifier
}
package {
name
package {
name
}
vulnerableVersionRange
}
vulnerableVersionRange
}
}
pageInfo {
hasNextPage
endCursor
pageInfo {
hasNextPage
endCursor
}
}
}
}
"""

supported_ecosystem = GithubDataSource.supported_ecosystem()
Expand Down Expand Up @@ -149,3 +194,146 @@ def generate_graphql_payload(purl, end_cursor):
package_name = f"{purl.namespace}/{purl.name}"

return {"query": GRAPHQL_QUERY_TEMPLATE % (ecosystem, package_name, end_cursor_exp)}


def generate_graphql_payload_from_cve(cve: str):
"""
Generate a GraphQL payload for querying security advisories related to a CVE.

Parameters:
- cve (str): CVE identifier string to search for.

Returns:
- dict: Dictionary containing the GraphQL query string with the CVE identifier substituted in.
"""
GRAPHQL_QUERY_TEMPLATE = """
query {
securityAdvisories(first: 100, identifier: { type: CVE, value: "%s" }) {
nodes {
vulnerabilities(first: 100) {
nodes {
package {
ecosystem
name
}
advisory {
identifiers {
type
value
}
}
firstPatchedVersion {
identifier
}
vulnerableVersionRange
}
}
}
}
}
"""
return {"query": GRAPHQL_QUERY_TEMPLATE % (cve)}


def get_purl_type(github_ecosystem):
"""
Return the corresponding purl type for a given GitHub ecosystem string.

Parameters:
github_ecosystem (str): The GitHub ecosystem string.

Returns:
str or None: The corresponding purl type string, or None if the ecosystem is not supported.
"""
ecosystems = GithubDataSource.supported_ecosystem()
for key, val in ecosystems.items():
if val == github_ecosystem.upper():
return key.lower()
return None


def group_advisory_by_package(advisories_dict, cve):
"""
Extract security advisory information from a dictionary and groups them by package.

Parameters:
advisories_dict (dict): Dictionary containing security advisory. The dictionary
should have the following structure:
{
"data":{
"securityAdvisories":{
"nodes":[
{
"vulnerabilities":{
"nodes":[
{
"package": {
"ecosystem": str,
"name": str
},
"advisory":{
"identifiers":[
{ "value": str },
...
]
},
"firstPatchedVersion":{
"identifier": str
},
"vulnerableVersionRange": str
},
...
]
}
},
...
]
}
}
}

cve (str): Used for filtering out advisory non maching CVEs.

Returns:
list: List of dict containing advisory for package. Each dict
in the list represents advisory for a package and has the following keys:

package (dict): Dict containing ecosystem and package name.
identifiers (list of str): List of identifiers CVE and GHSA.
firstPatchedVersion (list of str): List of first patched versions.
vulnerableVersionRange (list of str): List of vulnerable version ranges.
"""
advisories = advisories_dict["data"]["securityAdvisories"]["nodes"]
output = []

for advisory in advisories:
for vulnerability in advisory["vulnerabilities"]["nodes"]:
package = vulnerability["package"]
advisory_ids = [
identifier["value"] for identifier in vulnerability["advisory"]["identifiers"]
]

# Skip advisory if required CVE is not present in advisory.
# GraphQL query for `CVE-2022-2922` may also include advisory for `CVE-2022-29221`
# `CVE-2022-29222` and `CVE-2022-29229`
if cve not in advisory_ids:
continue
first_patched_version = vulnerability["firstPatchedVersion"]["identifier"]
vulnerable_version_range = vulnerability["vulnerableVersionRange"]

# Check if a vulnerability for the same package is already in the output list
existing_vulnerability = next((v for v in output if v["package"] == package), None)
if existing_vulnerability:
existing_vulnerability["identifiers"] += advisory_ids
existing_vulnerability["firstPatchedVersion"].append(first_patched_version)
existing_vulnerability["vulnerableVersionRange"].append(vulnerable_version_range)
else:
output.append(
{
"package": package,
"identifiers": advisory_ids,
"firstPatchedVersion": [first_patched_version],
"vulnerableVersionRange": [vulnerable_version_range],
}
)
return output
7 changes: 5 additions & 2 deletions vulntotal/datasources/gitlab.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import requests
import saneyaml
from fetchcode import fetch
from packageurl import PackageURL

from vulntotal.validator import DataSource
from vulntotal.validator import VendorData
Expand All @@ -39,7 +40,7 @@ def datasource_advisory(self, purl) -> Iterable[VendorData]:
location = download_subtree(casesensitive_package_slug)
if location:
interesting_advisories = parse_interesting_advisories(
location, purl.version, delete_download=True
location, purl, delete_download=True
)
return interesting_advisories
clear_download(location)
Expand Down Expand Up @@ -151,7 +152,8 @@ def get_casesensitive_slug(path, package_slug):
hasnext = paginated_tree["pageInfo"]["hasNextPage"]


def parse_interesting_advisories(location, version, delete_download=False) -> Iterable[VendorData]:
def parse_interesting_advisories(location, purl, delete_download=False) -> Iterable[VendorData]:
version = purl.version
path = Path(location)
glob = "**/*.yml"
files = (p for p in path.glob(glob) if p.is_file())
Expand All @@ -161,6 +163,7 @@ def parse_interesting_advisories(location, version, delete_download=False) -> It
affected_range = gitlab_advisory["affected_range"]
if gitlab_constraints_satisfied(affected_range, version):
yield VendorData(
purl=PackageURL(purl.type, purl.namespace, purl.name),
aliases=gitlab_advisory["identifiers"],
affected_versions=[affected_range],
fixed_versions=gitlab_advisory["fixed_versions"],
Expand Down
Loading