Skip to content

Commit

Permalink
feat(crons): Record historic check-in volume counts
Browse files Browse the repository at this point in the history
Part of GH-79328
  • Loading branch information
evanpurkhiser committed Oct 22, 2024
1 parent 1545321 commit a878462
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 5 deletions.
55 changes: 53 additions & 2 deletions src/sentry/monitors/clock_dispatch.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import logging
from collections import Counter
from datetime import datetime, timedelta, timezone
from typing import Sequence

from arroyo import Topic as ArroyoTopic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
Expand All @@ -22,6 +24,12 @@
# This key is used to store the hashmap of Mapping[PartitionKey, Timestamp]
MONITOR_TASKS_PARTITION_CLOCKS = "sentry.monitors.partition_clocks"

# This key is used to record historical date about the volume of check-ins.
MONITOR_VOLUME_HISTORY = "sentry.monitors.volume_history:{}"

# We record 30 days worth of historical data for each minute of check-ins.
MONITOR_VOLUME_RETENTION = timedelta(days=30)

CLOCK_TICK_CODEC: Codec[ClockTick] = get_topic_codec(Topic.MONITORS_CLOCK_TICK)


Expand Down Expand Up @@ -70,6 +78,50 @@ def _dispatch_tick(ts: datetime):
_clock_tick_producer.produce(ArroyoTopic(topic), payload)


def _make_reference_ts(ts: datetime):
"""
Produce a timestamp number with the seconds and microsecond removed
"""
return int(ts.replace(second=0, microsecond=0).timestamp())


def update_check_in_volume(ts: datetime):
"""
Increment a counter for this particular timestamp trimmed down to the
minute.
This counter will be used as historical data to help incidate if we may
have had some data-loss (due to an incident) and would want to tick our
clock in a mode where misses and time-outs are created as "unknown".
"""
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

reference_ts = _make_reference_ts(ts)
key = MONITOR_VOLUME_HISTORY.format(reference_ts)

pipeline = redis_client.pipeline()
pipeline.incr(key, amount=1)
pipeline.expire(key, MONITOR_VOLUME_RETENTION)
pipeline.execute()


def bulk_update_check_in_volume(ts_list: Sequence[datetime]):
"""
Increment counters for a list of check-in timestamps. Each timestamp will be
trimmed to the minute and groupped appropriately
"""
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

# Group timestamps down to the minute
for reference_ts, count in Counter(_make_reference_ts(ts) for ts in ts_list).items():
key = MONITOR_VOLUME_HISTORY.format(reference_ts)

pipeline = redis_client.pipeline()
pipeline.incr(key, amount=count)
pipeline.expire(key, MONITOR_VOLUME_RETENTION)
pipeline.execute()


def try_monitor_clock_tick(ts: datetime, partition: int):
"""
Handles triggering the monitor tasks when we've rolled over the minute.
Expand All @@ -84,8 +136,7 @@ def try_monitor_clock_tick(ts: datetime, partition: int):

# Trim the timestamp seconds off, these tasks are run once per minute and
# should have their timestamp clamped to the minute.
reference_datetime = ts.replace(second=0, microsecond=0)
reference_ts = int(reference_datetime.timestamp())
reference_ts = _make_reference_ts(ts)

# Store the current clock value for this partition.
redis_client.zadd(
Expand Down
12 changes: 11 additions & 1 deletion src/sentry/monitors/consumers/monitor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
from sentry.db.postgres.transactions import in_test_hide_transaction_boundary
from sentry.killswitches import killswitch_matches_context
from sentry.models.project import Project
from sentry.monitors.clock_dispatch import try_monitor_clock_tick
from sentry.monitors.clock_dispatch import (
bulk_update_check_in_volume,
try_monitor_clock_tick,
update_check_in_volume,
)
from sentry.monitors.constants import PermitCheckInStatus
from sentry.monitors.logic.mark_failed import mark_failed
from sentry.monitors.logic.mark_ok import mark_ok
Expand Down Expand Up @@ -992,6 +996,9 @@ def process_batch(executor: ThreadPoolExecutor, message: Message[ValuesBatch[Kaf
]
wait(futures)

# Update check in volume for the entire batch we've just processed
bulk_update_check_in_volume(item.timestamp for item in batch)

# Attempt to trigger monitor tasks across processed partitions
for partition, ts in latest_partition_ts.items():
try:
Expand All @@ -1009,6 +1016,9 @@ def process_single(message: Message[KafkaPayload | FilteredPayload]):
ts = message.value.timestamp
partition = message.value.partition.index

if wrapper["message_type"] != "clock_pulse":
update_check_in_volume(ts)

try:
try_monitor_clock_tick(ts, partition)
except Exception:
Expand Down
35 changes: 35 additions & 0 deletions tests/sentry/monitors/consumers/test_monitor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1016,6 +1016,13 @@ def test_organization_killswitch(self):

assert not MonitorCheckIn.objects.filter(guid=self.guid).exists()

@mock.patch("sentry.monitors.consumers.monitor_consumer.update_check_in_volume")
def test_monitor_update_check_in_volumne(self, update_check_in_volume):
monitor = self._create_monitor(slug="my-monitor")

self.send_checkin(monitor.slug)
assert update_check_in_volume.call_count == 1

@mock.patch("sentry.monitors.consumers.monitor_consumer.try_monitor_clock_tick")
def test_monitor_tasks_trigger(self, try_monitor_clock_tick):
monitor = self._create_monitor(slug="my-monitor")
Expand All @@ -1038,6 +1045,34 @@ def test_monitor_tasks_trigger(self, try_monitor_clock_tick):
logger.exception.assert_called_with("Failed to trigger monitor tasks")
try_monitor_clock_tick.side_effect = None

@mock.patch("sentry.monitors.consumers.monitor_consumer.bulk_update_check_in_volume")
def test_parallel_monitor_update_check_in_volume(self, bulk_update_check_in_volume):
factory = StoreMonitorCheckInStrategyFactory(mode="parallel", max_batch_size=4)
commit = mock.Mock()
consumer = factory.create_with_partitions(commit, {self.partition: 0})

monitor = self._create_monitor(slug="my-monitor")

now = datetime.now().replace(second=5)

# Send enough check-ins to process the batch
self.send_checkin(monitor.slug, consumer=consumer, ts=now)
self.send_checkin(monitor.slug, consumer=consumer, ts=now + timedelta(seconds=10))
self.send_checkin(monitor.slug, consumer=consumer, ts=now + timedelta(seconds=30))
self.send_checkin(monitor.slug, consumer=consumer, ts=now + timedelta(minutes=1))

# One final check-in will trigger the batch to process (but will not
# yet be processed itself)
self.send_checkin(monitor.slug, consumer=consumer, ts=now + timedelta(minutes=2))

assert bulk_update_check_in_volume.call_count == 1
assert list(bulk_update_check_in_volume.call_args_list[0][0][0]) == [
now,
now + timedelta(seconds=10),
now + timedelta(seconds=30),
now + timedelta(minutes=1),
]

@mock.patch("sentry.monitors.consumers.monitor_consumer.try_monitor_clock_tick")
def test_parallel_monitor_task_triggers(self, try_monitor_clock_tick):
factory = StoreMonitorCheckInStrategyFactory(mode="parallel", max_batch_size=4)
Expand Down
58 changes: 56 additions & 2 deletions tests/sentry/monitors/test_clock_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@

from arroyo import Topic
from arroyo.backends.kafka import KafkaPayload
from django.conf import settings
from django.test.utils import override_settings
from django.utils import timezone

from sentry.monitors.clock_dispatch import _dispatch_tick, try_monitor_clock_tick
from sentry.utils import json
from sentry.monitors.clock_dispatch import (
MONITOR_VOLUME_HISTORY,
_dispatch_tick,
bulk_update_check_in_volume,
try_monitor_clock_tick,
update_check_in_volume,
)
from sentry.utils import json, redis


@mock.patch("sentry.monitors.clock_dispatch._dispatch_tick")
Expand Down Expand Up @@ -143,3 +150,50 @@ def test_dispatch_to_kafka(clock_tick_producer_mock):
Topic("clock-tick-test-topic"),
KafkaPayload(None, json.dumps({"ts": now.timestamp()}).encode("utf-8"), []),
)


def test_update_check_in_volume():
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)
now = timezone.now().replace(second=5)

update_check_in_volume(now)
update_check_in_volume(now + timedelta(seconds=5))
update_check_in_volume(now + timedelta(minutes=1))

def make_key(offset: timedelta) -> str:
ts = now.replace(second=0, microsecond=0) + offset
return MONITOR_VOLUME_HISTORY.format(int(ts.timestamp()))

minute_0 = redis_client.get(make_key(timedelta()))
minute_1 = redis_client.get(make_key(timedelta(minutes=1)))

assert minute_0 == "2"
assert minute_1 == "1"


def test_bulk_update_check_in_volume():
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

now = timezone.now().replace(second=5)
items = [
now,
now + timedelta(seconds=10),
now + timedelta(seconds=30),
now + timedelta(minutes=1),
now + timedelta(minutes=3),
]
bulk_update_check_in_volume(items)

def make_key(offset: timedelta) -> str:
ts = now.replace(second=0, microsecond=0) + offset
return MONITOR_VOLUME_HISTORY.format(int(ts.timestamp()))

minute_0 = redis_client.get(make_key(timedelta()))
minute_1 = redis_client.get(make_key(timedelta(minutes=1)))
minute_2 = redis_client.get(make_key(timedelta(minutes=2)))
minute_3 = redis_client.get(make_key(timedelta(minutes=3)))

assert minute_0 == "3"
assert minute_1 == "1"
assert minute_2 == None
assert minute_3 == "1"

0 comments on commit a878462

Please sign in to comment.