Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(profiling): Support inbound filters for profiles #4176

Merged
merged 11 commits into from
Oct 30, 2024
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
- Allow profile chunks without release. ([#4155](https://github.com/getsentry/relay/pull/4155))
- Add validation for timestamps sent from the future. ([#4163](https://github.com/getsentry/relay/pull/4163))

**Features:**

- Support inbound filters for profiles. ([#4176](https://github.com/getsentry/relay/pull/4176))

**Internal:**

- Add a metric that counts span volume in the root project for dynamic sampling (`c:spans/count_per_root_project@none`). ([#4134](https://github.com/getsentry/relay/pull/4134))
Expand Down
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions relay-profiling/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ chrono = { workspace = true }
data-encoding = { workspace = true }
itertools = { workspace = true }
relay-base-schema = { workspace = true }
relay-dynamic-config = { workspace = true }
relay-event-schema = { workspace = true }
relay-filter = { workspace = true }
relay-log = { workspace = true }
relay-metrics = { workspace = true }
relay-protocol = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_path_to_error = { workspace = true }
thiserror = { workspace = true }
url = { workspace = true }

[dev-dependencies]
insta = { workspace = true }
4 changes: 4 additions & 0 deletions relay-profiling/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use relay_filter::FilterStatKey;

use thiserror::Error;

#[derive(Debug, Error)]
Expand Down Expand Up @@ -32,6 +34,8 @@ pub enum ProfileError {
DurationIsTooLong,
#[error("duration is zero")]
DurationIsZero,
#[error("filtered profile")]
Filtered(FilterStatKey),
}

impl ProfileError {
Expand Down
114 changes: 108 additions & 6 deletions relay-profiling/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,15 @@
//! Relay will forward those profiles encoded with `msgpack` after unpacking them if needed and push a message on Kafka.

use std::error::Error;
use std::net::IpAddr;
use std::time::Duration;
use url::Url;

use relay_base_schema::project::ProjectId;
use relay_event_schema::protocol::{Event, EventId};
use relay_dynamic_config::GlobalConfig;
use relay_event_schema::protocol::{Csp, Event, EventId, Exception, LogEntry, Values};
use relay_filter::{Filterable, ProjectFiltersConfig};
use relay_protocol::{Getter, Val};
use serde::Deserialize;
use serde_json::Deserializer;

Expand Down Expand Up @@ -75,10 +80,55 @@ struct MinimalProfile {
#[serde(alias = "profile_id", alias = "chunk_id")]
event_id: ProfileId,
platform: String,
release: Option<String>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we filter on more fields in the future? Users might be surprised when they configure a release filter, see that it works, and then try the same for ip_addr and it fails.

Copy link
Member Author

@Zylphrex Zylphrex Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For ip_addr, I see that it's pulled from the user for other event types. Meaning for a backend event, it'll use the frontend user IP. Is this a strict requirement? Because for continuous profiles, we can only send the ip of the server in some cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far the filter has only been applied to the end-user IP like you said, so in this case I would leave it empty (None) for continuous profiles.

#[serde(default)]
version: sample::Version,
}

impl Filterable for MinimalProfile {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly related to this PR, but it might be worth defaulting these methods in the trait.

fn csp(&self) -> Option<&Csp> {
None
}

fn exceptions(&self) -> Option<&Values<Exception>> {
None
}

fn ip_addr(&self) -> Option<&str> {
None
}

fn logentry(&self) -> Option<&LogEntry> {
None
}

fn release(&self) -> Option<&str> {
self.release.as_ref().map(|release| release.as_str())
Zylphrex marked this conversation as resolved.
Show resolved Hide resolved
}

fn transaction(&self) -> Option<&str> {
None
}

fn url(&self) -> Option<Url> {
None
}

fn user_agent(&self) -> Option<&str> {
None
}
}

impl Getter for MinimalProfile {
fn get_value(&self, path: &str) -> Option<Val<'_>> {
match path.strip_prefix("event.")? {
"release" => self.release.as_ref().map(|release| release.as_str().into()),
"platform" => Some(self.platform.as_str().into()),
_ => None,
}
}
}

fn minimal_profile_from_json(
payload: &[u8],
) -> Result<MinimalProfile, serde_path_to_error::Error<serde_json::Error>> {
Expand Down Expand Up @@ -139,7 +189,13 @@ pub fn parse_metadata(payload: &[u8], project_id: ProjectId) -> Result<ProfileId
Ok(profile.event_id)
}

pub fn expand_profile(payload: &[u8], event: &Event) -> Result<(ProfileId, Vec<u8>), ProfileError> {
pub fn expand_profile(
payload: &[u8],
event: &Event,
client_ip: Option<IpAddr>,
filter_settings: &ProjectFiltersConfig,
global_config: &GlobalConfig,
) -> Result<(ProfileId, Vec<u8>), ProfileError> {
let profile = match minimal_profile_from_json(payload) {
Ok(profile) => profile,
Err(err) => {
Expand All @@ -156,6 +212,16 @@ pub fn expand_profile(payload: &[u8], event: &Event) -> Result<(ProfileId, Vec<u
return Err(ProfileError::InvalidJson(err));
}
};

if let Err(filter_stat_key) = relay_filter::should_filter(
&profile,
client_ip,
filter_settings,
global_config.filters(),
) {
return Err(ProfileError::Filtered(filter_stat_key));
}

let transaction_metadata = extract_transaction_metadata(event);
let transaction_tags = extract_transaction_tags(event);
let processed_payload = match (profile.platform.as_str(), profile.version) {
Expand Down Expand Up @@ -200,7 +266,12 @@ pub fn expand_profile(payload: &[u8], event: &Event) -> Result<(ProfileId, Vec<u
}
}

pub fn expand_profile_chunk(payload: &[u8]) -> Result<Vec<u8>, ProfileError> {
pub fn expand_profile_chunk(
payload: &[u8],
client_ip: Option<IpAddr>,
filter_settings: &ProjectFiltersConfig,
global_config: &GlobalConfig,
) -> Result<Vec<u8>, ProfileError> {
let profile = match minimal_profile_from_json(payload) {
Ok(profile) => profile,
Err(err) => {
Expand All @@ -212,6 +283,16 @@ pub fn expand_profile_chunk(payload: &[u8]) -> Result<Vec<u8>, ProfileError> {
return Err(ProfileError::InvalidJson(err));
}
};

if let Err(filter_stat_key) = relay_filter::should_filter(
&profile,
client_ip,
filter_settings,
global_config.filters(),
) {
return Err(ProfileError::Filtered(filter_stat_key));
}

match (profile.platform.as_str(), profile.version) {
("android", _) => android::chunk::parse(payload),
(_, sample::Version::V2) => {
Expand Down Expand Up @@ -246,18 +327,39 @@ mod tests {
#[test]
fn test_expand_profile_with_version() {
let payload = include_bytes!("../tests/fixtures/sample/v1/valid.json");
assert!(expand_profile(payload, &Event::default()).is_ok());
assert!(expand_profile(
payload,
&Event::default(),
None,
&ProjectFiltersConfig::default(),
&GlobalConfig::default()
)
.is_ok());
}

#[test]
fn test_expand_profile_with_version_and_segment_id() {
let payload = include_bytes!("../tests/fixtures/sample/v1/segment_id.json");
assert!(expand_profile(payload, &Event::default()).is_ok());
assert!(expand_profile(
payload,
&Event::default(),
None,
&ProjectFiltersConfig::default(),
&GlobalConfig::default()
)
.is_ok());
}

#[test]
fn test_expand_profile_without_version() {
let payload = include_bytes!("../tests/fixtures/android/legacy/roundtrip.json");
assert!(expand_profile(payload, &Event::default()).is_ok());
assert!(expand_profile(
payload,
&Event::default(),
None,
&ProjectFiltersConfig::default(),
&GlobalConfig::default()
)
.is_ok());
}
}
1 change: 1 addition & 0 deletions relay-profiling/src/outcomes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ pub fn discard_reason(err: ProfileError) -> &'static str {
ProfileError::TooManyProfiles => "profiling_too_many_profiles",
ProfileError::DurationIsTooLong => "profiling_duration_is_too_long",
ProfileError::DurationIsZero => "profiling_duration_is_zero",
ProfileError::Filtered(_) => "profiling_filtered",
}
}
10 changes: 7 additions & 3 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1744,7 +1744,7 @@ impl EnvelopeProcessorService {
// Process profiles before dropping the transaction, if necessary.
// Before metric extraction to make sure the profile count is reflected correctly.
let profile_id = match keep_profiles {
true => profile::process(state),
true => profile::process(state, &global_config),
false => profile_id,
};
// Extract metrics here, we're about to drop the event/transaction.
Expand All @@ -1771,7 +1771,7 @@ impl EnvelopeProcessorService {

if_processing!(self.inner.config, {
// Process profiles before extracting metrics, to make sure they are removed if they are invalid.
let profile_id = profile::process(state);
let profile_id = profile::process(state, &global_config);
profile::transfer_id(state, profile_id);

// Always extract metrics in processing Relays for sampled items.
Expand Down Expand Up @@ -1811,7 +1811,11 @@ impl EnvelopeProcessorService {
) -> Result<(), ProcessingError> {
profile_chunk::filter(state);
if_processing!(self.inner.config, {
profile_chunk::process(state, &self.inner.config);
profile_chunk::process(
state,
&self.inner.global_config.current(),
&self.inner.config,
);
});
Ok(())
}
Expand Down
38 changes: 33 additions & 5 deletions relay-server/src/services/processor/profile.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
//! Profiles related processor code.
use std::net::IpAddr;

use relay_dynamic_config::Feature;
use relay_dynamic_config::{Feature, GlobalConfig};

use relay_base_schema::events::EventType;
use relay_config::Config;
use relay_event_schema::protocol::{Contexts, Event, ProfileContext};
use relay_filter::ProjectFiltersConfig;
use relay_profiling::{ProfileError, ProfileId};
use relay_protocol::Annotated;

Expand Down Expand Up @@ -88,7 +90,13 @@ pub fn transfer_id(
}

/// Processes profiles and set the profile ID in the profile context on the transaction if successful.
pub fn process(state: &mut ProcessEnvelopeState<TransactionGroup>) -> Option<ProfileId> {
pub fn process(
state: &mut ProcessEnvelopeState<TransactionGroup>,
global_config: &GlobalConfig,
) -> Option<ProfileId> {
let client_ip = state.managed_envelope.envelope().meta().client_addr();
let filter_settings = &state.project_info.config.filter_settings;

let profiling_enabled = state.project_info.has_feature(Feature::Profiling);
let mut profile_id = None;

Expand All @@ -104,7 +112,14 @@ pub fn process(state: &mut ProcessEnvelopeState<TransactionGroup>) -> Option<Pro
return ItemAction::DropSilently;
};

match expand_profile(item, event, &state.config) {
match expand_profile(
item,
event,
&state.config,
client_ip,
filter_settings,
global_config,
) {
Ok(id) => {
profile_id = Some(id);
ItemAction::Keep
Expand All @@ -119,8 +134,21 @@ pub fn process(state: &mut ProcessEnvelopeState<TransactionGroup>) -> Option<Pro
}

/// Transfers transaction metadata to profile and check its size.
fn expand_profile(item: &mut Item, event: &Event, config: &Config) -> Result<ProfileId, Outcome> {
match relay_profiling::expand_profile(&item.payload(), event) {
fn expand_profile(
item: &mut Item,
event: &Event,
config: &Config,
client_ip: Option<IpAddr>,
filter_settings: &ProjectFiltersConfig,
global_config: &GlobalConfig,
) -> Result<ProfileId, Outcome> {
match relay_profiling::expand_profile(
&item.payload(),
event,
client_ip,
filter_settings,
global_config,
) {
Ok((id, payload)) => {
if payload.len() <= config.max_profile_size() {
item.set_payload(ContentType::Json, payload);
Expand Down
22 changes: 19 additions & 3 deletions relay-server/src/services/processor/profile_chunk.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Profile chunks processor code.
use relay_dynamic_config::Feature;
use relay_dynamic_config::{Feature, GlobalConfig};

use crate::envelope::ItemType;
use crate::services::processor::ProcessEnvelopeState;
Expand All @@ -24,14 +24,27 @@ pub fn filter<G>(state: &mut ProcessEnvelopeState<G>) {

/// Processes profile chunks.
#[cfg(feature = "processing")]
pub fn process(state: &mut ProcessEnvelopeState<ProfileChunkGroup>, config: &Config) {
pub fn process(
state: &mut ProcessEnvelopeState<ProfileChunkGroup>,
global_config: &GlobalConfig,
config: &Config,
) {
let client_ip = state.managed_envelope.envelope().meta().client_addr();
let filter_settings = &state.project_info.config.filter_settings;

let continuous_profiling_enabled = state.project_info.has_feature(Feature::ContinuousProfiling);
state.managed_envelope.retain_items(|item| match item.ty() {
ItemType::ProfileChunk => {
if !continuous_profiling_enabled {
return ItemAction::DropSilently;
}
match relay_profiling::expand_profile_chunk(&item.payload()) {

match relay_profiling::expand_profile_chunk(
&item.payload(),
client_ip,
filter_settings,
global_config,
) {
Ok(payload) => {
if payload.len() <= config.max_profile_size() {
item.set_payload(ContentType::Json, payload);
Expand All @@ -44,6 +57,9 @@ pub fn process(state: &mut ProcessEnvelopeState<ProfileChunkGroup>, config: &Con
)))
}
}
Err(relay_profiling::ProfileError::Filtered(filter_stat_key)) => {
ItemAction::Drop(Outcome::Filtered(filter_stat_key))
}
Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(err),
))),
Expand Down
Loading