Skip to content

Commit

Permalink
feat(crons): Implement simple incident occurence dispatching
Browse files Browse the repository at this point in the history
Simply deserializes the IncidentOccurrence messages and dispatches the
incident occurrence.

Does not yet determine if the clock tick should cause the dispatch to be
delayed.
  • Loading branch information
evanpurkhiser committed Nov 14, 2024
1 parent 7eaeb8e commit ba25e24
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 17 deletions.
35 changes: 33 additions & 2 deletions src/sentry/monitors/consumers/incident_occurrences_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import logging
from collections.abc import Mapping
from datetime import UTC, datetime
from typing import TypeGuard

from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
Expand All @@ -12,6 +14,8 @@
from sentry_kafka_schemas.schema_types.monitors_incident_occurrences_v1 import IncidentOccurrence

from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.monitors.logic.incident_occurrence import create_incident_occurrence
from sentry.monitors.models import MonitorCheckIn, MonitorIncident

logger = logging.getLogger(__name__)

Expand All @@ -21,11 +25,38 @@


def process_incident_occurrence(message: Message[KafkaPayload | FilteredPayload]):
"""
Process a incident occurrence message. This will immediately dispatch an
issue occurrence via create_incident_occurrence.
"""
assert not isinstance(message.payload, FilteredPayload)
assert isinstance(message.value, BrokerValue)

# wrapper: IncidentOccurrence = MONITORS_INCIDENT_OCCURRENCES.decode(message.payload.value)
# TODO(epurkhiser): Do something with issue occurrence
wrapper: IncidentOccurrence = MONITORS_INCIDENT_OCCURRENCES.decode(message.payload.value)

try:
incident = MonitorIncident.objects.get(id=int(wrapper["incident_id"]))
except MonitorIncident.DoesNotExist:
return

# previous_checkin_ids includes the failed_checkin_id
checkins = MonitorCheckIn.objects.filter(id__in=wrapper["previous_checkin_ids"])
checkins_map: dict[int, MonitorCheckIn] = {checkin.id: checkin for checkin in checkins}

failed_checkin = checkins_map.get(int(wrapper["failed_checkin_id"]))
previous_checkins = [checkins_map.get(int(id)) for id in wrapper["previous_checkin_ids"]]

def has_all(checkins: list[MonitorCheckIn | None]) -> TypeGuard[list[MonitorCheckIn]]:
return None not in checkins

# Unlikeky, but if we can't find all the check-ins we can't produce an occurence
if failed_checkin is None or not has_all(previous_checkins):
logger.error("missing_check_ins")
return

received = datetime.fromtimestamp(wrapper["received_ts"], UTC)

create_incident_occurrence(failed_checkin, previous_checkins, incident, received)


class MonitorIncidentOccurenceStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import uuid
from datetime import datetime
from unittest import mock

Expand All @@ -11,6 +12,16 @@
MONITORS_INCIDENT_OCCURRENCES,
MonitorIncidentOccurenceStrategyFactory,
)
from sentry.monitors.models import (
CheckInStatus,
Monitor,
MonitorCheckIn,
MonitorEnvironment,
MonitorIncident,
MonitorType,
ScheduleType,
)
from sentry.testutils.cases import TestCase

partition = Partition(Topic("test"), 0)

Expand All @@ -35,19 +46,66 @@ def sned_incident_occurrence(
consumer.submit(Message(value))


def test_simple():
# XXX(epurkhiser): Doesn't really test anything yet
ts = timezone.now().replace(second=0, microsecond=0)

consumer = create_consumer()
sned_incident_occurrence(
consumer,
ts,
{
"clock_tick_ts": 1617895645,
"received_ts": 1617895650,
"failed_checkin_id": 123456,
"incident_id": 987654,
"previous_checkin_ids": [111222, 333444, 55666],
},
class MonitorsIncidentOccurrenceConsumerTest(TestCase):
@mock.patch(
"sentry.monitors.consumers.incident_occurrences_consumer.create_incident_occurrence"
)
def test_simple(self, mock_create_incident_occurrence):
ts = timezone.now().replace(second=0, microsecond=0)

monitor = Monitor.objects.create(
name="test monitor",
organization_id=self.organization.id,
project_id=self.project.id,
type=MonitorType.CRON_JOB,
config={
"schedule": [1, "month"],
"schedule_type": ScheduleType.INTERVAL,
"max_runtime": None,
"checkin_margin": None,
},
)
monitor_environment = MonitorEnvironment.objects.create(
monitor=monitor,
environment_id=self.environment.id,
status=monitor.status,
)

last_checkin = timezone.now()
trace_id = uuid.uuid4()

failed_checkin = MonitorCheckIn.objects.create(
monitor=monitor,
monitor_environment=monitor_environment,
project_id=self.project.id,
status=CheckInStatus.ERROR,
trace_id=trace_id,
date_added=last_checkin,
)
incident = MonitorIncident.objects.create(
monitor=monitor,
monitor_environment=monitor_environment,
starting_checkin=failed_checkin,
starting_timestamp=last_checkin,
)

consumer = create_consumer()
sned_incident_occurrence(
consumer,
ts,
{
"clock_tick_ts": int(ts.timestamp()),
"received_ts": int(last_checkin.timestamp()),
"incident_id": incident.id,
"failed_checkin_id": failed_checkin.id,
"previous_checkin_ids": [failed_checkin.id],
},
)

assert mock_create_incident_occurrence.call_count == 1
assert mock_create_incident_occurrence.mock_calls[0] == mock.call(
failed_checkin,
[failed_checkin],
incident,
last_checkin.replace(microsecond=0),
)

0 comments on commit ba25e24

Please sign in to comment.