Skip to content

Commit

Permalink
feat(evaluator): skip old kafka evaluation messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomáš Sasák authored and jdobes committed Nov 24, 2023
1 parent eb4dc1b commit 33f8e98
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 8 deletions.
2 changes: 1 addition & 1 deletion evaluator/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
# cve coupled with its package_name and cpe, from vmaas
CveUnpatched = namedtuple("Cve", ["cve", "package_name", "cpe"])
# system_platform row taken from DB
SystemPlatform = namedtuple("SystemPlatform", ["id", "inventory_id", "rh_account_id", "vmaas_json", "rule_results"])
SystemPlatform = namedtuple("SystemPlatform", ["id", "inventory_id", "rh_account_id", "vmaas_json", "rule_results", "last_evaluation"])
# single vulnerability stored inside db
VulnerabilityDB = namedtuple(
"VulnerabilityDB",
Expand Down
4 changes: 3 additions & 1 deletion evaluator/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import signal

from aiokafka import ConsumerRecord
from dateutil import parser
from psycopg_pool.pool_async import AsyncConnectionPool

from .common import CFG
Expand Down Expand Up @@ -62,14 +63,15 @@ async def _consume_message(self, msg: ConsumerRecord):
inventory_id = msg_dict["host"]["id"]
org_id = msg_dict["host"]["org_id"]
request_id = msg_dict.get("platform_metadata", {}).get("request_id")
timestamp = parser.parse(ts) if (ts := msg_dict.get("timestamp")) else None

try:
msg_type = EvaluatorMessageType(msg_dict.get("type"))
except ValueError:
LOGGER.error("received unknown message type: %s", msg_type)
return

await self.processor.evaluate_system(inventory_id, org_id, request_id)
await self.processor.evaluate_system(inventory_id, org_id, request_id, timestamp)

async def consume_message(self, msg: ConsumerRecord):
"""Consume message for evaluation, wrapper for semaphore"""
Expand Down
17 changes: 12 additions & 5 deletions evaluator/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
Processing logic of grouped evaluator message for single system
"""
import asyncio
from datetime import datetime
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple

from psycopg import AsyncConnection
Expand Down Expand Up @@ -62,7 +64,7 @@ async def _lock_system(self, inventory_id: str, conn: AsyncConnection) -> System
async with conn.cursor(row_factory=dict_row) as cur:
await cur.execute(
"""
SELECT id, rh_account_id, vmaas_json, rule_results
SELECT id, rh_account_id, vmaas_json, rule_results, last_evaluation
FROM system_platform
WHERE inventory_id = %s
AND when_deleted IS NULL
Expand All @@ -73,7 +75,9 @@ async def _lock_system(self, inventory_id: str, conn: AsyncConnection) -> System
)
row = await cur.fetchone()
if row:
return SystemPlatform(row["id"], inventory_id, row["rh_account_id"], row["vmaas_json"], row["rule_results"])
return SystemPlatform(
row["id"], inventory_id, row["rh_account_id"], row["vmaas_json"], row["rule_results"], row["last_evaluation"]
)
return None

async def _load_db_system_vulnerabilities(self, system_platform: SystemPlatform, conn: AsyncConnection) -> Dict[str, VulnerabilityDB]:
Expand Down Expand Up @@ -216,7 +220,7 @@ async def _compare_sys_vulns(

return to_insert, to_update, to_delete

async def _evaluate_system(self, inventory_id: str, org_id: str):
async def _evaluate_system(self, inventory_id: str, org_id: str, request_timestamp: Optional[datetime]):
"""Evaluate vulnerabilities for single system, and update DB"""
async with self.db_pool.connection() as conn:
async with conn.transaction():
Expand All @@ -230,6 +234,9 @@ async def _evaluate_system(self, inventory_id: str, org_id: str):
"skipping evaluation, due to empty vmaas_json and rule_results, system: %s, org_id: %s", inventory_id, org_id
)
return
if request_timestamp and system_platform.last_evaluation and request_timestamp < system_platform.last_evaluation:
LOGGER.info("skipping evaluation, kafka message is older than system was lastly evaluated")
return

# start both task asynchronously to speed up
sys_vuln_rows_db, sys_vuln_rows = await asyncio.gather(
Expand Down Expand Up @@ -259,14 +266,14 @@ async def _evaluate_system(self, inventory_id: str, org_id: str):
send_remediations_update(self.remediations_results, inventory_id, fixable_sys_vuln_rows)
send_notifications(self.evaluator_results, new_system_vulns, [], [], system_platform.rh_account_id, org_id)

async def evaluate_system(self, inventory_id: str, org_id: str, request_id: str):
async def evaluate_system(self, inventory_id: str, org_id: str, request_id: str, request_timestamp: datetime):
"""Evaluate single system"""
EVAL_COUNT.inc()
msg = {"platform_metadata": {"request_id": request_id}, "host": {"org_id": org_id, "id": inventory_id}}
try:
with EVAL_TIME.time():
LOGGER.info("evaluating system: %s, org_id: %s", inventory_id, org_id)
await self._evaluate_system(inventory_id, org_id)
await self._evaluate_system(inventory_id, org_id, request_timestamp)
except EvaluatorException as ex:
LOGGER.error(str(ex))
send_msg_to_payload_tracker(self.payload_tracker, msg, "error", status_msg="evaluation failed", loop=self.loop)
Expand Down
3 changes: 3 additions & 0 deletions grouper/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
message to evaluator
"""
import asyncio
from datetime import datetime
from datetime import timezone
from typing import Dict

from .common import ADVISOR_QUEUE_SIZE
Expand Down Expand Up @@ -151,6 +153,7 @@ async def _send_for_evaluation(self, item: QueueItem, org_id: str, inventory_id:
"org_id": org_id,
},
"platform_metadata": {"request_id": item.request_id},
"timestamp": str(datetime.now(timezone.utc)),
}
if (not item.inventory_changed and not item.advisor_changed) and not CFG.disable_optimisation:
UNCHANGED_SYSTEM.inc()
Expand Down
11 changes: 10 additions & 1 deletion vmaas_sync/vmaas_sync.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""VMaaS sync module."""
import asyncio
import datetime as dt
from datetime import datetime
from datetime import timezone
from typing import Any
from typing import Dict
from typing import Tuple
Expand Down Expand Up @@ -300,7 +302,14 @@ def re_evaluate_systems():
if not rows:
BATCH_SEMAPHORE.release()
break
msgs = [{"type": "re-evaluate_system", "host": {"id": inventory_id, "org_id": org_id}} for inventory_id, _, org_id in rows]
msgs = [
{
"type": "re-evaluate_system",
"host": {"id": inventory_id, "org_id": org_id},
"timestamp": str(datetime.now(timezone.utc)),
}
for inventory_id, _, org_id in rows
]
total_scheduled += len(msgs)
future = EVALUATOR_QUEUE.send_list(msgs, loop=loop)
future.add_done_callback(lambda x: BATCH_SEMAPHORE.release())
Expand Down

0 comments on commit 33f8e98

Please sign in to comment.