Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(replays): add replay events ItemType #1239

Merged
merged 36 commits into from
Jun 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
b910e80
feat(replays): add ItemType for recordings
JoshFerge Apr 22, 2022
fbb22ef
Merge branch 'master' into jferg/replay-recordings
JoshFerge Apr 27, 2022
d81ad3a
name change recording -> payload
JoshFerge Apr 27, 2022
e9972c5
feat(replays): add replay events itemtype
JoshFerge Apr 22, 2022
39cfd11
pr feedback
JoshFerge Apr 27, 2022
3d0d2ea
fix merge
JoshFerge Apr 27, 2022
7915d55
Update CHANGELOG.md
JoshFerge Apr 27, 2022
b58a4e3
drop events if ff not enabled
JoshFerge May 2, 2022
e9b9590
Merge branch 'master' into jferg/replay-recordings
JoshFerge May 2, 2022
1c78bf9
Merge branch 'jferg/replay-recordings' into jferg/replay-events-itemtype
JoshFerge May 2, 2022
4152676
add payload to ff condition
JoshFerge May 2, 2022
45d0531
fix itemtype reference
JoshFerge May 2, 2022
874051c
fix itemtype reference
JoshFerge May 3, 2022
dc27892
add replay items to from_str method
JoshFerge May 3, 2022
8167775
Merge branch 'master' into jferg/replay-recordings
JoshFerge May 4, 2022
6c78fb1
Merge branch 'jferg/replay-recordings' into jferg/replay-events-itemtype
JoshFerge May 4, 2022
6ff5c7b
Merge branch 'master' into jferg/replay-recordings
JoshFerge May 10, 2022
ed70621
fix merge conflicts
JoshFerge May 10, 2022
55248e8
remove ReplayEvent EventType
JoshFerge May 12, 2022
db13fef
update schema snapshot
JoshFerge May 25, 2022
39ee7e7
Merge branch 'master' into jferg/replay-recordings
JoshFerge Jun 1, 2022
0611228
rename payloads -> recordings
JoshFerge Jun 1, 2022
e62d36a
checkpoint: get envelope replay test working in principle
JoshFerge Jun 2, 2022
35a0755
small fixups
JoshFerge Jun 2, 2022
404371f
add second test for without processing
JoshFerge Jun 2, 2022
88b7cb7
separate out recording chunks
JoshFerge Jun 3, 2022
3e3882c
set defaults for recording attachment chunks
JoshFerge Jun 3, 2022
6c79661
merge recordings PR updates
JoshFerge Jun 6, 2022
e864666
event_id -> replay_id
JoshFerge Jun 7, 2022
a54ef48
Merge branch 'master' into jferg/replay-recordings
jan-auer Jun 9, 2022
6240aac
used ChunkedReplayRecording type
JoshFerge Jun 9, 2022
a8fd742
specify to use min latest relay version
JoshFerge Jun 9, 2022
ca921a0
merge upstream, pr feedback
JoshFerge Jun 9, 2022
6ebbd73
black
JoshFerge Jun 9, 2022
92b522d
remove errant test file
JoshFerge Jun 9, 2022
2f1f9db
fix merge conflicts
JoshFerge Jun 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
- Relay is now compatible with CentOS 7 and Red Hat Enterprise Linux 7 onward (kernel version _2.6.32_), depending on _glibc 2.17_ or newer. The `crash-handler` feature, which is currently enabled in the build published to DockerHub, additionally requires _curl 7.29_ or newer. ([#1279](https://github.com/getsentry/relay/pull/1279))
- Optionally start relay with `--upstream-dsn` to pass a Sentry DSN instead of the URL. This can be convenient when starting Relay in environments close to an SDK, where a DSN is already available. ([#1277](https://github.com/getsentry/relay/pull/1277))
- Add a new runtime mode `--aws-runtime-api=$AWS_LAMBDA_RUNTIME_API` that integrates Relay with the AWS Extensions API lifecycle. ([#1277](https://github.com/getsentry/relay/pull/1277))
- Add ReplayRecording ItemType. ([#1236](https://github.com/getsentry/relay/pull/1236))
- Add Replay ItemTypes. ([#1236](https://github.com/getsentry/relay/pull/1236), ([#1239](https://github.com/getsentry/relay/pull/1239)
)

**Bug Fixes**:

Expand Down
4 changes: 4 additions & 0 deletions relay-common/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ pub enum DataCategory {
Session = 5,
/// A profile
Profile = 6,
/// Session Replays
Replay = 7,
/// Any other data category not known by this Relay.
#[serde(other)]
Unknown = -1,
Expand All @@ -124,6 +126,7 @@ impl DataCategory {
"security" => Self::Security,
"attachment" => Self::Attachment,
"session" => Self::Session,
"replay" => Self::Replay,
"profile" => Self::Profile,
_ => Self::Unknown,
}
Expand All @@ -138,6 +141,7 @@ impl DataCategory {
Self::Security => "security",
Self::Attachment => "attachment",
Self::Session => "session",
Self::Replay => "replay",
Self::Profile => "profile",
Self::Unknown => "unknown",
}
Expand Down
8 changes: 7 additions & 1 deletion relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,9 @@ pub enum KafkaTopic {
Metrics,
/// Profiles
Profiles,
/// ReplayRecording, large blobs sent by the replay sdk
/// ReplayEvents, breadcrumb + session updates for replays
ReplayEvents,
/// ReplayRecordings, large blobs sent by the replay sdk
ReplayRecordings,
}

Expand All @@ -807,6 +809,8 @@ pub struct TopicAssignments {
pub metrics: TopicAssignment,
/// Stacktrace topic name
pub profiles: TopicAssignment,
/// Replay Events topic name.
pub replay_events: TopicAssignment,
/// Recordings topic name.
pub replay_recordings: TopicAssignment,
}
Expand All @@ -823,6 +827,7 @@ impl TopicAssignments {
KafkaTopic::Sessions => &self.sessions,
KafkaTopic::Metrics => &self.metrics,
KafkaTopic::Profiles => &self.profiles,
KafkaTopic::ReplayEvents => &self.replay_events,
KafkaTopic::ReplayRecordings => &self.replay_recordings,
}
}
Expand All @@ -839,6 +844,7 @@ impl Default for TopicAssignments {
sessions: "ingest-sessions".to_owned().into(),
metrics: "ingest-metrics".to_owned().into(),
profiles: "profiles".to_owned().into(),
replay_events: "ingest-replay-events".to_owned().into(),
replay_recordings: "ingest-replay-recordings".to_owned().into(),
}
}
Expand Down
1 change: 1 addition & 0 deletions relay-quotas/src/quota.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ impl CategoryUnit {
DataCategory::Default
| DataCategory::Error
| DataCategory::Transaction
| DataCategory::Replay
| DataCategory::Security
| DataCategory::Profile => Some(Self::Count),
DataCategory::Attachment => Some(Self::Bytes),
Expand Down
8 changes: 5 additions & 3 deletions relay-server/src/actors/envelopes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1006,10 +1006,11 @@ impl EnvelopeProcessor {
});
}

fn process_replay_recordings(&self, state: &mut ProcessEnvelopeState) {
/// Remove replays if the feature flag is not enabled
fn process_replays(&self, state: &mut ProcessEnvelopeState) {
let replays_enabled = state.project_state.has_feature(Feature::Replays);
state.envelope.retain_items(|item| match item.ty() {
ItemType::ReplayRecording => replays_enabled,
ItemType::ReplayEvent | ItemType::ReplayRecording => replays_enabled,
_ => true,
});
}
Expand Down Expand Up @@ -1329,6 +1330,7 @@ impl EnvelopeProcessor {
ItemType::MetricBuckets => false,
ItemType::ClientReport => false,
ItemType::Profile => false,
ItemType::ReplayEvent => false,
ItemType::ReplayRecording => false,
// Without knowing more, `Unknown` items are allowed to be repeated
ItemType::Unknown(_) => false,
Expand Down Expand Up @@ -1843,7 +1845,7 @@ impl EnvelopeProcessor {
self.process_client_reports(state);
self.process_user_reports(state);
self.process_profiles(state);
self.process_replay_recordings(state);
self.process_replays(state);

if state.creates_event() {
if_processing!({
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub enum Feature {
/// Enables ingestion and normalization of profiles.
#[serde(rename = "organizations:profiling")]
Profiling,
/// Enables ingestion of Session Replays (Replay Recordings and Replay Events)
#[serde(rename = "organizations:session-replay")]
Replays,

Expand Down
50 changes: 49 additions & 1 deletion relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ struct Producers {
sessions: Producer,
metrics: Producer,
profiles: Producer,
replay_events: Producer,
replay_recordings: Producer,
}

Expand All @@ -77,6 +78,7 @@ impl Producers {
KafkaTopic::Sessions => Some(&self.sessions),
KafkaTopic::Metrics => Some(&self.metrics),
KafkaTopic::Profiles => Some(&self.profiles),
KafkaTopic::ReplayEvents => Some(&self.replay_events),
KafkaTopic::ReplayRecordings => Some(&self.replay_recordings),
}
}
Expand Down Expand Up @@ -140,6 +142,11 @@ impl StoreForwarder {
&mut reused_producers,
KafkaTopic::ReplayRecordings,
)?,
replay_events: make_producer(
&*config,
&mut reused_producers,
KafkaTopic::ReplayEvents,
)?,
};

Ok(Self { config, producers })
Expand Down Expand Up @@ -445,6 +452,28 @@ impl StoreForwarder {
Ok(())
}

fn produce_replay_event(
JoshFerge marked this conversation as resolved.
Show resolved Hide resolved
&self,
replay_id: EventId,
project_id: ProjectId,
start_time: Instant,
item: &Item,
) -> Result<(), StoreError> {
let message = ReplayEventKafkaMessage {
replay_id,
project_id,
start_time: UnixTimestamp::from_instant(start_time).as_secs(),
payload: item.payload(),
};
relay_log::trace!("Sending replay event to Kafka");
self.produce(KafkaTopic::ReplayEvents, KafkaMessage::ReplayEvent(message))?;
metric!(
counter(RelayCounters::ProcessingMessageProduced) += 1,
event_type = "replay_event"
);
Ok(())
}

fn produce_replay_recording_chunks(
&self,
replay_id: EventId,
Expand Down Expand Up @@ -588,6 +617,17 @@ struct EventKafkaMessage {
/// Attachments that are potentially relevant for processing.
attachments: Vec<ChunkedAttachment>,
}
#[derive(Clone, Debug, Serialize)]
struct ReplayEventKafkaMessage {
/// Raw event payload.
payload: Bytes,
/// Time at which the event was received by Relay.
start_time: u64,
/// The event id.
replay_id: EventId,
/// The project id for the current event.
project_id: ProjectId,
}

/// Container payload for chunks of attachments.
#[derive(Debug, Serialize)]
Expand Down Expand Up @@ -713,6 +753,7 @@ enum KafkaMessage {
Session(SessionKafkaMessage),
Metric(MetricKafkaMessage),
Profile(ProfileKafkaMessage),
ReplayEvent(ReplayEventKafkaMessage),
ReplayRecording(ReplayRecordingKafkaMessage),
ReplayRecordingChunk(ReplayRecordingChunkKafkaMessage),
}
Expand All @@ -727,6 +768,7 @@ impl KafkaMessage {
KafkaMessage::Session(_) => "session",
KafkaMessage::Metric(_) => "metric",
KafkaMessage::Profile(_) => "profile",
KafkaMessage::ReplayEvent(_) => "replay_event",
KafkaMessage::ReplayRecording(_) => "replay_recording",
KafkaMessage::ReplayRecordingChunk(_) => "replay_recording_chunk",
}
Expand All @@ -742,6 +784,7 @@ impl KafkaMessage {
Self::Session(_message) => Uuid::nil(), // Explicit random partitioning for sessions
Self::Metric(_message) => Uuid::nil(), // TODO(ja): Determine a partitioning key
Self::Profile(_message) => Uuid::nil(),
Self::ReplayEvent(message) => message.replay_id.0,
Self::ReplayRecording(message) => message.replay_id.0,
Self::ReplayRecordingChunk(message) => message.replay_id.0,
};
Expand Down Expand Up @@ -879,7 +922,12 @@ impl Handler<StoreEnvelope> for StoreForwarder {
event_type = "replay_recording"
);
}

ItemType::ReplayEvent => self.produce_replay_event(
event_id.ok_or(StoreError::NoEventId)?,
scoping.project_id,
start_time,
item,
)?,
_ => {}
}
}
Expand Down
6 changes: 6 additions & 0 deletions relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ pub enum ItemType {
ClientReport,
/// Profile event payload encoded in JSON
Profile,
/// Replay metadata and breadcrumb payload
ReplayEvent,
/// Replay Recording data
ReplayRecording,
/// A new item type that is yet unknown by this version of Relay.
Expand Down Expand Up @@ -144,6 +146,7 @@ impl fmt::Display for ItemType {
Self::MetricBuckets => write!(f, "metric_buckets"),
Self::ClientReport => write!(f, "client_report"),
Self::Profile => write!(f, "profile"),
Self::ReplayEvent => write!(f, "replay_event"),
Self::ReplayRecording => write!(f, "replay_recording"),
Self::Unknown(s) => s.fmt(f),
}
Expand All @@ -169,6 +172,7 @@ impl std::str::FromStr for ItemType {
"metric_buckets" => Self::MetricBuckets,
"client_report" => Self::ClientReport,
"profile" => Self::Profile,
"replay_event" => Self::ReplayEvent,
"replay_recording" => Self::ReplayRecording,
other => Self::Unknown(other.to_owned()),
})
Expand Down Expand Up @@ -625,6 +629,7 @@ impl Item {
| ItemType::Metrics
| ItemType::MetricBuckets
| ItemType::ClientReport
| ItemType::ReplayEvent
| ItemType::ReplayRecording
| ItemType::Profile => false,

Expand All @@ -647,6 +652,7 @@ impl Item {
ItemType::RawSecurity => true,
ItemType::UnrealReport => true,
ItemType::UserReport => true,
ItemType::ReplayEvent => true,
ItemType::Session => false,
ItemType::Sessions => false,
ItemType::Metrics => false,
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/utils/rate_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ fn infer_event_category(item: &Item) -> Option<DataCategory> {
ItemType::FormData => None,
ItemType::UserReport => None,
ItemType::Profile => None,
ItemType::ReplayEvent => None,
ItemType::ReplayRecording => None,
ItemType::ClientReport => None,
ItemType::Unknown(_) => None,
Expand Down
1 change: 1 addition & 0 deletions relay-server/src/utils/sizes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub fn check_envelope_size_limits(config: &Config, envelope: &Envelope) -> bool
ItemType::Event
| ItemType::Transaction
| ItemType::Security
| ItemType::ReplayEvent
| ItemType::RawSecurity
| ItemType::FormData => event_size += item.len(),
ItemType::Attachment | ItemType::UnrealReport | ItemType::ReplayRecording => {
Expand Down
1 change: 1 addition & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
replay_recordings_consumer,
sessions_consumer,
metrics_consumer,
replay_events_consumer,
) # noqa


Expand Down
9 changes: 8 additions & 1 deletion tests/integration/fixtures/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ def send_envelope(self, project_id, envelope, headers=None, dsn_key_idx=0):
"X-Sentry-Auth": self.get_auth_header(project_id, dsn_key_idx),
**(headers or {}),
}

response = self.post(url, headers=headers, data=envelope.serialize())
response.raise_for_status()

Expand Down Expand Up @@ -196,6 +195,14 @@ def send_transaction(self, project_id, payload, item_headers=None):

self.send_envelope(project_id, envelope)

def send_replay_event(self, project_id, payload, item_headers=None):
envelope = Envelope()
envelope.add_item(Item(payload=PayloadRef(json=payload), type="replay_event"))
if envelope.headers is None:
envelope.headers = {}

self.send_envelope(project_id, envelope)

def send_session_aggregates(self, project_id, payload):
envelope = Envelope()
envelope.add_item(Item(payload=PayloadRef(json=payload), type="sessions"))
Expand Down
17 changes: 17 additions & 0 deletions tests/integration/fixtures/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def inner(options=None):
"outcomes": get_topic_name("outcomes"),
"sessions": get_topic_name("sessions"),
"metrics": get_topic_name("metrics"),
"replay_events": get_topic_name("replay_events"),
"replay_recordings": get_topic_name("replay_recordings"),
}

Expand Down Expand Up @@ -286,6 +287,11 @@ def replay_recordings_consumer(kafka_consumer):
return lambda: ReplayRecordingsConsumer(*kafka_consumer("replay_recordings"))


@pytest.fixture
def replay_events_consumer(kafka_consumer):
return lambda: ReplayEventsConsumer(*kafka_consumer("replay_events"))


class MetricsConsumer(ConsumerBase):
def get_metric(self, timeout=None):
message = self.poll(timeout=timeout)
Expand Down Expand Up @@ -360,3 +366,14 @@ def get_individual_replay(self):
v = msgpack.unpackb(message.value(), raw=False, use_list=False)
assert v["type"] == "replay_recording", v["type"]
return v


class ReplayEventsConsumer(ConsumerBase):
def get_replay_event(self):
message = self.poll()
assert message is not None
assert message.error() is None

event = msgpack.unpackb(message.value(), raw=False, use_list=False)
assert event["type"] == "replay_event"
return json.loads(event["payload"].decode("utf8")), event
Loading