Skip to content

Commit

Permalink
feat(cacheman): count edge systems in caches
Browse files Browse the repository at this point in the history
Count all host types, incl. edge systems, in CVE and rule caches,
when enabled by `vulnerability.edge_parity` feature flag.

RHINENG-1699
  • Loading branch information
vkrizan committed Sep 14, 2023
1 parent 6e704cb commit a10a1a9
Showing 1 changed file with 34 additions and 19 deletions.
53 changes: 34 additions & 19 deletions taskomatic/jobs/cacheman.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
from common.config import Config
from common.database_handler import DatabasePool
from common.database_handler import DatabasePoolConnection
from common.feature_flags import EDGE_PARITY_FEATURE
from common.feature_flags import initialize_unleash
from common.feature_flags import UNLEASH
from common.logging import get_logger
from common.logging import init_logging
from common.utils import validate_cve_cache_keepalive
Expand All @@ -22,7 +24,15 @@
CACHE_WORKERS = 4


def _select_count_affected(account_id):
def _host_type_condition(all_host_types):
if all_host_types:
return SQL("")
return SQL(" AND coalesce(sp.host_type, 'null') != 'edge'")


def _select_count_affected(account_id, all_host_types=False):
host_type_cond = _host_type_condition(all_host_types)

return (
SQL(
"""
Expand All @@ -37,8 +47,7 @@ def _select_count_affected(account_id):
sp.rh_account_id = %s AND
sp.opt_out = false AND
sp.stale = false AND
sp.when_deleted IS NULL AND
coalesce(sp.host_type, 'null') != 'edge') INNER JOIN
sp.when_deleted IS NULL{host_type_cond}) INNER JOIN
inventory.hosts ih ON sp.inventory_id = ih.id LEFT JOIN
cve_account_data cad ON (sv.cve_id = cad.cve_id AND
cad.rh_account_id = %s)
Expand All @@ -48,12 +57,14 @@ def _select_count_affected(account_id):
rule_only = false)))
GROUP BY sv.cve_id
"""
),
).format(host_type_cond=host_type_cond),
[account_id, account_id, account_id],
)


def _select_count_unpatched(account_id):
def _select_count_unpatched(account_id, all_host_types=False):
host_type_cond = _host_type_condition(all_host_types)

return (
SQL(
"""
Expand All @@ -68,23 +79,22 @@ def _select_count_unpatched(account_id):
sp.rh_account_id = %s AND
sp.opt_out = false AND
sp.stale = false AND
sp.when_deleted IS NULL AND
coalesce(sp.host_type, 'null') != 'edge') INNER JOIN
sp.when_deleted IS NULL{host_type_cond}) INNER JOIN
inventory.hosts ih ON sp.inventory_id = ih.id INNER JOIN
vulnerable_package_cve vpc ON svp.vulnerable_package_id = vpc.vulnerable_package_id
WHERE svp.rh_account_id = %s
GROUP BY vpc.cve_id
"""
),
).format(host_type_cond=host_type_cond),
[account_id, account_id],
)


def _materialize_cve_cache(cur, account_id, account_name, current_cache):
def _materialize_cve_cache(cur, account_id, account_name, current_cache, all_host_types=False):
LOGGER.debug("Materializing cache for account '%s' started.", account_name)

count_affected_sq, affected_params = _select_count_affected(account_id)
count_unpatched_sq, unpatched_params = _select_count_unpatched(account_id)
count_affected_sq, affected_params = _select_count_affected(account_id, all_host_types)
count_unpatched_sq, unpatched_params = _select_count_unpatched(account_id, all_host_types)

main_sq = """
SELECT cve_id,
Expand Down Expand Up @@ -192,18 +202,19 @@ def _materialize_cve_cache(cur, account_id, account_name, current_cache):
LOGGER.info("Materializing cache for account '%s' finished.", account_name)


def _materialize_rule_cache(cur, account_id, account_name, current_cache):
def _materialize_rule_cache(cur, account_id, account_name, current_cache, all_host_types=False):
host_type_cond = _host_type_condition(all_host_types)

LOGGER.debug("Materializing rule cache for account '%s' started.", account_name)
cur.execute(
"""
f"""
SELECT sv.rule_id, COUNT(*)
FROM system_vulnerabilities_active AS sv
JOIN system_platform AS sp ON (sv.system_id = sp.id AND
sp.rh_account_id = %s AND
sp.opt_out = FALSE AND
sp.stale = FALSE AND
sp.when_deleted IS NULL AND
COALESCE(sp.host_type, 'NULL') != 'edge')
sp.when_deleted IS NULL{host_type_cond})
INNER JOIN inventory.hosts ih ON sp.inventory_id = ih.id
WHERE sv.rh_account_id = %s AND
sv.rule_id IN (SELECT ir.id FROM insights_rule AS ir WHERE ir.active = TRUE AND NOT ir.rule_only) AND
Expand Down Expand Up @@ -257,11 +268,11 @@ def _materialize_rule_cache(cur, account_id, account_name, current_cache):
LOGGER.debug("Materializing rule cache for account '%s' finished.", account_name)


def _materialize_caches(account_id, account_name, current_cache):
def _materialize_caches(account_id, account_name, current_cache, all_host_types=False):
with DatabasePoolConnection() as conn:
with conn.cursor() as cur:
_materialize_cve_cache(cur, account_id, account_name, current_cache)
_materialize_rule_cache(cur, account_id, account_name, current_cache)
_materialize_cve_cache(cur, account_id, account_name, current_cache, all_host_types)
_materialize_rule_cache(cur, account_id, account_name, current_cache, all_host_types)

cur.execute("""UPDATE rh_account SET cve_cache_from = now() WHERE id = %s""", (account_id,))
conn.commit()
Expand Down Expand Up @@ -328,12 +339,16 @@ def run():
accounts_to_refresh = [account for account in accounts if account[3] and (not account[2] or account[3] > account[2])]
LOGGER.info("Accounts requiring cache refresh: %s", len(accounts_to_refresh))

all_host_types = UNLEASH.is_enabled(EDGE_PARITY_FEATURE)
if all_host_types:
LOGGER.info("Including all host types (Edge Parity feature)")

# Process accounts in parallel
with DatabasePool(CACHE_WORKERS):
executor = BoundedExecutor(CACHE_WORKERS, max_workers=CACHE_WORKERS)
futures = []
for account_id, account_name, _, _, _ in accounts_to_refresh:
futures.append(executor.submit(_materialize_caches, account_id, account_name, current_cache))
futures.append(executor.submit(_materialize_caches, account_id, account_name, current_cache, all_host_types))
for future in futures:
future.result()
executor.shutdown()
Expand Down

0 comments on commit a10a1a9

Please sign in to comment.