diff --git a/src/sentry/monitors/clock_dispatch.py b/src/sentry/monitors/clock_dispatch.py index c94fbfdc3ef092..844e1307edca4e 100644 --- a/src/sentry/monitors/clock_dispatch.py +++ b/src/sentry/monitors/clock_dispatch.py @@ -10,7 +10,6 @@ from sentry_kafka_schemas.schema_types.monitors_clock_tick_v1 import ClockTick from sentry.conf.types.kafka_definition import Topic, get_topic_codec -from sentry.monitors.system_incidents import safe_evaluate_tick_decision from sentry.utils import metrics, redis from sentry.utils.arroyo_producer import SingletonProducer from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition @@ -64,13 +63,7 @@ def _dispatch_tick(ts: datetime): # XXX(epurkhiser): Unclear what we want to do if we're not using kafka return - volume_anomaly_result = safe_evaluate_tick_decision(ts) - - message: ClockTick = { - "ts": ts.timestamp(), - "volume_anomaly_result": volume_anomaly_result.value, - } - payload = KafkaPayload(None, CLOCK_TICK_CODEC.encode(message), []) + payload = KafkaPayload(None, CLOCK_TICK_CODEC.encode({"ts": ts.timestamp()}), []) topic = get_topic_definition(Topic.MONITORS_CLOCK_TICK)["real_topic_name"] _clock_tick_producer.produce(ArroyoTopic(topic), payload) diff --git a/src/sentry/monitors/consumers/clock_tick_consumer.py b/src/sentry/monitors/consumers/clock_tick_consumer.py index 35890ea638be5f..b40abaf1de4b3f 100644 --- a/src/sentry/monitors/consumers/clock_tick_consumer.py +++ b/src/sentry/monitors/consumers/clock_tick_consumer.py @@ -15,7 +15,7 @@ from sentry.conf.types.kafka_definition import Topic, get_topic_codec from sentry.monitors.clock_tasks.check_missed import dispatch_check_missing from sentry.monitors.clock_tasks.check_timeout import dispatch_check_timeout -from sentry.monitors.clock_tasks.mark_unknown import dispatch_mark_unknown +from sentry.monitors.system_incidents import record_clock_tick_volume_metric from sentry.monitors.types import TickVolumeAnomolyResult logger = logging.getLogger(__name__) @@ -30,27 +30,15 @@ def process_clock_tick(message: Message[KafkaPayload | FilteredPayload]): wrapper: ClockTick = MONITORS_CLOCK_TICK_CODEC.decode(message.payload.value) ts = datetime.fromtimestamp(wrapper["ts"], tz=timezone.utc) - volume_anomaly_result = TickVolumeAnomolyResult.from_str( - wrapper.get("volume_anomaly_result", "normal") - ) + record_clock_tick_volume_metric(ts) logger.info( "process_clock_tick", - extra={"reference_datetime": str(ts), "volume_anomaly_result": volume_anomaly_result.value}, + extra={"reference_datetime": str(ts)}, ) - dispatch_check_missing(ts, volume_anomaly_result) - - # When the tick is anomalys we are unable to mark timeouts, since it is - # possible that a OK check-in was sent completing an earlier in-progress - # check-in during a period of data-loss. In this scenario instead we need - # to mark ALL in-progress check-ins as unknown, since they may time-out in - # the future if we lost the in-progress check-in. - match volume_anomaly_result: - case TickVolumeAnomolyResult.NORMAL: - dispatch_check_timeout(ts) - case TickVolumeAnomolyResult.ABNORMAL: - dispatch_mark_unknown(ts) + dispatch_check_missing(ts, TickVolumeAnomolyResult.NORMAL) + dispatch_check_timeout(ts) class MonitorClockTickStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): diff --git a/src/sentry/monitors/system_incidents.py b/src/sentry/monitors/system_incidents.py index 5ee9b16240d3b9..6a339098a8e67a 100644 --- a/src/sentry/monitors/system_incidents.py +++ b/src/sentry/monitors/system_incidents.py @@ -17,13 +17,15 @@ from django.conf import settings from sentry import options -from sentry.monitors.types import TickVolumeAnomolyResult from sentry.utils import metrics, redis logger = logging.getLogger("sentry") # This key is used to record historical date about the volume of check-ins. -MONITOR_VOLUME_HISTORY = "sentry.monitors.volume_history:{}" +MONITOR_VOLUME_HISTORY = "sentry.monitors.volume_history:{ts}" + +# This key is used to record the metric volume metric for the tick. +MONITOR_TICK_METRIC = "sentry.monitors.volume_metric:{ts}" # When fetching historic volume data to make a decision whether we have lost # data this value will determine how many historic volume data-points we fetch @@ -50,7 +52,7 @@ def update_check_in_volume(ts_list: Sequence[datetime]): # Group timestamps down to the minute for reference_ts, count in Counter(_make_reference_ts(ts) for ts in ts_list).items(): - key = MONITOR_VOLUME_HISTORY.format(reference_ts) + key = MONITOR_VOLUME_HISTORY.format(ts=reference_ts) pipeline = redis_client.pipeline() pipeline.incr(key, amount=count) @@ -58,21 +60,21 @@ def update_check_in_volume(ts_list: Sequence[datetime]): pipeline.execute() -def evaluate_tick_decision(tick: datetime) -> TickVolumeAnomolyResult: +def record_clock_tick_volume_metric(tick: datetime) -> None: """ - When the clock is ticking, we may decide this tick is invalid and should - result in unknown misses and marking all in-progress check-ins as having an - unknown result. + Look at the historic volume of check-ins for this tick over the last + MONITOR_VOLUME_RETENTION period and record a "tick metric". The specific + metric we are recording is percentage deviation from the mean historic + volume for each minute. - We do this by looking at the historic volume of check-ins for the - particular minute boundary we just crossed. + This metric will be used when making a decision to determine if a + particular tick is in an incident state or operating normally. - XXX(epurkhiser): This is currently in development and no decision is made - to mark unknowns, instead we are only recording metrics for each clock tick + NOTE that this records a metric for the tick timestamp that we just ticked + over. So when ticking at 12:01 the metric is recorded for 12:00. """ if not options.get("crons.tick_volume_anomaly_detection"): - # Detection not enabled. All ticks are considered normal - return TickVolumeAnomolyResult.NORMAL + return redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER) @@ -92,7 +94,7 @@ def evaluate_tick_decision(tick: datetime) -> TickVolumeAnomolyResult: # Bulk fetch volume counts volumes = redis_client.mget( - MONITOR_VOLUME_HISTORY.format(_make_reference_ts(ts)) for ts in historic_timestamps + MONITOR_VOLUME_HISTORY.format(ts=_make_reference_ts(ts)) for ts in historic_timestamps ) past_minute_volume = _int_or_none(volumes.pop(0)) @@ -100,11 +102,11 @@ def evaluate_tick_decision(tick: datetime) -> TickVolumeAnomolyResult: # Can't make any decisions if we didn't have data for the past minute if past_minute_volume is None: - return TickVolumeAnomolyResult.NORMAL + return # We need AT LEAST two data points to calculate standard deviation if len(historic_volume) < 2: - return TickVolumeAnomolyResult.NORMAL + return # Record some statistics about the past_minute_volume volume in comparison # to the historic_volume data @@ -123,7 +125,7 @@ def evaluate_tick_decision(tick: datetime) -> TickVolumeAnomolyResult: z_score = 0.0 # Percentage deviation from the mean for our past minutes volume - pct_deviation = (abs(past_minute_volume - historic_mean) / historic_mean) * 100 + pct_deviation = (past_minute_volume - historic_mean) / historic_mean * 100 metrics.gauge( "monitors.task.clock_tick.historic_volume_stdev_pct", @@ -134,8 +136,6 @@ def evaluate_tick_decision(tick: datetime) -> TickVolumeAnomolyResult: metrics.gauge("monitors.task.volume_history.z_score", z_score, sample_rate=1.0) metrics.gauge("monitors.task.volume_history.pct_deviation", pct_deviation, sample_rate=1.0) - # XXX(epurkhiser): We're not actually making any decisions with this data - # just yet. logger.info( "monitors.system_incidents.volume_history", extra={ @@ -149,19 +149,21 @@ def evaluate_tick_decision(tick: datetime) -> TickVolumeAnomolyResult: }, ) - # XXX(epurkhiser): No decision is made yet, all ticks are normal - return TickVolumeAnomolyResult.NORMAL + key = MONITOR_TICK_METRIC.format(ts=_make_reference_ts(past_ts)) + redis_client.set(key, pct_deviation) + redis_client.expire(key, MONITOR_VOLUME_RETENTION) -def safe_evaluate_tick_decision(tick: datetime) -> TickVolumeAnomolyResult: - try: - return evaluate_tick_decision(tick) - except Exception: - logging.exception("monitors.system_incidents.evaluate_tick_decision_failed") +def get_clock_tick_volume_metric(tick: datetime) -> float | None: + """ + Retrieve the volume metric for a specific clock tick. + """ + redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER) - # If there are any problems evaluating the tick volume, fallback to - # reporting the tick as NORMAL. - return TickVolumeAnomolyResult.NORMAL + if value := redis_client.get(MONITOR_TICK_METRIC.format(ts=_make_reference_ts(tick))): + return float(value) + else: + return None def _make_reference_ts(ts: datetime): diff --git a/tests/sentry/monitors/consumers/test_clock_tick_consumer.py b/tests/sentry/monitors/consumers/test_clock_tick_consumer.py index ea677e87b0b256..8bad29a4ce5d40 100644 --- a/tests/sentry/monitors/consumers/test_clock_tick_consumer.py +++ b/tests/sentry/monitors/consumers/test_clock_tick_consumer.py @@ -9,7 +9,6 @@ from arroyo.types import BrokerValue, Message, Partition, Topic from django.test.utils import override_settings from django.utils import timezone -from sentry_kafka_schemas.schema_types.monitors_clock_tick_v1 import ClockTick from sentry.monitors.clock_dispatch import try_monitor_clock_tick from sentry.monitors.consumers.clock_tasks_consumer import MonitorClockTasksStrategyFactory @@ -40,23 +39,27 @@ def create_consumer(): @mock.patch("sentry.monitors.consumers.clock_tick_consumer.dispatch_check_missing") @mock.patch("sentry.monitors.consumers.clock_tick_consumer.dispatch_check_timeout") -def test_simple(mock_dispatch_check_timeout, mock_dispatch_check_missing): +@mock.patch("sentry.monitors.consumers.clock_tick_consumer.record_clock_tick_volume_metric") +def test_simple( + mock_record_clock_tick_volume_metric, + mock_dispatch_check_timeout, + mock_dispatch_check_missing, +): consumer = create_consumer() ts = timezone.now().replace(second=0, microsecond=0) - message: ClockTick = { - "ts": ts.timestamp(), - "volume_anomaly_result": TickVolumeAnomolyResult.NORMAL.value, - } value = BrokerValue( - KafkaPayload(b"fake-key", MONITORS_CLOCK_TICK_CODEC.encode(message), []), + KafkaPayload(b"fake-key", MONITORS_CLOCK_TICK_CODEC.encode({"ts": ts.timestamp()}), []), partition, 1, ts, ) consumer.submit(Message(value)) + assert mock_record_clock_tick_volume_metric.call_count == 1 + assert mock_record_clock_tick_volume_metric.mock_calls[0] == mock.call(ts) + assert mock_dispatch_check_timeout.call_count == 1 assert mock_dispatch_check_timeout.mock_calls[0] == mock.call(ts) @@ -67,35 +70,6 @@ def test_simple(mock_dispatch_check_timeout, mock_dispatch_check_missing): ) -@mock.patch("sentry.monitors.consumers.clock_tick_consumer.dispatch_check_missing") -@mock.patch("sentry.monitors.consumers.clock_tick_consumer.dispatch_mark_unknown") -def test_simple_abnormal(mock_dispatch_mark_unknown, mock_dispatch_check_missing): - consumer = create_consumer() - - ts = timezone.now().replace(second=0, microsecond=0) - - message: ClockTick = { - "ts": ts.timestamp(), - "volume_anomaly_result": TickVolumeAnomolyResult.ABNORMAL.value, - } - value = BrokerValue( - KafkaPayload(b"fake-key", MONITORS_CLOCK_TICK_CODEC.encode(message), []), - partition, - 1, - ts, - ) - consumer.submit(Message(value)) - - assert mock_dispatch_mark_unknown.call_count == 1 - assert mock_dispatch_mark_unknown.mock_calls[0] == mock.call(ts) - - assert mock_dispatch_check_missing.call_count == 1 - assert mock_dispatch_check_missing.mock_calls[0] == mock.call( - ts, - TickVolumeAnomolyResult.ABNORMAL, - ) - - class MonitorsClockTickEndToEndTest(TestCase): @override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream") def test_end_to_end(self): diff --git a/tests/sentry/monitors/test_clock_dispatch.py b/tests/sentry/monitors/test_clock_dispatch.py index f458a3419c57a7..1bf58c035f882d 100644 --- a/tests/sentry/monitors/test_clock_dispatch.py +++ b/tests/sentry/monitors/test_clock_dispatch.py @@ -8,7 +8,6 @@ from sentry_kafka_schemas.schema_types.monitors_clock_tick_v1 import ClockTick from sentry.monitors.clock_dispatch import _dispatch_tick, try_monitor_clock_tick -from sentry.monitors.types import TickVolumeAnomolyResult from sentry.testutils.helpers.options import override_options from sentry.utils import json @@ -149,7 +148,6 @@ def test_dispatch_to_kafka(clock_tick_producer_mock): message: ClockTick = { "ts": now.timestamp(), - "volume_anomaly_result": TickVolumeAnomolyResult.NORMAL.value, } clock_tick_producer_mock.produce.assert_called_with( Topic("clock-tick-test-topic"), diff --git a/tests/sentry/monitors/test_system_incidents.py b/tests/sentry/monitors/test_system_incidents.py index d490402cc25518..74f59f372a6d04 100644 --- a/tests/sentry/monitors/test_system_incidents.py +++ b/tests/sentry/monitors/test_system_incidents.py @@ -10,12 +10,15 @@ MONITOR_VOLUME_DECISION_STEP, MONITOR_VOLUME_HISTORY, MONITOR_VOLUME_RETENTION, - evaluate_tick_decision, + get_clock_tick_volume_metric, + record_clock_tick_volume_metric, update_check_in_volume, ) from sentry.testutils.helpers.options import override_options from sentry.utils import redis +redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER) + def fill_historic_volume( start: datetime, length: timedelta, step: timedelta, counts: Sequence[int] @@ -62,7 +65,7 @@ def test_update_check_in_volume(): def make_key(offset: timedelta) -> str: ts = now.replace(second=0, microsecond=0) + offset - return MONITOR_VOLUME_HISTORY.format(int(ts.timestamp())) + return MONITOR_VOLUME_HISTORY.format(ts=int(ts.timestamp())) minute_0 = redis_client.get(make_key(timedelta())) minute_1 = redis_client.get(make_key(timedelta(minutes=1))) @@ -78,10 +81,10 @@ def make_key(offset: timedelta) -> str: @mock.patch("sentry.monitors.system_incidents.logger") @mock.patch("sentry.monitors.system_incidents.metrics") @override_options({"crons.tick_volume_anomaly_detection": True}) -def test_evaluate_tick_decision_simple(metrics, logger): +def test_record_clock_tiock_volume_metric_simple(metrics, logger): tick = timezone.now().replace(second=0, microsecond=0) - # This is the timestamp we're looking at just after the tick + # This is the timestamp we're looking at just before the tick past_ts = tick - timedelta(minutes=1) # Fill histroic volume data for earlier minutes. @@ -95,7 +98,7 @@ def test_evaluate_tick_decision_simple(metrics, logger): # Record a volume of 200 for the timestamp we are considerng update_check_in_volume([past_ts] * 165) - evaluate_tick_decision(tick) + record_clock_tick_volume_metric(tick) logger.info.assert_called_with( "monitors.system_incidents.volume_history", @@ -125,15 +128,16 @@ def test_evaluate_tick_decision_simple(metrics, logger): 1.3513513513513442, sample_rate=1.0, ) + assert get_clock_tick_volume_metric(past_ts) == 1.3513513513513442 @mock.patch("sentry.monitors.system_incidents.logger") @mock.patch("sentry.monitors.system_incidents.metrics") @override_options({"crons.tick_volume_anomaly_detection": True}) -def test_evaluate_tick_decision_volume_drop(metrics, logger): +def test_record_clock_tiock_volume_metric_volume_drop(metrics, logger): tick = timezone.now().replace(second=0, microsecond=0) - # This is the timestamp we're looking at just after the tick + # This is the timestamp we're looking at just before the tick past_ts = tick - timedelta(minutes=1) # Fill histroic volume data for earlier minutes. @@ -147,7 +151,7 @@ def test_evaluate_tick_decision_volume_drop(metrics, logger): # Record a volume much lower than what we had been recording previously update_check_in_volume([past_ts] * 6_000) - evaluate_tick_decision(tick) + record_clock_tick_volume_metric(tick) # Note that the pct_deviation and z_score are extremes logger.info.assert_called_with( @@ -157,7 +161,7 @@ def test_evaluate_tick_decision_volume_drop(metrics, logger): "evaluation_minute": past_ts.strftime("%H:%M"), "history_count": 30, "z_score": -19.816869917656856, - "pct_deviation": 52.0, + "pct_deviation": -52.0, "historic_mean": 12500, "historic_stdev": 328.0033641543204, }, @@ -175,39 +179,41 @@ def test_evaluate_tick_decision_volume_drop(metrics, logger): ) metrics.gauge.assert_any_call( "monitors.task.volume_history.pct_deviation", - 52.0, + -52.0, sample_rate=1.0, ) + assert get_clock_tick_volume_metric(past_ts) == -52.0 @mock.patch("sentry.monitors.system_incidents.logger") @mock.patch("sentry.monitors.system_incidents.metrics") @override_options({"crons.tick_volume_anomaly_detection": True}) -def test_evaluate_tick_decision_low_history(metrics, logger): +def test_record_clock_tiock_volume_metric_low_history(metrics, logger): tick = timezone.now().replace(second=0, microsecond=0) - # This is the timestamp we're looking at just after the tick + # This is the timestamp we're looking at just before the tick past_ts = tick - timedelta(minutes=1) # Only add one historic value (and the current value being evaluated) update_check_in_volume([past_ts - MONITOR_VOLUME_DECISION_STEP] * 900) update_check_in_volume([past_ts] * 900) - evaluate_tick_decision(tick) + record_clock_tick_volume_metric(tick) # We should do nothing because there was not enough daata to make any # calculation assert not logger.info.called assert not metrics.gauge.called + assert get_clock_tick_volume_metric(past_ts) is None @mock.patch("sentry.monitors.system_incidents.logger") @mock.patch("sentry.monitors.system_incidents.metrics") @override_options({"crons.tick_volume_anomaly_detection": True}) -def test_evaluate_tick_decision_uniform(metrics, logger): +def test_record_clock_tiock_volume_metric_uniform(metrics, logger): tick = timezone.now().replace(second=0, microsecond=0) - # This is the timestamp we're looking at just after the tick + # This is the timestamp we're looking at just before the tick past_ts = tick - timedelta(minutes=1) # Fill with a uniform history (all values the same). This will give us a @@ -221,7 +227,7 @@ def test_evaluate_tick_decision_uniform(metrics, logger): ) update_check_in_volume([past_ts] * 1000) - evaluate_tick_decision(tick) + record_clock_tick_volume_metric(tick) logger.info.assert_called_with( "monitors.system_incidents.volume_history", @@ -251,3 +257,4 @@ def test_evaluate_tick_decision_uniform(metrics, logger): 0.0, sample_rate=1.0, ) + assert get_clock_tick_volume_metric(past_ts) == 0.0