Skip to content

Commit

Permalink
refactor(listener): run black
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomáš Sasák committed Apr 6, 2023
1 parent e893ccc commit f157c81
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 83 deletions.
52 changes: 27 additions & 25 deletions listener/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,37 @@
PROMETHEUS_PORT = CFG.prometheus_port or str(CFG.listener_prometheus_port)

# prometheus metrics
NEW_SYSTEM = Counter('ve_listener_upl_new_system', '# of new systems inserted')
UPDATE_SYSTEM = Counter('ve_listener_upl_update_system', '# of systems updated')
UNCHANGED_SYSTEM = Counter('ve_listener_upl_unchanged_system', '# of system-updates with same vmaas info')
DELETED_SYSTEM = Counter('ve_listener_deleted_system', '# of systems deleted')
DELETED_SYSTEM_NOT_FOUND = Counter('ve_listener_deleted_system_nf', '# of systems to delete but not found')
UPDATED_SYSTEM = Counter('ve_listener_updated_system', '# of systems updated')
UPDATED_SYSTEM_NOT_FOUND = Counter('ve_listener_updated_system_nf', '# of systems to update but not found')
DELETED_UPLOADED = Counter('ve_listener_deleted_uploaded', '# of systems uploaded after being deleted')
PROCESS_MESSAGES = Counter('ve_listener_messages_processed', '# of messages processed')
SKIPPED_MESSAGES = Counter('ve_listener_messages_skipped', '# of messages skipped')
UNKNOWN_EVENT_TYPE = Counter('ve_listener_unknown_event_type', '# of msgs with unknown event type')
UNKNOWN_TOPIC = Counter('ve_listener_unknown_topic', '# of msgs received on unsupported topic')
DATABASE_ERROR = Counter('ve_listener_database_error', '# of database errors')
UPLOAD_NO_RPMDB = Counter('ve_listener_upl_no_rpmdb', '# of systems ignored due to missing rpmdb')
MESSAGE_PARSE_ERROR = Counter('ve_listener_message_parse_error', '# of message parse errors')
NEW_REPO = Counter('ve_listener_upl_new_repo', '# of new repos inserted')
NEW_RH_ACCOUNT = Counter('ve_listener_upl_new_rh_account', '# of new rh accounts inserted')
NEW_SYSTEM_REPO = Counter('ve_listener_upl_new_system_repo', '# of new system_repo pairs inserted')
DELETED_SYSTEM_REPO = Counter('ve_listener_upl_system_repo_deleted', '# deleted system_repo pairs')
INVALID_IDENTITY = Counter('ve_listener_upl_invalid_identity',
'# of skipped uploads because of invalid identity')
MISSING_INSIGHTS_ENTITLEMENT = Counter('ve_listener_upl_non_insights_entitlement',
'# of skipped uploads because of entitlement check')

RHUI_PATH_PART = '/rhui/'
NEW_SYSTEM = Counter("ve_listener_upl_new_system", "# of new systems inserted")
UPDATE_SYSTEM = Counter("ve_listener_upl_update_system", "# of systems updated")
UNCHANGED_SYSTEM = Counter("ve_listener_upl_unchanged_system", "# of system-updates with same vmaas info")
DELETED_SYSTEM = Counter("ve_listener_deleted_system", "# of systems deleted")
DELETED_SYSTEM_NOT_FOUND = Counter("ve_listener_deleted_system_nf", "# of systems to delete but not found")
UPDATED_SYSTEM = Counter("ve_listener_updated_system", "# of systems updated")
UPDATED_SYSTEM_NOT_FOUND = Counter("ve_listener_updated_system_nf", "# of systems to update but not found")
DELETED_UPLOADED = Counter("ve_listener_deleted_uploaded", "# of systems uploaded after being deleted")
PROCESS_MESSAGES = Counter("ve_listener_messages_processed", "# of messages processed")
SKIPPED_MESSAGES = Counter("ve_listener_messages_skipped", "# of messages skipped")
UNKNOWN_EVENT_TYPE = Counter("ve_listener_unknown_event_type", "# of msgs with unknown event type")
UNKNOWN_TOPIC = Counter("ve_listener_unknown_topic", "# of msgs received on unsupported topic")
DATABASE_ERROR = Counter("ve_listener_database_error", "# of database errors")
UPLOAD_NO_RPMDB = Counter("ve_listener_upl_no_rpmdb", "# of systems ignored due to missing rpmdb")
MESSAGE_PARSE_ERROR = Counter("ve_listener_message_parse_error", "# of message parse errors")
NEW_REPO = Counter("ve_listener_upl_new_repo", "# of new repos inserted")
NEW_RH_ACCOUNT = Counter("ve_listener_upl_new_rh_account", "# of new rh accounts inserted")
NEW_SYSTEM_REPO = Counter("ve_listener_upl_new_system_repo", "# of new system_repo pairs inserted")
DELETED_SYSTEM_REPO = Counter("ve_listener_upl_system_repo_deleted", "# deleted system_repo pairs")
INVALID_IDENTITY = Counter("ve_listener_upl_invalid_identity", "# of skipped uploads because of invalid identity")
MISSING_INSIGHTS_ENTITLEMENT = Counter("ve_listener_upl_non_insights_entitlement", "# of skipped uploads because of entitlement check")

RHUI_PATH_PART = "/rhui/"
REPO_PATH_PATTERN = re.compile("(/content/.*)")
REPO_BASEARCH_PLACEHOLDER = "$basearch"
REPO_RELEASEVER_PLACEHOLDER = "$releasever"


class ImportStatus(flags.Flags):
"""Import to database status."""

INSERTED = 1
CHANGED = 2
UPDATED = 4
Expand All @@ -55,6 +54,7 @@ class ImportStatus(flags.Flags):

class InventoryMsgType(Enum):
"""Inventory message type enum"""

UNKNOWN = 0
CREATED = 1
UPDATED = 2
Expand All @@ -76,13 +76,15 @@ def __init__(self, inventory_msg: dict, inventory_msg_type: InventoryMsgType, ad
def create_task_and_log(coro, logger, loop):
# pylint: disable=broad-except
"""Create asyncio task and log its result if its exception"""

def _log(task):
try:
task.result()
except asyncio.CancelledError:
pass
except Exception as exc:
logger.exception(exc)

task = loop.create_task(coro)
task.add_done_callback(_log)
return task
18 changes: 12 additions & 6 deletions listener/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,16 @@
from common.mqueue import MQReader, MQWriter
from common.identity import get_identity, is_entitled_insights
from common.database_handler import setup_db_pool
from .common import (CFG, PROMETHEUS_PORT, SKIPPED_MESSAGES, INVALID_IDENTITY, MISSING_INSIGHTS_ENTITLEMENT,
MESSAGE_PARSE_ERROR, InventoryMsgType, create_task_and_log)
from .common import (
CFG,
PROMETHEUS_PORT,
SKIPPED_MESSAGES,
INVALID_IDENTITY,
MISSING_INSIGHTS_ENTITLEMENT,
MESSAGE_PARSE_ERROR,
InventoryMsgType,
create_task_and_log,
)
from .queue import ListenerQueue

LOGGER = get_logger(__name__)
Expand Down Expand Up @@ -77,7 +85,7 @@ def _validate_deleted(self, msg: dict):

def validate_message(self, msg: dict) -> InventoryMsgType:
"""Validate inventory message, returns its type. Increments
prometheus counters"""
prometheus counters"""
if msg.get("type") == "created":
self._validate_created_updated(msg)
return InventoryMsgType.CREATED
Expand All @@ -93,9 +101,7 @@ def validate_message(self, msg: dict) -> InventoryMsgType:
class AdvisorMsgValidator:
"""Advisor message validator"""

_required_msg_fields = {
"input": [{"host": [("insights_id", False)]}]
}
_required_msg_fields = {"input": [{"host": [("insights_id", False)]}]}

def __init__(self, payload_tracker):
"""Constructor"""
Expand Down
Loading

0 comments on commit f157c81

Please sign in to comment.