Skip to content

Commit

Permalink
ref(crons): Include message_type in kafka message (#2723)
Browse files Browse the repository at this point in the history
There are two types of messages that end up in the ingest-monitors kafka
topic, "check_in" (the ones produced here in relay) and "clock_pulse"
messages, which are produced externally and are intended to ensure the
clock continues to run even when ingestion volume drops.

Currently we have some shim logic in our producer which handles adding
in this message type when it is missing


https://github.com/getsentry/sentry/blob/07522eb8c43828deee6a74de2bb72cf989ec907d/src/sentry/monitors/consumers/monitor_consumer.py#L591-L595

This change introduces the `message_type` key for the check-in messages
so we no longer need to shim this.

#skip-changelog
  • Loading branch information
evanpurkhiser authored Nov 27, 2023
1 parent ec0b7a5 commit c7b996c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 0 deletions.
11 changes: 11 additions & 0 deletions relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,7 @@ impl StoreService {
item: &Item,
) -> Result<(), StoreError> {
let message = KafkaMessage::CheckIn(CheckInKafkaMessage {
message_type: CheckInMessageType::CheckIn,
project_id,
retention_days,
start_time: UnixTimestamp::from_instant(start_time).as_secs(),
Expand Down Expand Up @@ -1023,11 +1024,21 @@ struct ProfileKafkaMessage {
payload: Bytes,
}

#[allow(dead_code)]
#[derive(Debug, Serialize)]
#[serde(rename_all = "snake_case")]
enum CheckInMessageType {
ClockPulse,
CheckIn,
}

#[derive(Debug, Serialize)]
struct CheckInKafkaMessage {
#[serde(skip)]
routing_key_hint: Option<Uuid>,

/// Used by the consumer to discrinminate the message.
message_type: CheckInMessageType,
/// Raw event payload.
payload: Bytes,
/// Time at which the event was received by Relay.
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/test_monitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def test_monitors_with_processing(
relay.send_check_in(42, check_in)

check_in, message = monitors_consumer.get_check_in()
assert message["message_type"] == "check_in"
assert message["start_time"] is not None
assert message["project_id"] == 42
assert check_in == {
Expand Down Expand Up @@ -72,6 +73,7 @@ def test_monitor_endpoint_get_with_processing(
assert response.status_code == 202

check_in, message = monitors_consumer.get_check_in()
assert message["message_type"] == "check_in"
assert message["start_time"] is not None
assert message["project_id"] == project_id
assert check_in == {
Expand Down Expand Up @@ -102,6 +104,7 @@ def test_monitor_endpoint_post_auth_basic_with_processing(
assert response.status_code == 202

check_in, message = monitors_consumer.get_check_in()
assert message["message_type"] == "check_in"
assert message["start_time"] is not None
assert message["project_id"] == project_id
assert check_in == {
Expand Down Expand Up @@ -130,6 +133,7 @@ def test_monitor_endpoint_embedded_auth_with_processing(
assert response.status_code == 202

check_in, message = monitors_consumer.get_check_in()
assert message["message_type"] == "check_in"
assert message["start_time"] is not None
assert message["project_id"] == project_id
assert check_in == {
Expand Down

0 comments on commit c7b996c

Please sign in to comment.