Skip to content

Commit

Permalink
fix(store): Normalize future_timestamp under 1 hour
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-auer committed Jul 8, 2020
1 parent 69b6bd5 commit 24a7043
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 22 deletions.
29 changes: 18 additions & 11 deletions relay-general/src/store/clock_drift.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::time::Duration;

use chrono::{DateTime, Duration as SignedDuration, Utc};

use crate::processor::{ProcessValue, ProcessingState, Processor};
use crate::protocol::{Event, SessionUpdate};
use crate::types::{Error, ErrorKind, Meta, ProcessingResult, Timestamp};

/// The minimum clock drift for correction to apply.
const MINIMUM_CLOCK_DRIFT_SECS: i64 = 55 * 60;

/// A signed correction that contains the sender's timestamp as well as the drift to the receiver.
#[derive(Clone, Copy, Debug)]
struct ClockCorrection {
Expand All @@ -20,8 +19,8 @@ impl ClockCorrection {
Self { sent_at, drift }
}

fn at_least(self, lower_bound: SignedDuration) -> Option<Self> {
if self.drift.num_seconds().abs() >= lower_bound.num_seconds().abs() {
fn at_least(self, lower_bound: Duration) -> Option<Self> {
if self.drift.num_seconds().abs() as u64 >= lower_bound.as_secs() {
Some(self)
} else {
None
Expand Down Expand Up @@ -60,10 +59,7 @@ impl ClockDriftProcessor {
/// If no `sent_at` timestamp is provided, then clock drift correction is disabled. The drift is
/// calculated from the signed difference between the receiver's and the sender's timestamp.
pub fn new(sent_at: Option<DateTime<Utc>>, received_at: DateTime<Utc>) -> Self {
let correction = sent_at.and_then(|sent_at| {
ClockCorrection::new(sent_at, received_at)
.at_least(SignedDuration::seconds(MINIMUM_CLOCK_DRIFT_SECS))
});
let correction = sent_at.map(|sent_at| ClockCorrection::new(sent_at, received_at));

Self {
received_at,
Expand All @@ -72,6 +68,16 @@ impl ClockDriftProcessor {
}
}

/// Limits clock drift correction to a minimum duration.
///
/// If the detected clock drift is lower than the given duration, no correction is performed and
/// `is_drifted` returns `false`. By default, there is no lower bound and every drift is
/// corrected.
pub fn at_least(mut self, lower_bound: Duration) -> Self {
self.correction = self.correction.and_then(|c| c.at_least(lower_bound));
self
}

/// Use the given error kind for the attached eventerror instead of the default
/// `ErrorKind::ClockDrift`.
pub fn error_kind(mut self, kind: ErrorKind) -> Self {
Expand Down Expand Up @@ -208,7 +214,8 @@ mod tests {
let now = end + drift;

// The event was sent and received with minimal delay, which should not correct
let mut processor = ClockDriftProcessor::new(Some(end), now);
let mut processor =
ClockDriftProcessor::new(Some(end), now).at_least(Duration::from_secs(3600));
let mut event = create_transaction(start, end);
process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();

Expand Down Expand Up @@ -240,7 +247,7 @@ mod tests {
let start = Utc.ymd(2000, 1, 1).and_hms(0, 0, 0);
let end = Utc.ymd(2000, 1, 2).and_hms(0, 0, 0);

let drift = -SignedDuration::days(1);
let drift = -SignedDuration::seconds(60);
let now = end + drift;

// The event was sent and received with delay
Expand Down
63 changes: 58 additions & 5 deletions relay-general/src/store/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1503,15 +1503,15 @@ fn test_future_timestamp() {
use insta::assert_ron_snapshot;

let mut event = Annotated::new(Event {
timestamp: Annotated::new(Utc.ymd(1970, 1, 1).and_hms(0, 0, 0)),
timestamp: Annotated::new(Utc.ymd(2000, 1, 3).and_hms(0, 2, 0)),
..Default::default()
});

let mut processor = NormalizeProcessor::new(
Arc::new(StoreConfig {
received_at: Some(Utc.ymd(2000, 1, 3).and_hms(0, 0, 0)),
max_secs_in_past: Some(0),
max_secs_in_future: Some(0),
max_secs_in_past: Some(30 * 24 * 3600),
max_secs_in_future: Some(60),
..Default::default()
}),
None,
Expand All @@ -1534,9 +1534,9 @@ fn test_future_timestamp() {
"": Meta(Some(MetaInner(
err: [
[
"past_timestamp",
"future_timestamp",
{
"sdk_time": "1970-01-01T00:00:00+00:00",
"sdk_time": "2000-01-03T00:02:00+00:00",
"server_time": "2000-01-03T00:00:00+00:00",
},
],
Expand All @@ -1547,3 +1547,56 @@ fn test_future_timestamp() {
}
"###);
}

#[test]
fn test_past_timestamp() {
use crate::types::SerializableAnnotated;

use chrono::TimeZone;
use insta::assert_ron_snapshot;

let mut event = Annotated::new(Event {
timestamp: Annotated::new(Utc.ymd(2000, 1, 3).and_hms(0, 0, 0)),
..Default::default()
});

let mut processor = NormalizeProcessor::new(
Arc::new(StoreConfig {
received_at: Some(Utc.ymd(2000, 3, 3).and_hms(0, 0, 0)),
max_secs_in_past: Some(30 * 24 * 3600),
max_secs_in_future: Some(60),
..Default::default()
}),
None,
);
process_value(&mut event, &mut processor, ProcessingState::root()).unwrap();

assert_ron_snapshot!(SerializableAnnotated(&event), {
".event_id" => "[event-id]",
}, @r###"
{
"event_id": "[event-id]",
"level": "error",
"type": "default",
"logger": "",
"platform": "other",
"timestamp": 952041600,
"received": 952041600,
"_meta": {
"timestamp": {
"": Meta(Some(MetaInner(
err: [
[
"past_timestamp",
{
"sdk_time": "2000-01-03T00:00:00+00:00",
"server_time": "2000-03-03T00:00:00+00:00",
},
],
],
))),
},
},
}
"###);
}
17 changes: 11 additions & 6 deletions relay-server/src/actors/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::cell::RefCell;
use std::collections::BTreeMap;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};

use actix::prelude::*;
use chrono::{DateTime, Duration, Utc};
use chrono::{DateTime, Duration as SignedDuration, Utc};
use failure::Fail;
use futures::prelude::*;
use parking_lot::RwLock;
Expand Down Expand Up @@ -47,6 +47,9 @@ use {
relay_quotas::{DataCategory, RateLimitingError, RedisRateLimiter},
};

/// The minimum clock drift for correction to apply.
const MINIMUM_CLOCK_DRIFT: Duration = Duration::from_secs(55 * 60);

#[derive(Debug, Fail)]
pub enum QueueEnvelopeError {
#[fail(display = "Too many events (event_buffer_size reached)")]
Expand Down Expand Up @@ -279,7 +282,8 @@ impl EventProcessor {
let received = state.received_at;

let project_id = envelope.meta().project_id().value();
let clock_drift_processor = ClockDriftProcessor::new(envelope.sent_at(), received);
let clock_drift_processor =
ClockDriftProcessor::new(envelope.sent_at(), received).at_least(MINIMUM_CLOCK_DRIFT);
let client = envelope.meta().client().map(str::to_owned);

envelope.retain_items(|item| {
Expand Down Expand Up @@ -327,13 +331,13 @@ impl EventProcessor {
changed = true;
}

let max_age = Duration::seconds(self.config.max_session_secs_in_past());
let max_age = SignedDuration::seconds(self.config.max_session_secs_in_past());
if (received - session.started) > max_age || (received - session.timestamp) > max_age {
log::trace!("skipping session older than {} days", max_age.num_days());
return false;
}

let max_future = Duration::seconds(self.config.max_secs_in_future());
let max_future = SignedDuration::seconds(self.config.max_secs_in_future());
if (session.started - received) > max_age || (session.timestamp - received) > max_age {
log::trace!(
"skipping session more than {}s in the future",
Expand Down Expand Up @@ -820,7 +824,8 @@ impl EventProcessor {
None => None,
};

let mut processor = ClockDriftProcessor::new(sent_at, state.received_at);
let mut processor =
ClockDriftProcessor::new(sent_at, state.received_at).at_least(MINIMUM_CLOCK_DRIFT);
process_value(&mut state.event, &mut processor, ProcessingState::root())
.map_err(|_| ProcessingError::InvalidTransaction)?;

Expand Down

0 comments on commit 24a7043

Please sign in to comment.