diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e4bf241..2e46383b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - [Helm] Fix commonLabels indentation for the deployment template - [#1250](https://github.com/jertel/elastalert2/pull/1250) - @dan-duffy - Add support for Kibana 8.10 for Kibana Discover - [#1277](https://github.com/jertel/elastalert2/pull/1277) - @nsano-rururu - Upgrade pylint 2.17.4 to 2.17.5, pytest 7.3.1 to 7.4.2, sphinx 6.2.1 to 7.2.6, sphinx_rtd_theme 1.2.2 to 1.3.0 - [#1278](https://github.com/jertel/elastalert2/pull/1278) - @nsano-rururu +- Fix issue with aggregated alerts not being sent - [#1285](https://github.com/jertel/elastalert2/pull/1285) - @jertel # 2.13.2 diff --git a/Dockerfile b/Dockerfile index 56ca1b34..a09b6706 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3-slim as builder +FROM python:3.11-slim as builder LABEL description="ElastAlert 2 Official Image" LABEL maintainer="Jason Ertel" @@ -10,7 +10,7 @@ RUN mkdir -p /opt/elastalert && \ pip install setuptools wheel && \ python setup.py sdist bdist_wheel -FROM python:3-slim +FROM python:3.11-slim ARG GID=1000 ARG UID=1000 diff --git a/elastalert/elastalert.py b/elastalert/elastalert.py index ae8c93bd..28de6e8e 100755 --- a/elastalert/elastalert.py +++ b/elastalert/elastalert.py @@ -183,6 +183,8 @@ def __init__(self, args): if self.args.silence: self.silence() + self.alert_lock = threading.Lock() + @staticmethod def get_index(rule, starttime=None, endtime=None): """ Gets the index for a rule. If strftime is set and starttime and endtime @@ -1486,72 +1488,73 @@ def find_recent_pending_alerts(self, time_limit): return [] def send_pending_alerts(self): - pending_alerts = self.find_recent_pending_alerts(self.alert_time_limit) - for alert in pending_alerts: - _id = alert['_id'] - alert = alert['_source'] - try: - rule_name = alert.pop('rule_name') - alert_time = alert.pop('alert_time') - match_body = alert.pop('match_body') - except KeyError: - # Malformed alert, drop it - continue + with self.alert_lock: + pending_alerts = self.find_recent_pending_alerts(self.alert_time_limit) + for alert in pending_alerts: + _id = alert['_id'] + alert = alert['_source'] + try: + rule_name = alert.pop('rule_name') + alert_time = alert.pop('alert_time') + match_body = alert.pop('match_body') + except KeyError: + # Malformed alert, drop it + continue - # Find original rule - for rule in self.rules: - if rule['name'] == rule_name: - break - else: - # Original rule is missing, keep alert for later if rule reappears - continue + # Find original rule + for rule in self.rules: + if rule['name'] == rule_name: + break + else: + # Original rule is missing, keep alert for later if rule reappears + continue - # Set current_es for top_count_keys query - self.thread_data.current_es = elasticsearch_client(rule) + # Set current_es for top_count_keys query + self.thread_data.current_es = elasticsearch_client(rule) + + # Send the alert unless it's a future alert + if ts_now() > ts_to_dt(alert_time): + aggregated_matches = self.get_aggregated_matches(_id) + if aggregated_matches: + matches = [match_body] + [agg_match['match_body'] for agg_match in aggregated_matches] + self.alert(matches, rule, alert_time=alert_time) + else: + # If this rule isn't using aggregation, this must be a retry of a failed alert + retried = False + if not rule.get('aggregation'): + retried = True + self.alert([match_body], rule, alert_time=alert_time, retried=retried) + + if rule['current_aggregate_id']: + for qk, agg_id in rule['current_aggregate_id'].items(): + if agg_id == _id: + rule['current_aggregate_id'].pop(qk) + break - # Send the alert unless it's a future alert - if ts_now() > ts_to_dt(alert_time): - aggregated_matches = self.get_aggregated_matches(_id) - if aggregated_matches: - matches = [match_body] + [agg_match['match_body'] for agg_match in aggregated_matches] - self.alert(matches, rule, alert_time=alert_time) - else: - # If this rule isn't using aggregation, this must be a retry of a failed alert - retried = False - if not rule.get('aggregation'): - retried = True - self.alert([match_body], rule, alert_time=alert_time, retried=retried) - - if rule['current_aggregate_id']: - for qk, agg_id in rule['current_aggregate_id'].items(): - if agg_id == _id: - rule['current_aggregate_id'].pop(qk) - break - - # Delete it from the index - try: - self.writeback_es.delete(index=self.writeback_index, id=_id) - except ElasticsearchException: # TODO: Give this a more relevant exception, try:except: is evil. - self.handle_error("Failed to delete alert %s at %s" % (_id, alert_time)) + # Delete it from the index + try: + self.writeback_es.delete(index=self.writeback_index, id=_id) + except ElasticsearchException: # TODO: Give this a more relevant exception, try:except: is evil. + self.handle_error("Failed to delete alert %s at %s" % (_id, alert_time)) - # Send in memory aggregated alerts - for rule in self.rules: - if rule['agg_matches']: - for aggregation_key_value, aggregate_alert_time in rule['aggregate_alert_time'].items(): - if ts_now() > aggregate_alert_time: - alertable_matches = [ - agg_match - for agg_match - in rule['agg_matches'] - if self.get_aggregation_key_value(rule, agg_match) == aggregation_key_value - ] - self.alert(alertable_matches, rule) - rule['agg_matches'] = [ - agg_match - for agg_match - in rule['agg_matches'] - if self.get_aggregation_key_value(rule, agg_match) != aggregation_key_value - ] + # Send in memory aggregated alerts + for rule in self.rules: + if rule['agg_matches']: + for aggregation_key_value, aggregate_alert_time in rule['aggregate_alert_time'].items(): + if ts_now() > aggregate_alert_time: + alertable_matches = [ + agg_match + for agg_match + in rule['agg_matches'] + if self.get_aggregation_key_value(rule, agg_match) == aggregation_key_value + ] + self.alert(alertable_matches, rule) + rule['agg_matches'] = [ + agg_match + for agg_match + in rule['agg_matches'] + if self.get_aggregation_key_value(rule, agg_match) != aggregation_key_value + ] def get_aggregated_matches(self, _id): """ Removes and returns all matches from writeback_es that have aggregate_id == _id """ @@ -1591,20 +1594,66 @@ def find_pending_aggregate_alert(self, rule, aggregation_key_value=None): def add_aggregated_alert(self, match, rule): """ Save a match as a pending aggregate alert to Elasticsearch. """ - # Optionally include the 'aggregation_key' as a dimension for aggregations - aggregation_key_value = self.get_aggregation_key_value(rule, match) - - if (not rule['current_aggregate_id'].get(aggregation_key_value) or - ('aggregate_alert_time' in rule and aggregation_key_value in rule['aggregate_alert_time'] and rule[ - 'aggregate_alert_time'].get(aggregation_key_value) < ts_to_dt(lookup_es_key(match, rule['timestamp_field'])))): + with self.alert_lock: + # Optionally include the 'aggregation_key' as a dimension for aggregations + aggregation_key_value = self.get_aggregation_key_value(rule, match) - # ElastAlert may have restarted while pending alerts exist - pending_alert = self.find_pending_aggregate_alert(rule, aggregation_key_value) - if pending_alert: - alert_time = ts_to_dt(pending_alert['_source']['alert_time']) - rule['aggregate_alert_time'][aggregation_key_value] = alert_time - agg_id = pending_alert['_id'] - rule['current_aggregate_id'] = {aggregation_key_value: agg_id} + # This is a fallback option in case this change to using ts_now() interferes with the behavior current + # users are accustomed to. It is not documented because it likely won't be needed. If no one reports + # a problem we can remove this fallback option in a future release. + if rule.get('aggregation_alert_time_compared_with_timestamp_field', False): + compare_dt = lookup_es_key(match, rule['timestamp_field']) + else: + compare_dt = ts_now() + + if (not rule['current_aggregate_id'].get(aggregation_key_value) or + ('aggregate_alert_time' in rule and aggregation_key_value in rule['aggregate_alert_time'] and rule[ + 'aggregate_alert_time'].get(aggregation_key_value) < ts_to_dt(compare_dt))): + + # ElastAlert may have restarted while pending alerts exist + pending_alert = self.find_pending_aggregate_alert(rule, aggregation_key_value) + if pending_alert: + alert_time = ts_to_dt(pending_alert['_source']['alert_time']) + rule['aggregate_alert_time'][aggregation_key_value] = alert_time + agg_id = pending_alert['_id'] + rule['current_aggregate_id'] = {aggregation_key_value: agg_id} + elastalert_logger.info( + 'Adding alert for %s to aggregation(id: %s, aggregation_key: %s), next alert at %s' % ( + rule['name'], + agg_id, + aggregation_key_value, + alert_time + ) + ) + else: + # First match, set alert_time + alert_time = '' + if isinstance(rule['aggregation'], dict) and rule['aggregation'].get('schedule'): + croniter._datetime_to_timestamp = cronite_datetime_to_timestamp # For Python 2.6 compatibility + try: + iter = croniter(rule['aggregation']['schedule'], ts_now()) + alert_time = unix_to_dt(iter.get_next()) + except Exception as e: + self.handle_error("Error parsing aggregate send time Cron format %s" % (e), rule['aggregation']['schedule']) + else: + try: + if rule.get('aggregate_by_match_time', False): + match_time = ts_to_dt(lookup_es_key(match, rule['timestamp_field'])) + alert_time = match_time + rule['aggregation'] + else: + alert_time = ts_now() + rule['aggregation'] + except Exception as e: + self.handle_error("[add_aggregated_alert]Error parsing aggregate send time format %s" % (e), rule['aggregation']) + + rule['aggregate_alert_time'][aggregation_key_value] = alert_time + agg_id = None + elastalert_logger.info( + 'New aggregation for %s, aggregation_key: %s. next alert at %s.' % (rule['name'], aggregation_key_value, alert_time) + ) + else: + # Already pending aggregation, use existing alert_time + alert_time = rule['aggregate_alert_time'].get(aggregation_key_value) + agg_id = rule['current_aggregate_id'].get(aggregation_key_value) elastalert_logger.info( 'Adding alert for %s to aggregation(id: %s, aggregation_key: %s), next alert at %s' % ( rule['name'], @@ -1613,60 +1662,23 @@ def add_aggregated_alert(self, match, rule): alert_time ) ) - else: - # First match, set alert_time - alert_time = '' - if isinstance(rule['aggregation'], dict) and rule['aggregation'].get('schedule'): - croniter._datetime_to_timestamp = cronite_datetime_to_timestamp # For Python 2.6 compatibility - try: - iter = croniter(rule['aggregation']['schedule'], ts_now()) - alert_time = unix_to_dt(iter.get_next()) - except Exception as e: - self.handle_error("Error parsing aggregate send time Cron format %s" % (e), rule['aggregation']['schedule']) - else: - try: - if rule.get('aggregate_by_match_time', False): - match_time = ts_to_dt(lookup_es_key(match, rule['timestamp_field'])) - alert_time = match_time + rule['aggregation'] - else: - alert_time = ts_now() + rule['aggregation'] - except Exception as e: - self.handle_error("[add_aggregated_alert]Error parsing aggregate send time format %s" % (e), rule['aggregation']) - rule['aggregate_alert_time'][aggregation_key_value] = alert_time - agg_id = None - elastalert_logger.info( - 'New aggregation for %s, aggregation_key: %s. next alert at %s.' % (rule['name'], aggregation_key_value, alert_time) - ) - else: - # Already pending aggregation, use existing alert_time - alert_time = rule['aggregate_alert_time'].get(aggregation_key_value) - agg_id = rule['current_aggregate_id'].get(aggregation_key_value) - elastalert_logger.info( - 'Adding alert for %s to aggregation(id: %s, aggregation_key: %s), next alert at %s' % ( - rule['name'], - agg_id, - aggregation_key_value, - alert_time - ) - ) - - alert_body = self.get_alert_body(match, rule, False, alert_time) - if agg_id: - alert_body['aggregate_id'] = agg_id - if aggregation_key_value: - alert_body['aggregation_key'] = aggregation_key_value - res = self.writeback('elastalert', alert_body, rule) + alert_body = self.get_alert_body(match, rule, False, alert_time) + if agg_id: + alert_body['aggregate_id'] = agg_id + if aggregation_key_value: + alert_body['aggregation_key'] = aggregation_key_value + res = self.writeback('elastalert', alert_body, rule) - # If new aggregation, save _id - if res and not agg_id: - rule['current_aggregate_id'][aggregation_key_value] = res['_id'] + # If new aggregation, save _id + if res and not agg_id: + rule['current_aggregate_id'][aggregation_key_value] = res['_id'] - # Couldn't write the match to ES, save it in memory for now - if not res: - rule['agg_matches'].append(match) + # Couldn't write the match to ES, save it in memory for now + if not res: + rule['agg_matches'].append(match) - return res + return res def silence(self, silence_cache_key=None): """ Silence an alert for a period of time. --silence and --rule must be passed as args. """ diff --git a/elastalert/schema.yaml b/elastalert/schema.yaml index c112d2df..b24bfb83 100644 --- a/elastalert/schema.yaml +++ b/elastalert/schema.yaml @@ -236,6 +236,7 @@ properties: type: string - type: string aggregation: *timeframe + aggregation_alert_time_compared_with_timestamp_field: {type: boolean} realert: *timeframe realert_key: {type: string} exponential_realert: *timeframe diff --git a/tests/Dockerfile-test b/tests/Dockerfile-test index bda0c7d1..bf2a1051 100644 --- a/tests/Dockerfile-test +++ b/tests/Dockerfile-test @@ -1,4 +1,4 @@ -FROM python:3-slim +FROM python:3.11-slim RUN apt update && apt upgrade -y RUN apt install -y gcc libffi-dev diff --git a/tests/base_test.py b/tests/base_test.py index c2d48e16..0c427f29 100644 --- a/tests/base_test.py +++ b/tests/base_test.py @@ -332,7 +332,7 @@ def test_match_with_enhancements_first(ea): assert add_alert.call_count == 0 -def test_agg_matchtime(ea): +def test_agg_matchtime_timestamp_field(ea): ea.max_aggregation = 1337 hits_timestamps = ['2014-09-26T12:34:45', '2014-09-26T12:40:45', '2014-09-26T12:47:45'] alerttime1 = dt_to_ts(ts_to_dt(hits_timestamps[0]) + datetime.timedelta(minutes=10)) @@ -344,6 +344,7 @@ def test_agg_matchtime(ea): ea.rules[0]['aggregate_by_match_time'] = True ea.rules[0]['aggregation'] = datetime.timedelta(minutes=10) ea.rules[0]['type'].matches = [{'@timestamp': h} for h in hits_timestamps] + ea.rules[0]['aggregation_alert_time_compared_with_timestamp_field'] = True ea.run_rule(ea.rules[0], END, START) # Assert that the three matches were added to Elasticsearch @@ -419,7 +420,7 @@ def test_agg_not_matchtime(ea): assert call3['aggregate_id'] == 'ABCD' -def test_agg_cron(ea): +def test_agg_cron_timestamp_field(ea): ea.max_aggregation = 1337 hits_timestamps = ['2014-09-26T12:34:45', '2014-09-26T12:40:45', '2014-09-26T12:47:45'] hits = generate_hits(hits_timestamps) @@ -434,6 +435,7 @@ def test_agg_cron(ea): dt_to_unix(ts_to_dt('2014-09-26T13:04:00'))] ea.rules[0]['aggregation'] = {'schedule': '*/5 * * * *'} ea.rules[0]['type'].matches = [{'@timestamp': h} for h in hits_timestamps] + ea.rules[0]['aggregation_alert_time_compared_with_timestamp_field'] = True ea.run_rule(ea.rules[0], END, START) # Assert that the three matches were added to Elasticsearch @@ -456,6 +458,68 @@ def test_agg_cron(ea): assert 'aggregate_id' not in call3 +def test_agg_matchtime(ea): + ea.max_aggregation = 1337 + hits_timestamps = ['2014-09-26T12:34:45', '2014-09-26T12:40:45', '2014-09-26T12:47:45'] + alerttime1 = dt_to_ts(ts_to_dt(hits_timestamps[0]) + datetime.timedelta(minutes=10)) + hits = generate_hits(hits_timestamps) + ea.thread_data.current_es.search.return_value = hits + + match_time = ts_to_dt('2014-09-26T12:48:00Z') + + with mock.patch('elastalert.elastalert.ts_now', return_value=match_time): + with mock.patch('elastalert.elastalert.elasticsearch_client') as mock_es: + # Aggregate first two, query over full range + mock_es.return_value = ea.thread_data.current_es + ea.rules[0]['aggregate_by_match_time'] = True + ea.rules[0]['aggregation'] = datetime.timedelta(minutes=10) + ea.rules[0]['type'].matches = [{'@timestamp': h} for h in hits_timestamps] + ea.rules[0]['aggregation_alert_time_compared_with_timestamp_field'] = True + ea.run_rule(ea.rules[0], END, START) + + # Assert that the three matches were added to Elasticsearch + call1 = ea.writeback_es.index.call_args_list[0][1]['body'] + call2 = ea.writeback_es.index.call_args_list[1][1]['body'] + call3 = ea.writeback_es.index.call_args_list[2][1]['body'] + assert call1['match_body']['@timestamp'] == '2014-09-26T12:34:45' + assert not call1['alert_sent'] + assert 'aggregate_id' not in call1 + assert call1['alert_time'] == alerttime1 + + assert call2['match_body']['@timestamp'] == '2014-09-26T12:40:45' + assert not call2['alert_sent'] + assert call2['aggregate_id'] == 'ABCD' + + assert call3['match_body']['@timestamp'] == '2014-09-26T12:47:45' + assert not call3['alert_sent'] + assert 'aggregate_id' not in call3 + + # First call - Find all pending alerts (only entries without agg_id) + # Second call - Find matches with agg_id == 'ABCD' + # Third call - Find matches with agg_id == 'CDEF' + ea.writeback_es.search.side_effect = [{'hits': {'hits': [{'_id': 'ABCD', '_index': 'wb', '_source': call1}, + {'_id': 'CDEF', '_index': 'wb', '_source': call3}]}}, + {'hits': {'hits': [{'_id': 'BCDE', '_index': 'wb', '_source': call2}]}}, + {'hits': {'total': 0, 'hits': []}}] + + with mock.patch('elastalert.elastalert.elasticsearch_client') as mock_es: + ea.send_pending_alerts() + # Assert that current_es was refreshed from the aggregate rules + assert mock_es.called_with(host='', port='') + assert mock_es.call_count == 2 + assert_alerts(ea, [hits_timestamps[:2], hits_timestamps[2:]]) + + call1 = ea.writeback_es.search.call_args_list[7][1]['body'] + call2 = ea.writeback_es.search.call_args_list[8][1]['body'] + call3 = ea.writeback_es.search.call_args_list[9][1]['body'] + call4 = ea.writeback_es.search.call_args_list[10][1]['body'] + + assert 'alert_time' in call2['query']['bool']['filter']['range'] + assert call3['query']['query_string']['query'] == 'aggregate_id:"ABCD"' + assert call4['query']['query_string']['query'] == 'aggregate_id:"CDEF"' + assert ea.writeback_es.search.call_args_list[9][1]['size'] == 1337 + + def test_agg_no_writeback_connectivity(ea): """ Tests that if writeback_es throws an exception, the matches will be added to 'agg_matches' and when run again, that they will be passed again to add_aggregated_alert """