Skip to content
This repository has been archived by the owner on Aug 23, 2022. It is now read-only.

Commit

Permalink
Add non-digest batch mode (#39)
Browse files Browse the repository at this point in the history
By default (digest mode), all batch events will be grouped by an owner and source_type.
we email will look like:
> here are your X new issues, and by the way, you have these Y old ones.

This PR adds non-digest mode, so the router will receive only these events that are new or need a reminder.
  • Loading branch information
Anton Sapozhnikov authored Mar 27, 2020
1 parent 40a8bb1 commit e7da88e
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 24 deletions.
69 changes: 51 additions & 18 deletions comet_core/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,25 @@ def __init__(self, database_uri="sqlite://"):

self.database_uri = database_uri
self.batch_config = {
"wait_for_more": timedelta(seconds=3),
"communication_digest_mode": True,
# By default (communication_digest_mode=True), all batch events will be grouped by an owner and source_type.
# And email will look like:
# here are your X new issues, and by the way, you have these Y old ones.
# In case of the non-digest mode,
# the router will receive only these events that are new or need a reminder.
"escalation_reminder_cadence": timedelta(days=7),
# `escalation_reminder_cadence` defines how often to send escalation reminders
"escalation_time": timedelta(seconds=10),
# `escalation_time` defines how soon event should be escalated (it takes ignore_fingerprints into account)
"max_wait": timedelta(seconds=4),
# `max_wait` defines the amount of time to wait since the earliest event in an attempt to catch whole batch
"new_threshold": timedelta(days=7),
# `new_threshold` defines amount of time to wait since the latest report of the given fingerprint to assume
# it as a regression of the detected issue
"owner_reminder_cadence": timedelta(days=7),
"escalation_time": timedelta(seconds=10),
"escalation_reminder_cadence": timedelta(days=7),
# `owner_reminder_cadence` defines how often to send reminders
"wait_for_more": timedelta(seconds=3),
# `wait_for_more` defines the amount of time to wait since the latest event
}
self.specific_configs = {}

Expand Down Expand Up @@ -377,7 +390,7 @@ def decorator(func):

self.escalators.add(source_types, func)

# pylint: disable=too-many-branches
# pylint: disable=too-many-branches, too-many-locals, too-many-nested-blocks, too-many-statements
def process_unprocessed_events(self):
"""Checks the database for unprocessed events and processes them.
Expand All @@ -387,20 +400,20 @@ def process_unprocessed_events(self):
same escalation recipient recently. All ignored events will be skipped for the above, but marked as processed.
Config options we care about:
source_type_config['owner_reminder_cadence']:
source_type_config['notifications_send_emails']
source_type_config['communication_digest_mode'],
source_type_config['escalation_reminder_cadence'],
source_type_config['escalation_time'],
source_type_config['escalation_reminder_cadence']
source_type_config['recipient_override']
source_type_config['email_subject']:
source_type_config['wait_for_more']:
source_type_config['max_wait']:
source_type_config['max_wait'],
source_type_config['new_threshold'],
source_type_config['owner_reminder_cadence'],
source_type_config['wait_for_more']
"""

LOG.debug("Processing unprocessed events")

# pylint: disable=consider-iterating-dictionary
for source_type in self.parsers.keys():
source_type_config = self.batch_config
source_type_config = self.batch_config.copy()
if source_type in self.specific_configs:
source_type_config.update(self.specific_configs[source_type])

Expand Down Expand Up @@ -451,14 +464,34 @@ def process_unprocessed_events(self):
for owner, events in events_by_owner.items():
owner_reminder_cadence = source_type_config["owner_reminder_cadence"]

if any([event.new for event in events]) or self.data_store.check_any_issue_needs_reminder(
owner_reminder_cadence, events
):
events_to_remind = []
if source_type_config["communication_digest_mode"]:
if any([event.new for event in events]) or self.data_store.check_any_issue_needs_reminder(
owner_reminder_cadence, events
):
events_to_remind = events
else:
fingerprints_to_remind = self.data_store.get_any_issues_need_reminder(
owner_reminder_cadence, events
)
if fingerprints_to_remind:
for e in events:
if e.fingerprint in fingerprints_to_remind:
e.reminder = True
events_to_remind.append(e)

for e in events:
if e.new and not e.fingerprint in fingerprints_to_remind:
events_to_remind.append(e)

if events_to_remind:
try:
self._route_events(owner, events, source_type)
self.data_store.update_processed_at_timestamp_to_now(events)
self._route_events(owner, events_to_remind, source_type)
self.data_store.update_processed_at_timestamp_to_now(events_to_remind)
except CometCouldNotSendException:
LOG.error(f"Could not send alert to {owner}: {events}")
LOG.error(f"Could not send alert to {owner}: {events_to_remind}")

self.data_store.update_processed_at_timestamp_to_now([e for e in events if e not in events_to_remind])

LOG.info("events-processed", extra={"events": len(events), "source-type": source_type, "owner": owner})

Expand Down
28 changes: 27 additions & 1 deletion comet_core/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def remove_duplicate_events(event_record_list):
return list(events_hash_table.values())


class DataStore:
class DataStore: # pylint: disable=too-many-public-methods
"""Abstraction of the comet storage layer.
Args:
Expand Down Expand Up @@ -156,6 +156,32 @@ def check_any_issue_needs_reminder(self, search_timedelta, records):
return max(timestamps)[0] <= datetime.utcnow() - search_timedelta
return False

def get_any_issues_need_reminder(self, search_timedelta, records):
"""Returns all the `fingerprints` having corresponding `event` table entries with the latest `sent_at`
more then search_timedelta ago.
NOTE: if all database records for a fingerprint given in the `records` list have the sent_at values set to Null,
then this fingerprint will be treated as NOT needing a reminder, which might be unintuitive.
Args:
search_timedelta (datetime.timedelta): reminder interval
records (list): list of EventRecord objects to check
Returns:
list: list of fingerprints that represent issues that need to be reminded about
"""
fingerprints = [record.fingerprint for record in records]
fingerprints_to_remind = (
self.session.query(func.max(EventRecord.sent_at).label("sent_at"), EventRecord.fingerprint)
.filter(EventRecord.fingerprint.in_(fingerprints) & EventRecord.sent_at.isnot(None))
.group_by(EventRecord.fingerprint)
.all()
)
result = []
deltat = datetime.utcnow() - search_timedelta
for f in fingerprints_to_remind:
if f.sent_at <= deltat:
result.append(f.fingerprint)

return result

def update_timestamp_column_to_now(self, records, column_name):
"""Update the `column_name` of the provided `EventRecord`s to datetime now
Expand Down
4 changes: 2 additions & 2 deletions comet_core/fingerprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

"""Helper function to compute the fingerprint of alerts."""

import collections
import hmac
import json
from collections.abc import Iterable
from copy import deepcopy
from hashlib import sha256, shake_256

Expand Down Expand Up @@ -62,7 +62,7 @@ def filter_dict(orig_dict, blacklist):
for item in blacklist:
if isinstance(item, str) and item in orig_dict:
del orig_dict[item]
elif isinstance(item, collections.Iterable):
elif isinstance(item, Iterable):
pointer = orig_dict
for sub in item[:-1]:
pointer = pointer.get(sub, {})
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

setuptools.setup(
name="comet-core",
version="2.5.1",
version="2.6.0",
url="https://github.com/spotify/comet-core",
author="Spotify Platform Security",
author_email="wasabi@spotify.com",
Expand Down
113 changes: 111 additions & 2 deletions tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
from comet_core.model import EventRecord, IgnoreFingerprintRecord


@freeze_time("2018-05-09 09:00:00")
# pylint: disable=missing-docstring
def test_process_unprocessed_events():
def test_process_unprocessed_events_digest_mode():
app = Comet()
app.register_parser("datastoretest", json)
app.register_parser("datastoretest2", json)
app.register_parser("datastoretest3", json)
app.register_parser("datastoretest4", json)

app.set_config("datastoretest2", {})

Expand Down Expand Up @@ -91,11 +91,120 @@ def test_process_unprocessed_events():
)

app.process_unprocessed_events()

# it is expected to have specific_router called once for the datastoretest2
assert specific_router.call_count == 1
# it is expected to have two calls of the generic router for the source_type datastoretest2 and datastoretest3
assert router.call_count == 2
# and the user must be check_user
assert router.call_args[0][2][0].owner == check_user
# due to the default escalation_time=10seconds, all three events (id=2,3,4) must be escalated
assert escalator.call_count == 3

app.data_store.add_record(
EventRecord(
id=5,
received_at=datetime.utcnow() - timedelta(days=2),
source_type="datastoretest",
owner=check_user,
data={},
fingerprint="f1",
)
)
app.process_unprocessed_events()

# f1 is expected to be processed, but not sent out
assert app.data_store.get_latest_event_with_fingerprint("f1").processed_at
assert not app.data_store.get_latest_event_with_fingerprint("f1").sent_at


# pylint: disable=missing-docstring
def test_process_unprocessed_events_non_digest_mode():
app = Comet()
app.register_parser("datastoretest4", json)

check_user = "an_owner"
router = mock.Mock()
escalator = mock.Mock()
app.register_router(func=router)
app.register_escalator(func=escalator)

app.set_config("datastoretest4", {"communication_digest_mode": False, "new_threshold": timedelta(days=14)})

app.data_store.add_record(
EventRecord(
id=6,
received_at=datetime.utcnow() - timedelta(days=8),
source_type="datastoretest4",
sent_at=datetime.utcnow() - timedelta(days=8),
processed_at=datetime.utcnow() - timedelta(days=8),
owner=check_user,
data={},
fingerprint="f5",
)
)

app.data_store.add_record(
EventRecord(
id=7,
received_at=datetime.utcnow() - timedelta(days=2),
source_type="datastoretest4",
owner=check_user,
data={},
fingerprint="f5",
)
)

app.data_store.add_record(
EventRecord(
id=8,
received_at=datetime.utcnow(),
source_type="datastoretest4",
owner=check_user,
data={},
fingerprint="f6",
)
)

app.data_store.add_record(
EventRecord(
id=9,
received_at=datetime.utcnow() - timedelta(days=2),
source_type="datastoretest4",
sent_at=datetime.utcnow() - timedelta(days=2),
processed_at=datetime.utcnow() - timedelta(days=2),
owner=check_user,
data={},
fingerprint="f7",
)
)

app.data_store.add_record(
EventRecord(
id=10,
received_at=datetime.utcnow(),
source_type="datastoretest4",
owner=check_user,
data={},
fingerprint="f7",
)
)

# f5 is expected to be reminded
# f6 is expected to be new and sent as well
# f7 is NOT expected to be reminded
before_calling = router.call_count
app.process_unprocessed_events()
assert app.data_store.get_latest_event_with_fingerprint("f5").processed_at
assert app.data_store.get_latest_event_with_fingerprint("f6").processed_at
assert app.data_store.get_latest_event_with_fingerprint("f7").processed_at
assert router.call_count == before_calling + 1

sent_fingerprints = [e.fingerprint for e in router.call_args[0][2]]
assert "f5" in sent_fingerprints
assert "f6" in sent_fingerprints
assert "f7" not in sent_fingerprints


def test_event_container():
container = EventContainer("test", {})
Expand Down
55 changes: 55 additions & 0 deletions tests/test_data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,61 @@ def test_update_processed_at_timestamp_to_now(data_store_with_test_events):
assert isinstance(record.processed_at, datetime)


def test_get_any_issues_need_reminder():
data_store = comet_core.data_store.DataStore("sqlite://")

test_fingerprint1 = "f1"
test_fingerprint2 = "f2"
test_fingerprint3 = "f3"

one_a = EventRecord(sent_at=datetime.utcnow() - timedelta(days=9), source_type="datastoretest")
one_a.fingerprint = test_fingerprint1
one_b = EventRecord(sent_at=datetime.utcnow() - timedelta(days=3), source_type="datastoretest")
one_b.fingerprint = test_fingerprint1

two_a = EventRecord(sent_at=datetime.utcnow() - timedelta(days=10), source_type="datastoretest")
two_a.fingerprint = test_fingerprint2
two_b = EventRecord(sent_at=datetime.utcnow() - timedelta(days=8), source_type="datastoretest")
two_b.fingerprint = test_fingerprint2

two_c = EventRecord(source_type="datastoretest") # sent_at NULL
two_c.fingerprint = test_fingerprint2

three_a = EventRecord(source_type="datastoretest") # sent_at NULL
three_a.fingerprint = test_fingerprint3

data_store.add_record(one_a)
data_store.add_record(two_a)
data_store.add_record(two_b)
data_store.add_record(two_c)
data_store.add_record(three_a)

# issue \ time --->
# 1 --------a------|-------------->
# 2 ----a-------b--|--------------> (2c sent_at == NULL)
# 3 ---------------|--------------> (3a sent_at == NULL)
# ^
# -7days

result = data_store.get_any_issues_need_reminder(timedelta(days=7), [one_a, two_a, three_a])
assert len(result) == 2
assert test_fingerprint2 in result
assert test_fingerprint1 in result

data_store.add_record(one_b)

# issue \ time --->
# 1 --------a------|-----b-------->
# 2 ----a-------b--|--------------> (2c sent_at == NULL)
# 3 ---------------|--------------> (3a sent_at == NULL)
# ^
# -7days

result = data_store.get_any_issues_need_reminder(timedelta(days=7), [one_a, two_a, three_a])
assert len(result) == 1
assert test_fingerprint2 in result


def test_check_any_issue_needs_reminder():
data_store = comet_core.data_store.DataStore("sqlite://")

Expand Down

0 comments on commit e7da88e

Please sign in to comment.