Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve aggregated alert logic to avoid missing alerts #1285

Merged
merged 5 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- [Helm] Fix commonLabels indentation for the deployment template - [#1250](https://github.com/jertel/elastalert2/pull/1250) - @dan-duffy
- Add support for Kibana 8.10 for Kibana Discover - [#1277](https://github.com/jertel/elastalert2/pull/1277) - @nsano-rururu
- Upgrade pylint 2.17.4 to 2.17.5, pytest 7.3.1 to 7.4.2, sphinx 6.2.1 to 7.2.6, sphinx_rtd_theme 1.2.2 to 1.3.0 - [#1278](https://github.com/jertel/elastalert2/pull/1278) - @nsano-rururu
- Fix issue with aggregated alerts not being sent - [#1285](https://github.com/jertel/elastalert2/pull/1285) - @jertel

# 2.13.2

Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3-slim as builder
FROM python:3.11-slim as builder

LABEL description="ElastAlert 2 Official Image"
LABEL maintainer="Jason Ertel"
Expand All @@ -10,7 +10,7 @@ RUN mkdir -p /opt/elastalert && \
pip install setuptools wheel && \
python setup.py sdist bdist_wheel

FROM python:3-slim
FROM python:3.11-slim

ARG GID=1000
ARG UID=1000
Expand Down
262 changes: 137 additions & 125 deletions elastalert/elastalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ def __init__(self, args):
if self.args.silence:
self.silence()

self.alert_lock = threading.Lock()

@staticmethod
def get_index(rule, starttime=None, endtime=None):
""" Gets the index for a rule. If strftime is set and starttime and endtime
Expand Down Expand Up @@ -1486,72 +1488,73 @@ def find_recent_pending_alerts(self, time_limit):
return []

def send_pending_alerts(self):
pending_alerts = self.find_recent_pending_alerts(self.alert_time_limit)
for alert in pending_alerts:
_id = alert['_id']
alert = alert['_source']
try:
rule_name = alert.pop('rule_name')
alert_time = alert.pop('alert_time')
match_body = alert.pop('match_body')
except KeyError:
# Malformed alert, drop it
continue
with self.alert_lock:
pending_alerts = self.find_recent_pending_alerts(self.alert_time_limit)
for alert in pending_alerts:
_id = alert['_id']
alert = alert['_source']
try:
rule_name = alert.pop('rule_name')
alert_time = alert.pop('alert_time')
match_body = alert.pop('match_body')
except KeyError:
# Malformed alert, drop it
continue

# Find original rule
for rule in self.rules:
if rule['name'] == rule_name:
break
else:
# Original rule is missing, keep alert for later if rule reappears
continue
# Find original rule
for rule in self.rules:
if rule['name'] == rule_name:
break
else:
# Original rule is missing, keep alert for later if rule reappears
continue

# Set current_es for top_count_keys query
self.thread_data.current_es = elasticsearch_client(rule)
# Set current_es for top_count_keys query
self.thread_data.current_es = elasticsearch_client(rule)

# Send the alert unless it's a future alert
if ts_now() > ts_to_dt(alert_time):
aggregated_matches = self.get_aggregated_matches(_id)
if aggregated_matches:
matches = [match_body] + [agg_match['match_body'] for agg_match in aggregated_matches]
self.alert(matches, rule, alert_time=alert_time)
else:
# If this rule isn't using aggregation, this must be a retry of a failed alert
retried = False
if not rule.get('aggregation'):
retried = True
self.alert([match_body], rule, alert_time=alert_time, retried=retried)

if rule['current_aggregate_id']:
for qk, agg_id in rule['current_aggregate_id'].items():
if agg_id == _id:
rule['current_aggregate_id'].pop(qk)
break

# Send the alert unless it's a future alert
if ts_now() > ts_to_dt(alert_time):
aggregated_matches = self.get_aggregated_matches(_id)
if aggregated_matches:
matches = [match_body] + [agg_match['match_body'] for agg_match in aggregated_matches]
self.alert(matches, rule, alert_time=alert_time)
else:
# If this rule isn't using aggregation, this must be a retry of a failed alert
retried = False
if not rule.get('aggregation'):
retried = True
self.alert([match_body], rule, alert_time=alert_time, retried=retried)

if rule['current_aggregate_id']:
for qk, agg_id in rule['current_aggregate_id'].items():
if agg_id == _id:
rule['current_aggregate_id'].pop(qk)
break

# Delete it from the index
try:
self.writeback_es.delete(index=self.writeback_index, id=_id)
except ElasticsearchException: # TODO: Give this a more relevant exception, try:except: is evil.
self.handle_error("Failed to delete alert %s at %s" % (_id, alert_time))
# Delete it from the index
try:
self.writeback_es.delete(index=self.writeback_index, id=_id)
except ElasticsearchException: # TODO: Give this a more relevant exception, try:except: is evil.
self.handle_error("Failed to delete alert %s at %s" % (_id, alert_time))

# Send in memory aggregated alerts
for rule in self.rules:
if rule['agg_matches']:
for aggregation_key_value, aggregate_alert_time in rule['aggregate_alert_time'].items():
if ts_now() > aggregate_alert_time:
alertable_matches = [
agg_match
for agg_match
in rule['agg_matches']
if self.get_aggregation_key_value(rule, agg_match) == aggregation_key_value
]
self.alert(alertable_matches, rule)
rule['agg_matches'] = [
agg_match
for agg_match
in rule['agg_matches']
if self.get_aggregation_key_value(rule, agg_match) != aggregation_key_value
]
# Send in memory aggregated alerts
for rule in self.rules:
if rule['agg_matches']:
for aggregation_key_value, aggregate_alert_time in rule['aggregate_alert_time'].items():
if ts_now() > aggregate_alert_time:
alertable_matches = [
agg_match
for agg_match
in rule['agg_matches']
if self.get_aggregation_key_value(rule, agg_match) == aggregation_key_value
]
self.alert(alertable_matches, rule)
rule['agg_matches'] = [
agg_match
for agg_match
in rule['agg_matches']
if self.get_aggregation_key_value(rule, agg_match) != aggregation_key_value
]

def get_aggregated_matches(self, _id):
""" Removes and returns all matches from writeback_es that have aggregate_id == _id """
Expand Down Expand Up @@ -1591,20 +1594,66 @@ def find_pending_aggregate_alert(self, rule, aggregation_key_value=None):
def add_aggregated_alert(self, match, rule):
""" Save a match as a pending aggregate alert to Elasticsearch. """

# Optionally include the 'aggregation_key' as a dimension for aggregations
aggregation_key_value = self.get_aggregation_key_value(rule, match)

if (not rule['current_aggregate_id'].get(aggregation_key_value) or
('aggregate_alert_time' in rule and aggregation_key_value in rule['aggregate_alert_time'] and rule[
'aggregate_alert_time'].get(aggregation_key_value) < ts_to_dt(lookup_es_key(match, rule['timestamp_field'])))):
with self.alert_lock:
# Optionally include the 'aggregation_key' as a dimension for aggregations
aggregation_key_value = self.get_aggregation_key_value(rule, match)

# ElastAlert may have restarted while pending alerts exist
pending_alert = self.find_pending_aggregate_alert(rule, aggregation_key_value)
if pending_alert:
alert_time = ts_to_dt(pending_alert['_source']['alert_time'])
rule['aggregate_alert_time'][aggregation_key_value] = alert_time
agg_id = pending_alert['_id']
rule['current_aggregate_id'] = {aggregation_key_value: agg_id}
# This is a fallback option in case this change to using ts_now() interferes with the behavior current
# users are accustomed to. It is not documented because it likely won't be needed. If no one reports
# a problem we can remove this fallback option in a future release.
if rule.get('aggregation_alert_time_compared_with_timestamp_field', False):
compare_dt = lookup_es_key(match, rule['timestamp_field'])
else:
compare_dt = ts_now()

if (not rule['current_aggregate_id'].get(aggregation_key_value) or
('aggregate_alert_time' in rule and aggregation_key_value in rule['aggregate_alert_time'] and rule[
'aggregate_alert_time'].get(aggregation_key_value) < ts_to_dt(compare_dt))):

# ElastAlert may have restarted while pending alerts exist
pending_alert = self.find_pending_aggregate_alert(rule, aggregation_key_value)
if pending_alert:
alert_time = ts_to_dt(pending_alert['_source']['alert_time'])
rule['aggregate_alert_time'][aggregation_key_value] = alert_time
agg_id = pending_alert['_id']
rule['current_aggregate_id'] = {aggregation_key_value: agg_id}
elastalert_logger.info(
'Adding alert for %s to aggregation(id: %s, aggregation_key: %s), next alert at %s' % (
rule['name'],
agg_id,
aggregation_key_value,
alert_time
)
)
else:
# First match, set alert_time
alert_time = ''
if isinstance(rule['aggregation'], dict) and rule['aggregation'].get('schedule'):
croniter._datetime_to_timestamp = cronite_datetime_to_timestamp # For Python 2.6 compatibility
try:
iter = croniter(rule['aggregation']['schedule'], ts_now())
alert_time = unix_to_dt(iter.get_next())
except Exception as e:
self.handle_error("Error parsing aggregate send time Cron format %s" % (e), rule['aggregation']['schedule'])
else:
try:
if rule.get('aggregate_by_match_time', False):
match_time = ts_to_dt(lookup_es_key(match, rule['timestamp_field']))
alert_time = match_time + rule['aggregation']
else:
alert_time = ts_now() + rule['aggregation']
except Exception as e:
self.handle_error("[add_aggregated_alert]Error parsing aggregate send time format %s" % (e), rule['aggregation'])

rule['aggregate_alert_time'][aggregation_key_value] = alert_time
agg_id = None
elastalert_logger.info(
'New aggregation for %s, aggregation_key: %s. next alert at %s.' % (rule['name'], aggregation_key_value, alert_time)
)
else:
# Already pending aggregation, use existing alert_time
alert_time = rule['aggregate_alert_time'].get(aggregation_key_value)
agg_id = rule['current_aggregate_id'].get(aggregation_key_value)
elastalert_logger.info(
'Adding alert for %s to aggregation(id: %s, aggregation_key: %s), next alert at %s' % (
rule['name'],
Expand All @@ -1613,60 +1662,23 @@ def add_aggregated_alert(self, match, rule):
alert_time
)
)
else:
# First match, set alert_time
alert_time = ''
if isinstance(rule['aggregation'], dict) and rule['aggregation'].get('schedule'):
croniter._datetime_to_timestamp = cronite_datetime_to_timestamp # For Python 2.6 compatibility
try:
iter = croniter(rule['aggregation']['schedule'], ts_now())
alert_time = unix_to_dt(iter.get_next())
except Exception as e:
self.handle_error("Error parsing aggregate send time Cron format %s" % (e), rule['aggregation']['schedule'])
else:
try:
if rule.get('aggregate_by_match_time', False):
match_time = ts_to_dt(lookup_es_key(match, rule['timestamp_field']))
alert_time = match_time + rule['aggregation']
else:
alert_time = ts_now() + rule['aggregation']
except Exception as e:
self.handle_error("[add_aggregated_alert]Error parsing aggregate send time format %s" % (e), rule['aggregation'])

rule['aggregate_alert_time'][aggregation_key_value] = alert_time
agg_id = None
elastalert_logger.info(
'New aggregation for %s, aggregation_key: %s. next alert at %s.' % (rule['name'], aggregation_key_value, alert_time)
)
else:
# Already pending aggregation, use existing alert_time
alert_time = rule['aggregate_alert_time'].get(aggregation_key_value)
agg_id = rule['current_aggregate_id'].get(aggregation_key_value)
elastalert_logger.info(
'Adding alert for %s to aggregation(id: %s, aggregation_key: %s), next alert at %s' % (
rule['name'],
agg_id,
aggregation_key_value,
alert_time
)
)

alert_body = self.get_alert_body(match, rule, False, alert_time)
if agg_id:
alert_body['aggregate_id'] = agg_id
if aggregation_key_value:
alert_body['aggregation_key'] = aggregation_key_value
res = self.writeback('elastalert', alert_body, rule)
alert_body = self.get_alert_body(match, rule, False, alert_time)
if agg_id:
alert_body['aggregate_id'] = agg_id
if aggregation_key_value:
alert_body['aggregation_key'] = aggregation_key_value
res = self.writeback('elastalert', alert_body, rule)

# If new aggregation, save _id
if res and not agg_id:
rule['current_aggregate_id'][aggregation_key_value] = res['_id']
# If new aggregation, save _id
if res and not agg_id:
rule['current_aggregate_id'][aggregation_key_value] = res['_id']

# Couldn't write the match to ES, save it in memory for now
if not res:
rule['agg_matches'].append(match)
# Couldn't write the match to ES, save it in memory for now
if not res:
rule['agg_matches'].append(match)

return res
return res

def silence(self, silence_cache_key=None):
""" Silence an alert for a period of time. --silence and --rule must be passed as args. """
Expand Down
1 change: 1 addition & 0 deletions elastalert/schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ properties:
type: string
- type: string
aggregation: *timeframe
aggregation_alert_time_compared_with_timestamp_field: {type: boolean}
realert: *timeframe
realert_key: {type: string}
exponential_realert: *timeframe
Expand Down
2 changes: 1 addition & 1 deletion tests/Dockerfile-test
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3-slim
FROM python:3.11-slim

RUN apt update && apt upgrade -y
RUN apt install -y gcc libffi-dev
Expand Down
Loading
Loading