From baf909dcabd590dfd6736973a94a3af3008c549f Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Wed, 5 Apr 2023 17:22:26 +0200 Subject: [PATCH] Auto monitoring beat update (#1989) - Small update to support Celery 4 and 5 - Changed the name of the schedule shelf file that we patch to have the suffix `-patched-by-sentry-sdk` instead of `.new` so in case there is an error with this new shelf file somewhere the users know that it is patched by the sentry sdk. - Additionally some minor tweaks to make code more readable --- sentry_sdk/integrations/celery.py | 30 +++++++------- .../celery/test_celery_beat_crons.py | 39 ++++++++++++++----- 2 files changed, 44 insertions(+), 25 deletions(-) diff --git a/sentry_sdk/integrations/celery.py b/sentry_sdk/integrations/celery.py index d69dd467bb..9d312e2e14 100644 --- a/sentry_sdk/integrations/celery.py +++ b/sentry_sdk/integrations/celery.py @@ -3,6 +3,7 @@ import sys import shutil import functools +import tempfile from sentry_sdk.consts import OP from sentry_sdk._compat import reraise @@ -320,6 +321,11 @@ def sentry_workloop(*args, **kwargs): def _get_headers(task): # type: (Task) -> Dict[str, Any] headers = task.request.get("headers") or {} + + if "headers" in headers: + headers.update(headers["headers"]) + del headers["headers"] + return headers @@ -392,9 +398,11 @@ def _reinstall_patched_tasks(app, sender, add_updated_periodic_tasks): add_updated_periodic_task() # Start Celery Beat (with new (cloned) schedule, because old one is still in use) - new_schedule_filename = sender.schedule_filename + ".new" - shutil.copy2(sender.schedule_filename, new_schedule_filename) - app.Beat(schedule=new_schedule_filename).run() + cloned_schedule = tempfile.NamedTemporaryFile(suffix="-patched-by-sentry-sdk") + with open(sender.schedule_filename, "rb") as original_schedule: + shutil.copyfileobj(original_schedule, cloned_schedule) + + app.Beat(schedule=cloned_schedule.name).run() # Nested functions do not work as Celery hook receiver, @@ -480,9 +488,7 @@ def crons_task_before_run(sender, **kwargs): if "sentry-monitor-slug" not in headers: return - monitor_config = ( - headers["sentry-monitor-config"] if "sentry-monitor-config" in headers else {} - ) + monitor_config = headers.get("sentry-monitor-config", {}) start_timestamp_s = now() @@ -506,9 +512,7 @@ def crons_task_success(sender, **kwargs): if "sentry-monitor-slug" not in headers: return - monitor_config = ( - headers["sentry-monitor-config"] if "sentry-monitor-config" in headers else {} - ) + monitor_config = headers.get("sentry-monitor-config", {}) start_timestamp_s = headers["sentry-monitor-start-timestamp-s"] @@ -529,9 +533,7 @@ def crons_task_failure(sender, **kwargs): if "sentry-monitor-slug" not in headers: return - monitor_config = ( - headers["sentry-monitor-config"] if "sentry-monitor-config" in headers else {} - ) + monitor_config = headers.get("sentry-monitor-config", {}) start_timestamp_s = headers["sentry-monitor-start-timestamp-s"] @@ -552,9 +554,7 @@ def crons_task_retry(sender, **kwargs): if "sentry-monitor-slug" not in headers: return - monitor_config = ( - headers["sentry-monitor-config"] if "sentry-monitor-config" in headers else {} - ) + monitor_config = headers.get("sentry-monitor-config", {}) start_timestamp_s = headers["sentry-monitor-start-timestamp-s"] diff --git a/tests/integrations/celery/test_celery_beat_crons.py b/tests/integrations/celery/test_celery_beat_crons.py index 8c99faef39..fd90196c8e 100644 --- a/tests/integrations/celery/test_celery_beat_crons.py +++ b/tests/integrations/celery/test_celery_beat_crons.py @@ -1,3 +1,4 @@ +import tempfile import mock import pytest @@ -37,6 +38,20 @@ def test_get_headers(): assert _get_headers(fake_task) == {"bla": "blub"} + fake_task.request.update( + { + "headers": { + "headers": { + "tri": "blub", + "bar": "baz", + }, + "bla": "blub", + }, + } + ) + + assert _get_headers(fake_task) == {"bla": "blub", "tri": "blub", "bar": "baz"} + @pytest.mark.parametrize( "seconds, expected_tuple", @@ -273,16 +288,20 @@ def test_reinstall_patched_tasks(): add_updated_periodic_tasks = [mock.MagicMock(), mock.MagicMock(), mock.MagicMock()] - with mock.patch("sentry_sdk.integrations.celery.shutil.copy2") as mock_copy2: - _reinstall_patched_tasks(app, sender, add_updated_periodic_tasks) + mock_open = mock.Mock(return_value=tempfile.NamedTemporaryFile()) - sender.stop.assert_called_once_with() + with mock.patch("sentry_sdk.integrations.celery.open", mock_open): + with mock.patch( + "sentry_sdk.integrations.celery.shutil.copyfileobj" + ) as mock_copyfileobj: + _reinstall_patched_tasks(app, sender, add_updated_periodic_tasks) - add_updated_periodic_tasks[0].assert_called_once_with() - add_updated_periodic_tasks[1].assert_called_once_with() - add_updated_periodic_tasks[2].assert_called_once_with() + sender.stop.assert_called_once_with() - mock_copy2.assert_called_once_with( - "test_schedule_filename", "test_schedule_filename.new" - ) - fake_beat.run.assert_called_once_with() + add_updated_periodic_tasks[0].assert_called_once_with() + add_updated_periodic_tasks[1].assert_called_once_with() + add_updated_periodic_tasks[2].assert_called_once_with() + + mock_copyfileobj.assert_called_once() + + fake_beat.run.assert_called_once_with()