Skip to content

Commit

Permalink
Merge pull request #86 from TeskaLabs/Enhancement/Remove-exits
Browse files Browse the repository at this point in the history
Remove exits from Kafka handler
  • Loading branch information
mithunbharadwaj authored Feb 21, 2025
2 parents 11533b4 + 68ba7fd commit 4f4edaf
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 143 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@

- Email supports TXT templates

#### 21.02.2025

- Make Kafka handler optional

#### 21.02.2025

- Exit strategy is obsolete


### Bugfix

Expand Down
22 changes: 20 additions & 2 deletions asabiris/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,34 @@ def __init__(self, args=None):
self.EmailOutputService = EmailOutputService(self)

if 'slack' in asab.Config.sections():
# Initialize the SlackOutputService
self.SlackOutputService = SlackOutputService(self)
self.SendSlackOrchestrator = SendSlackOrchestrator(self)

# Only initialize SendSlackOrchestrator if the SlackOutputService client is valid
if self.SlackOutputService.Client is None:
# If client is None, disable Slack orchestrator as well
self.SendSlackOrchestrator = None
else:
# If the client is valid, initialize the orchestrator
self.SendSlackOrchestrator = SendSlackOrchestrator(self)

else:
# If the slack section is not present in the config, set both services to None
self.SlackOutputService = None
self.SendSlackOrchestrator = None

if 'msteams' in asab.Config.sections():
# Initialize the MSTeamsOutputService
self.MSTeamsOutputService = MSTeamsOutputService(self)
self.SendMSTeamsOrchestrator = SendMSTeamsOrchestrator(self)
if self.MSTeamsOutputService.TeamsWebhookUrl is None:
# If client is None, disable MSTeams orchestrator as well
self.SendMSTeamsOrchestrator = None
else:
# If the TeamsWebhookUrl is valid, initialize the orchestrator
self.SendMSTeamsOrchestrator = SendMSTeamsOrchestrator(self)
else:
self.SendMSTeamsOrchestrator = None
self.MSTeamsOutputService = None

if 'sms' in asab.Config.sections():
self.SMSOutputService = SMSOutputService(self)
Expand Down
72 changes: 37 additions & 35 deletions asabiris/handlers/kafkahandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,9 @@ def check_config(config, section, parameter):
try:
value = config.get(section, parameter)
return value
except (configparser.NoOptionError, configparser.NoSectionError):
L.error(
"Configuration parameter '{}' is missing in section '{}'.".format(
parameter, section
)
)
exit()
except (configparser.NoOptionError, configparser.NoSectionError) as e:
L.error("Configuration parameter '{}' is missing in section '{}': {}".format(parameter, section, e))
return None


class KafkaHandler(asab.Service):
Expand All @@ -43,25 +39,37 @@ def __init__(self, app, service_name="KafkaHandler"):
super().__init__(app, service_name)
self.Task = None
self.JinjaService = app.get_service("JinjaService")
# Output services
self.Consumer = None # Ensure Consumer is always initialized

try:
topic = check_config(asab.Config, "kafka", "topic")
group_id = check_config(asab.Config, "kafka", "group_id")
bootstrap_servers = check_config(asab.Config, "kafka", "bootstrap_servers").split(",")
except configparser.NoOptionError:
L.error("Configuration missing. Required parameters: Kafka/group_id/bootstrap_servers")
exit()

self.Consumer = AIOKafkaConsumer(
topic,
group_id=group_id,
bootstrap_servers=bootstrap_servers,
loop=self.App.Loop,
retry_backoff_ms=10000,
auto_offset_reset="earliest",
)
bootstrap_servers = check_config(asab.Config, "kafka", "bootstrap_servers")

if not topic or not group_id or not bootstrap_servers:
L.warning("Kafka configuration is missing. Skipping Kafka initialization.")
return

bootstrap_servers = bootstrap_servers.split(",")

self.Consumer = AIOKafkaConsumer(
topic,
group_id=group_id,
bootstrap_servers=bootstrap_servers,
loop=self.App.Loop,
retry_backoff_ms=10000,
auto_offset_reset="earliest",
)

except Exception as e:
L.error("KafkaHandler initialization failed: {}".format(e), exc_info=True)
self.Consumer = None

async def initialize(self, app):
if self.Consumer is None:
L.warning("Kafka consumer is not initialized. Skipping Kafka initialization.")
return

max_retries = 5
initial_delay = 5 # Initial delay in seconds
max_delay = 300 # Maximum delay in seconds (5 minutes)
Expand All @@ -75,30 +83,29 @@ async def initialize(self, app):
L.warning("No connection to Kafka established. Attempt {} of {}. Retrying in {} seconds... {}".format(
attempt + 1, max_retries, delay, e))
await asyncio.sleep(delay)
# Apply exponential backoff with a cap on the delay
delay = min(delay * 2, max_delay)
else:
L.error("Failed to connect to Kafka after {} attempts. Stopping the app.".format(max_retries))
exit()
L.error("Failed to connect to Kafka after {} attempts.".format(max_retries))
return

self.Task = asyncio.ensure_future(self.consume(), loop=self.App.Loop)

async def finalize(self, app):
await self.Consumer.stop()
if self.Task.exception() is not None:
if self.Consumer is not None:
await self.Consumer.stop()
if self.Task and self.Task.exception():
L.warning("Exception occurred during alert notifications: {}".format(self.Task.exception()))

async def consume(self):
if self.Consumer is None:
return
async for msg in self.Consumer:
try:
msg = msg.value.decode("utf-8")
msg = json.loads(msg)
except (UnicodeDecodeError, json.JSONDecodeError) as e:
L.warning("Failed to decode or parse message: {}".format(e))
continue
except Exception as e:
L.warning("Invalid message format: '{}'".format(e))
continue # Skip to the next message
try:
await self.dispatch(msg)
except Exception as e:
Expand All @@ -113,29 +120,24 @@ async def dispatch(self, msg):

if msg_type == "email":
await self.handle_email(msg)

elif msg_type == "slack":
if self.App.SendSlackOrchestrator is None:
L.warning("Slack service is not configured. Discarding notification.")
return
await self.handle_slack(msg)

elif msg_type == "msteams":
if self.App.SendMSTeamsOrchestrator is None:
L.warning("MS Teams service is not configured. Discarding notification.")
return
await self.handle_msteams(msg)

elif msg_type == "sms":
if self.App.SendSMSOrchestrator is None:
L.warning("SMS service is not configured. Discarding notification.")
return
await self.handle_sms(msg)

else:
L.warning(
"Notification sending failed: Unsupported message type '{}'. "
"Supported types are 'email', 'slack', 'msteams', and 'sms'.".format(msg_type)
"Notification sending failed: Unsupported message type '{}'. Supported types are 'email', 'slack', 'msteams', and 'sms'.".format(msg_type)
)

async def handle_email(self, msg):
Expand Down
Loading

0 comments on commit 4f4edaf

Please sign in to comment.