diff --git a/CHANGELOG.md b/CHANGELOG.md index f0d62f38..1cba9b00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ - [Docs] Extend FAQ / troubleshooting section with information on Elasticsearch RBAC - [#1324](https://github.com/jertel/elastalert2/pull/1324) - @chr-b - Upgrade to Python 3.12 - [#1327](https://github.com/jertel/elastalert2/pull/1327) - @jertel - Correction in IRIS and GELF alerter [#1331](https://github.com/jertel/elastalert2/pull/1331) - @malinkinsa +- Fix handing of compound_query_key values - [#1330](https://github.com/jertel/elastalert2/pull/1330) - @jmacdone +- Fix handing raw_query_key and query_key values ending with .keyword- [#1330](https://github.com/jertel/elastalert2/pull/1330) - @jmacdone - [Docs] Fix broken search function caused by sphinx upgrade a few releases ago - [#1332](https://github.com/jertel/elastalert2/pull/1332) - @jertel - [Docs] Fix mismatch for parameter iris_customer_id - [1334](https://github.com/jertel/elastalert2/pull/1334) @malinkinsa - [IRIS] Make parameter iris_customer_id optional with default value - [1334](https://github.com/jertel/elastalert2/pull/1334) @malinkinsa diff --git a/elastalert/elastalert.py b/elastalert/elastalert.py index d7a9db71..b8224aae 100755 --- a/elastalert/elastalert.py +++ b/elastalert/elastalert.py @@ -6,6 +6,7 @@ import logging import os import random +import re import signal import sys import threading @@ -463,25 +464,42 @@ def get_hits_count(self, rule, starttime, endtime, index): ) return {endtime: res['count']} + @staticmethod + def query_key_filters(rule: dict, qk_value_csv: str) -> dict: + if qk_value_csv is None: + return + + # Split on comma followed by zero or more whitespace characters. It's + # expected to be commaspace separated. However 76ab593 suggests there + # are cases when it is only comma and not commaspace + qk_values = re.split(r',\s*',qk_value_csv) + end = '.keyword' + + query_keys = [] + try: + query_keys = rule['compound_query_key'] + except KeyError: + query_key = rule.get('query_key') + if query_key is not None: + query_keys.append(query_key) + + if len(qk_values) != len(query_keys): + msg = ( f"Received {len(qk_values)} value(s) for {len(query_keys)} key(s)." + f" Did '{qk_value_csv}' have a value with a comma?" + " See https://github.com/jertel/elastalert2/pull/1330#issuecomment-1849962106" ) + elastalert_logger.warning( msg ) + + for key, value in zip(query_keys, qk_values): + if rule.get('raw_count_keys', True): + if not key.endswith(end): + key += end + yield {'term': {key: value}} + def get_hits_terms(self, rule, starttime, endtime, index, key, qk=None, size=None): rule_filter = copy.copy(rule['filter']) - if qk: - qk_list = qk.split(",") - end = '.keyword' - - if len(qk_list) == 1: - qk = qk_list[0] - filter_key = rule['query_key'] - if rule.get('raw_count_keys', True) and not rule['query_key'].endswith(end): - filter_key = add_keyword_postfix(filter_key) - rule_filter.extend([{'term': {filter_key: qk}}]) - else: - filter_keys = rule['compound_query_key'] - for i in range(len(filter_keys)): - key_with_postfix = filter_keys[i] - if rule.get('raw_count_keys', True) and not key.endswith(end): - key_with_postfix = add_keyword_postfix(key_with_postfix) - rule_filter.extend([{'term': {key_with_postfix: qk_list[i]}}]) + + for filter in self.query_key_filters(rule=rule, qk_value_csv=qk): + rule_filter.append(filter) base_query = self.get_query( rule_filter, diff --git a/elastalert/util.py b/elastalert/util.py index e8a9dc34..04c0f274 100644 --- a/elastalert/util.py +++ b/elastalert/util.py @@ -44,7 +44,7 @@ def new_get_event_ts(ts_field): return lambda event: lookup_es_key(event[0], ts_field) -def _find_es_dict_by_key(lookup_dict, term): +def _find_es_dict_by_key(lookup_dict: dict, term: str, string_multi_field_name: str = ".keyword") -> tuple[dict, str]: """ Performs iterative dictionary search based upon the following conditions: 1. Subkeys may either appear behind a full stop (.) or at one lookup_dict level lower in the tree. @@ -64,8 +64,31 @@ def _find_es_dict_by_key(lookup_dict, term): element which is the last subkey used to access the target specified by the term. None is returned for both if the key can not be found. """ + + # For compound fieldnames added by ElastAlert.process_hits() + # + # For example, when query_key is a list of fieldnames it will insert a term + # 'key_1,other_fieldname,a_third_name' + # and if the rule is set for raw_query_keys, the query_key values may end + # with .keyword it will insert instead something like + # 'key_1_ip,other_fieldname_number,a_third_name.keyword' + # and we need to check for that synthentic compound fielname, including the + # .keyword suffix before contnuing + # + # Of course, it also handles happy path, non-ambuiguous fieldnames like + # 'ip_address' and 'src_displayname' that don't have . or [] characters if term in lookup_dict: return lookup_dict, term + + # If not synthetically added by ElastAlert, matching documents will not have + # .keyword fieldnames, even if a .keyword fieldname was used as a term in + # the search + # e.g. {"term": {"description.keyword": "Target Description Here"}} + # will return a document with {"_source": {"description": "Target Description Here"}} + term = term.removesuffix(string_multi_field_name) + if term in lookup_dict: + return lookup_dict, term + # If the term does not match immediately, perform iterative lookup: # 1. Split the search term into tokens # 2. Recurrently concatenate these together to traverse deeper into the dictionary, diff --git a/setup.py b/setup.py index 825d06cf..8154534f 100644 --- a/setup.py +++ b/setup.py @@ -31,6 +31,7 @@ 'elastalert=elastalert.elastalert:main']}, packages=find_packages(exclude=["tests"]), package_data={'elastalert': ['schema.yaml', 'es_mappings/**/*.json']}, + python_requires='>=3.9', install_requires=[ 'apscheduler>=3.10.4,<4.0', 'aws-requests-auth>=0.4.3', diff --git a/tests/hits_terms_test.py b/tests/hits_terms_test.py new file mode 100644 index 00000000..0f683779 --- /dev/null +++ b/tests/hits_terms_test.py @@ -0,0 +1,189 @@ +import pytest +from datetime import datetime, timedelta + +from elastalert.util import dt_to_ts +from elastalert.elastalert import ElastAlerter + +# I like the dictionary whitespace the way it is, thank you +# but I'm not going to tag all the lines with #noqa: E201 +# flake8: noqa + +@pytest.fixture +def example_agg_response(): + res = { + 'took': 1, + 'timed_out': False, + '_shards': {'total': 3, 'successful': 3, 'skipped': 0, 'failed': 0}, + 'hits': { + 'total': {'value': 9, 'relation': 'eq'}, + 'max_score': None, + 'hits': []}, + 'aggregations': { + 'counts': { + 'doc_count_error_upper_bound': 0, + 'sum_other_doc_count': 0, + 'buckets': [{'key': '10.0.4.174', 'doc_count': 2}, + {'key': '10.0.4.241', 'doc_count': 2}, + {'key': '10.0.4.76', 'doc_count': 1}, + {'key': '10.0.4.123', 'doc_count': 1}, + {'key': '10.0.4.156', 'doc_count': 1}, + {'key': '10.0.4.231', 'doc_count': 1}, + {'key': '10.0.4.248', 'doc_count': 1}]}} + } + return res + + +def _mock_query_key_option_loader(rule): + ''' + So, some copypasta from loaders.load_options, + + if query_key is a string: + no compound_query_key is created + if query_key is a list: + if len() > 1: + compound_query_key is created + query_key is replaced with ",".join() of the original query_key values + if len() == 1: + the query_key list with one string is normalilzed back to just a string + if len() == 0: + somehow it was an empty list and query_keys is silently dropped from the config + ''' + raw_query_key = rule.get('query_key') + if isinstance(raw_query_key, list): + if len(raw_query_key) > 1: + rule['compound_query_key'] = raw_query_key + rule['query_key'] = ','.join(raw_query_key) + elif len(raw_query_key) == 1: + rule['query_key'] = raw_query_key[0] + else: + del rule['query_key'] + + +@pytest.mark.parametrize( + ["qk_value", "query_key"], + + # scenario A: 3 query keys + [ ( ['172.16.1.10', '/api/v1/endpoint-foo', 'us-east-2'], + ['server_ip', 'service_name', 'region'] ), + + # scenario B: 2 query keys + ( ['172.16.1.10', '/api/v1/endpoint-foo'], + ['server_ip', 'service_name'] ), + + # scenario C: 1 query key, but it was given as a list of one fieldname in the rule options + # as of this writing, 707b2a5 shouldn't allow this to happen, but here is a test regardless + ( ['172.16.1.10'], + ['server_ip'] ), + + # scenario D: 1 query key, given as a string + ( ['172.16.1.10'], + 'server_ip' ), + + # scenario E: no query key + ( None, + None ) + ], +) +@pytest.mark.parametrize("query_key_values_separator", [",", ", ", ", ", ",\t"]) +def test_get_hits_terms_with_factored_out_filters(ea, example_agg_response, qk_value, query_key, query_key_values_separator): + + if query_key is not None: + ea.rules[0]['query_key'] = query_key + + # emulate the rule['compound_query_key'] creation logic which prob should be + # factored out of loaders.load_options() instead of copypasta'd for the test + _mock_query_key_option_loader(ea.rules[0]) + + try: + # ElastAlert.process_hits() is expected to insert the filedname values + # from _hits as a commaspace csv + qk_csv = query_key_values_separator.join(qk_value) + except TypeError: + qk_csv = None + index = 'foo-2023-13-13' #lousy Smarch weather + top_term_key = 'client_ip' + + endtime = datetime.now() + starttime = endtime - timedelta(hours=1) + ea.thread_data.current_es.search.return_value = example_agg_response + + hit_terms = ea.get_hits_terms( + rule=ea.rules[0], + starttime=starttime, + endtime=endtime, + index=index, + key=top_term_key, + qk = qk_csv, + size=None + ) + assert endtime in hit_terms + assert hit_terms[endtime] == example_agg_response['aggregations']['counts']['buckets'] + + expected_filters = [ + {'range': {'@timestamp': { 'gt': dt_to_ts(starttime), 'lte': dt_to_ts(endtime)}}} + ] + try: + cqk = ea.rules[0]['compound_query_key'] + for fieldname, value in zip(cqk, qk_value): + filter = {'term': {f'{fieldname}.keyword': value}} + expected_filters.append(filter) + except KeyError: + #not a compound, eh? it must be a string of a single filedname + try: + fieldname = ea.rules[0]['query_key'] + filter = {'term': {f'{fieldname}.keyword': qk_value[0]}} + expected_filters.append(filter) + except KeyError: + pass # maybe the rule never had a query_key, or it was an empty list and purged + + expected_query = { + 'query': {'bool': {'filter': {'bool': {'must': expected_filters}}}}, + # 50 harded coded in get_hits_terms as a default for size=None + 'aggs': {'counts': {'terms': {'field': top_term_key, 'size': 50, 'min_doc_count': 1}}} + } + ea.thread_data.current_es.search.assert_called_with(index=index,body=expected_query, size=0, ignore_unavailable=True) + + +def test_query_key_filters_single_query_key(): + rule = { 'query_key': 'a_single_key_as_a_string' } + qk_value_csv = 'a single value' + filters = list(ElastAlerter.query_key_filters(rule,qk_value_csv)) + expected_filters = [{'term': {f'{rule['query_key']}.keyword': qk_value_csv}}] + assert filters == expected_filters + +@pytest.mark.parametrize("query_key_values_separator", [",", ", ", ", ", ",\t"]) +def test_query_key_filters_compound_query_key(query_key_values_separator): + rule = { 'query_key': 'compound,key', + 'compound_query_key': ['compound', 'key'] } + qk_value_csv = query_key_values_separator.join( ['combined value', 'by commaspace'] ) + filters = list(ElastAlerter.query_key_filters(rule,qk_value_csv)) + expected_filters = [ + {'term': {'compound.keyword': 'combined value'}}, + {'term': {'key.keyword': 'by commaspace'}}, + ] + assert filters == expected_filters + +def test_query_key_filters_brittle_query_key_value_logs_warning(caplog): + rule = { 'query_key': 'university,state', + 'compound_query_key': ['university', 'state'] } + #uh oh, a commaspace we didn't expect + qk_value_csv = 'California State University, San Bernardino, California' + filters = list(ElastAlerter.query_key_filters(rule,qk_value_csv)) + log = caplog.records[0] + assert log.levelname == "WARNING" + assert 'Received 3 value(s) for 2 key(s).' in log.message + +def test_query_key_filters_none_values(): + rule = { 'query_key': 'something'} + qk_value_csv = None + filters = list(ElastAlerter.query_key_filters(rule,qk_value_csv)) + assert len(filters) == 0 + +def test_query_key_filters_unexpected_passed_values_for_a_rule_without_query_keys(caplog): + rule = { } + qk_value_csv = 'value' + filters = list(ElastAlerter.query_key_filters(rule,qk_value_csv)) + assert len(filters) == 0 + log = caplog.records[0] + assert log.levelname == "WARNING" + assert 'Received 1 value(s) for 0 key(s).' in log.message \ No newline at end of file diff --git a/tests/util_test.py b/tests/util_test.py index dbb0e8f1..32dcbc26 100644 --- a/tests/util_test.py +++ b/tests/util_test.py @@ -117,6 +117,7 @@ def test_looking_up_nested_keys(ea): } assert lookup_es_key(record, 'Fields.ts') == expected + assert lookup_es_key(record, 'Fields.ts.keyword') == expected def test_looking_up_nested_composite_keys(ea): @@ -131,6 +132,7 @@ def test_looking_up_nested_composite_keys(ea): } assert lookup_es_key(record, 'Fields.ts.value') == expected + assert lookup_es_key(record, 'Fields.ts.value.keyword') == expected def test_looking_up_arrays(ea): @@ -148,10 +150,14 @@ def test_looking_up_arrays(ea): assert lookup_es_key(record, 'flags[0]') == 1 assert lookup_es_key(record, 'flags[1]') == 2 assert lookup_es_key(record, 'objects[0]foo') == 'bar' + assert lookup_es_key(record, 'objects[0]foo.keyword') == 'bar' assert lookup_es_key(record, 'objects[1]foo[0]bar') == 'baz' assert lookup_es_key(record, 'objects[2]foo.bar') == 'baz' + assert lookup_es_key(record, 'objects[2]foo.bar.keyword') == 'baz' assert lookup_es_key(record, 'objects[1]foo[1]bar') is None + assert lookup_es_key(record, 'objects[1]foo[1]bar.keyword') is None assert lookup_es_key(record, 'objects[1]foo[0]baz') is None + assert lookup_es_key(record, 'objects[1]foo[0]baz.keyword') is None assert lookup_es_key(record, 'nested.foo[0]') == 'bar' assert lookup_es_key(record, 'nested.foo[1]') == 'baz'