From 066b95377334d95259cd559d831cdb49f160abb9 Mon Sep 17 00:00:00 2001 From: Evan Purkhiser Date: Tue, 14 Nov 2023 13:26:15 -0800 Subject: [PATCH] ref(crons): Include message_type in kafka message There are two types of messages that end up in the ingest-monitors kafka topic, "check_in" (the ones produced here in relay) and "clock_pulse" messages, which are produced externally and are intended to ensure the clock continues to run even when ingestion volume drops. Currently we have some shim logic in our producer which handles adding in this message type when it is missing https://github.com/getsentry/sentry/blob/07522eb8c43828deee6a74de2bb72cf989ec907d/src/sentry/monitors/consumers/monitor_consumer.py#L591-L595 This change introduces the `message_type` key for the check-in messages so we no longer need to shim this. --- relay-server/src/actors/store.rs | 4 ++++ tests/integration/test_monitors.py | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/relay-server/src/actors/store.rs b/relay-server/src/actors/store.rs index 829195478f6..da0d40de8d8 100644 --- a/relay-server/src/actors/store.rs +++ b/relay-server/src/actors/store.rs @@ -720,6 +720,7 @@ impl StoreService { item: &Item, ) -> Result<(), StoreError> { let message = KafkaMessage::CheckIn(CheckInKafkaMessage { + message_type: "check_in".into(), project_id, retention_days, start_time: UnixTimestamp::from_instant(start_time).as_secs(), @@ -1028,6 +1029,9 @@ struct CheckInKafkaMessage { #[serde(skip)] routing_key_hint: Option, + /// Discriminate tag used to differentiate these messages from clock pulse messages that are + /// also placed into the topic. + message_type: String, /// Raw event payload. payload: Bytes, /// Time at which the event was received by Relay. diff --git a/tests/integration/test_monitors.py b/tests/integration/test_monitors.py index 939c00d8f33..34a05be63a0 100644 --- a/tests/integration/test_monitors.py +++ b/tests/integration/test_monitors.py @@ -43,6 +43,7 @@ def test_monitors_with_processing( relay.send_check_in(42, check_in) check_in, message = monitors_consumer.get_check_in() + assert message["message_type"] == "check_in" assert message["start_time"] is not None assert message["project_id"] == 42 assert check_in == { @@ -72,6 +73,7 @@ def test_monitor_endpoint_get_with_processing( assert response.status_code == 202 check_in, message = monitors_consumer.get_check_in() + assert message["message_type"] == "check_in" assert message["start_time"] is not None assert message["project_id"] == project_id assert check_in == { @@ -102,6 +104,7 @@ def test_monitor_endpoint_post_auth_basic_with_processing( assert response.status_code == 202 check_in, message = monitors_consumer.get_check_in() + assert message["message_type"] == "check_in" assert message["start_time"] is not None assert message["project_id"] == project_id assert check_in == { @@ -130,6 +133,7 @@ def test_monitor_endpoint_embedded_auth_with_processing( assert response.status_code == 202 check_in, message = monitors_consumer.get_check_in() + assert message["message_type"] == "check_in" assert message["start_time"] is not None assert message["project_id"] == project_id assert check_in == {