Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: updating schema #3106

Merged
merged 2 commits into from
Jun 29, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 68 additions & 5 deletions cve_bin_tool/cvedb.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,19 @@ def get_cvelist_if_stale(self) -> None:
self.LOGGER.info(
"Using cached CVE data (<24h old). Use -u now to update immediately."
)
severity_schema, range_schema, exploit_schema = self.table_schemas()
(
severity_schema,
range_schema,
exploit_schema,
cve_metrics_schema,
metrics_schema,
) = self.table_schemas()
if (
not self.latest_schema("cve_severity", severity_schema)
or not self.latest_schema("cve_range", range_schema)
or not self.latest_schema("cve_exploited", exploit_schema)
# or not self.latest_schema("cve_metrics",cve_metrics_schema)
# or not self.latest_schema("metrics",metrics_schema)
):
self.refresh_cache_and_update_db()
self.time_of_last_update = datetime.datetime.today()
Expand All @@ -173,7 +181,7 @@ def latest_schema(
"""Check database is using latest schema"""
if table_name == "":
# If no table specified, check cve_range (the last one changed)
_, range_schema, __ = self.table_schemas()
_, range_schema, __, _, _ = self.table_schemas()
return self.latest_schema("cve_range", range_schema)

self.LOGGER.debug("Check database is using latest schema")
Expand Down Expand Up @@ -272,8 +280,31 @@ def table_schemas(self):
PRIMARY KEY(cve_number)
)
"""
cve_metrics_table = """
CREATE TABLE IF NOT EXISTS cve_metrics (
cve_number TEXT,
metric_id INTEGER,
metric_score REAL,
metric_field TEXT,
FOREIGN KEY(cve_number) REFERENCES cve_severity(cve_number),
FOREIGN KEY(metric_id) REFERENCES metrics(metric_id)
)
"""
metrics_table = """
CREATE TABLE IF NOT EXISTS metrics (
metrics_id INTEGER,
metrics_name TEXT,
PRIMARY KEY(metrics_id)
)
"""

return cve_data_create, version_range_create, exploit_table_create
return (
cve_data_create,
version_range_create,
exploit_table_create,
cve_metrics_table,
metrics_table,
)

def insert_queries(self):
cve_severity = """
Expand Down Expand Up @@ -321,14 +352,24 @@ def init_database(self) -> None:
cve_data_create,
version_range_create,
exploit_table_create,
cve_metrics_table_create,
metrics_table_create,
) = self.table_schemas()
index_range = "CREATE INDEX IF NOT EXISTS product_index ON cve_range (cve_number, vendor, product)"
cursor.execute(cve_data_create)
cursor.execute(version_range_create)
cursor.execute(exploit_table_create)
cursor.execute(cve_metrics_table_create)
cursor.execute(metrics_table_create)
cursor.execute(index_range)

severity_schema, range_schema, exploit_schema = self.table_schemas()
(
severity_schema,
range_schema,
exploit_schema,
cve_metrics_schema,
metrics_schema,
) = self.table_schemas()
# Check schema on cve_severity
if not self.latest_schema("cve_severity", severity_schema, cursor):
# Recreate table using latest schema
Expand Down Expand Up @@ -357,6 +398,24 @@ def init_database(self) -> None:
cursor.execute("DROP TABLE cve_exploited")
cursor.execute(exploit_table_create)

# Check schema on cve_metrics
if not self.latest_schema("cve_metrics", cve_metrics_schema, cursor):
self.LOGGER.info("Upgrading cve_metrics data. This may take some time.")
self.LOGGER.info(
"If this step hangs, try using `-u now` to get a fresh db."
)
cursor.execute("DROP TABLE cve_metrics")
cursor.execute(cve_metrics_table_create)

# Check schema on metrics
if not self.latest_schema("metrics", metrics_schema, cursor):
self.LOGGER.info("Upgrading metrics data. This may take some time.")
self.LOGGER.info(
"If this step hangs, try using `-u now` to get a fresh db."
)
cursor.execute("DROP TABLE metrics")
cursor.execute(metrics_table_create)

if self.connection is not None:
self.connection.commit()

Expand Down Expand Up @@ -646,7 +705,7 @@ def get_exploits_count(self) -> int:

def create_exploit_db(self):
cursor = self.db_open_and_get_cursor()
(_, _, create_exploit_table) = self.table_schemas()
(_, _, create_exploit_table, _, _) = self.table_schemas()
cursor = self.db_open_and_get_cursor()
cursor.execute(create_exploit_table)
self.connection.commit()
Expand Down Expand Up @@ -834,11 +893,15 @@ def json_to_db_wrapper(self, path, pubkey, ignore_signature, log_signature_error
cve_data_create,
version_range_create,
exploit_table_create,
cve_metrics_create,
metrics_create,
) = self.table_schemas()
index_range = "CREATE INDEX IF NOT EXISTS product_index ON cve_range (cve_number, vendor, product)"
cursor.execute(cve_data_create)
cursor.execute(version_range_create)
cursor.execute(exploit_table_create)
cursor.execute(cve_metrics_create)
cursor.execute(metrics_create)
cursor.execute(index_range)
metadata_fd = open(path / "metadata.json")
metadata = json.loads(metadata_fd.read())
Expand Down