Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add all results get api #84

Merged
merged 1 commit into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions src/nvd_api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,251 @@ def get_cpe_match(self,

return ret

def get_all_cves(self,
cpe_name: str = None,
cve_id: str = None,
cvss_v2_metrics: str = None,
cvss_v2_severity: CVSS_V2_SEVERITY = None,
cvss_v3_metrics: str = None,
cvss_v3_severity: CVSS_V3_SEVERITY = None,
cwe_id: str = None,
has_cert_alerts: bool = False,
has_cert_notes: bool = False,
has_kev: bool = False,
has_oval: bool = False,
is_vulnerable: bool = False,
keyword_exact_match: bool = False,
keyword_search: str = None,
last_mod_start_date: datetime = None,
last_mod_end_date: datetime = None,
no_rejected: bool = False,
pub_start_date: datetime = None,
pub_end_date: datetime = None,
source_identifier: str = None,
version_end: str = None,
version_end_type: VERSION_TYPE = None,
version_start: str = None,
version_start_type: VERSION_TYPE = None,
virtual_match_string: str = None) -> CveOas:
"""All CVE API # noqa: E501

Args:
cpe_name (str, optional): CPE Name. Defaults to None.
cve_id (str, optional): CVE ID. Defaults to None.
cvss_v2_metrics (str, optional): CVSSv2 vector string. Defaults to None.
cvss_v2_severity (CVSS_V2_SEVERITY, optional): CVSSv2 qualitative severity rating. Defaults to None.
cvss_v3_metrics (str, optional): CVSSv3 vector string. Defaults to None.
cvss_v3_severity (CVSS_V3_SEVERITY, optional): CVSSv3 qualitative severity rating. Defaults to None.
cwe_id (str, optional): CWE ID. Defaults to None.
has_cert_alerts (bool, optional): contain a Technical Alert from US-CERT. Defaults to False.
has_cert_notes (bool, optional): contain a Vulnerability Note from CERT/CC. Defaults to False.
has_kev (bool, optional): appear in CISA's Known Exploited Vulnerabilities (KEV) Catalog. Defaults to False.
has_oval (bool, optional): contain information from MITRE's Open Vulnerability and Assessment Language (OVAL). Defaults to False.
is_vulnerable (bool, optional): returns only CVE associated with a specific CPE. Defaults to False.
keyword_exact_match (bool, optional): returns any CVE where a word or phrase. Defaults to False.
keyword_search (str, optional): a word or phrase is found in the current description. Defaults to None.
last_mod_start_date (datetime, optional): search by modified date. Defaults to None.
last_mod_end_date (datetime, optional): search by modified date. Defaults to None.
no_rejected (bool, optional): return the CVE API includes CVE records with the REJECT or Rejected status. Defaults to False.
pub_start_date (datetime, optional): search by published date. Defaults to None.
pub_end_date (datetime, optional): search by published date. Defaults to None.
source_identifier (str, optional): returns CVE where the exact value of sourceIdentifier appears. Defaults to None.
version_end (str, optional): return only the CVEs associated with CPEs in specific version ranges. Defaults to None.
version_end_type (VERSION_TYPE, optional): return only the CVEs associated with CPEs in specific version ranges. Defaults to None.
version_start (str, optional): return only the CVEs associated with CPEs in specific version ranges. Defaults to None.
version_start_type (VERSION_TYPE, optional): return only the CVEs associated with CPEs in specific version ranges. Defaults to None.
virtual_match_string (str, optional): CVE more broadly than cpeName. Defaults to None.

Returns:
AsyncResult: API Result
"""
limit = self.MAX_PAGE_LIMIT_CVE_API
response = self.get_cves(start_index=0,
results_per_page=limit,
cpe_name=cpe_name,
cve_id=cve_id,
cvss_v2_metrics=cvss_v2_metrics,
cvss_v2_severity=cvss_v2_severity,
cvss_v3_metrics=cvss_v3_metrics,
cvss_v3_severity=cvss_v3_severity,
cwe_id=cwe_id,
has_cert_alerts=has_cert_alerts,
has_cert_notes=has_cert_notes,
has_kev=has_kev,
has_oval=has_oval,
is_vulnerable=is_vulnerable,
keyword_exact_match=keyword_exact_match,
keyword_search=keyword_search,
last_mod_start_date=last_mod_start_date,
last_mod_end_date=last_mod_end_date,
no_rejected=no_rejected,
pub_start_date=pub_start_date,
pub_end_date=pub_end_date,
source_identifier=source_identifier,
version_end=version_end,
version_end_type=version_end_type,
version_start=version_start,
version_start_type=version_start_type,
virtual_match_string=virtual_match_string)

if response.total_results >= limit:
count = response.total_results // limit + 1
for start_index in range(1, count):
r = self.get_cves(
start_index=start_index,
results_per_page=limit,
cpe_name=cpe_name,
cve_id=cve_id,
cvss_v2_metrics=cvss_v2_metrics,
cvss_v2_severity=cvss_v2_severity,
cvss_v3_metrics=cvss_v3_metrics,
cvss_v3_severity=cvss_v3_severity,
cwe_id=cwe_id,
has_cert_alerts=has_cert_alerts,
has_cert_notes=has_cert_notes,
has_kev=has_kev,
has_oval=has_oval,
is_vulnerable=is_vulnerable,
keyword_exact_match=keyword_exact_match,
keyword_search=keyword_search,
last_mod_start_date=last_mod_start_date,
last_mod_end_date=last_mod_end_date,
no_rejected=no_rejected,
pub_start_date=pub_start_date,
pub_end_date=pub_end_date,
source_identifier=source_identifier,
version_end=version_end,
version_end_type=version_end_type,
version_start=version_start,
version_start_type=version_start_type,
virtual_match_string=virtual_match_string)
response.vulnerabilities.extend(r.vulnerabilities)

return response

def get_all_cve_history(self,
change_start_date: datetime = None,
change_end_date: datetime = None,
cve_id: str = None,
event_name: EVENT_NAME = None,) -> CveHistoryOas:
"""All CVE Change History API # noqa: E501

Args:
change_start_date (datetime, optional): search by changed date. Defaults to None.
change_end_date (datetime, optional): search by changed date. Defaults to None.
cve_id (str, optional): CVE ID. Defaults to None.
event_name (EVENT_NAME, optional): returns all CVE associated with a specific type of change event. Defaults to None.

Returns:
CveHistoryOas: API Result
"""
limit = self.MAX_PAGE_LIMIT_CVE_HISTORY_API
response = self.get_cve_history(start_index=0,
results_per_page=limit,
change_start_date=change_start_date,
change_end_date=change_end_date,
cve_id=cve_id,
event_name=event_name)

if response.total_results >= limit:
count = response.total_results // limit + 1
for start_index in range(1, count):
r = self.get_cve_history(start_index=start_index,
results_per_page=limit,
change_start_date=change_start_date,
change_end_date=change_end_date,
cve_id=cve_id,
event_name=event_name)
response.cve_changes.extend(r.cve_changes)

return response

def get_all_cpes(self,
cpe_name_id: str = None,
cpe_match_string: str = None,
keyword_exact_match: bool = False,
keyword_search: str = None,
last_mod_start_date: datetime = None,
last_mod_end_date: datetime = None,
match_criteria_id: str = None) -> CpeOas:
"""All CPE API # noqa: E501

Args:
cpe_name_id (str, optional): specific CPE record UUID. Defaults to None.
cpe_match_string (str, optional): CPE Name. Defaults to None.
keyword_exact_match (bool, optional): if CPE exactly match or not. Defaults to None. Defaults to False.
keyword_search (str, optional): a word or phrase is found in the metadata title or reference links. Defaults to None.
last_mod_start_date (datetime, optional): search CPE by modified date. Defaults to None.
last_mod_end_date (datetime, optional): search CPE by modified date. Defaults to None.
match_criteria_id (str, optional): search CPE by uuid. Defaults to None.

Returns:
CpeOas: API Result
"""
limit = self.MAX_PAGE_LIMIT_CPE_API
response = self.get_cpes(start_index=0,
results_per_page=limit,
cpe_name_id=cpe_name_id,
cpe_match_string=cpe_match_string,
keyword_exact_match=keyword_exact_match,
keyword_search=keyword_search,
last_mod_start_date=last_mod_start_date,
last_mod_end_date=last_mod_end_date,
match_criteria_id=match_criteria_id)

if response.total_results >= limit:
count = response.total_results // limit + 1
for start_index in range(1, count):
r = self.get_cpes(start_index=start_index,
results_per_page=limit,
cpe_name_id=cpe_name_id,
cpe_match_string=cpe_match_string,
keyword_exact_match=keyword_exact_match,
keyword_search=keyword_search,
last_mod_start_date=last_mod_start_date,
last_mod_end_date=last_mod_end_date,
match_criteria_id=match_criteria_id)
response.products.extend(r.products)

return response

def get_all_cpe_match(self,
cve_id: str = None,
last_mod_start_date: datetime = None,
last_mod_end_date: datetime = None,
match_criteria_id: str = None) -> CpeMatchOas:
"""All Match Criteria API # noqa: E501

Args:
cve_id (str, optional): CVE ID. Defaults to None.
last_mod_start_date (datetime, optional): search by modified date. Defaults to None.
last_mod_end_date (datetime, optional): search by modified date. Defaults to None.
match_criteria_id (str, optional): specific by UUID. Defaults to None.

Returns:
CpeMatchOas: API Result
"""
limit = self.MAX_PAGE_LIMIT_CPE_MATCH_API
response = self.get_cpe_match(start_index=0,
results_per_page=limit,
cve_id=cve_id,
last_mod_start_date=last_mod_start_date,
last_mod_end_date=last_mod_end_date,
match_criteria_id=match_criteria_id)

if response.total_results >= limit:
count = response.total_results // limit + 1
for start_index in range(1, count):
r = self.get_cpe_match(start_index=start_index,
results_per_page=limit,
cve_id=cve_id,
last_mod_start_date=last_mod_start_date,
last_mod_end_date=last_mod_end_date,
match_criteria_id=match_criteria_id)
response.match_strings.extend(r.match_strings)

return response

def _convert_datetime(self, dt) -> datetime:
if type(dt) == str:
try:
Expand Down
7 changes: 7 additions & 0 deletions tests/test_get_cpe_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ def test_get_by_cve_id(self):
assert cpe_match.matches[2].cpe_name == "cpe:2.3:o:microsoft:windows:-:*:*:*:*:*:x86:*" # noqa
assert cpe_match.matches[2].cpe_name_id == "892CAEC7-9569-4385-8335-239B83D58837" # noqa

def test_get_all_cpe_match(self):
response = self.client.get_all_cpe_match(
last_mod_start_date="2021-01-01T00:00:00.000",
last_mod_end_date="2021-03-01T00:00:00.000"
)
assert (len(response.match_strings) > 5000)

def test_get_by_date(self):
response = self.client.get_cpe_match(
last_mod_start_date="2021-08-04T13:00:00.000",
Expand Down
7 changes: 7 additions & 0 deletions tests/test_get_cpes.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ def test_get_by_cpe_name_id(self):
assert cpe.titles[1].title == "マイクロソフト Access"
assert cpe.titles[1].lang == "ja"

def test_get_all_cpes(self):
response = self.client.get_all_cpes(
last_mod_start_date="2021-01-01T00:00:00.000",
last_mod_end_date="2021-03-01T00:00:00.000"
)
assert (len(response.products) > 10000)

def test_get_by_cpe_match_string(self):
response = self.client.get_cpes(
cpe_match_string="cpe:2.3:*:Microsoft",
Expand Down
8 changes: 8 additions & 0 deletions tests/test_get_cve_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ def test_get_by_change_date(self):
pprint(response)
assert (len(response.cve_changes) > 0)

def test_get_all_cve_history(self):
response = self.client.get_all_cve_history(
change_start_date="2021-01-01T00:00:00.000",
change_end_date="2021-02-15T00:00:00.000",
)
pprint(response)
assert (len(response.cve_changes) > 5000)

def test_get_by_cve(self):
response = self.client.get_cve_history(
cve_id="CVE-2019-1010218",
Expand Down
8 changes: 8 additions & 0 deletions tests/test_get_cves.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ def test_get_by_cve(self):
assert response.timestamp is not None
assert (len(response.vulnerabilities) > 0)

def test_get_all_cve(self):
response = self.client.get_all_cves(
pub_start_date="2021-01-01T00:00:00.000",
pub_end_date="2021-02-15T00:00:00.000",
)
pprint(response)
assert (len(response.vulnerabilities) > 2000)

def test_get_by_cvss_v2(self):
response = self.client.get_cves(
cpe_name="cpe:2.3:o:debian:debian_linux:3.0:*:*:*:*:*:*:*",
Expand Down