Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(monitors): Ingestion for basic check-in payloads #1886

Merged
merged 10 commits into from
Mar 2, 2023
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions relay-common/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ pub enum DataCategory {
/// This is the category for transaction payloads that were accepted and stored in full. In
/// contrast, `transaction` only guarantees that metrics have been accepted for the transaction.
TransactionIndexed = 9,
/// Cron monitor checkins.
Cron = 10,
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
//
// IMPORTANT: After adding a new entry to DataCategory, go to the `relay-cabi` subfolder and run
// `make header` to regenerate the C-binding. This allows using the data category from Python.
Expand All @@ -145,6 +147,7 @@ impl DataCategory {
"replay" => Self::Replay,
"transaction_processed" => Self::TransactionProcessed,
"transaction_indexed" => Self::TransactionIndexed,
"cron" => Self::Cron,
_ => Self::Unknown,
}
}
Expand All @@ -163,6 +166,7 @@ impl DataCategory {
Self::Replay => "replay",
Self::TransactionProcessed => "transaction_processed",
Self::TransactionIndexed => "transaction_indexed",
Self::Cron => "cron",
Self::Unknown => "unknown",
}
}
Expand Down
8 changes: 8 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,8 @@ struct Limits {
max_attachments_size: ByteSize,
/// The maximum combined size for all client reports in an envelope or request.
max_client_reports_size: ByteSize,
/// The maximum payload size for a cron monitor checkin.
max_checkin_size: ByteSize,
/// The maximum payload size for an entire envelopes. Individual limits still apply.
max_envelope_size: ByteSize,
/// The maximum number of session items per envelope.
Expand Down Expand Up @@ -571,6 +573,7 @@ impl Default for Limits {
max_attachment_size: ByteSize::mebibytes(100),
max_attachments_size: ByteSize::mebibytes(100),
max_client_reports_size: ByteSize::kibibytes(4),
max_checkin_size: ByteSize::kibibytes(100),
max_envelope_size: ByteSize::mebibytes(100),
max_session_count: 100,
max_api_payload_size: ByteSize::mebibytes(20),
Expand Down Expand Up @@ -1690,6 +1693,11 @@ impl Config {
self.values.limits.max_client_reports_size.as_bytes()
}

/// Returns the maxmium payload size of a cron monitor checkin in bytes.
pub fn max_checkin_size(&self) -> usize {
self.values.limits.max_checkin_size.as_bytes()
}

/// Returns the maximum size of an envelope payload in bytes.
///
/// Individual item size limits still apply.
Expand Down
19 changes: 19 additions & 0 deletions relay-crons/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "relay-crons"
authors = ["Sentry <oss@sentry.io>"]
description = "Crons processing for Relay"
homepage = "https://getsentry.github.io/relay/"
repository = "https://github.com/getsentry/relay"
version = "23.2.0"
edition = "2021"
license-file = "../LICENSE"
publish = false

[dependencies]
relay-common = { path = "../relay-common" }
serde = { version = "1.0.114", features = ["derive"] }
serde_json = "1.0.55"
thiserror = "1.0.38"

[dev-dependencies]
similar-asserts = "1.4.2"
108 changes: 108 additions & 0 deletions relay-crons/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//! Crons protocol and processing for Sentry.
//!
//! [Cron Monitors] allow you to monitor the uptime and performance of any scheduled, recurring job
//! in Sentry. Once implemented, it'll allow you to get alerts and metrics to help you solve errors,
//! detect timeouts, and prevent disruptions to your service.
//!
//! # API
//!
//! The public API documentation is available on [Sentry Docs](https://docs.sentry.io/api/crons/).
//!
//! [cron monitors]: https://docs.sentry.io/product/crons/

#![doc(
html_logo_url = "https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png",
html_favicon_url = "https://raw.githubusercontent.com/getsentry/relay/master/artwork/relay-icon.png"
)]
#![warn(missing_docs)]

use relay_common::Uuid;
use serde::{Deserialize, Serialize};

/// Error returned from [`process_checkin`].
#[derive(Debug, thiserror::Error)]
pub enum ProcessCheckinError {
/// Failed to deserialize the payload.
#[error("failed to deserialize checkin")]
Json(#[from] serde_json::Error),
}

///
#[derive(Clone, Copy, Debug, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
enum CheckinStatus {
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
/// Checkin had no issues during execution.
Ok,
/// Checkin failed or otherwise had some issues.
Error,
/// Checkin is expectred to complete.
InProgress,
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
/// Monitor did not check in on time.
Missed,
/// No status was passed.
#[serde(other)]
Unknown,
}

fn uuid_simple<S>(uuid: &Uuid, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
uuid.as_simple().serialize(serializer)
}

/// The cron monitor checkin payload.
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Debug, Deserialize, Serialize)]
struct Checkin {
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
/// Unique identifier of this checkin.
#[serde(serialize_with = "uuid_simple")]
checkin_id: Uuid, // TODO(ja): roundtrip without slashes

/// Identifier of the monitor for this checkin.
#[serde(serialize_with = "uuid_simple")]
monitor_id: Uuid,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is going to become monitor_slug as I mentioned to you.

Let me get confirmation for getsentry/sentry#45166 first though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I'd propose we merge as-is and follow up with a set of PRs for Relay + Sentry once this has cleared up.


/// Status of this checkin. Defaults to `"unknown"`.
status: CheckinStatus,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need a #[serde(default)] here to derive the default in case the field is missing from the payload?

Copy link
Member Author

@jan-auer jan-auer Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've had this discussion with @cleptric . The current endpoint requires this field. We're thinking about making it optional as long as we can still break this API. @evanpurkhiser would you like it to be optional right away?


/// Duration of this check since it has started in seconds.
#[serde(default, skip_serializing_if = "Option::is_none")]
duration: Option<f64>,
}

/// Normalizes a cron monitor checkin payload.
///
/// Returns `None` if the payload was valid and does not have to be changed. Returns `Some` for
/// valid payloads that were normalized.
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
pub fn process_checkin(payload: &[u8]) -> Result<Vec<u8>, ProcessCheckinError> {
let mut checkin = serde_json::from_slice::<Checkin>(payload)?;

// Missed status cannot be ingested, this is computed on the server.
if checkin.status == CheckinStatus::Missed {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there will be cases we want to allow the user to send missed checkins.

This is fine for now

checkin.status = CheckinStatus::Unknown;
}

let serialized = serde_json::to_vec(&checkin)?;
Ok(serialized)
}

#[cfg(test)]
mod tests {
use super::*;
use similar_asserts::assert_eq;

#[test]
fn test_json_roundtrip() {
let json = r#"{
"checkin_id": "a460c25ff2554577b920fcfacae4e5eb",
"monitor_id": "4dc8556e039245c7bd569f8cf513ea42",
"status": "in_progress",
"duration": 21.0
}"#;

let checkin = serde_json::from_str::<Checkin>(json).unwrap();
let serialized = serde_json::to_string_pretty(&checkin).unwrap();

assert_eq!(json, serialized);
}
}
9 changes: 8 additions & 1 deletion relay-kafka/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ pub enum KafkaTopic {
ReplayEvents,
/// ReplayRecordings, large blobs sent by the replay sdk
ReplayRecordings,
/// Cron monitor checkins.
Crons,
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
}

impl KafkaTopic {
/// Returns iterator over the variants of [`KafkaTopic`].
/// It will have to be adjusted if the new variants are added.
pub fn iter() -> std::slice::Iter<'static, Self> {
use KafkaTopic::*;
static TOPICS: [KafkaTopic; 11] = [
static TOPICS: [KafkaTopic; 12] = [
Events,
Attachments,
Transactions,
Expand All @@ -58,6 +60,7 @@ impl KafkaTopic {
Profiles,
ReplayEvents,
ReplayRecordings,
Crons,
];
TOPICS.iter()
}
Expand Down Expand Up @@ -93,6 +96,8 @@ pub struct TopicAssignments {
pub replay_events: TopicAssignment,
/// Recordings topic name.
pub replay_recordings: TopicAssignment,
/// Cron monitor checkins.
pub crons: TopicAssignment,
}

impl TopicAssignments {
Expand All @@ -113,6 +118,7 @@ impl TopicAssignments {
KafkaTopic::Profiles => &self.profiles,
KafkaTopic::ReplayEvents => &self.replay_events,
KafkaTopic::ReplayRecordings => &self.replay_recordings,
KafkaTopic::Crons => &self.crons,
}
}
}
Expand All @@ -132,6 +138,7 @@ impl Default for TopicAssignments {
profiles: "profiles".to_owned().into(),
replay_events: "ingest-replay-events".to_owned().into(),
replay_recordings: "ingest-replay-recordings".to_owned().into(),
crons: "ingest-crons".to_owned().into(),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion relay-quotas/src/quota.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ impl CategoryUnit {
| DataCategory::Security
| DataCategory::Profile
| DataCategory::TransactionProcessed
| DataCategory::TransactionIndexed => Some(Self::Count),
| DataCategory::TransactionIndexed
| DataCategory::Cron => Some(Self::Count),
DataCategory::Attachment => Some(Self::Bytes),
DataCategory::Session => Some(Self::Batched),
DataCategory::Unknown => None,
Expand Down
2 changes: 2 additions & 0 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ssl = [
]
processing = [
"dep:minidump",
"dep:relay-crons",
"dep:relay-profiling",
"dep:symbolic-common",
"dep:symbolic-unreal",
Expand Down Expand Up @@ -57,6 +58,7 @@ relay-auth = { path = "../relay-auth" }
relay-aws-extension = { path = "../relay-aws-extension" }
relay-common = { path = "../relay-common" }
relay-config = { path = "../relay-config" }
relay-crons = { path = "../relay-crons", optional = true }
relay-filter = { path = "../relay-filter" }
relay-general = { path = "../relay-general" }
relay-kafka = { path = "../relay-kafka", optional = true }
Expand Down
27 changes: 26 additions & 1 deletion relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ impl EnvelopeProcessorService {
}
}

/// Remove profiles from the envelope if the feature flag is not enabled
/// Remove profiles from the envelope if the feature flag is not enabled.
fn filter_profiles(&self, state: &mut ProcessEnvelopeState) {
let profiling_enabled = state.project_state.has_feature(Feature::Profiling);
state.envelope.retain_items(|item| match item.ty() {
Expand All @@ -1029,6 +1029,28 @@ impl EnvelopeProcessorService {
});
}

/// Normalize cron monitor checkins and remove invalid ones.
#[cfg(feature = "processing")]
fn process_checkins(&self, state: &mut ProcessEnvelopeState) {
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
state.envelope.retain_items(|item| {
if item.ty() != &ItemType::Checkin {
return true;
}

match relay_crons::process_checkin(&item.payload()) {
Ok(processed) => {
item.set_payload(ContentType::Json, processed);
true
}
Err(error) => {
// TODO: Track an outcome.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we OK with not tracking outcomes here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the backend also doesn't track outcomes yet, and it's not yet clear whether we want to track per check-in update or only per check-in.

relay_log::debug!("dropped invalid cron checkin: {}", LogError(&error));
false
}
}
})
}

/// Process profiles and set the profile ID in the profile context on the transaction if successful
#[cfg(feature = "processing")]
fn process_profiles(&self, state: &mut ProcessEnvelopeState) {
Expand Down Expand Up @@ -1567,6 +1589,8 @@ impl EnvelopeProcessorService {
ItemType::Profile => false,
ItemType::ReplayEvent => false,
ItemType::ReplayRecording => false,
ItemType::Checkin => false,

// Without knowing more, `Unknown` items are allowed to be repeated
ItemType::Unknown(_) => false,
}
Expand Down Expand Up @@ -2235,6 +2259,7 @@ impl EnvelopeProcessorService {
self.enforce_quotas(state)?;
// We need the event parsed in order to set the profile context on it
self.process_profiles(state);
self.process_checkins(state);
});

if state.has_event() {
Expand Down
Loading