Skip to content

Commit

Permalink
ref(crons): Include message_type in kafka message
Browse files Browse the repository at this point in the history
There are two types of messages that end up in the ingest-monitors kafka
topic, "check_in" (the ones produced here in relay) and "clock_pulse"
messages, which are produced externally and are intended to ensure the
clock continues to run even when ingestion volume drops.

Currently we have some shim logic in our producer which handles adding
in this message type when it is missing

https://github.com/getsentry/sentry/blob/07522eb8c43828deee6a74de2bb72cf989ec907d/src/sentry/monitors/consumers/monitor_consumer.py#L591-L595

This change introduces the `message_type` key for the check-in messages
so we no longer need to shim this.
  • Loading branch information
evanpurkhiser committed Nov 14, 2023
1 parent a6e3c40 commit 066b953
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 0 deletions.
4 changes: 4 additions & 0 deletions relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,7 @@ impl StoreService {
item: &Item,
) -> Result<(), StoreError> {
let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
message_type: "check_in".into(),
project_id,
retention_days,
start_time: UnixTimestamp::from_instant(start_time).as_secs(),
Expand Down Expand Up @@ -1028,6 +1029,9 @@ struct CheckInKafkaMessage {
#[serde(skip)]
routing_key_hint: Option<Uuid>,

/// Discriminate tag used to differentiate these messages from clock pulse messages that are
/// also placed into the topic.
message_type: String,
/// Raw event payload.
payload: Bytes,
/// Time at which the event was received by Relay.
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/test_monitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def test_monitors_with_processing(
relay.send_check_in(42, check_in)

check_in, message = monitors_consumer.get_check_in()
assert message["message_type"] == "check_in"
assert message["start_time"] is not None
assert message["project_id"] == 42
assert check_in == {
Expand Down Expand Up @@ -72,6 +73,7 @@ def test_monitor_endpoint_get_with_processing(
assert response.status_code == 202

check_in, message = monitors_consumer.get_check_in()
assert message["message_type"] == "check_in"
assert message["start_time"] is not None
assert message["project_id"] == project_id
assert check_in == {
Expand Down Expand Up @@ -102,6 +104,7 @@ def test_monitor_endpoint_post_auth_basic_with_processing(
assert response.status_code == 202

check_in, message = monitors_consumer.get_check_in()
assert message["message_type"] == "check_in"
assert message["start_time"] is not None
assert message["project_id"] == project_id
assert check_in == {
Expand Down Expand Up @@ -130,6 +133,7 @@ def test_monitor_endpoint_embedded_auth_with_processing(
assert response.status_code == 202

check_in, message = monitors_consumer.get_check_in()
assert message["message_type"] == "check_in"
assert message["start_time"] is not None
assert message["project_id"] == project_id
assert check_in == {
Expand Down

0 comments on commit 066b953

Please sign in to comment.