Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(crons): Refactor to record_clock_tick_volume_metric #80605

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions src/sentry/monitors/clock_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from sentry_kafka_schemas.schema_types.monitors_clock_tick_v1 import ClockTick

from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.monitors.system_incidents import safe_evaluate_tick_decision
from sentry.utils import metrics, redis
from sentry.utils.arroyo_producer import SingletonProducer
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
Expand Down Expand Up @@ -64,13 +63,7 @@ def _dispatch_tick(ts: datetime):
# XXX(epurkhiser): Unclear what we want to do if we're not using kafka
return

volume_anomaly_result = safe_evaluate_tick_decision(ts)

message: ClockTick = {
"ts": ts.timestamp(),
"volume_anomaly_result": volume_anomaly_result.value,
}
payload = KafkaPayload(None, CLOCK_TICK_CODEC.encode(message), [])
payload = KafkaPayload(None, CLOCK_TICK_CODEC.encode({"ts": ts.timestamp()}), [])

topic = get_topic_definition(Topic.MONITORS_CLOCK_TICK)["real_topic_name"]
_clock_tick_producer.produce(ArroyoTopic(topic), payload)
Expand Down
22 changes: 5 additions & 17 deletions src/sentry/monitors/consumers/clock_tick_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.monitors.clock_tasks.check_missed import dispatch_check_missing
from sentry.monitors.clock_tasks.check_timeout import dispatch_check_timeout
from sentry.monitors.clock_tasks.mark_unknown import dispatch_mark_unknown
from sentry.monitors.system_incidents import record_clock_tick_volume_metric
from sentry.monitors.types import TickVolumeAnomolyResult

logger = logging.getLogger(__name__)
Expand All @@ -30,27 +30,15 @@ def process_clock_tick(message: Message[KafkaPayload | FilteredPayload]):
wrapper: ClockTick = MONITORS_CLOCK_TICK_CODEC.decode(message.payload.value)
ts = datetime.fromtimestamp(wrapper["ts"], tz=timezone.utc)

volume_anomaly_result = TickVolumeAnomolyResult.from_str(
wrapper.get("volume_anomaly_result", "normal")
)
record_clock_tick_volume_metric(ts)

logger.info(
"process_clock_tick",
extra={"reference_datetime": str(ts), "volume_anomaly_result": volume_anomaly_result.value},
extra={"reference_datetime": str(ts)},
)

dispatch_check_missing(ts, volume_anomaly_result)

# When the tick is anomalys we are unable to mark timeouts, since it is
# possible that a OK check-in was sent completing an earlier in-progress
# check-in during a period of data-loss. In this scenario instead we need
# to mark ALL in-progress check-ins as unknown, since they may time-out in
# the future if we lost the in-progress check-in.
match volume_anomaly_result:
case TickVolumeAnomolyResult.NORMAL:
dispatch_check_timeout(ts)
case TickVolumeAnomolyResult.ABNORMAL:
dispatch_mark_unknown(ts)
dispatch_check_missing(ts, TickVolumeAnomolyResult.NORMAL)
dispatch_check_timeout(ts)


class MonitorClockTickStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
Expand Down
60 changes: 31 additions & 29 deletions src/sentry/monitors/system_incidents.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
from django.conf import settings

from sentry import options
from sentry.monitors.types import TickVolumeAnomolyResult
from sentry.utils import metrics, redis

logger = logging.getLogger("sentry")

# This key is used to record historical date about the volume of check-ins.
MONITOR_VOLUME_HISTORY = "sentry.monitors.volume_history:{}"
MONITOR_VOLUME_HISTORY = "sentry.monitors.volume_history:{ts}"

# This key is used to record the metric volume metric for the tick.
MONITOR_TICK_METRIC = "sentry.monitors.volume_metric:{ts}"

# When fetching historic volume data to make a decision whether we have lost
# data this value will determine how many historic volume data-points we fetch
Expand All @@ -50,29 +52,29 @@ def update_check_in_volume(ts_list: Sequence[datetime]):

# Group timestamps down to the minute
for reference_ts, count in Counter(_make_reference_ts(ts) for ts in ts_list).items():
key = MONITOR_VOLUME_HISTORY.format(reference_ts)
key = MONITOR_VOLUME_HISTORY.format(ts=reference_ts)

pipeline = redis_client.pipeline()
pipeline.incr(key, amount=count)
pipeline.expire(key, MONITOR_VOLUME_RETENTION)
pipeline.execute()


def evaluate_tick_decision(tick: datetime) -> TickVolumeAnomolyResult:
def record_clock_tick_volume_metric(tick: datetime) -> None:
"""
When the clock is ticking, we may decide this tick is invalid and should
result in unknown misses and marking all in-progress check-ins as having an
unknown result.
Look at the historic volume of check-ins for this tick over the last
MONITOR_VOLUME_RETENTION period and record a "tick metric". The specific
metric we are recording is percentage deviation from the mean historic
volume for each minute.

We do this by looking at the historic volume of check-ins for the
particular minute boundary we just crossed.
This metric will be used when making a decision to determine if a
particular tick is in an incident state or operating normally.

XXX(epurkhiser): This is currently in development and no decision is made
to mark unknowns, instead we are only recording metrics for each clock tick
NOTE that this records a metric for the tick timestamp that we just ticked
over. So when ticking at 12:01 the metric is recorded for 12:00.
"""
if not options.get("crons.tick_volume_anomaly_detection"):
# Detection not enabled. All ticks are considered normal
return TickVolumeAnomolyResult.NORMAL
return

redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

Expand All @@ -92,19 +94,19 @@ def evaluate_tick_decision(tick: datetime) -> TickVolumeAnomolyResult:

# Bulk fetch volume counts
volumes = redis_client.mget(
MONITOR_VOLUME_HISTORY.format(_make_reference_ts(ts)) for ts in historic_timestamps
MONITOR_VOLUME_HISTORY.format(ts=_make_reference_ts(ts)) for ts in historic_timestamps
)

past_minute_volume = _int_or_none(volumes.pop(0))
historic_volume: list[int] = [int(v) for v in volumes if v is not None]

# Can't make any decisions if we didn't have data for the past minute
if past_minute_volume is None:
return TickVolumeAnomolyResult.NORMAL
return

# We need AT LEAST two data points to calculate standard deviation
if len(historic_volume) < 2:
return TickVolumeAnomolyResult.NORMAL
return

# Record some statistics about the past_minute_volume volume in comparison
# to the historic_volume data
Expand All @@ -123,7 +125,7 @@ def evaluate_tick_decision(tick: datetime) -> TickVolumeAnomolyResult:
z_score = 0.0

# Percentage deviation from the mean for our past minutes volume
pct_deviation = (abs(past_minute_volume - historic_mean) / historic_mean) * 100
pct_deviation = (past_minute_volume - historic_mean) / historic_mean * 100
Comment on lines -126 to +128
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's where we adjust the pct mean deviation to allow negative values


metrics.gauge(
"monitors.task.clock_tick.historic_volume_stdev_pct",
Expand All @@ -134,8 +136,6 @@ def evaluate_tick_decision(tick: datetime) -> TickVolumeAnomolyResult:
metrics.gauge("monitors.task.volume_history.z_score", z_score, sample_rate=1.0)
metrics.gauge("monitors.task.volume_history.pct_deviation", pct_deviation, sample_rate=1.0)

# XXX(epurkhiser): We're not actually making any decisions with this data
# just yet.
logger.info(
"monitors.system_incidents.volume_history",
extra={
Expand All @@ -149,19 +149,21 @@ def evaluate_tick_decision(tick: datetime) -> TickVolumeAnomolyResult:
},
)

# XXX(epurkhiser): No decision is made yet, all ticks are normal
return TickVolumeAnomolyResult.NORMAL
key = MONITOR_TICK_METRIC.format(ts=_make_reference_ts(past_ts))
redis_client.set(key, pct_deviation)
redis_client.expire(key, MONITOR_VOLUME_RETENTION)


def safe_evaluate_tick_decision(tick: datetime) -> TickVolumeAnomolyResult:
try:
return evaluate_tick_decision(tick)
except Exception:
logging.exception("monitors.system_incidents.evaluate_tick_decision_failed")
def get_clock_tick_volume_metric(tick: datetime) -> float | None:
"""
Retrieve the volume metric for a specific clock tick.
"""
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

# If there are any problems evaluating the tick volume, fallback to
# reporting the tick as NORMAL.
return TickVolumeAnomolyResult.NORMAL
if value := redis_client.get(MONITOR_TICK_METRIC.format(ts=_make_reference_ts(tick))):
return float(value)
else:
return None


def _make_reference_ts(ts: datetime):
Expand Down
46 changes: 10 additions & 36 deletions tests/sentry/monitors/consumers/test_clock_tick_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from arroyo.types import BrokerValue, Message, Partition, Topic
from django.test.utils import override_settings
from django.utils import timezone
from sentry_kafka_schemas.schema_types.monitors_clock_tick_v1 import ClockTick

from sentry.monitors.clock_dispatch import try_monitor_clock_tick
from sentry.monitors.consumers.clock_tasks_consumer import MonitorClockTasksStrategyFactory
Expand Down Expand Up @@ -40,23 +39,27 @@ def create_consumer():

@mock.patch("sentry.monitors.consumers.clock_tick_consumer.dispatch_check_missing")
@mock.patch("sentry.monitors.consumers.clock_tick_consumer.dispatch_check_timeout")
def test_simple(mock_dispatch_check_timeout, mock_dispatch_check_missing):
@mock.patch("sentry.monitors.consumers.clock_tick_consumer.record_clock_tick_volume_metric")
def test_simple(
mock_record_clock_tick_volume_metric,
mock_dispatch_check_timeout,
mock_dispatch_check_missing,
):
consumer = create_consumer()

ts = timezone.now().replace(second=0, microsecond=0)

message: ClockTick = {
"ts": ts.timestamp(),
"volume_anomaly_result": TickVolumeAnomolyResult.NORMAL.value,
}
value = BrokerValue(
KafkaPayload(b"fake-key", MONITORS_CLOCK_TICK_CODEC.encode(message), []),
KafkaPayload(b"fake-key", MONITORS_CLOCK_TICK_CODEC.encode({"ts": ts.timestamp()}), []),
partition,
1,
ts,
)
consumer.submit(Message(value))

assert mock_record_clock_tick_volume_metric.call_count == 1
assert mock_record_clock_tick_volume_metric.mock_calls[0] == mock.call(ts)

assert mock_dispatch_check_timeout.call_count == 1
assert mock_dispatch_check_timeout.mock_calls[0] == mock.call(ts)

Expand All @@ -67,35 +70,6 @@ def test_simple(mock_dispatch_check_timeout, mock_dispatch_check_missing):
)


@mock.patch("sentry.monitors.consumers.clock_tick_consumer.dispatch_check_missing")
@mock.patch("sentry.monitors.consumers.clock_tick_consumer.dispatch_mark_unknown")
def test_simple_abnormal(mock_dispatch_mark_unknown, mock_dispatch_check_missing):
consumer = create_consumer()

ts = timezone.now().replace(second=0, microsecond=0)

message: ClockTick = {
"ts": ts.timestamp(),
"volume_anomaly_result": TickVolumeAnomolyResult.ABNORMAL.value,
}
value = BrokerValue(
KafkaPayload(b"fake-key", MONITORS_CLOCK_TICK_CODEC.encode(message), []),
partition,
1,
ts,
)
consumer.submit(Message(value))

assert mock_dispatch_mark_unknown.call_count == 1
assert mock_dispatch_mark_unknown.mock_calls[0] == mock.call(ts)

assert mock_dispatch_check_missing.call_count == 1
assert mock_dispatch_check_missing.mock_calls[0] == mock.call(
ts,
TickVolumeAnomolyResult.ABNORMAL,
)


class MonitorsClockTickEndToEndTest(TestCase):
@override_settings(SENTRY_EVENTSTREAM="sentry.eventstream.kafka.KafkaEventStream")
def test_end_to_end(self):
Expand Down
2 changes: 0 additions & 2 deletions tests/sentry/monitors/test_clock_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from sentry_kafka_schemas.schema_types.monitors_clock_tick_v1 import ClockTick

from sentry.monitors.clock_dispatch import _dispatch_tick, try_monitor_clock_tick
from sentry.monitors.types import TickVolumeAnomolyResult
from sentry.testutils.helpers.options import override_options
from sentry.utils import json

Expand Down Expand Up @@ -149,7 +148,6 @@ def test_dispatch_to_kafka(clock_tick_producer_mock):

message: ClockTick = {
"ts": now.timestamp(),
"volume_anomaly_result": TickVolumeAnomolyResult.NORMAL.value,
}
clock_tick_producer_mock.produce.assert_called_with(
Topic("clock-tick-test-topic"),
Expand Down
Loading
Loading