-
-
Notifications
You must be signed in to change notification settings - Fork 4.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(crons): Add
mark_unknown
clock tick task (#79735)
This task will be used when the clock ticks with the volume_anomaly_result set to `abnormal`. In this scenario we must mark ALL in-progress check-ins as "unknown", since we cannot be sure that the completing check-in was not sent during the lost data that caused the volume drop of check-ins. Part of GH-79328 --------- Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com> Co-authored-by: Josh Ferge <josh.ferge@sentry.io>
- Loading branch information
1 parent
471aa94
commit f033b12
Showing
2 changed files
with
162 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
from __future__ import annotations | ||
|
||
import logging | ||
from datetime import datetime | ||
|
||
from arroyo.backends.kafka import KafkaPayload | ||
from sentry_kafka_schemas.schema_types.monitors_clock_tasks_v1 import MarkUnknown | ||
|
||
from sentry.monitors.models import CheckInStatus, MonitorCheckIn | ||
from sentry.utils import metrics | ||
|
||
from .producer import MONITORS_CLOCK_TASKS_CODEC, produce_task | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
# This is the MAXIMUM number of pending MONITOR CHECKINS this job will check. | ||
# | ||
# NOTE: We should keep an eye on this as we have more and more usage of | ||
# monitors the larger the number of checkins to check will exist. | ||
CHECKINS_LIMIT = 10_000 | ||
|
||
|
||
def dispatch_mark_unknown(ts: datetime): | ||
""" | ||
Given a clock tick timestamp datetime which was processed where an anomaly | ||
had been detected in the volume of check-ins that have been processed, | ||
determine monitors that are in-progress that can no longer be known to | ||
complete as data loss has likely occured. | ||
This will dispatch MarkUnknown messages into monitors-clock-tasks. | ||
""" | ||
unknown_checkins = list( | ||
MonitorCheckIn.objects.filter( | ||
status=CheckInStatus.IN_PROGRESS, | ||
date_added__lte=ts, | ||
).values( | ||
"id", "monitor_environment_id" | ||
)[:CHECKINS_LIMIT] | ||
) | ||
|
||
metrics.gauge( | ||
"sentry.monitors.tasks.check_unknown.count", | ||
len(unknown_checkins), | ||
sample_rate=1.0, | ||
) | ||
|
||
# check for any monitors which were started before we processed an unknown | ||
# tick. We need to mark all in-progress as unnknown since we do not know if | ||
# the OK check-in may have been sent while we had data-loss and it will | ||
# time-out in the future after we've recovered. | ||
for checkin in unknown_checkins: | ||
message: MarkUnknown = { | ||
"type": "mark_unknown", | ||
"ts": ts.timestamp(), | ||
"monitor_environment_id": checkin["monitor_environment_id"], | ||
"checkin_id": checkin["id"], | ||
} | ||
payload = KafkaPayload( | ||
str(checkin["monitor_environment_id"]).encode(), | ||
MONITORS_CLOCK_TASKS_CODEC.encode(message), | ||
[], | ||
) | ||
produce_task(payload) | ||
|
||
|
||
def mark_checkin_unknown(checkin_id: int, ts: datetime) -> None: | ||
logger.info("checkin_unknown", extra={"checkin_id": checkin_id}) | ||
|
||
MonitorCheckIn.objects.filter(id=checkin_id, status=CheckInStatus.IN_PROGRESS).update( | ||
status=CheckInStatus.UNKNOWN, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
from datetime import timedelta | ||
from unittest import mock | ||
|
||
from arroyo.backends.kafka import KafkaPayload | ||
from django.utils import timezone | ||
from sentry_kafka_schemas.schema_types.monitors_clock_tasks_v1 import MarkUnknown | ||
|
||
from sentry.monitors.clock_tasks.mark_unknown import dispatch_mark_unknown, mark_checkin_unknown | ||
from sentry.monitors.clock_tasks.producer import MONITORS_CLOCK_TASKS_CODEC | ||
from sentry.monitors.models import ( | ||
CheckInStatus, | ||
Monitor, | ||
MonitorCheckIn, | ||
MonitorEnvironment, | ||
MonitorStatus, | ||
MonitorType, | ||
ScheduleType, | ||
) | ||
from sentry.testutils.cases import TestCase | ||
|
||
|
||
class MonitorClockTasksMarkUnknownTest(TestCase): | ||
@mock.patch("sentry.monitors.clock_tasks.mark_unknown.produce_task") | ||
def test_mark_unknown(self, mock_produce_task): | ||
org = self.create_organization() | ||
project = self.create_project(organization=org) | ||
|
||
ts = timezone.now().replace(hour=0, minute=0, second=0, microsecond=0) | ||
|
||
# Schedule is once a day | ||
monitor = Monitor.objects.create( | ||
organization_id=org.id, | ||
project_id=project.id, | ||
type=MonitorType.CRON_JOB, | ||
config={ | ||
"schedule_type": ScheduleType.CRONTAB, | ||
"schedule": "0 0 * * *", | ||
"max_runtime": None, | ||
"checkin_margin": None, | ||
}, | ||
) | ||
|
||
monitor_environment = MonitorEnvironment.objects.create( | ||
# XXX(epurkhiser): Arbitrarily large id to make sure we can | ||
# correctly use the monitor_environment.id as the partition key | ||
id=62702371781194950, | ||
monitor=monitor, | ||
environment_id=self.environment.id, | ||
last_checkin=ts, | ||
next_checkin=ts + timedelta(hours=24), | ||
next_checkin_latest=ts + timedelta(hours=24, minutes=1), | ||
status=MonitorStatus.OK, | ||
) | ||
# Checkin will timeout in 30 minutes | ||
checkin = MonitorCheckIn.objects.create( | ||
monitor=monitor, | ||
monitor_environment=monitor_environment, | ||
project_id=project.id, | ||
status=CheckInStatus.IN_PROGRESS, | ||
date_added=ts, | ||
date_updated=ts, | ||
timeout_at=ts + timedelta(minutes=30), | ||
) | ||
|
||
dispatch_mark_unknown(ts) | ||
|
||
message: MarkUnknown = { | ||
"type": "mark_unknown", | ||
"ts": ts.timestamp(), | ||
"monitor_environment_id": checkin.monitor_environment_id, | ||
"checkin_id": checkin.id, | ||
} | ||
payload = KafkaPayload( | ||
str(monitor_environment.id).encode(), | ||
MONITORS_CLOCK_TASKS_CODEC.encode(message), | ||
[], | ||
) | ||
|
||
# assert that task is called for the specific environment | ||
assert mock_produce_task.call_count == 1 | ||
assert mock_produce_task.mock_calls[0] == mock.call(payload) | ||
|
||
mark_checkin_unknown(checkin.id, ts) | ||
|
||
# Checkin is marked as unknown | ||
assert MonitorCheckIn.objects.filter(id=checkin.id, status=CheckInStatus.UNKNOWN).exists() | ||
|
||
# Monitor status does not change | ||
assert MonitorEnvironment.objects.filter( | ||
id=monitor_environment.id, status=MonitorStatus.OK | ||
).exists() |