diff --git a/CHANGELOG.md b/CHANGELOG.md index cc0038cd1f..3973c219f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ **Internal**: - Emit outcomes for rate limited attachments. ([#951](https://github.com/getsentry/relay/pull/951)) +- Remove timestamp from metrics text protocol. ([#972](https://github.com/getsentry/relay/pull/972)) - Add max, min, sum, and count to gauge metrics. ([#974](https://github.com/getsentry/relay/pull/974)) ## 21.3.1 diff --git a/Cargo.lock b/Cargo.lock index c6f4e340bb..ad622ba177 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3120,6 +3120,7 @@ dependencies = [ "schemars", "sentry-types 0.20.1", "serde", + "serde_test", ] [[package]] @@ -3810,6 +3811,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_test" +version = "1.0.125" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4bb5fef7eaf5a97917567183607ac4224c5b451c15023930f23b937cce879fe" +dependencies = [ + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.5.5" diff --git a/relay-common/Cargo.toml b/relay-common/Cargo.toml index 3103bdbf49..818be3b4ad 100644 --- a/relay-common/Cargo.toml +++ b/relay-common/Cargo.toml @@ -26,6 +26,9 @@ sentry-types = "0.20.0" schemars = { version = "0.8.1", features = ["uuid", "chrono"], optional = true } serde = { version = "1.0.114", features = ["derive"] } +[dev-dependencies] +serde_test = "1.0.125" + [features] jsonschema = ["schemars"] default = [] diff --git a/relay-common/src/time.rs b/relay-common/src/time.rs index dbf904d32b..39ba896e42 100644 --- a/relay-common/src/time.rs +++ b/relay-common/src/time.rs @@ -3,6 +3,7 @@ use std::fmt; use std::time::{Duration, Instant, SystemTime}; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; /// Converts an `Instant` into a `SystemTime`. @@ -146,12 +147,114 @@ impl Serialize for UnixTimestamp { } } +struct UnixTimestampVisitor; + +impl<'de> serde::de::Visitor<'de> for UnixTimestampVisitor { + type Value = UnixTimestamp; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a non-negative timestamp or datetime string") + } + + fn visit_u64(self, v: u64) -> Result + where + E: serde::de::Error, + { + Ok(UnixTimestamp::from_secs(v)) + } + + fn visit_f64(self, v: f64) -> Result + where + E: serde::de::Error, + { + if v < 0.0 || v > u64::MAX as f64 { + return Err(E::custom("timestamp out-of-range")); + } + + Ok(UnixTimestamp::from_secs(v.trunc() as u64)) + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + let datetime = v.parse::>().map_err(E::custom)?; + let timestamp = datetime.timestamp(); + + if timestamp >= 0 { + Ok(UnixTimestamp(timestamp as u64)) + } else { + Err(E::custom("timestamp out-of-range")) + } + } +} + impl<'de> Deserialize<'de> for UnixTimestamp { fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de>, { - let secs = u64::deserialize(deserializer)?; - Ok(Self::from_secs(secs)) + deserializer.deserialize_any(UnixTimestampVisitor) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_test::{assert_de_tokens, assert_de_tokens_error, assert_tokens, Token}; + + #[test] + fn test_parse_timestamp_int() { + assert_tokens(&UnixTimestamp::from_secs(123), &[Token::U64(123)]); + } + + #[test] + fn test_parse_timestamp_neg_int() { + assert_de_tokens_error::( + &[Token::I64(-1)], + "invalid type: integer `-1`, expected a non-negative timestamp or datetime string", + ); + } + + #[test] + fn test_parse_timestamp_float() { + assert_de_tokens(&UnixTimestamp::from_secs(123), &[Token::F64(123.4)]); + } + + #[test] + fn test_parse_timestamp_large_float() { + assert_de_tokens_error::( + &[Token::F64(2.0 * (u64::MAX as f64))], + "timestamp out-of-range", + ); + } + + #[test] + fn test_parse_timestamp_neg_float() { + assert_de_tokens_error::(&[Token::F64(-1.0)], "timestamp out-of-range"); + } + + #[test] + fn test_parse_timestamp_str() { + assert_de_tokens( + &UnixTimestamp::from_secs(123), + &[Token::Str("1970-01-01T00:02:03Z")], + ); + } + + #[test] + fn test_parse_timestamp_other() { + assert_de_tokens_error::( + &[Token::Bool(true)], + "invalid type: boolean `true`, expected a non-negative timestamp or datetime string", + ); + } + + #[test] + fn test_parse_datetime_bogus() { + assert_de_tokens_error::( + &[Token::Str("adf3rt546")], + "input contains invalid characters", + ); } } diff --git a/relay-metrics/src/aggregation.rs b/relay-metrics/src/aggregation.rs index 14b0b3dc3a..2bf6c68b47 100644 --- a/relay-metrics/src/aggregation.rs +++ b/relay-metrics/src/aggregation.rs @@ -1211,7 +1211,7 @@ mod tests { }) .and_then(|_| { // Wait until flush delay has passed - relay_test::delay(Duration::from_millis(1100)).map_err(|_| ()) + relay_test::delay(Duration::from_millis(1500)).map_err(|_| ()) }) .and_then(|_| { // After the flush delay has passed, the receiver should have the bucket: diff --git a/relay-metrics/src/lib.rs b/relay-metrics/src/lib.rs index d35e121b69..8cea97227f 100644 --- a/relay-metrics/src/lib.rs +++ b/relay-metrics/src/lib.rs @@ -11,13 +11,28 @@ //! looks like this: //! //! ```text -//! endpoint.response_time@ms:57|d|'1615889449|#route:user_index -//! endpoint.hits:1|c|'1615889449|#route:user_index +//! endpoint.response_time@ms:57|d|#route:user_index +//! endpoint.hits:1|c|#route:user_index //! ``` //! //! The metric type is part of its signature just like the unit. Therefore, it is allowed to reuse a //! metric name for multiple metric types, which will result in multiple metrics being recorded. //! +//! # Metric Envelopes +//! +//! To send one or more metrics to Relay, the raw protocol is enclosed in an envelope item of type +//! `metrics`: +//! +//! ```text +//! {} +//! {"type": "metrics", "timestamp": 1615889440, ...} +//! endpoint.response_time@ms:57|d|#route:user_index +//! ... +//! ``` +//! +//! The timestamp in the item header is used to send backdated metrics. If it is omitted, +//! the `received` time of the envelope is assumed. +//! //! # Aggregation //! //! Relay accumulates all metrics in [time buckets](Bucket) before sending them onwards. Aggregation diff --git a/relay-metrics/src/protocol.rs b/relay-metrics/src/protocol.rs index b3a8516829..77d72bbc6a 100644 --- a/relay-metrics/src/protocol.rs +++ b/relay-metrics/src/protocol.rs @@ -1,5 +1,5 @@ use std::collections::BTreeMap; -use std::fmt::{self, Write}; +use std::fmt; use std::iter::FusedIterator; use hash32::{FnvHasher, Hasher}; @@ -285,23 +285,6 @@ fn parse_tags(string: &str) -> Option> { Some(map) } -/// Parses a UNIX timestamp from the given string. -/// -/// The timestamp can be represented as floating point number, in which case it is truncated. -fn parse_timestamp(string: &str) -> Option { - if let Ok(int) = string.parse() { - Some(UnixTimestamp::from_secs(int)) - } else if let Ok(float) = string.parse::() { - if float < 0f64 { - None - } else { - Some(UnixTimestamp::from_secs(float.trunc() as u64)) - } - } else { - None - } -} - /// A single metric value representing the payload sent from clients. /// /// As opposed to bucketed metric aggregations, this single metrics always represent a single @@ -312,15 +295,15 @@ fn parse_timestamp(string: &str) -> Option { /// # Submission Protocol /// /// ```text -/// [@unit]:||'|#:, +/// [@unit]:||#:, /// ``` /// /// See the field documentation on this struct for more information on the components. An example /// submission looks like this: /// /// ```text -/// endpoint.response_time@ms:57|d|'1615889449|#route:user_index -/// endpoint.hits:1|c|'1615889449|#route:user_index +/// endpoint.response_time@ms:57|d|#route:user_index +/// endpoint.hits:1|c|#route:user_index /// ``` /// /// To parse a submission payload, use [`Metric::parse_all`]. @@ -328,7 +311,8 @@ fn parse_timestamp(string: &str) -> Option { /// # JSON Representation /// /// In addition to the submission protocol, metrics can be represented as structured data in JSON. -/// Field values are the same with a single exception: The timestamp is required in JSON notation. +/// In addition to the field values from the submission protocol, a timestamp is added to every +/// metric (see [crate documentation](crate)). /// /// ```json /// { @@ -361,8 +345,7 @@ fn parse_timestamp(string: &str) -> Option { /// { /// "name": "endpoint.users", /// "value": 4267882815, -/// "type": "s", -/// "timestamp": 4711 +/// "type": "s" /// } /// ``` @@ -391,9 +374,9 @@ pub struct Metric { pub value: MetricValue, /// The timestamp for this metric value. /// - /// In the SDK protocol, timestamps are optional and preceded with a single quote `'`. Supply a - /// default timestamp to [`Metric::parse`] or [`Metric::parse_all`] to insert a default - /// timestamp. + /// If a timestamp is not supplied in the item header of the envelope, the + /// default timestamp supplied to [`Metric::parse`] or [`Metric::parse_all`] + /// is associated with the metric. pub timestamp: UnixTimestamp, /// A list of tags adding dimensions to the metric for filtering and aggregation. /// @@ -423,10 +406,8 @@ impl Metric { }; for component in components { - match component.chars().next() { - Some('#') => metric.tags = parse_tags(component.get(1..)?)?, - Some('\'') => metric.timestamp = parse_timestamp(component.get(1..)?)?, - _ => (), + if let Some('#') = component.chars().next() { + metric.tags = parse_tags(component.get(1..)?)?; } } @@ -477,56 +458,6 @@ impl Metric { pub fn parse_all(slice: &[u8], timestamp: UnixTimestamp) -> ParseMetrics<'_> { ParseMetrics { slice, timestamp } } - - /// Serializes the metric to the raw protocol. - /// - /// See the [`Metric`] for more information on the protocol. - /// - /// # Example - /// - /// ``` - /// use std::collections::BTreeMap; - /// use relay_metrics::{Metric, MetricUnit, MetricValue, MetricType, UnixTimestamp}; - /// - /// let metric = Metric { - /// name: "hits".to_owned(), - /// unit: MetricUnit::None, - /// value: MetricValue::Counter(1.0), - /// timestamp: UnixTimestamp::from_secs(1615889449), - /// tags: BTreeMap::new(), - /// }; - /// - /// assert_eq!(metric.serialize(), "hits:1|c|'1615889449"); - /// ``` - pub fn serialize(&self) -> String { - let mut string = self.name.clone(); - - if self.unit != MetricUnit::None { - write!(string, "@{}", self.unit).ok(); - } - - write!( - string, - ":{}|{}|'{}", - self.value, - self.value.ty(), - self.timestamp - ) - .ok(); - - for (index, (key, value)) in self.tags.iter().enumerate() { - match index { - 0 => write!(string, "|#{}", key).ok(), - _ => write!(string, ",{}", key).ok(), - }; - - if !value.is_empty() { - write!(string, ":{}", value).ok(); - } - } - - string - } } /// Iterator over parsed metrics returned from [`Metric::parse_all`]. @@ -673,22 +604,6 @@ mod tests { assert_eq!(metric.unit, MetricUnit::Duration(DurationPrecision::Second)); } - #[test] - fn test_parse_timestamp() { - let s = "foo:17.5|d|'1337"; - let timestamp = UnixTimestamp::from_secs(0xffff_ffff); - let metric = Metric::parse(s.as_bytes(), timestamp).unwrap(); - assert_eq!(metric.timestamp, UnixTimestamp::from_secs(1337)); - } - - #[test] - fn test_parse_timestamp_float() { - let s = "foo:17.5|d|'1337.666"; - let timestamp = UnixTimestamp::from_secs(0xffff_ffff); - let metric = Metric::parse(s.as_bytes(), timestamp).unwrap(); - assert_eq!(metric.timestamp, UnixTimestamp::from_secs(1337)); - } - #[test] fn test_parse_tags() { let s = "foo:17.5|d|#foo,bar:baz"; @@ -726,57 +641,6 @@ mod tests { assert!(metric.is_err()); } - #[test] - fn test_serialize_basic() { - let metric = Metric { - name: "foo".to_owned(), - unit: MetricUnit::None, - value: MetricValue::Counter(42.), - timestamp: UnixTimestamp::from_secs(4711), - tags: BTreeMap::new(), - }; - - assert_eq!(metric.serialize(), "foo:42|c|'4711"); - } - - #[test] - fn test_serialize_unit() { - let metric = Metric { - name: "foo".to_owned(), - unit: MetricUnit::Duration(DurationPrecision::Second), - value: MetricValue::Counter(42.), - timestamp: UnixTimestamp::from_secs(4711), - tags: BTreeMap::new(), - }; - - assert_eq!(metric.serialize(), "foo@s:42|c|'4711"); - } - - #[test] - fn test_serialize_tags() { - let mut tags = BTreeMap::new(); - tags.insert("empty".to_owned(), "".to_owned()); - tags.insert("full".to_owned(), "value".to_owned()); - - let metric = Metric { - name: "foo".to_owned(), - unit: MetricUnit::None, - value: MetricValue::Counter(42.), - timestamp: UnixTimestamp::from_secs(4711), - tags, - }; - - assert_eq!(metric.serialize(), "foo:42|c|'4711|#empty,full:value"); - } - - #[test] - fn test_roundtrip() { - let s = "foo@s:17.5|d|'1337|#bar,foo:baz"; - let timestamp = UnixTimestamp::from_secs(0xffff_ffff); - let metric = Metric::parse(s.as_bytes(), timestamp).unwrap(); - assert_eq!(metric.serialize(), s); - } - #[test] fn test_serde_json() { let json = r#"{ diff --git a/relay-server/src/actors/events.rs b/relay-server/src/actors/events.rs index 1272c94da3..ecc4b990da 100644 --- a/relay-server/src/actors/events.rs +++ b/relay-server/src/actors/events.rs @@ -1336,7 +1336,7 @@ impl Handler for EventProcessor { } = message; let received = relay_common::instant_to_date_time(start_time); - let timestamp = UnixTimestamp::from_secs(received.timestamp() as u64); + let default_timestamp = UnixTimestamp::from_secs(received.timestamp() as u64); let clock_drift_processor = ClockDriftProcessor::new(sent_at, received).at_least(MINIMUM_CLOCK_DRIFT); @@ -1344,6 +1344,7 @@ impl Handler for EventProcessor { for item in items { let payload = item.payload(); if item.ty() == ItemType::Metrics { + let timestamp = item.timestamp().unwrap_or(default_timestamp); let metrics = Metric::parse_all(&payload, timestamp).filter_map(|result| { let mut metric = result.ok()?; clock_drift_processor.process_timestamp(&mut metric.timestamp); diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index b5c1095d64..d2ccb05d62 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -38,6 +38,7 @@ use std::io::{self, Write}; use bytes::Bytes; use chrono::{DateTime, Utc}; use failure::Fail; +use relay_common::UnixTimestamp; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use smallvec::SmallVec; @@ -362,6 +363,13 @@ pub struct ItemHeaders { #[serde(default, skip_serializing_if = "Option::is_none")] sample_rates: Option, + /// A custom timestamp associated with the item. + /// + /// For metrics, this field can be used to backdate a submission. + /// The given timestamp determines the bucket into which the metric will be aggregated. + #[serde(default, skip_serializing_if = "Option::is_none")] + timestamp: Option, + /// Other attributes for forward compatibility. #[serde(flatten)] other: BTreeMap, @@ -385,6 +393,7 @@ impl Item { filename: None, rate_limited: false, sample_rates: None, + timestamp: None, other: BTreeMap::new(), }, payload: Bytes::new(), @@ -482,6 +491,11 @@ impl Item { } } + /// Get custom timestamp for this item. Currently used to backdate metrics. + pub fn timestamp(&self) -> Option { + self.headers.timestamp + } + /// Returns the specified header value, if present. pub fn get_header(&self, name: &K) -> Option<&Value> where @@ -1276,6 +1290,24 @@ mod tests { assert_eq!(meta.dsn(), request_meta().dsn()); } + #[test] + fn test_parse_request_sent_at() { + let bytes = Bytes::from("{\"event_id\":\"9ec79c33ec9942ab8353589fcb2e04dc\", \"sent_at\": \"1970-01-01T00:02:03Z\"}"); + let envelope = Envelope::parse_request(bytes, request_meta()).unwrap(); + let sent_at = envelope.sent_at().unwrap(); + + // DSN should be assumed from the request. + assert_eq!(sent_at.timestamp(), 123); + } + + #[test] + fn test_parse_request_sent_at_null() { + let bytes = + Bytes::from("{\"event_id\":\"9ec79c33ec9942ab8353589fcb2e04dc\", \"sent_at\": null}"); + let envelope = Envelope::parse_request(bytes, request_meta()).unwrap(); + assert!(envelope.sent_at().is_none()); + } + #[test] fn test_parse_request_no_origin() { let bytes = Bytes::from("{\"event_id\":\"9ec79c33ec9942ab8353589fcb2e04dc\",\"dsn\":\"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42\"}"); diff --git a/tests/integration/fixtures/__init__.py b/tests/integration/fixtures/__init__.py index 2b9e0d24ea..28f486d22e 100644 --- a/tests/integration/fixtures/__init__.py +++ b/tests/integration/fixtures/__init__.py @@ -175,10 +175,14 @@ def send_session_aggregates(self, project_id, payload): envelope.add_item(Item(payload=PayloadRef(json=payload), type="sessions")) self.send_envelope(project_id, envelope) - def send_metrics(self, project_id, payload): + def send_metrics(self, project_id, payload, timestamp=None): envelope = Envelope() envelope.add_item( - Item(payload=PayloadRef(bytes=payload.encode()), type="metrics") + Item( + payload=PayloadRef(bytes=payload.encode()), + type="metrics", + headers=None if timestamp is None else {"timestamp": timestamp}, + ) ) self.send_envelope(project_id, envelope) diff --git a/tests/integration/test_metrics.py b/tests/integration/test_metrics.py index efbf18b2f9..a0d050bf25 100644 --- a/tests/integration/test_metrics.py +++ b/tests/integration/test_metrics.py @@ -14,8 +14,8 @@ def test_metrics(mini_sentry, relay): mini_sentry.add_basic_project_config(project_id) timestamp = int(datetime.now(tz=timezone.utc).timestamp()) - metrics_payload = f"foo:42|c|'{timestamp}\nbar:17|c|'{timestamp}" - relay.send_metrics(project_id, metrics_payload) + metrics_payload = f"foo:42|c\nbar:17|c" + relay.send_metrics(project_id, metrics_payload, timestamp) envelope = mini_sentry.captured_events.get(timeout=2) assert len(envelope.items) == 1 @@ -30,6 +30,28 @@ def test_metrics(mini_sentry, relay): ] +def test_metrics_backdated(mini_sentry, relay): + relay = relay(mini_sentry, options=TEST_CONFIG) + + project_id = 42 + mini_sentry.add_basic_project_config(project_id) + + timestamp = int(datetime.now(tz=timezone.utc).timestamp()) - 24 * 60 * 60 + metrics_payload = f"foo:42|c" + relay.send_metrics(project_id, metrics_payload, timestamp) + + envelope = mini_sentry.captured_events.get(timeout=2) + assert len(envelope.items) == 1 + + metrics_item = envelope.items[0] + assert metrics_item.type == "metric_buckets" + + received_metrics = metrics_item.get_bytes() + assert json.loads(received_metrics.decode()) == [ + {"timestamp": timestamp, "name": "foo", "value": 42.0, "type": "c"}, + ] + + def test_metrics_with_processing(mini_sentry, relay_with_processing, metrics_consumer): relay = relay_with_processing(options=TEST_CONFIG) metrics_consumer = metrics_consumer() @@ -38,8 +60,8 @@ def test_metrics_with_processing(mini_sentry, relay_with_processing, metrics_con mini_sentry.add_full_project_config(project_id) timestamp = int(datetime.now(tz=timezone.utc).timestamp()) - metrics_payload = f"foo:42|c|'{timestamp}\nbar@s:17|c|'{timestamp}" - relay.send_metrics(project_id, metrics_payload) + metrics_payload = f"foo:42|c\nbar@s:17|c" + relay.send_metrics(project_id, metrics_payload, timestamp) metric = metrics_consumer.get_metric() @@ -88,10 +110,10 @@ def test_metrics_full(mini_sentry, relay, relay_with_processing, metrics_consume # Send two events to downstream and one to upstream timestamp = int(datetime.now(tz=timezone.utc).timestamp()) - downstream.send_metrics(project_id, f"foo:7|c|'{timestamp}") - downstream.send_metrics(project_id, f"foo:5|c|'{timestamp}") + downstream.send_metrics(project_id, f"foo:7|c", timestamp) + downstream.send_metrics(project_id, f"foo:5|c", timestamp) - upstream.send_metrics(project_id, f"foo:3|c|'{timestamp}") + upstream.send_metrics(project_id, f"foo:3|c", timestamp) metric = metrics_consumer.get_metric(timeout=4) metric.pop("timestamp")