Skip to content

Commit

Permalink
refactor(listener): run black
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomáš Sasák committed Jun 26, 2023
1 parent 043fe3f commit 72bc843
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 168 deletions.
2 changes: 1 addition & 1 deletion listener/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Message type from inventory can be:
* Deleted system (deleted): System got deleted from inventory.

Listener processes one of this message.
On new system, it creates a row with system info into vulnerability DB (`system_platform`, `repo`, `system_repo` table).
On new system, it creates a row with system info into vulnerability DB (`system_platform`, `repo`, `system_repo` table).
On update it updates already existing system fields only if the package profile changed from the last created/updated event.
On both of these operations, listener prepares a request for VMaaS with provided list of packages from the message and stores it into the db (`vmaas_json` field).
On delete, it marks system as deleted (system gets deleted later, optimization, `when_deleted` field).
Expand Down
14 changes: 6 additions & 8 deletions listener/advisor_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
from common.logging import get_logger
from common.utils import send_msg_to_payload_tracker, reporter_allowed
from common.mqueue import MQWriter, Partitioners
from .common import (CFG, DATABASE_ERROR, UPDATE_SYSTEM_ADVISOR, NEW_SYSTEM_ADVISOR,
DELETED_UPLOADED_ADVISOR, ImportStatus, AdvisorMsg)
from .common import CFG, DATABASE_ERROR, UPDATE_SYSTEM_ADVISOR, NEW_SYSTEM_ADVISOR, DELETED_UPLOADED_ADVISOR, ImportStatus, AdvisorMsg
from .processor import BaseProcessor

LOGGER = get_logger(__name__)
Expand Down Expand Up @@ -102,7 +101,7 @@ def _parse_hits(self, reports: dict, rule_results: dict):
rule_results[cve] = {
"rule_id": rule,
"details": json.dumps(report["details"]),
"mitigation_reason": report["details"]["cves"][cve]
"mitigation_reason": report["details"]["cves"][cve],
}

def _parse_passes(self, passes: dict, rule_results: dict):
Expand All @@ -114,12 +113,12 @@ def _parse_passes(self, passes: dict, rule_results: dict):
rule_results[cve] = {
"rule_id": rule_only,
"details": json.dumps(pass_["details"]),
"mitigation_reason": pass_["details"]["cves"][cve]
"mitigation_reason": pass_["details"]["cves"][cve],
}

def _parse_db_fields(self, msg: AdvisorMsg) -> dict:
"""Prepare DB fields for system_platform table insertion query
from advisor message"""
from advisor message"""
fields = {}
host = msg.msg["input"]["host"]

Expand All @@ -134,8 +133,7 @@ def _parse_db_fields(self, msg: AdvisorMsg) -> dict:
hits = msg.msg["results"].get("reports", [])
self._parse_hits(hits, rule_hits)

results_raw = json.dumps({"rule_hits": rule_hits,
"rule_passes": rule_passes}, sort_keys=True)
results_raw = json.dumps({"rule_hits": rule_hits, "rule_passes": rule_passes}, sort_keys=True)

fields["advisor_checksum"] = hashlib.sha256(results_raw.encode("utf-8")).hexdigest()
fields["rule_results"] = results_raw
Expand Down Expand Up @@ -165,7 +163,7 @@ def _send_for_evaluation(self, org_id: str, inventory_id: str, request_id: str,
"platform_metadata": {
"request_id": request_id,
},
"timestamp": timestamp
"timestamp": timestamp,
}
self.grouper.send(msg, loop=self.loop, key=org_id)

Expand Down
67 changes: 36 additions & 31 deletions listener/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,47 +13,49 @@
CFG = Config()
PROMETHEUS_PORT = CFG.prometheus_port or str(CFG.listener_prometheus_port)


# prometheus metrics only used in old listener
NEW_SYSTEM = Counter('ve_listener_upl_new_system', '# of new systems inserted')
UPDATE_SYSTEM = Counter('ve_listener_upl_update_system', '# of systems updated')
UNCHANGED_SYSTEM = Counter('ve_listener_upl_unchanged_system', '# of system-updates with same vmaas info')
DELETED_UPLOADED = Counter('ve_listener_deleted_uploaded', '# of systems uploaded after being deleted')
UNKNOWN_EVENT_TYPE = Counter('ve_listener_unknown_event_type', '# of msgs with unknown event type')
UNKNOWN_TOPIC = Counter('ve_listener_unknown_topic', '# of msgs received on unsupported topic')
NEW_SYSTEM = Counter("ve_listener_upl_new_system", "# of new systems inserted")
UPDATE_SYSTEM = Counter("ve_listener_upl_update_system", "# of systems updated")
UNCHANGED_SYSTEM = Counter("ve_listener_upl_unchanged_system", "# of system-updates with same vmaas info")
DELETED_UPLOADED = Counter("ve_listener_deleted_uploaded", "# of systems uploaded after being deleted")
UNKNOWN_EVENT_TYPE = Counter("ve_listener_unknown_event_type", "# of msgs with unknown event type")
UNKNOWN_TOPIC = Counter("ve_listener_unknown_topic", "# of msgs received on unsupported topic")

# prometheus metrics
NEW_SYSTEM_INVENTORY = Counter('ve_listener_upl_new_system_inv', '# of new systems inserted from inventory')
NEW_SYSTEM_ADVISOR = Counter('ve_listener_upl_new_system_adv', '# of new systems inserted from advisor')
UPDATE_SYSTEM_INVENTORY = Counter('ve_listener_upl_update_system_inv', '# of systems updated from inventory')
UPDATE_SYSTEM_ADVISOR = Counter('ve_listener_upl_update_system_adv', '# of systems updated from advisor')
DELETED_UPLOADED_INVENTORY = Counter('ve_listener_deleted_uploaded_inv', '# of systems uploaded from inventory after being deleted')
DELETED_UPLOADED_ADVISOR = Counter('ve_listener_deleted_uploaded_adv', '# of systems uploaded from advisor after being deleted')

DELETED_SYSTEM = Counter('ve_listener_deleted_system', '# of systems deleted')
DELETED_SYSTEM_NOT_FOUND = Counter('ve_listener_deleted_system_nf', '# of systems to delete but not found')
UPDATED_SYSTEM = Counter('ve_listener_updated_system', '# of systems updated')
UPDATED_SYSTEM_NOT_FOUND = Counter('ve_listener_updated_system_nf', '# of systems to update but not found')

PROCESS_MESSAGES = Counter('ve_listener_messages_processed', '# of messages processed')
SKIPPED_MESSAGES = Counter('ve_listener_messages_skipped', '# of messages skipped')
MESSAGE_PARSE_ERROR = Counter('ve_listener_message_parse_error', '# of message parse errors')

DATABASE_ERROR = Counter('ve_listener_database_error', '# of database errors')
UPLOAD_NO_RPMDB = Counter('ve_listener_upl_no_rpmdb', '# of systems ignored due to missing rpmdb')

NEW_REPO = Counter('ve_listener_upl_new_repo', '# of new repos inserted')
NEW_RH_ACCOUNT = Counter('ve_listener_upl_new_rh_account', '# of new rh accounts inserted')
NEW_SYSTEM_REPO = Counter('ve_listener_upl_new_system_repo', '# of new system_repo pairs inserted')
DELETED_SYSTEM_REPO = Counter('ve_listener_upl_system_repo_deleted', '# deleted system_repo pairs')

RHUI_PATH_PART = '/rhui/'
NEW_SYSTEM_INVENTORY = Counter("ve_listener_upl_new_system_inv", "# of new systems inserted from inventory")
NEW_SYSTEM_ADVISOR = Counter("ve_listener_upl_new_system_adv", "# of new systems inserted from advisor")
UPDATE_SYSTEM_INVENTORY = Counter("ve_listener_upl_update_system_inv", "# of systems updated from inventory")
UPDATE_SYSTEM_ADVISOR = Counter("ve_listener_upl_update_system_adv", "# of systems updated from advisor")
DELETED_UPLOADED_INVENTORY = Counter("ve_listener_deleted_uploaded_inv", "# of systems uploaded from inventory after being deleted")
DELETED_UPLOADED_ADVISOR = Counter("ve_listener_deleted_uploaded_adv", "# of systems uploaded from advisor after being deleted")

DELETED_SYSTEM = Counter("ve_listener_deleted_system", "# of systems deleted")
DELETED_SYSTEM_NOT_FOUND = Counter("ve_listener_deleted_system_nf", "# of systems to delete but not found")
UPDATED_SYSTEM = Counter("ve_listener_updated_system", "# of systems updated")
UPDATED_SYSTEM_NOT_FOUND = Counter("ve_listener_updated_system_nf", "# of systems to update but not found")

PROCESS_MESSAGES = Counter("ve_listener_messages_processed", "# of messages processed")
SKIPPED_MESSAGES = Counter("ve_listener_messages_skipped", "# of messages skipped")
MESSAGE_PARSE_ERROR = Counter("ve_listener_message_parse_error", "# of message parse errors")

DATABASE_ERROR = Counter("ve_listener_database_error", "# of database errors")
UPLOAD_NO_RPMDB = Counter("ve_listener_upl_no_rpmdb", "# of systems ignored due to missing rpmdb")

NEW_REPO = Counter("ve_listener_upl_new_repo", "# of new repos inserted")
NEW_RH_ACCOUNT = Counter("ve_listener_upl_new_rh_account", "# of new rh accounts inserted")
NEW_SYSTEM_REPO = Counter("ve_listener_upl_new_system_repo", "# of new system_repo pairs inserted")
DELETED_SYSTEM_REPO = Counter("ve_listener_upl_system_repo_deleted", "# deleted system_repo pairs")

RHUI_PATH_PART = "/rhui/"
REPO_PATH_PATTERN = re.compile("(/content/.*)")
REPO_BASEARCH_PLACEHOLDER = "$basearch"
REPO_RELEASEVER_PLACEHOLDER = "$releasever"


class ImportStatus(flags.Flags):
"""Import to database status."""

INSERTED = 1
CHANGED = 2
UPDATED = 4
Expand All @@ -62,6 +64,7 @@ class ImportStatus(flags.Flags):

class InventoryMsgType(Enum):
"""Inventory message type enum"""

UNKNOWN = 0
CREATED = 1
UPDATED = 2
Expand All @@ -71,11 +74,13 @@ class InventoryMsgType(Enum):
@dataclass
class InventoryMsg:
"""Represents single inventory message"""

msg_type: InventoryMsgType
msg: dict


@dataclass
class AdvisorMsg:
"""Represents single advisor message"""

msg: dict
Loading

0 comments on commit 72bc843

Please sign in to comment.