Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AlertMerger query optimizations #1030

Merged
merged 3 commits into from
Oct 4, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions streamalert/alert_merger/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ class AlertMerger:
# Set the max payload size to slightly under that to account for the rest of the message.
MAX_LAMBDA_PAYLOAD_SIZE = 126000

# The maximum number of alerts that are loaded into memory for a single rule, during a single
# loop in the Alert Merger.
ALERT_GENERATOR_DEFAULT_LIMIT = 5000

@classmethod
def get_instance(cls):
"""Get an instance of the AlertMerger, using a cached version if possible."""
Expand All @@ -73,17 +77,32 @@ def __init__(self):
self.alert_proc_timeout = int(os.environ['ALERT_PROCESSOR_TIMEOUT_SEC'])
self.lambda_client = boto3.client('lambda')

def _get_alerts(self, rule_name):
"""Build a list of Alert instances triggered from the given rule name."""
alerts = []
# FIXME (derek.wang) Maybe make this configurable in the future
self._alert_generator_limit = self.ALERT_GENERATOR_DEFAULT_LIMIT

def _alert_generator(self, rule_name):
"""
Returns a generator that yields Alert instances triggered from the given rule name.

To limit memory consumption, the generator yields a maximum number of alerts, defined
by self._alert_generator_limit.
"""
alert_count = 0
for record in self.table.get_alert_records(rule_name, self.alert_proc_timeout):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omit the alert_count and use something like the following:

for idx, record in enumerate(self.table.get_alert_records(rule_name, self.alert_proc_timeout), start=1):
    ...
    if idx >= self. _alert_generator_limit:
        ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I'll test this on my own but regardless—)
Does enumerate preserve the generator? It won't materialize the generator values up-front like list, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ryxias yep it will preserve the generator underneath

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ryxias

def gen():
    for i in range(10):
        print('yielding', i)
        yield i


def main():
    iter = enumerate(gen(), start=1)
    print('created enumerator', iter)
    for i, v in iter:
        print('index: {}; value: {}'.format(i, v))


if __name__ == '__main__':
    main()

Output:

created enumerator <enumerate object at 0x10ddd1e60>
yielding 0
index: 1; value: 0
yielding 1
index: 2; value: 1
yielding 2
index: 3; value: 2
yielding 3
index: 4; value: 3
yielding 4
index: 5; value: 4
yielding 5
index: 6; value: 5
yielding 6
index: 7; value: 6
yielding 7
index: 8; value: 7
yielding 8
index: 9; value: 8
yielding 9
index: 10; value: 9

try:
alerts.append(Alert.create_from_dynamo_record(record))
yield Alert.create_from_dynamo_record(record)
alert_count += 1
except AlertCreationError:
LOGGER.exception('Invalid alert record %s', record)
continue

return alerts
if alert_count >= self._alert_generator_limit:
LOGGER.warning(
'Alert Merger reached alert limit of %d for rule "%s"',
self._alert_generator_limit,
rule_name
)
return

@staticmethod
def _merge_groups(alerts):
Expand Down Expand Up @@ -144,14 +163,9 @@ def dispatch(self):
merged_alerts = [] # List of newly created merge alerts
alerts_to_delete = [] # List of alerts which can be deleted

# TODO: Find a way to avoid a full table scan just to get rule names
for rule_name in self.table.rule_names():
alerts = self._get_alerts(rule_name)
if not alerts:
continue

for rule_name in self.table.rule_names_generator():
merge_enabled_alerts = []
for alert in alerts:
for alert in self._alert_generator(rule_name):
if alert.remaining_outputs:
# If an alert still has pending outputs, it needs to be sent immediately.
# For example, all alerts are sent to the default firehose now even if they will
Expand Down
34 changes: 34 additions & 0 deletions streamalert/shared/alert_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ def _paginate(func, func_kwargs):
def rule_names(self):
"""Find all of the distinct rule names in the table.

@deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still used internally? I'd say, if not, just 86 it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used in a variety of places; as far as I can tell it's used in a lot of tests. My worry was it was used in a bunch of public methods, which other customers might be using... that said, I think it's a good idea cuz using this method is just not a good idea. I'll hunt down the references in a separate PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separate note: "just 86 it" 🤣

I watch too much food TV

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YOU or I watch too much!? I worked in a restaurant 😛

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just surprised I understood the kitchen jargon
Too much Gordon Ramsey on Youtube
Yelling at people
ITS R AW
☠️

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOL

This method is deprecated due to requiring a full table scan prior to returning any
records.

Returns:
set: Set of string rule names
"""
Expand All @@ -68,6 +72,36 @@ def rule_names(self):
}
return set(item['RuleName'] for item in self._paginate(self._table.scan, kwargs))

def rule_names_generator(self):
"""Returns a generator that yields unique rule_names of items on the table

Each unique name is yielded only once. Additionally, because the names materialized over
several paginated calls, it is not 100% guaranteed to return every possible rule_name on
the Alert Table; if there are other operations that are writing items to the DynamoDB table,
it is possible for certain names to get skipped.

Returns:
generator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please include the type that the generator will yield

"""
kwargs = {
'ProjectionExpression': 'RuleName',
'Select': 'SPECIFIC_ATTRIBUTES',

# It is acceptable to use inconsistent reads here to reduce read capacity units
# consumed, as there is already no guarantee of consistent in the rule names due to
# pagination.
'ConsistentRead': False,
}

rule_names = set()
for item in self._paginate(self._table.scan, kwargs):
name = item['RuleName']
if name in rule_names:
continue

rule_names.add(name)
yield name

def get_alert_records(self, rule_name, alert_proc_timeout_sec):
"""Find all alerts for the given rule which need to be dispatched to the alert processor.

Expand Down
4 changes: 2 additions & 2 deletions tests/unit/streamalert/alert_merger/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ def teardown(self):
self.lambda_mock.stop()

@patch.object(main, 'LOGGER')
def test_get_alerts(self, mock_logger):
def test_alert_generator(self, mock_logger):
"""Alert Merger - Sorted Alerts - Invalid Alerts are Logged"""
records = [
Alert('test_rule', {}, {'output'}).dynamo_record(),
{'Nonsense': 'Record'}
]

with patch.object(self.merger.table, 'get_alert_records', return_value=records):
result = self.merger._get_alerts('test_rule')
result = list(self.merger._alert_generator('test_rule'))
# Valid record is returned
assert_equal(1, len(result))
assert_equal(records[0]['AlertID'], result[0].alert_id)
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/streamalert/shared/test_alert_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ def test_rule_names(self):
"""Alert Table - Rule Names From Table Scan"""
assert_equal({'even', 'odd'}, self.alert_table.rule_names())

def test_rule_names_generator(self):
"""Alert Table - Rule Names Generator From Table Scan"""
assert_equal({'even', 'odd'}, set(self.alert_table.rule_names_generator()))

def test_get_alert_records(self):
"""Alert Table - Pending Alerts From Table Query"""
result = list(self.alert_table.get_alert_records('odd', _ALERT_PROCESSOR_TIMEOUT_SEC))
Expand Down