Skip to content

Commit

Permalink
Use periodic task for heartbeats (#2723)
Browse files Browse the repository at this point in the history
# What this PR does

## Which issue(s) this PR fixes

## Checklist

- [ ] Unit, integration, and e2e (if applicable) tests updated
- [ ] Documentation added (or `pr:no public docs` PR label added if not
required)
- [ ] `CHANGELOG.md` updated (or `pr:no changelog` PR label added if not
required)

---------

Co-authored-by: Joey Orlando <joey.orlando@grafana.com>
Co-authored-by: Michael Derynck <michael.derynck@grafana.com>
  • Loading branch information
3 people authored Aug 10, 2023
1 parent 638c9a3 commit fd19dd4
Show file tree
Hide file tree
Showing 15 changed files with 237 additions and 228 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Add stack slug to organization options for direct paging Slash command by @vadimkerr ([#2743](https://github.com/grafana/oncall/pull/2743))
- Avoid creating (or notifying about) potential event splits resulting from untaken swap requests ([#2748](https://github.com/grafana/oncall/pull/2748))
- Refactor heartbeats into a periodic task ([2723](https://github.com/grafana/oncall/pull/2723))

### Fixed

Expand Down
86 changes: 17 additions & 69 deletions engine/apps/heartbeat/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@

from django.conf import settings
from django.core.validators import MinLengthValidator
from django.db import models, transaction
from django.db import models
from django.utils import timezone

from apps.integrations.tasks import create_alert
from common.public_primary_keys import generate_public_primary_key, increase_public_primary_key_length

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -43,10 +42,26 @@ class IntegrationHeartBeat(models.Model):

created_at = models.DateTimeField(auto_now_add=True)
timeout_seconds = models.IntegerField(default=0)

last_heartbeat_time = models.DateTimeField(default=None, null=True)
"""
Stores the latest received heartbeat signal time
"""

last_checkup_task_time = models.DateTimeField(default=None, null=True)
"""
Deprecated. This field is not used. TODO: remove it
"""

actual_check_up_task_id = models.CharField(max_length=100)
"""
Deprecated. Stored the latest scheduled `integration_heartbeat_checkup` task id. TODO: remove it
"""

previous_alerted_state_was_life = models.BooleanField(default=True)
"""
Last status of the heartbeat. Determines if integration was alive on latest checkup
"""

public_primary_key = models.CharField(
max_length=20,
Expand Down Expand Up @@ -83,73 +98,6 @@ def status(self) -> bool:
def link(self) -> str:
return urljoin(self.alert_receive_channel.integration_url, "heartbeat/")

@classmethod
def perform_heartbeat_check(cls, heartbeat_id: int, task_request_id: str) -> None:
with transaction.atomic():
heartbeats = cls.objects.filter(pk=heartbeat_id).select_for_update()
if len(heartbeats) == 0:
logger.info(f"Heartbeat {heartbeat_id} not found {task_request_id}")
return
heartbeat = heartbeats[0]
if task_request_id == heartbeat.actual_check_up_task_id:
heartbeat.check_heartbeat_state_and_save()
else:
logger.info(f"Heartbeat {heartbeat_id} is not actual {task_request_id}")

def check_heartbeat_state_and_save(self) -> bool:
"""
Use this method if you want just check heartbeat status.
"""
state_changed = self.check_heartbeat_state()
if state_changed:
self.save(update_fields=["previous_alerted_state_was_life"])
return state_changed

def check_heartbeat_state(self) -> bool:
"""
Actually checking heartbeat.
Use this method if you want to do changes of heartbeat instance while checking its status.
( See IntegrationHeartBeatAPIView.post() for example )
"""
state_changed = False
if self.is_expired:
if self.previous_alerted_state_was_life:
self.on_heartbeat_expired()
self.previous_alerted_state_was_life = False
state_changed = True
else:
if not self.previous_alerted_state_was_life:
self.on_heartbeat_restored()
self.previous_alerted_state_was_life = True
state_changed = True
return state_changed

def on_heartbeat_restored(self) -> None:
create_alert.apply_async(
kwargs={
"title": self.alert_receive_channel.heartbeat_restored_title,
"message": self.alert_receive_channel.heartbeat_restored_message,
"image_url": None,
"link_to_upstream_details": None,
"alert_receive_channel_pk": self.alert_receive_channel.pk,
"integration_unique_data": {},
"raw_request_data": self.alert_receive_channel.heartbeat_restored_payload,
},
)

def on_heartbeat_expired(self) -> None:
create_alert.apply_async(
kwargs={
"title": self.alert_receive_channel.heartbeat_expired_title,
"message": self.alert_receive_channel.heartbeat_expired_message,
"image_url": None,
"link_to_upstream_details": None,
"alert_receive_channel_pk": self.alert_receive_channel.pk,
"integration_unique_data": {},
"raw_request_data": self.alert_receive_channel.heartbeat_expired_payload,
},
)

# Insight logs
@property
def insight_logs_type_verbal(self) -> str:
Expand Down
128 changes: 88 additions & 40 deletions engine/apps/heartbeat/tasks.py
Original file line number Diff line number Diff line change
@@ -1,57 +1,105 @@
from time import perf_counter
import datetime

from celery.utils.log import get_task_logger
from django.conf import settings
from django.db import transaction
from django.db.models import DateTimeField, DurationField, ExpressionWrapper, F
from django.db.models.functions import Cast
from django.utils import timezone

from apps.heartbeat.models import IntegrationHeartBeat
from apps.integrations.tasks import create_alert
from common.custom_celery_tasks import shared_dedicated_queue_retry_task
from settings.base import DatabaseTypes

logger = get_task_logger(__name__)


@shared_dedicated_queue_retry_task()
def integration_heartbeat_checkup(heartbeat_id: int) -> None:
from apps.heartbeat.models import IntegrationHeartBeat

IntegrationHeartBeat.perform_heartbeat_check(heartbeat_id, integration_heartbeat_checkup.request.id)

def check_heartbeats() -> str:
"""
Periodic task to check heartbeats status change and create alerts (or auto-resolve alerts) if needed
"""
# Heartbeat is considered enabled if it
# * has timeout_seconds set to non-zero (non-default) value,
# * received at least one checkup (last_heartbeat_time set to non-null value)\

@shared_dedicated_queue_retry_task()
def process_heartbeat_task(alert_receive_channel_pk):
start = perf_counter()
from apps.heartbeat.models import IntegrationHeartBeat
def _get_timeout_expression() -> ExpressionWrapper:
if settings.DATABASES["default"]["ENGINE"] == f"django.db.backends.{DatabaseTypes.POSTGRESQL}":
# DurationField: When used on PostgreSQL, the data type used is an interval
# https://docs.djangoproject.com/en/3.2/ref/models/fields/#durationfield
return ExpressionWrapper(datetime.timedelta(seconds=1) * F("timeout_seconds"), output_field=DurationField())
else:
# DurationField: ...Otherwise a bigint of microseconds is used...
# microseconds = seconds * 10**6
# https://docs.djangoproject.com/en/3.2/ref/models/fields/#durationfield
return ExpressionWrapper(F("timeout_seconds") * 10**6, output_field=DurationField())

enabled_heartbeats = (
IntegrationHeartBeat.objects.filter(last_heartbeat_time__isnull=False)
.exclude(timeout_seconds=0)
.annotate(period_start=(Cast(timezone.now() - _get_timeout_expression(), DateTimeField())))
)
with transaction.atomic():
heartbeats = IntegrationHeartBeat.objects.filter(
alert_receive_channel__pk=alert_receive_channel_pk,
).select_for_update()
if len(heartbeats) == 0:
logger.info(f"Integration Heartbeat for alert_receive_channel {alert_receive_channel_pk} was not found.")
return
else:
heartbeat = heartbeats[0]
heartbeat_selected = perf_counter()
logger.info(
f"IntegrationHeartBeat selected for alert_receive_channel {alert_receive_channel_pk} in {heartbeat_selected - start}"
# Heartbeat is considered expired if it
# * is enabled,
# * is not already expired,
# * last check in was before the timeout period start
expired_heartbeats = enabled_heartbeats.select_for_update().filter(
last_heartbeat_time__lte=F("period_start"), previous_alerted_state_was_life=True
)
task = integration_heartbeat_checkup.apply_async(
(heartbeat.pk,),
countdown=heartbeat.timeout_seconds + 1,
)
is_touched = heartbeat.last_heartbeat_time is not None
heartbeat.actual_check_up_task_id = task.id
heartbeat.last_heartbeat_time = timezone.now()
update_fields = ["actual_check_up_task_id", "last_heartbeat_time"]
task_started = perf_counter()
logger.info(
f"heartbeat_checkup task started for alert_receive_channel {alert_receive_channel_pk} in {task_started - start}"
# Schedule alert creation for each expired heartbeat after transaction commit
for heartbeat in expired_heartbeats:
transaction.on_commit(
lambda: create_alert.apply_async(
kwargs={
"title": heartbeat.alert_receive_channel.heartbeat_expired_title,
"message": heartbeat.alert_receive_channel.heartbeat_expired_message,
"image_url": None,
"link_to_upstream_details": None,
"alert_receive_channel_pk": heartbeat.alert_receive_channel.pk,
"integration_unique_data": {},
"raw_request_data": heartbeat.alert_receive_channel.heartbeat_expired_payload,
},
)
)
# Update previous_alerted_state_was_life to False
expired_count = expired_heartbeats.update(previous_alerted_state_was_life=False)
with transaction.atomic():
# Heartbeat is considered restored if it
# * is enabled,
# * last check in was after the timeout period start,
# * was is alerted state (previous_alerted_state_was_life is False), i.e. was expired
restored_heartbeats = enabled_heartbeats.select_for_update().filter(
last_heartbeat_time__gte=F("period_start"), previous_alerted_state_was_life=False
)
if is_touched:
state_changed = heartbeat.check_heartbeat_state()
state_checked = perf_counter()
logger.info(
f"state checked for alert_receive_channel {alert_receive_channel_pk} in {state_checked - start}"
# Schedule auto-resolve alert creation for each expired heartbeat after transaction commit
for heartbeat in restored_heartbeats:
transaction.on_commit(
lambda: create_alert.apply_async(
kwargs={
"title": heartbeat.alert_receive_channel.heartbeat_restored_title,
"message": heartbeat.alert_receive_channel.heartbeat_restored_message,
"image_url": None,
"link_to_upstream_details": None,
"alert_receive_channel_pk": heartbeat.alert_receive_channel.pk,
"integration_unique_data": {},
"raw_request_data": heartbeat.alert_receive_channel.heartbeat_restored_payload,
},
)
)
if state_changed:
update_fields.append("previous_alerted_state_was_life")
heartbeat.save(update_fields=update_fields)
restored_count = restored_heartbeats.update(previous_alerted_state_was_life=True)
return f"Found {expired_count} expired and {restored_count} restored heartbeats"


@shared_dedicated_queue_retry_task()
def integration_heartbeat_checkup(heartbeat_id: int) -> None:
"""Deprecated. TODO: Remove this task after this task cleared from queue"""
pass


@shared_dedicated_queue_retry_task()
def process_heartbeat_task(alert_receive_channel_pk):
IntegrationHeartBeat.objects.filter(
alert_receive_channel__pk=alert_receive_channel_pk,
).update(last_heartbeat_time=timezone.now())
Loading

0 comments on commit fd19dd4

Please sign in to comment.