Skip to content

Commit

Permalink
feat: Support NVD 2.0 API (Fixes intel#1872) (intel#2330)
Browse files Browse the repository at this point in the history
* Fixes intel#1872
  • Loading branch information
anthonyharrison authored Nov 9, 2022
1 parent 654706e commit 8b22d3b
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 29 deletions.
1 change: 1 addition & 0 deletions .github/actions/spelling/expect.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ Interoperability
csvjsonconsolehtml
cyclonedx
jsonapi
jsonapiapi
lowmediumhighcritical
nowdailyneverlatest
rdf
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ Usage:
CVE Data Download:
Arguments related to data sources and Cache Configuration

-n {api,json}, --nvd {api,json}
-n {api,api2,json}, --nvd {api,api2,json}
choose method for getting CVE lists from NVD
-u {now,daily,never,latest}, --update {now,daily,never,latest}
update schedule for data sources and exploits database (default: daily)
Expand Down
2 changes: 1 addition & 1 deletion cve_bin_tool/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def main(argv=None):
"-n",
"--nvd",
action="store",
choices=["api", "json"],
choices=["api", "api2", "json"],
help="choose method for getting CVE lists from NVD",
default="api",
)
Expand Down
127 changes: 122 additions & 5 deletions cve_bin_tool/data_sources/nvd_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ async def get_cve_data(self):

if self.nvd_type == "api":
return self.format_data(self.all_cve_entries), self.source_name
elif self.nvd_type == "api2":
return self.format_data_api2(self.all_cve_entries), self.source_name
else:
severity_data = []
affected_data = []
Expand Down Expand Up @@ -193,6 +195,116 @@ def parse_node(self, node: dict[str, Any]) -> list[dict[str, str]]:
affects_list.append(affects)
return affects_list

def format_data_api2(self, all_cve_entries):
"""Format CVE data for CVEDB"""

cve_data = []
affects_data = []

for cve_item in all_cve_entries:
# the information we want:
# CVE ID, Severity, Score ->
# affected {Vendor(s), Product(s), Version(s)}

cve = {
"ID": cve_item["ID"],
"description": cve_item["descriptions"][0]["value"],
"severity": "unknown",
"score": "unknown",
"CVSS_version": "unknown",
"CVSS_vector": "unknown",
"last_modified": cve_item["lastModified"]
if cve_item.get("lastModified", None)
else cve_item["published"],
}
if cve["description"].startswith("** REJECT **"):
# Skip this CVE if it's marked as 'REJECT'
continue

# Get CVSSv3 or CVSSv2 score for output.
# Details are left as an exercise to the user.
if "cvssMetricV31" in cve_item["metrics"]:
cve["severity"] = cve_item["metrics"]["cvssMetricV31"]["cvssData"][
"baseSeverity"
]
cve["score"] = cve_item["metrics"]["cvssMetricV31"]["cvssData"][
"baseScore"
]
cve["CVSS_vector"] = cve_item["metrics"]["cvssMetricV31"]["cvssData"][
"vectorString"
]
cve["CVSS_version"] = 3
elif "cvssMetricV30" in cve_item["metrics"]:
cve["severity"] = cve_item["metrics"]["cvssMetricV30"]["cvssData"][
"baseSeverity"
]
cve["score"] = cve_item["metrics"]["cvssMetricV30"]["cvssData"][
"baseScore"
]
cve["CVSS_vector"] = cve_item["metrics"]["cvssMetricV30"]["cvssData"][
"vectorString"
]
cve["CVSS_version"] = 3
elif "cvssMetricV2" in cve_item["metrics"]:
cve["severity"] = cve_item["metrics"]["cvssMetricV2"]["cvssData"][
"baseSeverity"
]
cve["score"] = cve_item["metrics"]["cvssMetricV2"]["cvssData"][
"baseScore"
]
cve["CVSS_vector"] = cve_item["metrics"]["cvssMetricV2"]["cvssData"][
"vectorString"
]
cve["CVSS_version"] = 2
else:
LOGGER.info(f"Unknown metrics field {cve_item['ID']}")

cve_data.append(cve)

# walk the nodes with version data
# return list of versions
affects_list = []
if "configurations" in cve_item:
for node in cve_item["configurations"]["nodes"]:
affects_list.extend(self.parse_node_api2(node))
if "children" in node:
for child in node["children"]:
affects_list.extend(self.parse_node_api2(child))

for affects in affects_list:
affects["cve_id"] = cve["ID"]

affects_data.extend(affects_list)

return cve_data, affects_data

def parse_node_api2(self, node: dict[str, Any]) -> list[dict[str, str]]:
affects_list = []
if "cpeMatch" in node:
for cpe_match in node["cpeMatch"]:
cpe_split = cpe_match["criteria"].split(":")
affects = {
"vendor": cpe_split[3],
"product": cpe_split[4],
"version": cpe_split[5],
}

# if we have a range (e.g. version is *) fill it out, and put blanks where needed
range_fields = [
"versionStartIncluding",
"versionStartExcluding",
"versionEndIncluding",
"versionEndExcluding",
]
for field in range_fields:
if field in cpe_match:
affects[field] = cpe_match[field]
else:
affects[field] = self.RANGE_UNSET

affects_list.append(affects)
return affects_list

async def fetch_cves(self):
if not self.session:
connector = aiohttp.TCPConnector(limit_per_host=19)
Expand All @@ -201,15 +313,12 @@ async def fetch_cves(self):
)

tasks = []
if self.nvd_type == "api":
LOGGER.info("[Using NVD API]")
LOGGER.info("Getting NVD CVE data...")
LOGGER.info("Getting NVD CVE data...")
if self.nvd_type in ["api", "api2"]:
self.all_cve_entries = await asyncio.create_task(
self.nist_fetch_using_api(),
)

else:
LOGGER.info("Getting NVD CVE data...")
nvd_metadata = await asyncio.create_task(
self.nist_scrape(self.session),
)
Expand Down Expand Up @@ -245,11 +354,19 @@ async def nist_fetch_using_api(self) -> list:

db = cvedb.CVEDB()

if self.nvd_type == "api2":
LOGGER.info("[Using NVD API 2.0]")
api_version = "2.0"
else:
LOGGER.info("[Using NVD API]")
api_version = "1.0"

nvd_api = NVD_API(
logger=self.LOGGER,
error_mode=self.error_mode,
incremental_update=self.incremental_update,
api_key=self.nvd_api_key,
api_version=api_version,
)
if self.incremental_update:
await nvd_api.get_nvd_params(
Expand Down
84 changes: 66 additions & 18 deletions cve_bin_tool/nvd_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from cve_bin_tool.error_handler import ErrorMode, NVDKeyError, NVDServiceError
from cve_bin_tool.log import LOGGER

FEED = "https://services.nvd.nist.gov/rest/json/cves/1.0"
FEED = "https://services.nvd.nist.gov/rest/json/cves/"
NVD_CVE_STATUS = "https://nvd.nist.gov/rest/public/dashboard/statistics"

PAGESIZE = 2000
Expand All @@ -43,6 +43,7 @@ def __init__(
error_mode: ErrorMode = ErrorMode.TruncTrace,
incremental_update=False,
api_key: str = "",
api_version: str = "1.0",
):
self.logger = logger or LOGGER.getChild(self.__class__.__name__)
self.feed = feed
Expand All @@ -57,7 +58,10 @@ def __init__(
self.failed_count = 0
self.all_cve_entries: List = []
self.invalid_api = False
if api_key:
self.api_version = api_version
self.feed = f"{feed}{self.api_version}"
# Version 2.0 API doesn't work with API key
if api_key and self.api_version == 1.0:
self.params["apiKey"] = api_key

@staticmethod
Expand Down Expand Up @@ -96,6 +100,22 @@ def get_reject_count(fetched_data: Dict) -> int:
reject_count += 1
return reject_count

@staticmethod
def convert_date_to_nvd_date_api2(date: datetime) -> str:
"""Returns a datetime string of NVD recognized date format"""
utc_date = date.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S:%f")[:-3]
return f"{utc_date}"

@staticmethod
def get_reject_count_api2(fetched_data: Dict) -> int:
"""Returns total rejected CVE count"""
all_cve_list = fetched_data["vulnerabilities"]["cve"]
reject_count = 0
for cve_item in all_cve_list:
if cve_item["descriptions"][0]["value"].startswith("** REJECT **"):
reject_count += 1
return reject_count

async def get_nvd_params(
self,
time_of_last_update: Union[datetime, None] = None,
Expand All @@ -112,11 +132,10 @@ async def get_nvd_params(
aiohttp.ClientSession(connector=connector, trust_env=True)
)

self.logger.debug("Fetching metadata from NVD...")
self.logger.info("Fetching metadata from NVD...")
cve_count = await self.nvd_count_metadata(self.session)

if "apiKey" in self.params:
await self.validate_nvd_api()
await self.validate_nvd_api()

if self.invalid_api:
self.logger.warning(
Expand All @@ -125,16 +144,29 @@ async def get_nvd_params(
else:
if time_of_last_update:
# Fetch all the updated CVE entries from the modified date. Subtracting 2-minute offset for updating cve entries
self.params["modStartDate"] = self.convert_date_to_nvd_date(
time_of_last_update - timedelta(minutes=2)
)
self.params["modEndDate"] = self.convert_date_to_nvd_date(
datetime.now()
)
self.logger.info(
f'Fetching updated CVE entries after {self.params["modStartDate"]}'
)
self.params["includeMatchStringChange"] = json.dumps(True)
if self.api_version == "1.0":
self.params["modStartDate"] = self.convert_date_to_nvd_date(
time_of_last_update - timedelta(minutes=2)
)
self.params["modEndDate"] = self.convert_date_to_nvd_date(
datetime.now()
)
self.params["includeMatchStringChange"] = json.dumps(True)
self.logger.info(
f'Fetching updated CVE entries after {self.params["modStartDate"]}'
)
else:
self.params[
"lastModStartDate"
] = self.convert_date_to_nvd_date_api2(
time_of_last_update - timedelta(minutes=2)
)
self.params["lastModEndDate"] = self.convert_date_to_nvd_date_api2(
datetime.now()
)
self.logger.info(
f'Fetching updated CVE entries after {self.params["lastModStartDate"]}'
)
# Check modified strings inside CVEs as well
with Progress() as progress:
task = progress.add_task(
Expand Down Expand Up @@ -184,24 +216,40 @@ async def load_nvd_request(self, start_index):
fetched_data = None
while fetched_data is None:
try:
self.logger.debug(f"Send request {self.feed} {param_dict}")
async with await self.session.get(
self.feed,
params=param_dict,
raise_for_status=True,
) as response:
self.logger.debug(f"Response received {response.status}")
if response.status == 200:
fetched_data = await response.json()

if start_index == 0:
# Update total results in case there is discrepancy between NVD dashboard and API
reject_count = self.get_reject_count(fetched_data)
reject_count = (
self.get_reject_count(fetched_data)
if self.api_version == "1.0"
else self.get_reject_count_api2(fetched_data)
)
self.total_results = (
fetched_data["totalResults"] - reject_count
)
self.all_cve_entries.extend(fetched_data["result"]["CVE_Items"])
if self.api_version == "1.0":
self.all_cve_entries.extend(
fetched_data["result"]["CVE_Items"]
)
else:
self.all_cve_entries.extend(
fetched_data["vulnerabilities"]["cve"]
)

elif response.status == 503:
raise NVDServiceError(self.params["modStartDate"])
if self.api_version == "1.0":
raise NVDServiceError(self.params["modStartDate"])
else:
raise NVDServiceError(self.params["lastMmodStartDate"])
else:
self.logger.info(f"Response code: {response.status}")
self.failed_count += 1
Expand Down
8 changes: 4 additions & 4 deletions doc/MANUAL.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
- [--disable-validation-check](#--disable-validation-check)
- [CVE Data Download Arguments](#cve-data-download-arguments)
- [-u {now,daily,never,latest}, --update {now,daily,never,latest}](#-u-nowdailyneverlatest---update-nowdailyneverlatest)
- [-n {json,api}, --nvd {json,api}](#-n-jsonapi---nvd-jsonapi)
- [-n {json,api,api2}, --nvd {json,api,api2}](#-n-jsonapiapi2---nvd-jsonapiapi2)
- [--nvd-api-key NVD_API_KEY](#--nvd-api-key-nvd_api_key)
- [-d {NVD,OSV,GAD} [{NVD,OSV,GAD} ...], --disable-data-source {NVD,OSV,GAD} [{NVD,OSV,GAD} ...]](#-d-nvdosvgad-nvdosvgad----disable-data-source-nvdosvgad-nvdosvgad-)
- [Checkers Arguments](#checkers-arguments)
Expand Down Expand Up @@ -81,7 +81,7 @@ which is useful if you're trying the latest code from
CVE Data Download:
Arguments related to data sources and Cache Configuration

-n {api,json}, --nvd {api,json}
-n {api,api2,json}, --nvd {api,api2,json}
choose method for getting CVE lists from NVD
-u {now,daily,never,latest}, --update {now,daily,never,latest}
update schedule for data sources and exploits database (default: daily)
Expand Down Expand Up @@ -318,9 +318,9 @@ This option skips validating XML files (e.g. within an SBOM) against a schema.

This option controls the frequency of updates for the CVE data from the National Vulnerability Database. By default, the tool checks the staleness of the data with every run, and if the data is more than one day old, it gets an update from NVD. You may also choose to update the data `now` (in which case all cached data is deleted and a full new download is done) or `never` in which case the staleness check is not done and no update is requested. The `now` and `never` modes can be combined to produce alternative update schedules if daily is not the desired one.

### -n {json,api}, --nvd {json,api}
### -n {json,api,api2}, --nvd {json,api,api2}

This option selects how CVE data is downloaded from the National Vulnerability Database. The default `api` option uses the NVD CVE Retrieval API. The results from this API are updated as quickly as the NVD website.
This option selects how CVE data is downloaded from the National Vulnerability Database. The default `api` option uses the NVD CVE Retrieval API version 1.0. The `api2` option uses the later NVD CVE Retrieval API version 2.0. The results from this API are updated as quickly as the NVD website.
A major benefit of using this NVD API is incremental updates which basically means you won't have to download the complete feed again in case you want the latest CVE entries from NVD. See the detailed guide on [incremental updates](how_to_guides/use_incremental_updates.md) for more details.

You may also choose to update the data using `json` option which uses the JSON feeds available on [this page](https://nvd.nist.gov/vuln/data-feeds). These per-year feeds are updated once per day. This mode was the default for CVE Binary Tool prior to the 3.0 release.
Expand Down

0 comments on commit 8b22d3b

Please sign in to comment.