Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix compound query_key and mixed mappings #1330

Merged
merged 16 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
- [Docs] Extend FAQ / troubleshooting section with information on Elasticsearch RBAC - [#1324](https://github.com/jertel/elastalert2/pull/1324) - @chr-b
- Upgrade to Python 3.12 - [#1327](https://github.com/jertel/elastalert2/pull/1327) - @jertel
- Correction in IRIS and GELF alerter [#1331](https://github.com/jertel/elastalert2/pull/1331) - @malinkinsa
- Fix handing of compound_query_key values - [#1330](https://github.com/jertel/elastalert2/pull/1330) - @jmacdone
- Fix handing raw_query_key and query_key values ending with .keyword- [#1330](https://github.com/jertel/elastalert2/pull/1330) - @jmacdone
- [Docs] Fix broken search function caused by sphinx upgrade a few releases ago - [#1332](https://github.com/jertel/elastalert2/pull/1332) - @jertel
- [Docs] Fix mismatch for parameter iris_customer_id - [1334](https://github.com/jertel/elastalert2/pull/1334) @malinkinsa
- [IRIS] Make parameter iris_customer_id optional with default value - [1334](https://github.com/jertel/elastalert2/pull/1334) @malinkinsa
Expand Down
52 changes: 35 additions & 17 deletions elastalert/elastalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import os
import random
import re
import signal
import sys
import threading
Expand Down Expand Up @@ -463,25 +464,42 @@ def get_hits_count(self, rule, starttime, endtime, index):
)
return {endtime: res['count']}

@staticmethod
def query_key_filters(rule: dict, qk_value_csv: str) -> dict:
if qk_value_csv is None:
return

# Split on comma followed by zero or more whitespace characters. It's
# expected to be commaspace separated. However 76ab593 suggests there
# are cases when it is only comma and not commaspace
qk_values = re.split(r',\s*',qk_value_csv)
end = '.keyword'

query_keys = []
try:
query_keys = rule['compound_query_key']
except KeyError:
query_key = rule.get('query_key')
if query_key is not None:
query_keys.append(query_key)

if len(qk_values) != len(query_keys):
msg = ( f"Received {len(qk_values)} value(s) for {len(query_keys)} key(s)."
f" Did '{qk_value_csv}' have a value with a comma?"
" See https://github.com/jertel/elastalert2/pull/1330#issuecomment-1849962106" )
elastalert_logger.warning( msg )

for key, value in zip(query_keys, qk_values):
if rule.get('raw_count_keys', True):
if not key.endswith(end):
key += end
yield {'term': {key: value}}

def get_hits_terms(self, rule, starttime, endtime, index, key, qk=None, size=None):
rule_filter = copy.copy(rule['filter'])
if qk:
qk_list = qk.split(",")
end = '.keyword'

if len(qk_list) == 1:
qk = qk_list[0]
filter_key = rule['query_key']
if rule.get('raw_count_keys', True) and not rule['query_key'].endswith(end):
filter_key = add_keyword_postfix(filter_key)
rule_filter.extend([{'term': {filter_key: qk}}])
else:
filter_keys = rule['compound_query_key']
for i in range(len(filter_keys)):
key_with_postfix = filter_keys[i]
if rule.get('raw_count_keys', True) and not key.endswith(end):
key_with_postfix = add_keyword_postfix(key_with_postfix)
rule_filter.extend([{'term': {key_with_postfix: qk_list[i]}}])

for filter in self.query_key_filters(rule=rule, qk_value_csv=qk):
rule_filter.append(filter)

base_query = self.get_query(
rule_filter,
Expand Down
25 changes: 24 additions & 1 deletion elastalert/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def new_get_event_ts(ts_field):
return lambda event: lookup_es_key(event[0], ts_field)


def _find_es_dict_by_key(lookup_dict, term):
def _find_es_dict_by_key(lookup_dict: dict, term: str, string_multi_field_name: str = ".keyword") -> tuple[dict, str]:
""" Performs iterative dictionary search based upon the following conditions:

1. Subkeys may either appear behind a full stop (.) or at one lookup_dict level lower in the tree.
Expand All @@ -64,8 +64,31 @@ def _find_es_dict_by_key(lookup_dict, term):
element which is the last subkey used to access the target specified by the term. None is
returned for both if the key can not be found.
"""

# For compound fieldnames added by ElastAlert.process_hits()
#
# For example, when query_key is a list of fieldnames it will insert a term
# 'key_1,other_fieldname,a_third_name'
# and if the rule is set for raw_query_keys, the query_key values may end
# with .keyword it will insert instead something like
# 'key_1_ip,other_fieldname_number,a_third_name.keyword'
# and we need to check for that synthentic compound fielname, including the
# .keyword suffix before contnuing
#
# Of course, it also handles happy path, non-ambuiguous fieldnames like
# 'ip_address' and 'src_displayname' that don't have . or [] characters
if term in lookup_dict:
return lookup_dict, term

# If not synthetically added by ElastAlert, matching documents will not have
# .keyword fieldnames, even if a .keyword fieldname was used as a term in
# the search
# e.g. {"term": {"description.keyword": "Target Description Here"}}
# will return a document with {"_source": {"description": "Target Description Here"}}
term = term.removesuffix(string_multi_field_name)
if term in lookup_dict:
return lookup_dict, term

# If the term does not match immediately, perform iterative lookup:
# 1. Split the search term into tokens
# 2. Recurrently concatenate these together to traverse deeper into the dictionary,
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
'elastalert=elastalert.elastalert:main']},
packages=find_packages(exclude=["tests"]),
package_data={'elastalert': ['schema.yaml', 'es_mappings/**/*.json']},
python_requires='>=3.9',
install_requires=[
'apscheduler>=3.10.4,<4.0',
'aws-requests-auth>=0.4.3',
Expand Down
189 changes: 189 additions & 0 deletions tests/hits_terms_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import pytest
from datetime import datetime, timedelta

from elastalert.util import dt_to_ts
from elastalert.elastalert import ElastAlerter

# I like the dictionary whitespace the way it is, thank you
# but I'm not going to tag all the lines with #noqa: E201
# flake8: noqa

@pytest.fixture
def example_agg_response():
res = {
'took': 1,
'timed_out': False,
'_shards': {'total': 3, 'successful': 3, 'skipped': 0, 'failed': 0},
'hits': {
'total': {'value': 9, 'relation': 'eq'},
'max_score': None,
'hits': []},
'aggregations': {
'counts': {
'doc_count_error_upper_bound': 0,
'sum_other_doc_count': 0,
'buckets': [{'key': '10.0.4.174', 'doc_count': 2},
{'key': '10.0.4.241', 'doc_count': 2},
{'key': '10.0.4.76', 'doc_count': 1},
{'key': '10.0.4.123', 'doc_count': 1},
{'key': '10.0.4.156', 'doc_count': 1},
{'key': '10.0.4.231', 'doc_count': 1},
{'key': '10.0.4.248', 'doc_count': 1}]}}
}
return res


def _mock_query_key_option_loader(rule):
'''
So, some copypasta from loaders.load_options,

if query_key is a string:
no compound_query_key is created
if query_key is a list:
if len() > 1:
compound_query_key is created
query_key is replaced with ",".join() of the original query_key values
if len() == 1:
the query_key list with one string is normalilzed back to just a string
if len() == 0:
somehow it was an empty list and query_keys is silently dropped from the config
'''
raw_query_key = rule.get('query_key')
if isinstance(raw_query_key, list):
if len(raw_query_key) > 1:
rule['compound_query_key'] = raw_query_key
rule['query_key'] = ','.join(raw_query_key)
elif len(raw_query_key) == 1:
rule['query_key'] = raw_query_key[0]
else:
del rule['query_key']


@pytest.mark.parametrize(
["qk_value", "query_key"],

# scenario A: 3 query keys
[ ( ['172.16.1.10', '/api/v1/endpoint-foo', 'us-east-2'],
['server_ip', 'service_name', 'region'] ),

# scenario B: 2 query keys
( ['172.16.1.10', '/api/v1/endpoint-foo'],
['server_ip', 'service_name'] ),

# scenario C: 1 query key, but it was given as a list of one fieldname in the rule options
# as of this writing, 707b2a5 shouldn't allow this to happen, but here is a test regardless
( ['172.16.1.10'],
['server_ip'] ),

# scenario D: 1 query key, given as a string
( ['172.16.1.10'],
'server_ip' ),

# scenario E: no query key
( None,
None )
],
)
@pytest.mark.parametrize("query_key_values_separator", [",", ", ", ", ", ",\t"])
def test_get_hits_terms_with_factored_out_filters(ea, example_agg_response, qk_value, query_key, query_key_values_separator):

if query_key is not None:
ea.rules[0]['query_key'] = query_key

# emulate the rule['compound_query_key'] creation logic which prob should be
# factored out of loaders.load_options() instead of copypasta'd for the test
_mock_query_key_option_loader(ea.rules[0])

try:
# ElastAlert.process_hits() is expected to insert the filedname values
# from _hits as a commaspace csv
qk_csv = query_key_values_separator.join(qk_value)
except TypeError:
qk_csv = None
index = 'foo-2023-13-13' #lousy Smarch weather
top_term_key = 'client_ip'

endtime = datetime.now()
starttime = endtime - timedelta(hours=1)
ea.thread_data.current_es.search.return_value = example_agg_response

hit_terms = ea.get_hits_terms(
rule=ea.rules[0],
starttime=starttime,
endtime=endtime,
index=index,
key=top_term_key,
qk = qk_csv,
size=None
)
assert endtime in hit_terms
assert hit_terms[endtime] == example_agg_response['aggregations']['counts']['buckets']

expected_filters = [
{'range': {'@timestamp': { 'gt': dt_to_ts(starttime), 'lte': dt_to_ts(endtime)}}}
]
try:
cqk = ea.rules[0]['compound_query_key']
for fieldname, value in zip(cqk, qk_value):
filter = {'term': {f'{fieldname}.keyword': value}}
expected_filters.append(filter)
except KeyError:
#not a compound, eh? it must be a string of a single filedname
try:
fieldname = ea.rules[0]['query_key']
filter = {'term': {f'{fieldname}.keyword': qk_value[0]}}
expected_filters.append(filter)
except KeyError:
pass # maybe the rule never had a query_key, or it was an empty list and purged

expected_query = {
'query': {'bool': {'filter': {'bool': {'must': expected_filters}}}},
# 50 harded coded in get_hits_terms as a default for size=None
'aggs': {'counts': {'terms': {'field': top_term_key, 'size': 50, 'min_doc_count': 1}}}
}
ea.thread_data.current_es.search.assert_called_with(index=index,body=expected_query, size=0, ignore_unavailable=True)


def test_query_key_filters_single_query_key():
rule = { 'query_key': 'a_single_key_as_a_string' }
qk_value_csv = 'a single value'
filters = list(ElastAlerter.query_key_filters(rule,qk_value_csv))
expected_filters = [{'term': {f'{rule['query_key']}.keyword': qk_value_csv}}]
assert filters == expected_filters

@pytest.mark.parametrize("query_key_values_separator", [",", ", ", ", ", ",\t"])
def test_query_key_filters_compound_query_key(query_key_values_separator):
rule = { 'query_key': 'compound,key',
'compound_query_key': ['compound', 'key'] }
qk_value_csv = query_key_values_separator.join( ['combined value', 'by commaspace'] )
filters = list(ElastAlerter.query_key_filters(rule,qk_value_csv))
expected_filters = [
{'term': {'compound.keyword': 'combined value'}},
{'term': {'key.keyword': 'by commaspace'}},
]
assert filters == expected_filters

def test_query_key_filters_brittle_query_key_value_logs_warning(caplog):
rule = { 'query_key': 'university,state',
'compound_query_key': ['university', 'state'] }
#uh oh, a commaspace we didn't expect
qk_value_csv = 'California State University, San Bernardino, California'
filters = list(ElastAlerter.query_key_filters(rule,qk_value_csv))
log = caplog.records[0]
assert log.levelname == "WARNING"
assert 'Received 3 value(s) for 2 key(s).' in log.message

def test_query_key_filters_none_values():
rule = { 'query_key': 'something'}
qk_value_csv = None
filters = list(ElastAlerter.query_key_filters(rule,qk_value_csv))
assert len(filters) == 0

def test_query_key_filters_unexpected_passed_values_for_a_rule_without_query_keys(caplog):
rule = { }
qk_value_csv = 'value'
filters = list(ElastAlerter.query_key_filters(rule,qk_value_csv))
assert len(filters) == 0
log = caplog.records[0]
assert log.levelname == "WARNING"
assert 'Received 1 value(s) for 0 key(s).' in log.message
6 changes: 6 additions & 0 deletions tests/util_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def test_looking_up_nested_keys(ea):
}

assert lookup_es_key(record, 'Fields.ts') == expected
assert lookup_es_key(record, 'Fields.ts.keyword') == expected


def test_looking_up_nested_composite_keys(ea):
Expand All @@ -131,6 +132,7 @@ def test_looking_up_nested_composite_keys(ea):
}

assert lookup_es_key(record, 'Fields.ts.value') == expected
assert lookup_es_key(record, 'Fields.ts.value.keyword') == expected


def test_looking_up_arrays(ea):
Expand All @@ -148,10 +150,14 @@ def test_looking_up_arrays(ea):
assert lookup_es_key(record, 'flags[0]') == 1
assert lookup_es_key(record, 'flags[1]') == 2
assert lookup_es_key(record, 'objects[0]foo') == 'bar'
assert lookup_es_key(record, 'objects[0]foo.keyword') == 'bar'
assert lookup_es_key(record, 'objects[1]foo[0]bar') == 'baz'
assert lookup_es_key(record, 'objects[2]foo.bar') == 'baz'
assert lookup_es_key(record, 'objects[2]foo.bar.keyword') == 'baz'
assert lookup_es_key(record, 'objects[1]foo[1]bar') is None
assert lookup_es_key(record, 'objects[1]foo[1]bar.keyword') is None
assert lookup_es_key(record, 'objects[1]foo[0]baz') is None
assert lookup_es_key(record, 'objects[1]foo[0]baz.keyword') is None
assert lookup_es_key(record, 'nested.foo[0]') == 'bar'
assert lookup_es_key(record, 'nested.foo[1]') == 'baz'

Expand Down