diff --git a/src/nvd_api/client.py b/src/nvd_api/client.py index 7ae046a..4c6d1d7 100644 --- a/src/nvd_api/client.py +++ b/src/nvd_api/client.py @@ -339,6 +339,251 @@ def get_cpe_match(self, return ret + def get_all_cves(self, + cpe_name: str = None, + cve_id: str = None, + cvss_v2_metrics: str = None, + cvss_v2_severity: CVSS_V2_SEVERITY = None, + cvss_v3_metrics: str = None, + cvss_v3_severity: CVSS_V3_SEVERITY = None, + cwe_id: str = None, + has_cert_alerts: bool = False, + has_cert_notes: bool = False, + has_kev: bool = False, + has_oval: bool = False, + is_vulnerable: bool = False, + keyword_exact_match: bool = False, + keyword_search: str = None, + last_mod_start_date: datetime = None, + last_mod_end_date: datetime = None, + no_rejected: bool = False, + pub_start_date: datetime = None, + pub_end_date: datetime = None, + source_identifier: str = None, + version_end: str = None, + version_end_type: VERSION_TYPE = None, + version_start: str = None, + version_start_type: VERSION_TYPE = None, + virtual_match_string: str = None) -> CveOas: + """All CVE API # noqa: E501 + + Args: + cpe_name (str, optional): CPE Name. Defaults to None. + cve_id (str, optional): CVE ID. Defaults to None. + cvss_v2_metrics (str, optional): CVSSv2 vector string. Defaults to None. + cvss_v2_severity (CVSS_V2_SEVERITY, optional): CVSSv2 qualitative severity rating. Defaults to None. + cvss_v3_metrics (str, optional): CVSSv3 vector string. Defaults to None. + cvss_v3_severity (CVSS_V3_SEVERITY, optional): CVSSv3 qualitative severity rating. Defaults to None. + cwe_id (str, optional): CWE ID. Defaults to None. + has_cert_alerts (bool, optional): contain a Technical Alert from US-CERT. Defaults to False. + has_cert_notes (bool, optional): contain a Vulnerability Note from CERT/CC. Defaults to False. + has_kev (bool, optional): appear in CISA's Known Exploited Vulnerabilities (KEV) Catalog. Defaults to False. + has_oval (bool, optional): contain information from MITRE's Open Vulnerability and Assessment Language (OVAL). Defaults to False. + is_vulnerable (bool, optional): returns only CVE associated with a specific CPE. Defaults to False. + keyword_exact_match (bool, optional): returns any CVE where a word or phrase. Defaults to False. + keyword_search (str, optional): a word or phrase is found in the current description. Defaults to None. + last_mod_start_date (datetime, optional): search by modified date. Defaults to None. + last_mod_end_date (datetime, optional): search by modified date. Defaults to None. + no_rejected (bool, optional): return the CVE API includes CVE records with the REJECT or Rejected status. Defaults to False. + pub_start_date (datetime, optional): search by published date. Defaults to None. + pub_end_date (datetime, optional): search by published date. Defaults to None. + source_identifier (str, optional): returns CVE where the exact value of sourceIdentifier appears. Defaults to None. + version_end (str, optional): return only the CVEs associated with CPEs in specific version ranges. Defaults to None. + version_end_type (VERSION_TYPE, optional): return only the CVEs associated with CPEs in specific version ranges. Defaults to None. + version_start (str, optional): return only the CVEs associated with CPEs in specific version ranges. Defaults to None. + version_start_type (VERSION_TYPE, optional): return only the CVEs associated with CPEs in specific version ranges. Defaults to None. + virtual_match_string (str, optional): CVE more broadly than cpeName. Defaults to None. + + Returns: + AsyncResult: API Result + """ + limit = self.MAX_PAGE_LIMIT_CVE_API + response = self.get_cves(start_index=0, + results_per_page=limit, + cpe_name=cpe_name, + cve_id=cve_id, + cvss_v2_metrics=cvss_v2_metrics, + cvss_v2_severity=cvss_v2_severity, + cvss_v3_metrics=cvss_v3_metrics, + cvss_v3_severity=cvss_v3_severity, + cwe_id=cwe_id, + has_cert_alerts=has_cert_alerts, + has_cert_notes=has_cert_notes, + has_kev=has_kev, + has_oval=has_oval, + is_vulnerable=is_vulnerable, + keyword_exact_match=keyword_exact_match, + keyword_search=keyword_search, + last_mod_start_date=last_mod_start_date, + last_mod_end_date=last_mod_end_date, + no_rejected=no_rejected, + pub_start_date=pub_start_date, + pub_end_date=pub_end_date, + source_identifier=source_identifier, + version_end=version_end, + version_end_type=version_end_type, + version_start=version_start, + version_start_type=version_start_type, + virtual_match_string=virtual_match_string) + + if response.total_results >= limit: + count = response.total_results // limit + 1 + for start_index in range(1, count): + r = self.get_cves( + start_index=start_index, + results_per_page=limit, + cpe_name=cpe_name, + cve_id=cve_id, + cvss_v2_metrics=cvss_v2_metrics, + cvss_v2_severity=cvss_v2_severity, + cvss_v3_metrics=cvss_v3_metrics, + cvss_v3_severity=cvss_v3_severity, + cwe_id=cwe_id, + has_cert_alerts=has_cert_alerts, + has_cert_notes=has_cert_notes, + has_kev=has_kev, + has_oval=has_oval, + is_vulnerable=is_vulnerable, + keyword_exact_match=keyword_exact_match, + keyword_search=keyword_search, + last_mod_start_date=last_mod_start_date, + last_mod_end_date=last_mod_end_date, + no_rejected=no_rejected, + pub_start_date=pub_start_date, + pub_end_date=pub_end_date, + source_identifier=source_identifier, + version_end=version_end, + version_end_type=version_end_type, + version_start=version_start, + version_start_type=version_start_type, + virtual_match_string=virtual_match_string) + response.vulnerabilities.extend(r.vulnerabilities) + + return response + + def get_all_cve_history(self, + change_start_date: datetime = None, + change_end_date: datetime = None, + cve_id: str = None, + event_name: EVENT_NAME = None,) -> CveHistoryOas: + """All CVE Change History API # noqa: E501 + + Args: + change_start_date (datetime, optional): search by changed date. Defaults to None. + change_end_date (datetime, optional): search by changed date. Defaults to None. + cve_id (str, optional): CVE ID. Defaults to None. + event_name (EVENT_NAME, optional): returns all CVE associated with a specific type of change event. Defaults to None. + + Returns: + CveHistoryOas: API Result + """ + limit = self.MAX_PAGE_LIMIT_CVE_HISTORY_API + response = self.get_cve_history(start_index=0, + results_per_page=limit, + change_start_date=change_start_date, + change_end_date=change_end_date, + cve_id=cve_id, + event_name=event_name) + + if response.total_results >= limit: + count = response.total_results // limit + 1 + for start_index in range(1, count): + r = self.get_cve_history(start_index=start_index, + results_per_page=limit, + change_start_date=change_start_date, + change_end_date=change_end_date, + cve_id=cve_id, + event_name=event_name) + response.cve_changes.extend(r.cve_changes) + + return response + + def get_all_cpes(self, + cpe_name_id: str = None, + cpe_match_string: str = None, + keyword_exact_match: bool = False, + keyword_search: str = None, + last_mod_start_date: datetime = None, + last_mod_end_date: datetime = None, + match_criteria_id: str = None) -> CpeOas: + """All CPE API # noqa: E501 + + Args: + cpe_name_id (str, optional): specific CPE record UUID. Defaults to None. + cpe_match_string (str, optional): CPE Name. Defaults to None. + keyword_exact_match (bool, optional): if CPE exactly match or not. Defaults to None. Defaults to False. + keyword_search (str, optional): a word or phrase is found in the metadata title or reference links. Defaults to None. + last_mod_start_date (datetime, optional): search CPE by modified date. Defaults to None. + last_mod_end_date (datetime, optional): search CPE by modified date. Defaults to None. + match_criteria_id (str, optional): search CPE by uuid. Defaults to None. + + Returns: + CpeOas: API Result + """ + limit = self.MAX_PAGE_LIMIT_CPE_API + response = self.get_cpes(start_index=0, + results_per_page=limit, + cpe_name_id=cpe_name_id, + cpe_match_string=cpe_match_string, + keyword_exact_match=keyword_exact_match, + keyword_search=keyword_search, + last_mod_start_date=last_mod_start_date, + last_mod_end_date=last_mod_end_date, + match_criteria_id=match_criteria_id) + + if response.total_results >= limit: + count = response.total_results // limit + 1 + for start_index in range(1, count): + r = self.get_cpes(start_index=start_index, + results_per_page=limit, + cpe_name_id=cpe_name_id, + cpe_match_string=cpe_match_string, + keyword_exact_match=keyword_exact_match, + keyword_search=keyword_search, + last_mod_start_date=last_mod_start_date, + last_mod_end_date=last_mod_end_date, + match_criteria_id=match_criteria_id) + response.products.extend(r.products) + + return response + + def get_all_cpe_match(self, + cve_id: str = None, + last_mod_start_date: datetime = None, + last_mod_end_date: datetime = None, + match_criteria_id: str = None) -> CpeMatchOas: + """All Match Criteria API # noqa: E501 + + Args: + cve_id (str, optional): CVE ID. Defaults to None. + last_mod_start_date (datetime, optional): search by modified date. Defaults to None. + last_mod_end_date (datetime, optional): search by modified date. Defaults to None. + match_criteria_id (str, optional): specific by UUID. Defaults to None. + + Returns: + CpeMatchOas: API Result + """ + limit = self.MAX_PAGE_LIMIT_CPE_MATCH_API + response = self.get_cpe_match(start_index=0, + results_per_page=limit, + cve_id=cve_id, + last_mod_start_date=last_mod_start_date, + last_mod_end_date=last_mod_end_date, + match_criteria_id=match_criteria_id) + + if response.total_results >= limit: + count = response.total_results // limit + 1 + for start_index in range(1, count): + r = self.get_cpe_match(start_index=start_index, + results_per_page=limit, + cve_id=cve_id, + last_mod_start_date=last_mod_start_date, + last_mod_end_date=last_mod_end_date, + match_criteria_id=match_criteria_id) + response.match_strings.extend(r.match_strings) + + return response + def _convert_datetime(self, dt) -> datetime: if type(dt) == str: try: diff --git a/tests/test_get_cpe_match.py b/tests/test_get_cpe_match.py index ad0e855..c42aa71 100644 --- a/tests/test_get_cpe_match.py +++ b/tests/test_get_cpe_match.py @@ -62,6 +62,13 @@ def test_get_by_cve_id(self): assert cpe_match.matches[2].cpe_name == "cpe:2.3:o:microsoft:windows:-:*:*:*:*:*:x86:*" # noqa assert cpe_match.matches[2].cpe_name_id == "892CAEC7-9569-4385-8335-239B83D58837" # noqa + def test_get_all_cpe_match(self): + response = self.client.get_all_cpe_match( + last_mod_start_date="2021-01-01T00:00:00.000", + last_mod_end_date="2021-03-01T00:00:00.000" + ) + assert (len(response.match_strings) > 5000) + def test_get_by_date(self): response = self.client.get_cpe_match( last_mod_start_date="2021-08-04T13:00:00.000", diff --git a/tests/test_get_cpes.py b/tests/test_get_cpes.py index 65cb70f..15b904e 100644 --- a/tests/test_get_cpes.py +++ b/tests/test_get_cpes.py @@ -58,6 +58,13 @@ def test_get_by_cpe_name_id(self): assert cpe.titles[1].title == "マイクロソフト Access" assert cpe.titles[1].lang == "ja" + def test_get_all_cpes(self): + response = self.client.get_all_cpes( + last_mod_start_date="2021-01-01T00:00:00.000", + last_mod_end_date="2021-03-01T00:00:00.000" + ) + assert (len(response.products) > 10000) + def test_get_by_cpe_match_string(self): response = self.client.get_cpes( cpe_match_string="cpe:2.3:*:Microsoft", diff --git a/tests/test_get_cve_history.py b/tests/test_get_cve_history.py index d49ed32..7d82e31 100644 --- a/tests/test_get_cve_history.py +++ b/tests/test_get_cve_history.py @@ -39,6 +39,14 @@ def test_get_by_change_date(self): pprint(response) assert (len(response.cve_changes) > 0) + def test_get_all_cve_history(self): + response = self.client.get_all_cve_history( + change_start_date="2021-01-01T00:00:00.000", + change_end_date="2021-02-15T00:00:00.000", + ) + pprint(response) + assert (len(response.cve_changes) > 5000) + def test_get_by_cve(self): response = self.client.get_cve_history( cve_id="CVE-2019-1010218", diff --git a/tests/test_get_cves.py b/tests/test_get_cves.py index 506cfe8..0ddaf12 100644 --- a/tests/test_get_cves.py +++ b/tests/test_get_cves.py @@ -46,6 +46,14 @@ def test_get_by_cve(self): assert response.timestamp is not None assert (len(response.vulnerabilities) > 0) + def test_get_all_cve(self): + response = self.client.get_all_cves( + pub_start_date="2021-01-01T00:00:00.000", + pub_end_date="2021-02-15T00:00:00.000", + ) + pprint(response) + assert (len(response.vulnerabilities) > 2000) + def test_get_by_cvss_v2(self): response = self.client.get_cves( cpe_name="cpe:2.3:o:debian:debian_linux:3.0:*:*:*:*:*:*:*",