Skip to content

Commit

Permalink
feat(crons): Refactor to record_clock_tick_volume_metric
Browse files Browse the repository at this point in the history
This changes how we're using the volume history. Previously we were
intending to use the volume history to make a decision for a specific
tick, we cannot do that since we'll actually need to look at historic
volume metrics to determine if we've entered an incident or if we just
had an abnormality in mean deviation.

- The clock_dispatch no longer includes a volume_anomaly_result. I will
  remove this from the sentry-kafka-schema in an upcoming PR.

- Instead of evaluating a tick decision during dispatch, we now record
  the metrics for the timestamp we just ticked past into redis. This is
  done during the processing of the clock tick in the
  clock_tick_consumer.

- The clock_tick_consumer no longer reads the volume_anomaly_result into
  a TickVolumeAnomolyResult. We'll still do something with this since in
  the future we'll be evaluating a tick result decision based on the
  tick metrics and will need to dispatch mark_unknown when entering an
  incident. But for now I've removed this logic.

- I've also updated the pct_deviation metric (which is the one recorded
  into the redis key) to not be an absolute value, since we want to know
  which direction we've deviated in, we do not want to produce an
  incident in the scenario that we've increased in volume.

- I've removed the safe_evaluate_tick_decision instead of creating a
  safe_record_clock_tick_volume_metric since we're now running this
  logic in a consumer which can backlog if we do have some kind of
  issue. This wrapper only existed since it was in a hot path that could
  fail in an unrecoverable way. We've also had this code running for a
  while now with no problems, so it's safe to not be overly cautious.
  • Loading branch information
evanpurkhiser committed Nov 12, 2024
1 parent cf3d29a commit 777ce1d
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 108 deletions.
9 changes: 1 addition & 8 deletions src/sentry/monitors/clock_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from sentry_kafka_schemas.schema_types.monitors_clock_tick_v1 import ClockTick

from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.monitors.system_incidents import safe_evaluate_tick_decision
from sentry.utils import metrics, redis
from sentry.utils.arroyo_producer import SingletonProducer
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
Expand Down Expand Up @@ -64,13 +63,7 @@ def _dispatch_tick(ts: datetime):
# XXX(epurkhiser): Unclear what we want to do if we're not using kafka
return

volume_anomaly_result = safe_evaluate_tick_decision(ts)

message: ClockTick = {
"ts": ts.timestamp(),
"volume_anomaly_result": volume_anomaly_result.value,
}
payload = KafkaPayload(None, CLOCK_TICK_CODEC.encode(message), [])
payload = KafkaPayload(None, CLOCK_TICK_CODEC.encode({"ts": ts.timestamp()}), [])

topic = get_topic_definition(Topic.MONITORS_CLOCK_TICK)["real_topic_name"]
_clock_tick_producer.produce(ArroyoTopic(topic), payload)
Expand Down
22 changes: 5 additions & 17 deletions src/sentry/monitors/consumers/clock_tick_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.monitors.clock_tasks.check_missed import dispatch_check_missing
from sentry.monitors.clock_tasks.check_timeout import dispatch_check_timeout
from sentry.monitors.clock_tasks.mark_unknown import dispatch_mark_unknown
from sentry.monitors.system_incidents import record_clock_tick_volume_metric
from sentry.monitors.types import TickVolumeAnomolyResult

logger = logging.getLogger(__name__)
Expand All @@ -30,27 +30,15 @@ def process_clock_tick(message: Message[KafkaPayload | FilteredPayload]):
wrapper: ClockTick = MONITORS_CLOCK_TICK_CODEC.decode(message.payload.value)
ts = datetime.fromtimestamp(wrapper["ts"], tz=timezone.utc)

volume_anomaly_result = TickVolumeAnomolyResult.from_str(
wrapper.get("volume_anomaly_result", "normal")
)
record_clock_tick_volume_metric(ts)

logger.info(
"process_clock_tick",
extra={"reference_datetime": str(ts), "volume_anomaly_result": volume_anomaly_result.value},
extra={"reference_datetime": str(ts)},
)

dispatch_check_missing(ts, volume_anomaly_result)

# When the tick is anomalys we are unable to mark timeouts, since it is
# possible that a OK check-in was sent completing an earlier in-progress
# check-in during a period of data-loss. In this scenario instead we need
# to mark ALL in-progress check-ins as unknown, since they may time-out in
# the future if we lost the in-progress check-in.
match volume_anomaly_result:
case TickVolumeAnomolyResult.NORMAL:
dispatch_check_timeout(ts)
case TickVolumeAnomolyResult.ABNORMAL:
dispatch_mark_unknown(ts)
dispatch_check_missing(ts, TickVolumeAnomolyResult.NORMAL)
dispatch_check_timeout(ts)


class MonitorClockTickStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
Expand Down
61 changes: 32 additions & 29 deletions src/sentry/monitors/system_incidents.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
from django.conf import settings

from sentry import options
from sentry.monitors.types import TickVolumeAnomolyResult
from sentry.utils import metrics, redis

logger = logging.getLogger("sentry")

# This key is used to record historical date about the volume of check-ins.
MONITOR_VOLUME_HISTORY = "sentry.monitors.volume_history:{}"
MONITOR_VOLUME_HISTORY = "sentry.monitors.volume_history:{ts}"

# This key is used to record the metric volume metric for the tick.
MONITOR_TICK_METRIC = "sentry.monitors.volume_metric:{ts}"

# When fetching historic volume data to make a decision whether we have lost
# data this value will determine how many historic volume data-points we fetch
Expand All @@ -50,29 +52,29 @@ def update_check_in_volume(ts_list: Sequence[datetime]):

# Group timestamps down to the minute
for reference_ts, count in Counter(_make_reference_ts(ts) for ts in ts_list).items():
key = MONITOR_VOLUME_HISTORY.format(reference_ts)
key = MONITOR_VOLUME_HISTORY.format(ts=reference_ts)

pipeline = redis_client.pipeline()
pipeline.incr(key, amount=count)
pipeline.expire(key, MONITOR_VOLUME_RETENTION)
pipeline.execute()


def evaluate_tick_decision(tick: datetime) -> TickVolumeAnomolyResult:
def record_clock_tick_volume_metric(tick: datetime) -> None:
"""
When the clock is ticking, we may decide this tick is invalid and should
result in unknown misses and marking all in-progress check-ins as having an
unknown result.
Look at the historic volume of check-ins for this tick over the last
MONITOR_VOLUME_RETENTION period and record a "tick metric". The specific
metric we are recording is percentage deviation from the mean historic
volume for each minute.
We do this by looking at the historic volume of check-ins for the
particular minute boundary we just crossed.
This metric will be used when making a decision to determine if a
particular tick is in an incident state or operating normally.
XXX(epurkhiser): This is currently in development and no decision is made
to mark unknowns, instead we are only recording metrics for each clock tick
NOTE that this records a metric for the tick timestamp that we just ticked
over. So when ticking at 12:01 the metric is recorded for 12:00.
"""
if not options.get("crons.tick_volume_anomaly_detection"):
# Detection not enabled. All ticks are considered normal
return TickVolumeAnomolyResult.NORMAL
return

redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

Expand All @@ -92,19 +94,19 @@ def evaluate_tick_decision(tick: datetime) -> TickVolumeAnomolyResult:

# Bulk fetch volume counts
volumes = redis_client.mget(
MONITOR_VOLUME_HISTORY.format(_make_reference_ts(ts)) for ts in historic_timestamps
MONITOR_VOLUME_HISTORY.format(ts=_make_reference_ts(ts)) for ts in historic_timestamps
)

past_minute_volume = _int_or_none(volumes.pop(0))
historic_volume: list[int] = [int(v) for v in volumes if v is not None]

# Can't make any decisions if we didn't have data for the past minute
if past_minute_volume is None:
return TickVolumeAnomolyResult.NORMAL
return

# We need AT LEAST two data points to calculate standard deviation
if len(historic_volume) < 2:
return TickVolumeAnomolyResult.NORMAL
return

# Record some statistics about the past_minute_volume volume in comparison
# to the historic_volume data
Expand All @@ -123,7 +125,7 @@ def evaluate_tick_decision(tick: datetime) -> TickVolumeAnomolyResult:
z_score = 0.0

# Percentage deviation from the mean for our past minutes volume
pct_deviation = (abs(past_minute_volume - historic_mean) / historic_mean) * 100
pct_deviation = (past_minute_volume - historic_mean) / historic_mean * 100

metrics.gauge(
"monitors.task.clock_tick.historic_volume_stdev_pct",
Expand All @@ -134,8 +136,6 @@ def evaluate_tick_decision(tick: datetime) -> TickVolumeAnomolyResult:
metrics.gauge("monitors.task.volume_history.z_score", z_score, sample_rate=1.0)
metrics.gauge("monitors.task.volume_history.pct_deviation", pct_deviation, sample_rate=1.0)

# XXX(epurkhiser): We're not actually making any decisions with this data
# just yet.
logger.info(
"monitors.system_incidents.volume_history",
extra={
Expand All @@ -149,19 +149,22 @@ def evaluate_tick_decision(tick: datetime) -> TickVolumeAnomolyResult:
},
)

# XXX(epurkhiser): No decision is made yet, all ticks are normal
return TickVolumeAnomolyResult.NORMAL
redis_client.set(
MONITOR_TICK_METRIC.format(ts=_make_reference_ts(past_ts)),
pct_deviation,
)


def safe_evaluate_tick_decision(tick: datetime) -> TickVolumeAnomolyResult:
try:
return evaluate_tick_decision(tick)
except Exception:
logging.exception("monitors.system_incidents.evaluate_tick_decision_failed")
def get_clock_tick_volume_metric(tick: datetime) -> float | None:
"""
Retrieve the volume metric for a specific clock tick.
"""
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

# If there are any problems evaluating the tick volume, fallback to
# reporting the tick as NORMAL.
return TickVolumeAnomolyResult.NORMAL
if value := redis_client.get(MONITOR_TICK_METRIC.format(ts=_make_reference_ts(tick))):
return float(value)
else:
return None


def _make_reference_ts(ts: datetime):
Expand Down
46 changes: 10 additions & 36 deletions tests/sentry/monitors/consumers/test_clock_tick_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from arroyo.types import BrokerValue, Message, Partition, Topic
from django.test.utils import override_settings
from django.utils import timezone
from sentry_kafka_schemas.schema_types.monitors_clock_tick_v1 import ClockTick

from sentry.monitors.clock_dispatch import try_monitor_clock_tick
from sentry.monitors.consumers.clock_tasks_consumer import MonitorClockTasksStrategyFactory
Expand Down Expand Up @@ -40,23 +39,27 @@ def create_consumer():

@mock.patch("sentry.monitors.consumers.clock_tick_consumer.dispatch_check_missing")
@mock.patch("sentry.monitors.consumers.clock_tick_consumer.dispatch_check_timeout")
def test_simple(mock_dispatch_check_timeout, mock_dispatch_check_missing):
@mock.patch("sentry.monitors.consumers.clock_tick_consumer.record_clock_tick_volume_metric")
def test_simple(
mock_record_clock_tick_volume_metric,
mock_dispatch_check_timeout,
mock_dispatch_check_missing,
):
consumer = create_consumer()

ts = timezone.now().replace(second=0, microsecond=0)

message: ClockTick = {
"ts": ts.timestamp(),
"volume_anomaly_result": TickVolumeAnomolyResult.NORMAL.value,
}
value = BrokerValue(
KafkaPayload(b"fake-key", MONITORS_CLOCK_TICK_CODEC.encode(message), []),
KafkaPayload(b"fake-key", MONITORS_CLOCK_TICK_CODEC.encode({"ts": ts.timestamp()}), []),
partition,
1,
ts,
)
consumer.submit(Message(value))

assert mock_record_clock_tick_volume_metric.call_count == 1
assert mock_record_clock_tick_volume_metric.mock_calls[0] == mock.call(ts)

assert mock_dispatch_check_timeout.call_count == 1
assert mock_dispatch_check_timeout.mock_calls[0] == mock.call(ts)

Expand All @@ -67,35 +70,6 @@ def test_simple(mock_dispatch_check_timeout, mock_dispatch_check_missing):
)


@mock.patch("sentry.monitors.consumers.clock_tick_consumer.dispatch_check_missing")
@mock.patch("sentry.monitors.consumers.clock_tick_consumer.dispatch_mark_unknown")
def test_simple_abnormal(mock_dispatch_mark_unknown, mock_dispatch_check_missing):
consumer = create_consumer()

ts = timezone.now().replace(second=0, microsecond=0)

message: ClockTick = {
"ts": ts.timestamp(),
"volume_anomaly_result": TickVolumeAnomolyResult.ABNORMAL.value,
}
value = BrokerValue(
KafkaPayload(b"fake-key", MONITORS_CLOCK_TICK_CODEC.encode(message), []),
partition,
1,
ts,
)
consumer.submit(Message(value))

assert mock_dispatch_mark_unknown.call_count == 1
assert mock_dispatch_mark_unknown.mock_calls[0] == mock.call(ts)

assert mock_dispatch_check_missing.call_count == 1
assert mock_dispatch_check_missing.mock_calls[0] == mock.call(
ts,
TickVolumeAnomolyResult.ABNORMAL,
)


class MonitorsClockTickEndToEndTest(TestCase):
@override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
def test_end_to_end(self):
Expand Down
2 changes: 0 additions & 2 deletions tests/sentry/monitors/test_clock_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from sentry_kafka_schemas.schema_types.monitors_clock_tick_v1 import ClockTick

from sentry.monitors.clock_dispatch import _dispatch_tick, try_monitor_clock_tick
from sentry.monitors.types import TickVolumeAnomolyResult
from sentry.testutils.helpers.options import override_options
from sentry.utils import json

Expand Down Expand Up @@ -149,7 +148,6 @@ def test_dispatch_to_kafka(clock_tick_producer_mock):

message: ClockTick = {
"ts": now.timestamp(),
"volume_anomaly_result": TickVolumeAnomolyResult.NORMAL.value,
}
clock_tick_producer_mock.produce.assert_called_with(
Topic("clock-tick-test-topic"),
Expand Down
Loading

0 comments on commit 777ce1d

Please sign in to comment.