diff --git a/CHANGELOG.md b/CHANGELOG.md index c804152847..0ed1ce39fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,8 @@ - Relay is now compatible with CentOS 7 and Red Hat Enterprise Linux 7 onward (kernel version _2.6.32_), depending on _glibc 2.17_ or newer. The `crash-handler` feature, which is currently enabled in the build published to DockerHub, additionally requires _curl 7.29_ or newer. ([#1279](https://github.com/getsentry/relay/pull/1279)) - Optionally start relay with `--upstream-dsn` to pass a Sentry DSN instead of the URL. This can be convenient when starting Relay in environments close to an SDK, where a DSN is already available. ([#1277](https://github.com/getsentry/relay/pull/1277)) - Add a new runtime mode `--aws-runtime-api=$AWS_LAMBDA_RUNTIME_API` that integrates Relay with the AWS Extensions API lifecycle. ([#1277](https://github.com/getsentry/relay/pull/1277)) -- Add ReplayRecording ItemType. ([#1236](https://github.com/getsentry/relay/pull/1236)) +- Add Replay ItemTypes. ([#1236](https://github.com/getsentry/relay/pull/1236), ([#1239](https://github.com/getsentry/relay/pull/1239) + ) **Bug Fixes**: diff --git a/relay-common/src/constants.rs b/relay-common/src/constants.rs index 956a47f8f5..d496f82a10 100644 --- a/relay-common/src/constants.rs +++ b/relay-common/src/constants.rs @@ -109,6 +109,8 @@ pub enum DataCategory { Session = 5, /// A profile Profile = 6, + /// Session Replays + Replay = 7, /// Any other data category not known by this Relay. #[serde(other)] Unknown = -1, @@ -124,6 +126,7 @@ impl DataCategory { "security" => Self::Security, "attachment" => Self::Attachment, "session" => Self::Session, + "replay" => Self::Replay, "profile" => Self::Profile, _ => Self::Unknown, } @@ -138,6 +141,7 @@ impl DataCategory { Self::Security => "security", Self::Attachment => "attachment", Self::Session => "session", + Self::Replay => "replay", Self::Profile => "profile", Self::Unknown => "unknown", } diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 6a50ae90d5..1d28c285e7 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -783,7 +783,9 @@ pub enum KafkaTopic { Metrics, /// Profiles Profiles, - /// ReplayRecording, large blobs sent by the replay sdk + /// ReplayEvents, breadcrumb + session updates for replays + ReplayEvents, + /// ReplayRecordings, large blobs sent by the replay sdk ReplayRecordings, } @@ -807,6 +809,8 @@ pub struct TopicAssignments { pub metrics: TopicAssignment, /// Stacktrace topic name pub profiles: TopicAssignment, + /// Replay Events topic name. + pub replay_events: TopicAssignment, /// Recordings topic name. pub replay_recordings: TopicAssignment, } @@ -823,6 +827,7 @@ impl TopicAssignments { KafkaTopic::Sessions => &self.sessions, KafkaTopic::Metrics => &self.metrics, KafkaTopic::Profiles => &self.profiles, + KafkaTopic::ReplayEvents => &self.replay_events, KafkaTopic::ReplayRecordings => &self.replay_recordings, } } @@ -839,6 +844,7 @@ impl Default for TopicAssignments { sessions: "ingest-sessions".to_owned().into(), metrics: "ingest-metrics".to_owned().into(), profiles: "profiles".to_owned().into(), + replay_events: "ingest-replay-events".to_owned().into(), replay_recordings: "ingest-replay-recordings".to_owned().into(), } } diff --git a/relay-quotas/src/quota.rs b/relay-quotas/src/quota.rs index 66e50ca7d3..a4f94f00c3 100644 --- a/relay-quotas/src/quota.rs +++ b/relay-quotas/src/quota.rs @@ -103,6 +103,7 @@ impl CategoryUnit { DataCategory::Default | DataCategory::Error | DataCategory::Transaction + | DataCategory::Replay | DataCategory::Security | DataCategory::Profile => Some(Self::Count), DataCategory::Attachment => Some(Self::Bytes), diff --git a/relay-server/src/actors/envelopes.rs b/relay-server/src/actors/envelopes.rs index 879fb9171a..053fc249f4 100644 --- a/relay-server/src/actors/envelopes.rs +++ b/relay-server/src/actors/envelopes.rs @@ -1006,10 +1006,11 @@ impl EnvelopeProcessor { }); } - fn process_replay_recordings(&self, state: &mut ProcessEnvelopeState) { + /// Remove replays if the feature flag is not enabled + fn process_replays(&self, state: &mut ProcessEnvelopeState) { let replays_enabled = state.project_state.has_feature(Feature::Replays); state.envelope.retain_items(|item| match item.ty() { - ItemType::ReplayRecording => replays_enabled, + ItemType::ReplayEvent | ItemType::ReplayRecording => replays_enabled, _ => true, }); } @@ -1329,6 +1330,7 @@ impl EnvelopeProcessor { ItemType::MetricBuckets => false, ItemType::ClientReport => false, ItemType::Profile => false, + ItemType::ReplayEvent => false, ItemType::ReplayRecording => false, // Without knowing more, `Unknown` items are allowed to be repeated ItemType::Unknown(_) => false, @@ -1843,7 +1845,7 @@ impl EnvelopeProcessor { self.process_client_reports(state); self.process_user_reports(state); self.process_profiles(state); - self.process_replay_recordings(state); + self.process_replays(state); if state.creates_event() { if_processing!({ diff --git a/relay-server/src/actors/project.rs b/relay-server/src/actors/project.rs index 4c034cf18e..f67ff2acd9 100644 --- a/relay-server/src/actors/project.rs +++ b/relay-server/src/actors/project.rs @@ -54,6 +54,7 @@ pub enum Feature { /// Enables ingestion and normalization of profiles. #[serde(rename = "organizations:profiling")] Profiling, + /// Enables ingestion of Session Replays (Replay Recordings and Replay Events) #[serde(rename = "organizations:session-replay")] Replays, diff --git a/relay-server/src/actors/store.rs b/relay-server/src/actors/store.rs index 6e3c842950..689f8b70a2 100644 --- a/relay-server/src/actors/store.rs +++ b/relay-server/src/actors/store.rs @@ -59,6 +59,7 @@ struct Producers { sessions: Producer, metrics: Producer, profiles: Producer, + replay_events: Producer, replay_recordings: Producer, } @@ -77,6 +78,7 @@ impl Producers { KafkaTopic::Sessions => Some(&self.sessions), KafkaTopic::Metrics => Some(&self.metrics), KafkaTopic::Profiles => Some(&self.profiles), + KafkaTopic::ReplayEvents => Some(&self.replay_events), KafkaTopic::ReplayRecordings => Some(&self.replay_recordings), } } @@ -140,6 +142,11 @@ impl StoreForwarder { &mut reused_producers, KafkaTopic::ReplayRecordings, )?, + replay_events: make_producer( + &*config, + &mut reused_producers, + KafkaTopic::ReplayEvents, + )?, }; Ok(Self { config, producers }) @@ -445,6 +452,28 @@ impl StoreForwarder { Ok(()) } + fn produce_replay_event( + &self, + replay_id: EventId, + project_id: ProjectId, + start_time: Instant, + item: &Item, + ) -> Result<(), StoreError> { + let message = ReplayEventKafkaMessage { + replay_id, + project_id, + start_time: UnixTimestamp::from_instant(start_time).as_secs(), + payload: item.payload(), + }; + relay_log::trace!("Sending replay event to Kafka"); + self.produce(KafkaTopic::ReplayEvents, KafkaMessage::ReplayEvent(message))?; + metric!( + counter(RelayCounters::ProcessingMessageProduced) += 1, + event_type = "replay_event" + ); + Ok(()) + } + fn produce_replay_recording_chunks( &self, replay_id: EventId, @@ -588,6 +617,17 @@ struct EventKafkaMessage { /// Attachments that are potentially relevant for processing. attachments: Vec, } +#[derive(Clone, Debug, Serialize)] +struct ReplayEventKafkaMessage { + /// Raw event payload. + payload: Bytes, + /// Time at which the event was received by Relay. + start_time: u64, + /// The event id. + replay_id: EventId, + /// The project id for the current event. + project_id: ProjectId, +} /// Container payload for chunks of attachments. #[derive(Debug, Serialize)] @@ -713,6 +753,7 @@ enum KafkaMessage { Session(SessionKafkaMessage), Metric(MetricKafkaMessage), Profile(ProfileKafkaMessage), + ReplayEvent(ReplayEventKafkaMessage), ReplayRecording(ReplayRecordingKafkaMessage), ReplayRecordingChunk(ReplayRecordingChunkKafkaMessage), } @@ -727,6 +768,7 @@ impl KafkaMessage { KafkaMessage::Session(_) => "session", KafkaMessage::Metric(_) => "metric", KafkaMessage::Profile(_) => "profile", + KafkaMessage::ReplayEvent(_) => "replay_event", KafkaMessage::ReplayRecording(_) => "replay_recording", KafkaMessage::ReplayRecordingChunk(_) => "replay_recording_chunk", } @@ -742,6 +784,7 @@ impl KafkaMessage { Self::Session(_message) => Uuid::nil(), // Explicit random partitioning for sessions Self::Metric(_message) => Uuid::nil(), // TODO(ja): Determine a partitioning key Self::Profile(_message) => Uuid::nil(), + Self::ReplayEvent(message) => message.replay_id.0, Self::ReplayRecording(message) => message.replay_id.0, Self::ReplayRecordingChunk(message) => message.replay_id.0, }; @@ -879,7 +922,12 @@ impl Handler for StoreForwarder { event_type = "replay_recording" ); } - + ItemType::ReplayEvent => self.produce_replay_event( + event_id.ok_or(StoreError::NoEventId)?, + scoping.project_id, + start_time, + item, + )?, _ => {} } } diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 50eec3b33c..4a10aa19fc 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -103,6 +103,8 @@ pub enum ItemType { ClientReport, /// Profile event payload encoded in JSON Profile, + /// Replay metadata and breadcrumb payload + ReplayEvent, /// Replay Recording data ReplayRecording, /// A new item type that is yet unknown by this version of Relay. @@ -144,6 +146,7 @@ impl fmt::Display for ItemType { Self::MetricBuckets => write!(f, "metric_buckets"), Self::ClientReport => write!(f, "client_report"), Self::Profile => write!(f, "profile"), + Self::ReplayEvent => write!(f, "replay_event"), Self::ReplayRecording => write!(f, "replay_recording"), Self::Unknown(s) => s.fmt(f), } @@ -169,6 +172,7 @@ impl std::str::FromStr for ItemType { "metric_buckets" => Self::MetricBuckets, "client_report" => Self::ClientReport, "profile" => Self::Profile, + "replay_event" => Self::ReplayEvent, "replay_recording" => Self::ReplayRecording, other => Self::Unknown(other.to_owned()), }) @@ -625,6 +629,7 @@ impl Item { | ItemType::Metrics | ItemType::MetricBuckets | ItemType::ClientReport + | ItemType::ReplayEvent | ItemType::ReplayRecording | ItemType::Profile => false, @@ -647,6 +652,7 @@ impl Item { ItemType::RawSecurity => true, ItemType::UnrealReport => true, ItemType::UserReport => true, + ItemType::ReplayEvent => true, ItemType::Session => false, ItemType::Sessions => false, ItemType::Metrics => false, diff --git a/relay-server/src/utils/rate_limits.rs b/relay-server/src/utils/rate_limits.rs index 8836d4be42..44a4e6219c 100644 --- a/relay-server/src/utils/rate_limits.rs +++ b/relay-server/src/utils/rate_limits.rs @@ -104,6 +104,7 @@ fn infer_event_category(item: &Item) -> Option { ItemType::FormData => None, ItemType::UserReport => None, ItemType::Profile => None, + ItemType::ReplayEvent => None, ItemType::ReplayRecording => None, ItemType::ClientReport => None, ItemType::Unknown(_) => None, diff --git a/relay-server/src/utils/sizes.rs b/relay-server/src/utils/sizes.rs index 227360bc32..90863514be 100644 --- a/relay-server/src/utils/sizes.rs +++ b/relay-server/src/utils/sizes.rs @@ -25,6 +25,7 @@ pub fn check_envelope_size_limits(config: &Config, envelope: &Envelope) -> bool ItemType::Event | ItemType::Transaction | ItemType::Security + | ItemType::ReplayEvent | ItemType::RawSecurity | ItemType::FormData => event_size += item.len(), ItemType::Attachment | ItemType::UnrealReport | ItemType::ReplayRecording => { diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 14f4c75cb7..91193ee636 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -26,6 +26,7 @@ replay_recordings_consumer, sessions_consumer, metrics_consumer, + replay_events_consumer, ) # noqa diff --git a/tests/integration/fixtures/__init__.py b/tests/integration/fixtures/__init__.py index 6948c86927..78f63996c2 100644 --- a/tests/integration/fixtures/__init__.py +++ b/tests/integration/fixtures/__init__.py @@ -166,7 +166,6 @@ def send_envelope(self, project_id, envelope, headers=None, dsn_key_idx=0): "X-Sentry-Auth": self.get_auth_header(project_id, dsn_key_idx), **(headers or {}), } - response = self.post(url, headers=headers, data=envelope.serialize()) response.raise_for_status() @@ -196,6 +195,14 @@ def send_transaction(self, project_id, payload, item_headers=None): self.send_envelope(project_id, envelope) + def send_replay_event(self, project_id, payload, item_headers=None): + envelope = Envelope() + envelope.add_item(Item(payload=PayloadRef(json=payload), type="replay_event")) + if envelope.headers is None: + envelope.headers = {} + + self.send_envelope(project_id, envelope) + def send_session_aggregates(self, project_id, payload): envelope = Envelope() envelope.add_item(Item(payload=PayloadRef(json=payload), type="sessions")) diff --git a/tests/integration/fixtures/processing.py b/tests/integration/fixtures/processing.py index f042c01f16..482c46dfc6 100644 --- a/tests/integration/fixtures/processing.py +++ b/tests/integration/fixtures/processing.py @@ -51,6 +51,7 @@ def inner(options=None): "outcomes": get_topic_name("outcomes"), "sessions": get_topic_name("sessions"), "metrics": get_topic_name("metrics"), + "replay_events": get_topic_name("replay_events"), "replay_recordings": get_topic_name("replay_recordings"), } @@ -286,6 +287,11 @@ def replay_recordings_consumer(kafka_consumer): return lambda: ReplayRecordingsConsumer(*kafka_consumer("replay_recordings")) +@pytest.fixture +def replay_events_consumer(kafka_consumer): + return lambda: ReplayEventsConsumer(*kafka_consumer("replay_events")) + + class MetricsConsumer(ConsumerBase): def get_metric(self, timeout=None): message = self.poll(timeout=timeout) @@ -360,3 +366,14 @@ def get_individual_replay(self): v = msgpack.unpackb(message.value(), raw=False, use_list=False) assert v["type"] == "replay_recording", v["type"] return v + + +class ReplayEventsConsumer(ConsumerBase): + def get_replay_event(self): + message = self.poll() + assert message is not None + assert message.error() is None + + event = msgpack.unpackb(message.value(), raw=False, use_list=False) + assert event["type"] == "replay_event" + return json.loads(event["payload"].decode("utf8")), event diff --git a/tests/integration/test_replay_events.py b/tests/integration/test_replay_events.py new file mode 100644 index 0000000000..384711d1f9 --- /dev/null +++ b/tests/integration/test_replay_events.py @@ -0,0 +1,57 @@ +def generate_replay_event(): + return { + "replay_id": "d2132d31b39445f1938d7e21b6bf0ec4", + "seq_id": 0, + "type": "replay_event", + "transaction": "/organizations/:orgId/performance/:eventSlug/", + "start_timestamp": 1597976392.6542819, + "timestamp": 1597976400.6189718, + "contexts": { + "trace": { + "trace_id": "4C79F60C11214EB38604F4AE0781BFB2", + "span_id": "FA90FDEAD5F74052", + "type": "trace", + } + }, + } + + +def test_replay_event_with_processing( + mini_sentry, relay_with_processing, replay_events_consumer +): + relay = relay_with_processing() + mini_sentry.add_basic_project_config( + 42, extra={"config": {"features": ["organizations:session-replay"]}} + ) + + replay_events_consumer = replay_events_consumer() + + replay_item = generate_replay_event() + + relay.send_replay_event(42, replay_item) + + replay_event, _ = replay_events_consumer.get_replay_event() + assert ( + replay_event["transaction"] == "/organizations/:orgId/performance/:eventSlug/" + ) + assert "trace" in replay_event["contexts"] + assert replay_event["seq_id"] == 0 + + +def test_replay_events_without_processing(mini_sentry, relay_chain): + relay = relay_chain(min_relay_version="latest") + + project_id = 42 + mini_sentry.add_basic_project_config( + project_id, extra={"config": {"features": ["organizations:session-replay"]}} + ) + + replay_item = generate_replay_event() + + relay.send_replay_event(42, replay_item) + + envelope = mini_sentry.captured_events.get(timeout=1) + assert len(envelope.items) == 1 + + replay_event = envelope.items[0] + assert replay_event.type == "replay_event"