Skip to content

Commit

Permalink
Merge pull request #1189 from jertel/eql
Browse files Browse the repository at this point in the history
add support for EQL queries
  • Loading branch information
nsano-rururu authored Jun 7, 2023
2 parents 4d28e5a + 1d2598c commit af24e21
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 2 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
- TBD

## New features
- TBD
- Add initial support for EQL - [#1189](https://github.com/jertel/elastalert2/pull/1189) - @jertel

## Other changes
- Add support for Kibana 8.8 for Kibana Discover - [#1184](https://github.com/jertel/elastalert2/pull/1184) - @nsano-rururu
Expand Down
27 changes: 27 additions & 0 deletions docs/source/recipes/writing_filters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,30 @@ Below is a more complex example for Elasticsearch 7.x, provided by a `community
uid: charlie
- match:
menu_item: "burrito pasta salad pizza"

EQL (Event Query Language)
**************************

EQL is partially supported as of version 2.12.0. To use EQL, include a filter item as follows::

filter:
- eql: any where machine.os == "win 8"

Note that only one ``eql`` filter can be defined in a filter.

It is also possible to use standard query filters in combination with EQL filters::

filter:
- eql: any where machine.os == "win 8"
- query:
query_string:
query: "test.field: 123"

EQL is only partially supported due to the following limitations:

- Cannot be used with aggregation rule types.
- Cannot be used with blacklist/whitelist rule types.
- Cannot be used with percentage match rule types.
- Cannot be used with ``use_count_query`` property.
- Does not support scrolling, so large result sets may have unexpected results. Be sure to filter your queries thoroughly to avoid returning excessive numbers of events.
- Not supported with OpenSearch
83 changes: 82 additions & 1 deletion elastalert/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
import copy
import time
import elastalert.eql as eql

from elasticsearch import Elasticsearch
from elasticsearch import RequestsHttpConnection
Expand Down Expand Up @@ -74,3 +74,84 @@ def resolve_writeback_index(self, writeback_index, doc_type):
elif doc_type == 'elastalert_error':
return writeback_index + '_error'
return writeback_index


@query_params(
"_source",
"_source_excludes",
"_source_includes",
"allow_no_indices",
"allow_partial_search_results",
"analyze_wildcard",
"analyzer",
"batched_reduce_size",
"ccs_minimize_roundtrips",
"default_operator",
"df",
"docvalue_fields",
"expand_wildcards",
"explain",
"from_",
"ignore_throttled",
"ignore_unavailable",
"lenient",
"max_concurrent_shard_requests",
"pre_filter_shard_size",
"preference",
"q",
"request_cache",
"rest_total_hits_as_int",
"routing",
"scroll",
"search_type",
"seq_no_primary_term",
"size",
"sort",
"stats",
"stored_fields",
"suggest_field",
"suggest_mode",
"suggest_size",
"suggest_text",
"terminate_after",
"timeout",
"track_scores",
"track_total_hits",
"typed_keys",
"version",
)
def search(self, body=None, index=None, doc_type=None, params=None, headers=None):
# This implementation of search is nearly identical to the base class with the following exceptions:
# 1. If the request body contains an EQL query, the body will be restructured to support the EQL API.
# 2. The path will be set to the EQL API endpoint, if #1 is true.
# 3. The scroll and _source_includes params will be dropped if #1 is true, since the EQL API doesn't support them.
# 4. The size param will be moved to a body parameter instead of a top-level param if #1 is true.
# 5. The results will be converted from EQL API format into the standard search format.

# from is a reserved word so it cannot be used, use from_ instead
if "from_" in params:
params["from"] = params.pop("from_")

path = _make_path(index, doc_type, "_search")
eql_body = eql.format_request(body)
if eql_body is not None:
path = path.replace('/_search', '/_eql/search')
body = eql_body
if 'size' in params:
body['size'] = int(params.pop('size'))
if 'scroll' in params:
params.pop('scroll')
if '_source_includes' in params:
params.pop('_source_includes')

results = self.transport.perform_request(
"POST",
path,
params=params,
headers=headers,
body=body,
)

eql.format_results(results);

return results
51 changes: 51 additions & 0 deletions elastalert/eql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# A dict containing EQL would resemble the following:
# {'query': {'bool': {'filter': {'bool': {'must': [{'range': {'@timestamp': {'gt': 'yyyy...', 'lte': 'yyyy...'}}}, {'eql': 'process where process.name == "regsvr32.exe"'}]}}}}, 'sort': [{'@timestamp': {'order': 'asc'}}]}
def format_request(body):
query = body.get('query')
if not query:
return None

query_bool = query.get('bool')
if not query_bool:
return None

filter = query_bool.get('filter')
if not filter:
return None

filter_bool = filter.get('bool')
if not filter_bool:
return None

filter_bool_must = filter_bool.get('must')
if not filter_bool_must:
return None

other_filters = []
eql = None
for f in filter_bool_must:
if f.get('eql'):
eql = f['eql']
else:
other_filters.append(f)

if eql:
new_body = {'filter': { 'bool': { 'must': other_filters }}, 'query': eql}
return new_body

return None

def format_results(results):
hits = results.get('hits')
if not hits:
return results

events = hits.get('events')
if events is None:
return results

# relabel events as hits, for consistency
events = hits.pop('events')
hits['hits'] = events

return results
127 changes: 127 additions & 0 deletions tests/eql_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import elastalert.eql as eql
from elastalert import ElasticSearchClient
from unittest import mock


def test_format_request_without_eql():
assert eql.format_request({}) is None
assert eql.format_request({'query': {}}) is None
assert eql.format_request({'query': {'bool': {}}}) is None
assert eql.format_request({'query': {'bool': {'filter': {}}}}) is None
assert eql.format_request({'query': {'bool': {'filter': {'bool': {}}}}}) is None
assert eql.format_request({'query': {'bool': {'filter': {'bool': {'must': []}}}}}) is None
assert eql.format_request({'query': {'bool': {'filter': {'bool': {'must': [{'foo': 'bar'}]}}}}}) is None


def test_format_request_with_eql():
body = {
'query': {
'bool': {
'filter': {
'bool': {
'must': [
{'eql': 'test query'},
{'other': 'other filter'},
]
}
}
}
}
}
expected_body = {'filter': {'bool': {'must': [{'other': 'other filter'}]}}, 'query': 'test query'}
assert eql.format_request(body) == expected_body


def eql_body():
body = {
'query': {
'bool': {
'filter': {
'bool': {
'must': [
{'eql': 'test query'},
{'other': 'other filter'},
{'eql': 'newer query'},
]
}
}
}
}
}
return body


def test_format_request_with_excessive_eql():
body = eql_body()
expected_body = {'filter': {'bool': {'must': [{'other': 'other filter'}]}}, 'query': 'newer query'}
assert eql.format_request(body) == expected_body


def test_format_results_without_events():
expected_results = {'hits': {'hits': []}}
results = expected_results
eql.format_results(results) == expected_results


def test_format_results_with_events():
expected_results = {'hits': {'events': [{'foo': 'bar'}]}}
results = {'hits': {'hits': [{'foo': 'bar'}]}}
eql.format_results(results) == expected_results


def init_client():
conn = {
'es_host': '',
'es_hosts': [],
'es_port': 123,
'es_url_prefix': '',
'use_ssl': False,
'verify_certs': False,
'ca_certs': [],
'ssl_show_warn': False,
'http_auth': '',
'headers': [],
'es_conn_timeout': 0,
'send_get_body_as': '',
'client_cert': '',
'client_key': ''
}
return ElasticSearchClient(conn)


def test_search_without_eql():
es_client = init_client()

expected_params = {'from': True, 'size': 12, 'scroll': True, '_source_includes': True}
expected_headers = {}
expected_body = {}
results = {}
es_client.transport = mock.Mock()
es_client.transport.perform_request.return_value = results

body = {}
params = {'from_': True, 'size': 12, 'scroll': True, '_source_includes': True}
es_client.search(body=body, index='test', params=params)
es_client.transport.perform_request.assert_called_with('POST', '/test/_search',
params=expected_params,
headers=expected_headers,
body=expected_body)


def test_search_with_eql():
es_client = init_client()

expected_params = {'from': True}
expected_headers = {}
expected_body = {'filter': {'bool': {'must': [{'other': 'other filter'}]}}, 'query': 'newer query', 'size': 12}
results = {}
es_client.transport = mock.Mock()
es_client.transport.perform_request.return_value = results

body = eql_body()
params = {'from_': True, 'size': 12, 'scroll': True, '_source_includes': True}
results = es_client.search(body=body, index='test', params=params)
es_client.transport.perform_request.assert_called_with('POST', '/test/_eql/search',
params=expected_params,
headers=expected_headers,
body=expected_body)

0 comments on commit af24e21

Please sign in to comment.