Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transaction on_commit before signals for alert group actions #3731

Merged
merged 20 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

### Changed

- Ensure alert group log records are committed to DB before signalling about them @mderynck([#3731](https://github.com/grafana/oncall/pull/3731))

### Fixed

- Address `SlackAPIRatelimitError` exceptions in `apps.slack.tasks.send_message_to_thread_if_bot_not_in_channel` task
Expand Down
431 changes: 192 additions & 239 deletions engine/apps/alerts/models/alert_group.py

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions engine/apps/alerts/models/alert_group_log_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,12 @@ def get_step_specific_info(self):
step_specific_info = json.loads(self.step_specific_info)
return step_specific_info

def delete(self):
logger.debug(
f"alert_group_log_record for alert_group deleted" f"alert_group={self.alert_group.pk} log_id={self.pk}"
)
super().delete()


@receiver(post_save, sender=AlertGroupLogRecord)
def listen_for_alertgrouplogrecord(sender, instance, created, *args, **kwargs):
Expand Down
2 changes: 2 additions & 0 deletions engine/apps/alerts/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from .custom_button_result import custom_button_result # noqa: F401
from .custom_webhook_result import custom_webhook_result # noqa: F401
from .delete_alert_group import delete_alert_group # noqa: F401
from .delete_alert_group import finish_delete_alert_group # noqa: F401
from .delete_alert_group import send_alert_group_signal_for_delete # noqa: F401
from .distribute_alert import distribute_alert # noqa: F401
from .escalate_alert_group import escalate_alert_group # noqa: F401
from .invite_user_to_join_incident import invite_user_to_join_incident # noqa: F401
Expand Down
1 change: 1 addition & 0 deletions engine/apps/alerts/tasks/acknowledge_reminder.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def acknowledge_reminder_task(alert_group_pk: int, unacknowledge_process_id: str
log_record = alert_group.log_records.create(
type=AlertGroupLogRecord.TYPE_ACK_REMINDER_TRIGGERED, author=alert_group.acknowledged_by_user
)
task_logger.info(f"created log record {log_record.pk}, sending signal...")
transaction.on_commit(partial(send_alert_group_signal.delay, log_record.pk))


Expand Down
33 changes: 30 additions & 3 deletions engine/apps/alerts/tasks/delete_alert_group.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from celery.utils.log import get_task_logger
from django.conf import settings

from apps.alerts.signals import alert_group_action_triggered_signal
from apps.slack.errors import SlackAPIRatelimitError
from common.custom_celery_tasks import shared_dedicated_queue_retry_task

Expand All @@ -10,7 +11,7 @@
@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def delete_alert_group(alert_group_pk, user_pk):
def delete_alert_group(alert_group_pk: int, user_pk: int) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not for this PR, but wondering if this still needs to be a task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can keep it for now, I guess something could go wrong in the stop escalation part of delete and we could retry here.

from apps.alerts.models import AlertGroup
from apps.user_management.models import User

Expand All @@ -25,9 +26,35 @@ def delete_alert_group(alert_group_pk, user_pk):
return

logger.debug(f"User {user} is deleting alert group {alert_group} (channel: {alert_group.channel})")
alert_group.delete_by_user(user)


@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def send_alert_group_signal_for_delete(alert_group_pk: int, log_record_pk: int) -> None:
try:
alert_group.delete_by_user(user)
alert_group_action_triggered_signal.send(
sender=send_alert_group_signal_for_delete,
log_record=log_record_pk,
force_sync=True,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we trigger the action triggered signal and queue the cleanup task after that. Makes sense 👍

except SlackAPIRatelimitError as e:
# Handle Slack API ratelimit raised in apps.slack.scenarios.distribute_alerts.DeleteGroupStep.process_signal
delete_alert_group.apply_async((alert_group_pk, user_pk), countdown=e.retry_after)
send_alert_group_signal_for_delete.apply_async((alert_group_pk, log_record_pk), countdown=e.retry_after)
return

finish_delete_alert_group.apply_async((alert_group_pk,))


@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
)
def finish_delete_alert_group(alert_group_pk: int) -> None:
from apps.alerts.models import AlertGroup

alert_group = AlertGroup.objects.filter(pk=alert_group_pk).first()
if not alert_group:
logger.debug(f"Alert group id={alert_group_pk} not found, already deleted")
return
alert_group.finish_delete_by_user()
4 changes: 3 additions & 1 deletion engine/apps/alerts/tasks/send_alert_group_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
from apps.alerts.signals import alert_group_action_triggered_signal
from common.custom_celery_tasks import shared_dedicated_queue_retry_task

from .task_logger import task_logger


@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=0 if settings.DEBUG else None
)
def send_alert_group_signal(log_record_id):
start_time = time.time()

task_logger.info(f"sending signal for log record {log_record_id}")
alert_group_action_triggered_signal.send(sender=send_alert_group_signal, log_record=log_record_id)

print("--- %s seconds ---" % (time.time() - start_time))
82 changes: 63 additions & 19 deletions engine/apps/alerts/tests/test_alert_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
from apps.alerts.incident_appearance.renderers.phone_call_renderer import AlertGroupPhoneCallRenderer
from apps.alerts.models import AlertGroup, AlertGroupLogRecord
from apps.alerts.tasks import wipe
from apps.alerts.tasks.delete_alert_group import delete_alert_group
from apps.alerts.tasks.delete_alert_group import (
delete_alert_group,
finish_delete_alert_group,
send_alert_group_signal_for_delete,
)
from apps.slack.client import SlackClient
from apps.slack.errors import SlackAPIMessageNotFoundError, SlackAPIRatelimitError
from apps.slack.models import SlackMessage
Expand Down Expand Up @@ -85,9 +89,9 @@ def test_delete(
make_alert,
make_slack_message,
make_resolution_note_slack_message,
django_capture_on_commit_callbacks,
):
"""test alert group deleting"""

organization, slack_team_identity = make_organization_with_slack_team_identity()
user = make_user(organization=organization)

Expand Down Expand Up @@ -119,7 +123,20 @@ def test_delete(
assert alert_group.slack_messages.count() == 1
assert alert_group.resolution_note_slack_messages.count() == 2

delete_alert_group(alert_group.pk, user.pk)
with patch(
"apps.alerts.tasks.delete_alert_group.send_alert_group_signal_for_delete.delay", return_value=None
) as mock_send_alert_group_signal:
with django_capture_on_commit_callbacks(execute=True):
delete_alert_group(alert_group.pk, user.pk)
assert mock_send_alert_group_signal.call_count == 1

with patch(
"apps.alerts.tasks.delete_alert_group.finish_delete_alert_group.apply_async", return_value=None
) as mock_finish_delete_alert_group:
send_alert_group_signal_for_delete(*mock_send_alert_group_signal.call_args.args)
assert mock_finish_delete_alert_group.call_count == 1

finish_delete_alert_group(alert_group.pk)

assert not alert_group.alerts.exists()
assert not alert_group.slack_messages.exists()
Expand All @@ -140,10 +157,10 @@ def test_delete(


@pytest.mark.parametrize("api_method", ["reactions_remove", "chat_delete"])
@patch.object(delete_alert_group, "apply_async")
@patch.object(send_alert_group_signal_for_delete, "apply_async")
@pytest.mark.django_db
def test_delete_slack_ratelimit(
mock_delete_alert_group,
mock_send_alert_group_signal_for_delete,
api_method,
make_organization_with_slack_team_identity,
make_user,
Expand All @@ -152,6 +169,7 @@ def test_delete_slack_ratelimit(
make_alert,
make_slack_message,
make_resolution_note_slack_message,
django_capture_on_commit_callbacks,
):
organization, slack_team_identity = make_organization_with_slack_team_identity()
user = make_user(organization=organization)
Expand Down Expand Up @@ -180,17 +198,31 @@ def test_delete_slack_ratelimit(
ts="test2_ts",
)

with patch.object(
SlackClient,
api_method,
side_effect=SlackAPIRatelimitError(
response=build_slack_response({"ok": False, "error": "ratelimited"}, headers={"Retry-After": 42})
),
):
delete_alert_group(alert_group.pk, user.pk)
with patch(
"apps.alerts.tasks.delete_alert_group.send_alert_group_signal_for_delete.delay", return_value=None
) as mock_send_alert_group_signal:
with django_capture_on_commit_callbacks(execute=True):
delete_alert_group(alert_group.pk, user.pk)
assert mock_send_alert_group_signal.call_count == 1

with patch(
"apps.alerts.tasks.delete_alert_group.finish_delete_alert_group.apply_async", return_value=None
) as mock_finish_delete_alert_group:
with patch.object(
SlackClient,
api_method,
side_effect=SlackAPIRatelimitError(
response=build_slack_response({"ok": False, "error": "ratelimited"}, headers={"Retry-After": 42})
),
):
send_alert_group_signal_for_delete(*mock_send_alert_group_signal.call_args.args)

assert mock_finish_delete_alert_group.call_count == 0

# Check task is retried gracefully
mock_delete_alert_group.assert_called_once_with((alert_group.pk, user.pk), countdown=42)
mock_send_alert_group_signal_for_delete.assert_called_once_with(
mock_send_alert_group_signal.call_args.args, countdown=42
)


@pytest.mark.parametrize("api_method", ["reactions_remove", "chat_delete"])
Expand Down Expand Up @@ -582,7 +614,7 @@ def test_filter_active_alert_groups(
@patch("apps.alerts.models.AlertGroup.hard_delete")
@patch("apps.alerts.models.AlertGroup.un_attach_by_delete")
@patch("apps.alerts.models.AlertGroup.stop_escalation")
@patch("apps.alerts.models.alert_group.alert_group_action_triggered_signal")
@patch("apps.alerts.tasks.delete_alert_group.alert_group_action_triggered_signal")
@pytest.mark.django_db
def test_delete_by_user(
mock_alert_group_action_triggered_signal,
Expand All @@ -592,6 +624,7 @@ def test_delete_by_user(
make_organization_and_user,
make_alert_receive_channel,
make_alert_group,
django_capture_on_commit_callbacks,
):
organization, user = make_organization_and_user()
alert_receive_channel = make_alert_receive_channel(organization)
Expand All @@ -603,20 +636,31 @@ def test_delete_by_user(

assert alert_group.log_records.filter(type=AlertGroupLogRecord.TYPE_DELETED).count() == 0

alert_group.delete_by_user(user)
with patch(
"apps.alerts.tasks.delete_alert_group.send_alert_group_signal_for_delete.delay", return_value=None
) as mock_send_alert_group_signal:
with django_capture_on_commit_callbacks(execute=True):
delete_alert_group(alert_group.pk, user.pk)

assert mock_send_alert_group_signal.call_count == 1
assert alert_group.log_records.filter(type=AlertGroupLogRecord.TYPE_DELETED).count() == 1
deleted_log_record = alert_group.log_records.get(type=AlertGroupLogRecord.TYPE_DELETED)

alert_group.stop_escalation.assert_called_once_with()

with patch(
"apps.alerts.tasks.delete_alert_group.finish_delete_alert_group.apply_async", return_value=None
) as mock_finish_delete_alert_group:
send_alert_group_signal_for_delete(*mock_send_alert_group_signal.call_args.args)
assert mock_finish_delete_alert_group.call_count == 1

mock_alert_group_action_triggered_signal.send.assert_called_once_with(
sender=alert_group.delete_by_user,
sender=send_alert_group_signal_for_delete,
log_record=deleted_log_record.pk,
action_source=None,
force_sync=True,
)

finish_delete_alert_group(alert_group.pk)

alert_group.hard_delete.assert_called_once_with()

for dependent_alert_group in dependent_alert_groups:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def mock_apply_async(monkeypatch):


@patch("apps.alerts.models.alert_group_log_record.tasks.send_update_log_report_signal.apply_async")
@patch("apps.alerts.models.alert_group.alert_group_action_triggered_signal.send")
@patch("apps.alerts.tasks.send_alert_group_signal.alert_group_action_triggered_signal.send")
@pytest.mark.django_db
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
def test_update_metric_alert_groups_total_cache_on_action(
Expand Down Expand Up @@ -142,7 +142,7 @@ def get_called_arg_index_and_compare_results(update_expected_result):


@patch("apps.alerts.models.alert_group_log_record.tasks.send_update_log_report_signal.apply_async")
@patch("apps.alerts.models.alert_group.alert_group_action_triggered_signal.send")
@patch("apps.alerts.tasks.send_alert_group_signal.alert_group_action_triggered_signal.send")
@pytest.mark.django_db
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
def test_update_metric_alert_groups_response_time_cache_on_action(
Expand Down
32 changes: 26 additions & 6 deletions engine/apps/slack/representatives/alert_group_representative.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from celery.utils.log import get_task_logger
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist

from apps.alerts.constants import ActionSource
from apps.alerts.representative import AlertGroupAbstractRepresentative
Expand Down Expand Up @@ -49,14 +50,20 @@ def on_create_alert_slack_representative_async(alert_pk):


@shared_dedicated_queue_retry_task(
autoretry_for=(Exception,), retry_backoff=True, max_retries=1 if settings.DEBUG else None
autoretry_for=(Exception,),
retry_backoff=True,
dont_autoretry_for=(ObjectDoesNotExist,),
max_retries=1 if settings.DEBUG else None,
)
def on_alert_group_action_triggered_async(log_record_id):
from apps.alerts.models import AlertGroupLogRecord

logger.debug(f"SLACK representative: get log record {log_record_id}")
try:
log_record = AlertGroupLogRecord.objects.get(pk=log_record_id)
except AlertGroupLogRecord.DoesNotExist as e:
logger.warning(f"SLACK representative: log record {log_record_id} never created or has been deleted")
raise e

log_record = AlertGroupLogRecord.objects.get(pk=log_record_id)
alert_group_id = log_record.alert_group_id
logger.debug(f"Start on_alert_group_action_triggered for alert_group {alert_group_id}, log record {log_record_id}")
instance = AlertGroupSlackRepresentative(log_record)
Expand Down Expand Up @@ -145,16 +152,25 @@ def on_alert_group_action_triggered(cls, **kwargs):
from apps.alerts.models import AlertGroupLogRecord

log_record = kwargs["log_record"]
action_source = kwargs.get("action_source")
force_sync = kwargs.get("force_sync", False)
if isinstance(log_record, AlertGroupLogRecord):
log_record_id = log_record.pk
else:
log_record_id = log_record

if action_source == ActionSource.SLACK or force_sync:
try:
log_record = AlertGroupLogRecord.objects.get(pk=log_record_id)
except AlertGroupLogRecord.DoesNotExist:
logger.warning(
f"on_alert_group_action_triggered: log record {log_record_id} never created or has been deleted"
)
return

if log_record.action_source == ActionSource.SLACK or force_sync:
logger.debug(f"SLACK on_alert_group_action_triggered: sync {log_record_id} {force_sync}")
on_alert_group_action_triggered_async(log_record_id)
else:
logger.debug(f"SLACK on_alert_group_action_triggered: async {log_record_id} {force_sync}")
on_alert_group_action_triggered_async.apply_async((log_record_id,))

@classmethod
Expand All @@ -167,7 +183,11 @@ def on_alert_group_update_log_report(cls, **kwargs):
alert_group_id = alert_group.pk
else:
alert_group_id = alert_group
alert_group = AlertGroup.objects.get(pk=alert_group_id)
try:
alert_group = AlertGroup.objects.get(pk=alert_group_id)
except AlertGroup.DoesNotExist as e:
logger.warning(f"SLACK update log report: alert group {alert_group_id} has been deleted")
raise e

logger.debug(
f"Received alert_group_update_log_report signal in SLACK representative for alert_group {alert_group_id}"
Expand Down
7 changes: 6 additions & 1 deletion engine/apps/telegram/alert_group_representative.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,13 @@ def on_alert_group_action(self):
def on_alert_group_update_log_report(cls, **kwargs):
logger.info("AlertGroupTelegramRepresentative UPDATE LOG REPORT SIGNAL")
alert_group = kwargs["alert_group"]

if not isinstance(alert_group, AlertGroup):
alert_group = AlertGroup.objects.get(pk=alert_group)
try:
alert_group = AlertGroup.objects.get(pk=alert_group)
except AlertGroup.DoesNotExist as e:
logger.warning(f"Telegram update log report: alert group {alert_group} has been deleted")
raise e

messages_to_edit = alert_group.telegram_messages.filter(
message_type__in=(
Expand Down
Loading
Loading