Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ES6 writeback index fix + extra features #2168

Closed
wants to merge 15 commits into from
Closed
2 changes: 1 addition & 1 deletion elastalert/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def load_conf(args, defaults=None, overwrites=None):
if required_globals - frozenset(conf.keys()):
raise EAException('%s must contain %s' % (filename, ', '.join(required_globals - frozenset(conf.keys()))))

conf.setdefault('writeback_alias', 'elastalert_alerts')
conf.setdefault('writeback_alias', 'elastalert')
conf.setdefault('max_query_size', 10000)
conf.setdefault('scroll_keepalive', '30s')
conf.setdefault('disable_rules_on_error', True)
Expand Down
54 changes: 42 additions & 12 deletions elastalert/elastalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def __init__(self, args):
self.from_addr = self.conf.get('from_addr', 'ElastAlert')
self.smtp_host = self.conf.get('smtp_host', 'localhost')
self.max_aggregation = self.conf.get('max_aggregation', 10000)
self.limit_execution_margin = self.conf.get('limit_execution_margin', 10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look like it's referenced anywhere

self.buffer_time = self.conf['buffer_time']
self.silence_cache = {}
self.rule_hashes = self.rules_loader.get_hashes(self.conf, self.args.rule)
Expand Down Expand Up @@ -255,17 +256,25 @@ def get_query(filters, starttime=None, endtime=None, sort=True, timestamp_field=
query['sort'] = [{timestamp_field: {'order': 'desc' if desc else 'asc'}}]
return query

def get_terms_query(self, query, size, field, five=False):
def get_terms_query(self, query, size, field, five=False, use_keyword_postfix=False):
""" Takes a query generated by get_query and outputs a aggregation query """
query_element = query['query']
if 'sort' in query_element:
query_element.pop('sort')
agg_query={'counts': {'composite':{'sources': [],'size': size}}}
fields = field.split(",")
for field in fields:
if use_keyword_postfix:
agg_query['counts']['composite']['sources'].append({field: {"terms": {"field": field+".keyword"}}})
else:
agg_query['counts']['composite']['sources'].append({field: {"terms": {"field": field}}})

if not five:
query_element['filtered'].update({'aggs': {'counts': {'terms': {'field': field, 'size': size}}}})
query_element['filtered'].update({'aggs': agg_query})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this work on older versions?

aggs_query = {'aggs': query_element}
else:
aggs_query = query
aggs_query['aggs'] = {'counts': {'terms': {'field': field, 'size': size}}}
aggs_query['aggs'] = agg_query
return aggs_query

def get_aggregation_query(self, query, rule, query_key, terms_size, timestamp_field='@timestamp'):
Expand Down Expand Up @@ -517,7 +526,10 @@ def get_hits_terms(self, rule, starttime, endtime, index, key, qk=None, size=Non
)
if size is None:
size = rule.get('terms_size', 50)
query = self.get_terms_query(base_query, size, key, rule['five'])
use_keyword_suffix = False
if "use_keyword_suffix" in rule:
use_keyword_suffix = rule["use_keyword_suffix"]
query = self.get_terms_query(base_query, size, key, rule['five'], use_keyword_suffix)

try:
if not rule['five']:
Expand Down Expand Up @@ -657,6 +669,12 @@ def run_query(self, rule, start=None, end=None, scroll=False):
rule_inst.add_aggregation_data(data)
else:
rule_inst.add_data(data)
if isinstance(rule['type'], FlatlineRule):
if "writeback_events_up_enabled" in rule_inst.rules and rule_inst.rules["writeback_events_up_enabled"] and len(rule_inst.events_up) !=0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you replace "writeback_events_up_enabled" in rule_inst.rules and rule_inst.rules["writeback_events_up_enabled"] with rule_inst.get("writeback_events_up_enabled") ?

Also add a space between != and 0

for matches in rule_inst.events_up:
alert_body = self.get_alert_body(matches, rule, False, ts_now(), None, rule_inst.rules["event_up_rule_name_suffix"])
self.writeback('elastalert', alert_body, rule)
rule_inst.events_up=[]

try:
if rule.get('scroll_id') and self.thread_data.num_hits < self.thread_data.total_hits:
Expand Down Expand Up @@ -866,6 +884,8 @@ def run_rule(self, rule, endtime, starttime=None):
else:
self.set_starttime(rule, endtime)

if 'use_timeframe_as_max_limit' in rule and rule['use_timeframe_as_max_limit']:
rule['startime']= max(ts_now() - rule['timeframe'], rule['starttime'])
rule['original_starttime'] = rule['starttime']

# Don't run if starttime was set to the future
Expand All @@ -878,17 +898,24 @@ def run_rule(self, rule, endtime, starttime=None):
self.thread_data.num_dupes = 0
self.thread_data.cumulative_hits = 0
segment_size = self.get_segment_size(rule)

tmp_endtime = rule['starttime']

while endtime - rule['starttime'] > segment_size:
tmp_endtime = tmp_endtime + segment_size
query_all_timeFrame_at_once = 'query_all_timeframe_at_once' in rule and rule['query_all_timeframe_at_once']
if query_all_timeFrame_at_once:
tmp_endtime = ts_now()


while query_all_timeFrame_at_once or endtime - rule['starttime'] > segment_size:
if not query_all_timeFrame_at_once:
tmp_endtime = tmp_endtime + segment_size
if not self.run_query(rule, rule['starttime'], tmp_endtime):
return 0
self.thread_data.cumulative_hits += self.thread_data.num_hits
self.thread_data.num_hits = 0
rule['starttime'] = tmp_endtime
rule['type'].garbage_collect(tmp_endtime)
if query_all_timeFrame_at_once:
break

if rule.get('aggregation_query_element'):
if endtime - tmp_endtime == segment_size:
Expand All @@ -899,7 +926,7 @@ def run_rule(self, rule, endtime, starttime=None):
return 0
else:
endtime = tmp_endtime
else:
elif not query_all_timeFrame_at_once:
if not self.run_query(rule, rule['starttime'], endtime):
return 0
self.thread_data.cumulative_hits += self.thread_data.num_hits
Expand Down Expand Up @@ -1223,6 +1250,7 @@ def handle_config_change(self):
elastalert_logger.info("Background configuration change check run at %s" % (pretty_ts(ts_now())))

def handle_rule_execution(self, rule):

self.thread_data.alerts_sent = 0
next_run = datetime.datetime.utcnow() + rule['run_every']
# Set endtime based on the rule's delay
Expand All @@ -1248,9 +1276,9 @@ def handle_rule_execution(self, rule):
if rule.get('limit_execution_coverage'):
rule['next_min_starttime'] = rule['next_starttime']
if not rule['has_run_once']:
rule['has_run_once'] = True
self.reset_rule_schedule(rule)
return

rule['has_run_once'] = True
try:
num_matches = self.run_rule(rule, endtime, rule.get('initial_starttime'))
Expand All @@ -1261,9 +1289,9 @@ def handle_rule_execution(self, rule):
else:
old_starttime = pretty_ts(rule.get('original_starttime'), rule.get('use_local_time'))
elastalert_logger.info("Ran %s from %s to %s: %s query hits (%s already seen), %s matches,"
" %s alerts sent" % (rule['name'], old_starttime, pretty_ts(endtime, rule.get('use_local_time')),
" %s alerts sent; cumulative hits %s" % (rule['name'], old_starttime, pretty_ts(endtime, rule.get('use_local_time')),
self.thread_data.num_hits, self.thread_data.num_dupes, num_matches,
self.thread_data.alerts_sent))
self.thread_data.alerts_sent, self.thread_data.cumulative_hits))
self.thread_data.alerts_sent = 0

if next_run < datetime.datetime.utcnow():
Expand Down Expand Up @@ -1537,14 +1565,16 @@ def send_alert(self, matches, rule, alert_time=None, retried=False):
if res and not agg_id:
agg_id = res['_id']

def get_alert_body(self, match, rule, alert_sent, alert_time, alert_exception=None):
def get_alert_body(self, match, rule, alert_sent, alert_time, alert_exception=None, rule_name_suffix=None):
body = {
'match_body': match,
'rule_name': rule['name'],
'alert_info': rule['alert'][0].get_info() if not self.debug else {},
'alert_sent': alert_sent,
'alert_time': alert_time
}
if rule_name_suffix != None:
body['rule_name']+=rule_name_suffix

if self.add_metadata_alert:
body['category'] = rule['category']
Expand Down
55 changes: 52 additions & 3 deletions elastalert/ruletypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import copy
import datetime
import sys

reload(sys)
sys.setdefaultencoding('utf8')
from blist import sortedlist
from util import add_raw_postfix
from util import dt_to_ts
Expand Down Expand Up @@ -235,8 +236,16 @@ def add_terms_data(self, terms):
for bucket in buckets:
event = ({self.ts_field: timestamp,
self.rules['query_key']: bucket['key']}, bucket['doc_count'])
self.occurrences.setdefault(bucket['key'], EventWindow(self.rules['timeframe'], getTimestamp=self.get_ts)).append(event)
self.check_for_match(bucket['key'])
composite_key =bucket['key']
#to deal with composite key as it is dict not string, concat all key in one string separate by ,
if type(composite_key) is dict:
composite_key_as_string=""
for single_key in self.rules['query_key'].split(","):
composite_key_as_string+= composite_key[single_key]+", "
composite_key_as_string= composite_key_as_string[:-2]
composite_key = composite_key_as_string
self.occurrences.setdefault(composite_key, EventWindow(self.rules['timeframe'], getTimestamp=self.get_ts)).append(event)
self.check_for_match(composite_key)

def add_data(self, data):
if 'query_key' in self.rules:
Expand Down Expand Up @@ -559,6 +568,11 @@ def __init__(self, *args):

# Dictionary mapping query keys to the first events
self.first_event = {}
self.event_triggered_at_once_per_key={}
self.events_up=[]




def check_for_match(self, key, end=True):
# This function gets called between every added document with end=True after the last
Expand All @@ -568,6 +582,12 @@ def check_for_match(self, key, end=True):

most_recent_ts = self.get_ts(self.occurrences[key].data[-1])
if self.first_event.get(key) is None:
if key not in self.event_triggered_at_once_per_key:
self.event_triggered_at_once_per_key[key]=True
print("First boot for key "+key)
else:
print("service up "+key)
self.add_event_up(copy.deepcopy(self.occurrences[key].data[-1][0]))
self.first_event[key] = most_recent_ts

# Don't check for matches until timeframe has elapsed
Expand All @@ -580,6 +600,7 @@ def check_for_match(self, key, end=True):
# Do a deep-copy, otherwise we lose the datetime type in the timestamp field of the last event
event = copy.deepcopy(self.occurrences[key].data[-1][0])
event.update(key=key, count=count)
event = self.decompose_key_into_multiple_fields(event, self.rules['query_key'])
self.add_match(event)

if not self.rules.get('forget_keys'):
Expand All @@ -593,6 +614,34 @@ def check_for_match(self, key, end=True):
# Forget about this key until we see it again
self.first_event.pop(key)
self.occurrences.pop(key)
def decompose_key_into_multiple_fields(self, event, query_key):
if 'key' in event:
fieldNames = query_key.split(",")
key_fields = event['key'].split(",")
for sub_key_idx in range(len(fieldNames)):
event[fieldNames[sub_key_idx]]= key_fields[sub_key_idx]
return event

def add_event_up(self, event):
""" This function is called on all matching events. Rules use it to add
extra information about the context of a match. Event is a dictionary
containing terms directly from Elasticsearch and alerts will report
all of the information.

:param event: The matching event, a dictionary of terms.
"""
# Convert datetime's back to timestamps
ts = self.rules.get('timestamp_field')
if ts in event:
event[ts] = dt_to_ts(event[ts])
for key in event.keys():
sub_keys = key.split(",")
if len(sub_keys) <2:
continue
for sub_key_idx in range(len(sub_keys)):
event[sub_keys[sub_key_idx]]= event[key][sub_keys[sub_key_idx]]
del event[key]
self.events_up.append(copy.deepcopy(event))

def get_match_str(self, match):
ts = match[self.rules['timestamp_field']]
Expand Down