Skip to content

Commit

Permalink
feat(crons): Implement simple incident occurence dispatching
Browse files Browse the repository at this point in the history
Simply deserializes the IncidentOccurrence messages and dispatches the
incident occurrence.

Does not yet determine if the clock tick should cause the dispatch to be
delayed.
  • Loading branch information
evanpurkhiser committed Nov 15, 2024
1 parent 985a324 commit 293092e
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 17 deletions.
36 changes: 34 additions & 2 deletions src/sentry/monitors/consumers/incident_occurrences_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import logging
from collections.abc import Mapping
from datetime import UTC, datetime
from typing import TypeGuard

from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
Expand All @@ -12,6 +14,8 @@
from sentry_kafka_schemas.schema_types.monitors_incident_occurrences_v1 import IncidentOccurrence

from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.monitors.logic.incident_occurrence import create_incident_occurrence
from sentry.monitors.models import MonitorCheckIn, MonitorIncident

logger = logging.getLogger(__name__)

Expand All @@ -21,11 +25,39 @@


def process_incident_occurrence(message: Message[KafkaPayload | FilteredPayload]):
"""
Process a incident occurrence message. This will immediately dispatch an
issue occurrence via create_incident_occurrence.
"""
assert not isinstance(message.payload, FilteredPayload)
assert isinstance(message.value, BrokerValue)

# wrapper: IncidentOccurrence = MONITORS_INCIDENT_OCCURRENCES.decode(message.payload.value)
# TODO(epurkhiser): Do something with issue occurrence
wrapper: IncidentOccurrence = MONITORS_INCIDENT_OCCURRENCES.decode(message.payload.value)

try:
incident = MonitorIncident.objects.get(id=int(wrapper["incident_id"]))
except MonitorIncident.DoesNotExist:
logger.exception("missing_incident")
return

# previous_checkin_ids includes the failed_checkin_id
checkins = MonitorCheckIn.objects.filter(id__in=wrapper["previous_checkin_ids"])
checkins_map: dict[int, MonitorCheckIn] = {checkin.id: checkin for checkin in checkins}

failed_checkin = checkins_map.get(int(wrapper["failed_checkin_id"]))
previous_checkins = [checkins_map.get(int(id)) for id in wrapper["previous_checkin_ids"]]

def has_all(checkins: list[MonitorCheckIn | None]) -> TypeGuard[list[MonitorCheckIn]]:
return None not in checkins

# Unlikely, but if we can't find all the check-ins we can't produce an occurence
if failed_checkin is None or not has_all(previous_checkins):
logger.error("missing_check_ins")
return

received = datetime.fromtimestamp(wrapper["received_ts"], UTC)

create_incident_occurrence(failed_checkin, previous_checkins, incident, received)


class MonitorIncidentOccurenceStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
Expand Down
148 changes: 133 additions & 15 deletions tests/sentry/monitors/consumers/test_incident_occurrence_consumer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import uuid
from datetime import datetime
from unittest import mock

Expand All @@ -11,6 +12,14 @@
MONITORS_INCIDENT_OCCURRENCES,
MonitorIncidentOccurenceStrategyFactory,
)
from sentry.monitors.models import (
CheckInStatus,
MonitorCheckIn,
MonitorEnvironment,
MonitorIncident,
MonitorStatus,
)
from sentry.testutils.cases import TestCase

partition = Partition(Topic("test"), 0)

Expand All @@ -35,19 +44,128 @@ def sned_incident_occurrence(
consumer.submit(Message(value))


def test_simple():
# XXX(epurkhiser): Doesn't really test anything yet
ts = timezone.now().replace(second=0, microsecond=0)

consumer = create_consumer()
sned_incident_occurrence(
consumer,
ts,
{
"clock_tick_ts": 1617895645,
"received_ts": 1617895650,
"failed_checkin_id": 123456,
"incident_id": 987654,
"previous_checkin_ids": [111222, 333444, 55666],
},
class MonitorsIncidentOccurrenceConsumerTestCase(TestCase):
@mock.patch(
"sentry.monitors.consumers.incident_occurrences_consumer.create_incident_occurrence"
)
def test_simple(self, mock_create_incident_occurrence):
ts = timezone.now().replace(second=0, microsecond=0)

monitor = self.create_monitor()
monitor_environment = MonitorEnvironment.objects.create(
monitor=monitor,
environment_id=self.environment.id,
status=MonitorStatus.ERROR,
)

last_checkin = timezone.now()
trace_id = uuid.uuid4()

failed_checkin = MonitorCheckIn.objects.create(
monitor=monitor,
monitor_environment=monitor_environment,
project_id=self.project.id,
status=CheckInStatus.ERROR,
trace_id=trace_id,
date_added=last_checkin,
)
incident = MonitorIncident.objects.create(
monitor=monitor,
monitor_environment=monitor_environment,
starting_checkin=failed_checkin,
starting_timestamp=last_checkin,
)

consumer = create_consumer()
sned_incident_occurrence(
consumer,
ts,
{
"clock_tick_ts": int(ts.timestamp()),
"received_ts": int(last_checkin.timestamp()),
"incident_id": incident.id,
"failed_checkin_id": failed_checkin.id,
"previous_checkin_ids": [failed_checkin.id],
},
)

assert mock_create_incident_occurrence.call_count == 1
assert mock_create_incident_occurrence.mock_calls[0] == mock.call(
failed_checkin,
[failed_checkin],
incident,
last_checkin.replace(microsecond=0),
)

@mock.patch("sentry.monitors.consumers.incident_occurrences_consumer.logger")
def test_missing_data(self, mock_logger):
ts = timezone.now().replace(second=0, microsecond=0)

monitor = self.create_monitor()
monitor_environment = MonitorEnvironment.objects.create(
monitor=monitor,
environment_id=self.environment.id,
status=MonitorStatus.ERROR,
)

last_checkin = timezone.now()
trace_id = uuid.uuid4()

failed_checkin = MonitorCheckIn.objects.create(
monitor=monitor,
monitor_environment=monitor_environment,
project_id=self.project.id,
status=CheckInStatus.ERROR,
trace_id=trace_id,
date_added=last_checkin,
)
incident = MonitorIncident.objects.create(
monitor=monitor,
monitor_environment=monitor_environment,
starting_checkin=failed_checkin,
starting_timestamp=last_checkin,
)

consumer = create_consumer()

# Send with bad incident id
sned_incident_occurrence(
consumer,
ts,
{
"clock_tick_ts": int(ts.timestamp()),
"received_ts": int(last_checkin.timestamp()),
"incident_id": 1234,
"failed_checkin_id": failed_checkin.id,
"previous_checkin_ids": [failed_checkin.id],
},
)
mock_logger.exception.assert_called_with("missing_incident")

# Send with bad failed_checkin_id
sned_incident_occurrence(
consumer,
ts,
{
"clock_tick_ts": int(ts.timestamp()),
"received_ts": int(last_checkin.timestamp()),
"incident_id": incident.id,
"failed_checkin_id": 1234,
"previous_checkin_ids": [failed_checkin.id],
},
)
mock_logger.error.assert_called_with("missing_check_ins")

# Send with bad previous_checkin_ids
sned_incident_occurrence(
consumer,
ts,
{
"clock_tick_ts": int(ts.timestamp()),
"received_ts": int(last_checkin.timestamp()),
"incident_id": incident.id,
"failed_checkin_id": failed_checkin.id,
"previous_checkin_ids": [123],
},
)
mock_logger.error.assert_called_with("missing_check_ins")

0 comments on commit 293092e

Please sign in to comment.