Skip to content

Commit

Permalink
Merge pull request #1285 from jertel/jertel/lock
Browse files Browse the repository at this point in the history
Improve aggregated alert logic to avoid missing alerts
  • Loading branch information
nsano-rururu committed Oct 5, 2023
2 parents 6bcdd1e + 277cee7 commit 2008db1
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 130 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- [Helm] Fix commonLabels indentation for the deployment template - [#1250](https://github.com/jertel/elastalert2/pull/1250) - @dan-duffy
- Add support for Kibana 8.10 for Kibana Discover - [#1277](https://github.com/jertel/elastalert2/pull/1277) - @nsano-rururu
- Upgrade pylint 2.17.4 to 2.17.5, pytest 7.3.1 to 7.4.2, sphinx 6.2.1 to 7.2.6, sphinx_rtd_theme 1.2.2 to 1.3.0 - [#1278](https://github.com/jertel/elastalert2/pull/1278) - @nsano-rururu
- Fix issue with aggregated alerts not being sent - [#1285](https://github.com/jertel/elastalert2/pull/1285) - @jertel

# 2.13.2

Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3-slim as builder
FROM python:3.11-slim as builder

LABEL description="ElastAlert 2 Official Image"
LABEL maintainer="Jason Ertel"
Expand All @@ -10,7 +10,7 @@ RUN mkdir -p /opt/elastalert && \
pip install setuptools wheel && \
python setup.py sdist bdist_wheel

FROM python:3-slim
FROM python:3.11-slim

ARG GID=1000
ARG UID=1000
Expand Down
262 changes: 137 additions & 125 deletions elastalert/elastalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ def __init__(self, args):
if self.args.silence:
self.silence()

self.alert_lock = threading.Lock()

@staticmethod
def get_index(rule, starttime=None, endtime=None):
""" Gets the index for a rule. If strftime is set and starttime and endtime
Expand Down Expand Up @@ -1486,72 +1488,73 @@ def find_recent_pending_alerts(self, time_limit):
return []

def send_pending_alerts(self):
pending_alerts = self.find_recent_pending_alerts(self.alert_time_limit)
for alert in pending_alerts:
_id = alert['_id']
alert = alert['_source']
try:
rule_name = alert.pop('rule_name')
alert_time = alert.pop('alert_time')
match_body = alert.pop('match_body')
except KeyError:
# Malformed alert, drop it
continue
with self.alert_lock:
pending_alerts = self.find_recent_pending_alerts(self.alert_time_limit)
for alert in pending_alerts:
_id = alert['_id']
alert = alert['_source']
try:
rule_name = alert.pop('rule_name')
alert_time = alert.pop('alert_time')
match_body = alert.pop('match_body')
except KeyError:
# Malformed alert, drop it
continue

# Find original rule
for rule in self.rules:
if rule['name'] == rule_name:
break
else:
# Original rule is missing, keep alert for later if rule reappears
continue
# Find original rule
for rule in self.rules:
if rule['name'] == rule_name:
break
else:
# Original rule is missing, keep alert for later if rule reappears
continue

# Set current_es for top_count_keys query
self.thread_data.current_es = elasticsearch_client(rule)
# Set current_es for top_count_keys query
self.thread_data.current_es = elasticsearch_client(rule)

# Send the alert unless it's a future alert
if ts_now() > ts_to_dt(alert_time):
aggregated_matches = self.get_aggregated_matches(_id)
if aggregated_matches:
matches = [match_body] + [agg_match['match_body'] for agg_match in aggregated_matches]
self.alert(matches, rule, alert_time=alert_time)
else:
# If this rule isn't using aggregation, this must be a retry of a failed alert
retried = False
if not rule.get('aggregation'):
retried = True
self.alert([match_body], rule, alert_time=alert_time, retried=retried)

if rule['current_aggregate_id']:
for qk, agg_id in rule['current_aggregate_id'].items():
if agg_id == _id:
rule['current_aggregate_id'].pop(qk)
break

# Send the alert unless it's a future alert
if ts_now() > ts_to_dt(alert_time):
aggregated_matches = self.get_aggregated_matches(_id)
if aggregated_matches:
matches = [match_body] + [agg_match['match_body'] for agg_match in aggregated_matches]
self.alert(matches, rule, alert_time=alert_time)
else:
# If this rule isn't using aggregation, this must be a retry of a failed alert
retried = False
if not rule.get('aggregation'):
retried = True
self.alert([match_body], rule, alert_time=alert_time, retried=retried)

if rule['current_aggregate_id']:
for qk, agg_id in rule['current_aggregate_id'].items():
if agg_id == _id:
rule['current_aggregate_id'].pop(qk)
break

# Delete it from the index
try:
self.writeback_es.delete(index=self.writeback_index, id=_id)
except ElasticsearchException: # TODO: Give this a more relevant exception, try:except: is evil.
self.handle_error("Failed to delete alert %s at %s" % (_id, alert_time))
# Delete it from the index
try:
self.writeback_es.delete(index=self.writeback_index, id=_id)
except ElasticsearchException: # TODO: Give this a more relevant exception, try:except: is evil.
self.handle_error("Failed to delete alert %s at %s" % (_id, alert_time))

# Send in memory aggregated alerts
for rule in self.rules:
if rule['agg_matches']:
for aggregation_key_value, aggregate_alert_time in rule['aggregate_alert_time'].items():
if ts_now() > aggregate_alert_time:
alertable_matches = [
agg_match
for agg_match
in rule['agg_matches']
if self.get_aggregation_key_value(rule, agg_match) == aggregation_key_value
]
self.alert(alertable_matches, rule)
rule['agg_matches'] = [
agg_match
for agg_match
in rule['agg_matches']
if self.get_aggregation_key_value(rule, agg_match) != aggregation_key_value
]
# Send in memory aggregated alerts
for rule in self.rules:
if rule['agg_matches']:
for aggregation_key_value, aggregate_alert_time in rule['aggregate_alert_time'].items():
if ts_now() > aggregate_alert_time:
alertable_matches = [
agg_match
for agg_match
in rule['agg_matches']
if self.get_aggregation_key_value(rule, agg_match) == aggregation_key_value
]
self.alert(alertable_matches, rule)
rule['agg_matches'] = [
agg_match
for agg_match
in rule['agg_matches']
if self.get_aggregation_key_value(rule, agg_match) != aggregation_key_value
]

def get_aggregated_matches(self, _id):
""" Removes and returns all matches from writeback_es that have aggregate_id == _id """
Expand Down Expand Up @@ -1591,20 +1594,66 @@ def find_pending_aggregate_alert(self, rule, aggregation_key_value=None):
def add_aggregated_alert(self, match, rule):
""" Save a match as a pending aggregate alert to Elasticsearch. """

# Optionally include the 'aggregation_key' as a dimension for aggregations
aggregation_key_value = self.get_aggregation_key_value(rule, match)

if (not rule['current_aggregate_id'].get(aggregation_key_value) or
('aggregate_alert_time' in rule and aggregation_key_value in rule['aggregate_alert_time'] and rule[
'aggregate_alert_time'].get(aggregation_key_value) < ts_to_dt(lookup_es_key(match, rule['timestamp_field'])))):
with self.alert_lock:
# Optionally include the 'aggregation_key' as a dimension for aggregations
aggregation_key_value = self.get_aggregation_key_value(rule, match)

# ElastAlert may have restarted while pending alerts exist
pending_alert = self.find_pending_aggregate_alert(rule, aggregation_key_value)
if pending_alert:
alert_time = ts_to_dt(pending_alert['_source']['alert_time'])
rule['aggregate_alert_time'][aggregation_key_value] = alert_time
agg_id = pending_alert['_id']
rule['current_aggregate_id'] = {aggregation_key_value: agg_id}
# This is a fallback option in case this change to using ts_now() interferes with the behavior current
# users are accustomed to. It is not documented because it likely won't be needed. If no one reports
# a problem we can remove this fallback option in a future release.
if rule.get('aggregation_alert_time_compared_with_timestamp_field', False):
compare_dt = lookup_es_key(match, rule['timestamp_field'])
else:
compare_dt = ts_now()

if (not rule['current_aggregate_id'].get(aggregation_key_value) or
('aggregate_alert_time' in rule and aggregation_key_value in rule['aggregate_alert_time'] and rule[
'aggregate_alert_time'].get(aggregation_key_value) < ts_to_dt(compare_dt))):

# ElastAlert may have restarted while pending alerts exist
pending_alert = self.find_pending_aggregate_alert(rule, aggregation_key_value)
if pending_alert:
alert_time = ts_to_dt(pending_alert['_source']['alert_time'])
rule['aggregate_alert_time'][aggregation_key_value] = alert_time
agg_id = pending_alert['_id']
rule['current_aggregate_id'] = {aggregation_key_value: agg_id}
elastalert_logger.info(
'Adding alert for %s to aggregation(id: %s, aggregation_key: %s), next alert at %s' % (
rule['name'],
agg_id,
aggregation_key_value,
alert_time
)
)
else:
# First match, set alert_time
alert_time = ''
if isinstance(rule['aggregation'], dict) and rule['aggregation'].get('schedule'):
croniter._datetime_to_timestamp = cronite_datetime_to_timestamp # For Python 2.6 compatibility
try:
iter = croniter(rule['aggregation']['schedule'], ts_now())
alert_time = unix_to_dt(iter.get_next())
except Exception as e:
self.handle_error("Error parsing aggregate send time Cron format %s" % (e), rule['aggregation']['schedule'])
else:
try:
if rule.get('aggregate_by_match_time', False):
match_time = ts_to_dt(lookup_es_key(match, rule['timestamp_field']))
alert_time = match_time + rule['aggregation']
else:
alert_time = ts_now() + rule['aggregation']
except Exception as e:
self.handle_error("[add_aggregated_alert]Error parsing aggregate send time format %s" % (e), rule['aggregation'])

rule['aggregate_alert_time'][aggregation_key_value] = alert_time
agg_id = None
elastalert_logger.info(
'New aggregation for %s, aggregation_key: %s. next alert at %s.' % (rule['name'], aggregation_key_value, alert_time)
)
else:
# Already pending aggregation, use existing alert_time
alert_time = rule['aggregate_alert_time'].get(aggregation_key_value)
agg_id = rule['current_aggregate_id'].get(aggregation_key_value)
elastalert_logger.info(
'Adding alert for %s to aggregation(id: %s, aggregation_key: %s), next alert at %s' % (
rule['name'],
Expand All @@ -1613,60 +1662,23 @@ def add_aggregated_alert(self, match, rule):
alert_time
)
)
else:
# First match, set alert_time
alert_time = ''
if isinstance(rule['aggregation'], dict) and rule['aggregation'].get('schedule'):
croniter._datetime_to_timestamp = cronite_datetime_to_timestamp # For Python 2.6 compatibility
try:
iter = croniter(rule['aggregation']['schedule'], ts_now())
alert_time = unix_to_dt(iter.get_next())
except Exception as e:
self.handle_error("Error parsing aggregate send time Cron format %s" % (e), rule['aggregation']['schedule'])
else:
try:
if rule.get('aggregate_by_match_time', False):
match_time = ts_to_dt(lookup_es_key(match, rule['timestamp_field']))
alert_time = match_time + rule['aggregation']
else:
alert_time = ts_now() + rule['aggregation']
except Exception as e:
self.handle_error("[add_aggregated_alert]Error parsing aggregate send time format %s" % (e), rule['aggregation'])

rule['aggregate_alert_time'][aggregation_key_value] = alert_time
agg_id = None
elastalert_logger.info(
'New aggregation for %s, aggregation_key: %s. next alert at %s.' % (rule['name'], aggregation_key_value, alert_time)
)
else:
# Already pending aggregation, use existing alert_time
alert_time = rule['aggregate_alert_time'].get(aggregation_key_value)
agg_id = rule['current_aggregate_id'].get(aggregation_key_value)
elastalert_logger.info(
'Adding alert for %s to aggregation(id: %s, aggregation_key: %s), next alert at %s' % (
rule['name'],
agg_id,
aggregation_key_value,
alert_time
)
)

alert_body = self.get_alert_body(match, rule, False, alert_time)
if agg_id:
alert_body['aggregate_id'] = agg_id
if aggregation_key_value:
alert_body['aggregation_key'] = aggregation_key_value
res = self.writeback('elastalert', alert_body, rule)
alert_body = self.get_alert_body(match, rule, False, alert_time)
if agg_id:
alert_body['aggregate_id'] = agg_id
if aggregation_key_value:
alert_body['aggregation_key'] = aggregation_key_value
res = self.writeback('elastalert', alert_body, rule)

# If new aggregation, save _id
if res and not agg_id:
rule['current_aggregate_id'][aggregation_key_value] = res['_id']
# If new aggregation, save _id
if res and not agg_id:
rule['current_aggregate_id'][aggregation_key_value] = res['_id']

# Couldn't write the match to ES, save it in memory for now
if not res:
rule['agg_matches'].append(match)
# Couldn't write the match to ES, save it in memory for now
if not res:
rule['agg_matches'].append(match)

return res
return res

def silence(self, silence_cache_key=None):
""" Silence an alert for a period of time. --silence and --rule must be passed as args. """
Expand Down
1 change: 1 addition & 0 deletions elastalert/schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ properties:
type: string
- type: string
aggregation: *timeframe
aggregation_alert_time_compared_with_timestamp_field: {type: boolean}
realert: *timeframe
realert_key: {type: string}
exponential_realert: *timeframe
Expand Down
2 changes: 1 addition & 1 deletion tests/Dockerfile-test
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3-slim
FROM python:3.11-slim

RUN apt update && apt upgrade -y
RUN apt install -y gcc libffi-dev
Expand Down
Loading

0 comments on commit 2008db1

Please sign in to comment.