Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Track outcome source #604

Merged
merged 22 commits into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ pub struct OverridableConfig {
pub secret_key: Option<String>,
/// The public key of the relay
pub public_key: Option<String>,
/// Outcome source
pub outcome_source: Option<String>,
}

/// The relay credentials
Expand Down Expand Up @@ -719,6 +721,9 @@ pub struct Outcomes {
/// The maximum time interval (in milliseconds) that an outcome may be batched
/// via http to the upstream (only applies to non processing relays)
pub max_outcome_interval_millsec: u64,
/// Defines the source string registered in the outcomes originating from
/// this Relay ( typically something like the region and or the layer )
RaduW marked this conversation as resolved.
Show resolved Hide resolved
pub source: Option<String>,
}

impl Default for Outcomes {
Expand All @@ -727,6 +732,7 @@ impl Default for Outcomes {
emit_outcomes: false,
max_outcome_batch_size: 1000,
max_outcome_interval_millsec: 500,
source: None,
}
}
}
Expand Down Expand Up @@ -834,7 +840,7 @@ impl Config {
/// command line parameters)
pub fn apply_override(
&mut self,
overrides: OverridableConfig,
mut overrides: OverridableConfig,
) -> Result<&mut Self, ConfigError> {
let relay = &mut self.values.relay;

Expand Down Expand Up @@ -909,6 +915,10 @@ impl Config {
} else {
None
};
let mut outcomes = &mut self.values.outcomes;
if overrides.outcome_source.is_some() {
outcomes.source = overrides.outcome_source.take();
}

if let Some(credentials) = &mut self.credentials {
//we have existing credentials we may override some entries
Expand Down Expand Up @@ -1080,6 +1090,11 @@ impl Config {
Duration::from_millis(self.values.outcomes.max_outcome_interval_millsec)
}

/// The originating source of the outcome
pub fn outcome_source(&self) -> Option<&str> {
self.values.outcomes.source.as_deref()
}

/// Returns the log level.
pub fn log_level_filter(&self) -> log::LevelFilter {
self.values.logging.level
Expand Down
17 changes: 12 additions & 5 deletions relay-server/src/actors/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,12 @@ pub struct TrackRawOutcome {
/// The client ip address.
#[serde(default, skip_serializing_if = "Option::is_none")]
remote_addr: Option<String>,
/// The source of the outcome (which Relay sent it)
#[serde(default, skip_serializing_if = "Option::is_none")]
source: Option<String>,
}

impl From<&TrackOutcome> for TrackRawOutcome {
fn from(msg: &TrackOutcome) -> Self {
impl TrackRawOutcome {
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
fn from_outcome(msg: &TrackOutcome, config: &Config) -> Self {
jan-auer marked this conversation as resolved.
Show resolved Hide resolved
let reason = match msg.outcome.to_reason() {
None => None,
Some(reason) => Some(reason.to_string()),
Expand All @@ -299,6 +301,10 @@ impl From<&TrackOutcome> for TrackRawOutcome {
id => Some(id),
};

// since TrackOutcome objects come only from this Relay (and not any downstream
// Relays), set the source to whatever our current outcome source is.
let source = config.outcome_source().map(str::to_owned);

TrackRawOutcome {
timestamp,
org_id,
Expand All @@ -308,6 +314,7 @@ impl From<&TrackOutcome> for TrackRawOutcome {
reason,
event_id: msg.event_id,
remote_addr: msg.remote_addr.map(|addr| addr.to_string()),
source,
}
}
}
Expand Down Expand Up @@ -497,7 +504,7 @@ mod processing {
type Result = Result<(), OutcomeError>;

fn handle(&mut self, message: TrackOutcome, _ctx: &mut Self::Context) -> Self::Result {
self.handle(TrackRawOutcome::from(&message), _ctx)
self.handle(TrackRawOutcome::from_outcome(&message, &self.config), _ctx)
}
}

Expand Down Expand Up @@ -565,7 +572,7 @@ mod non_processing {
type Result = Result<(), OutcomeError>;

fn handle(&mut self, message: TrackOutcome, _ctx: &mut Self::Context) -> Self::Result {
self.handle(TrackRawOutcome::from(&message), _ctx)
self.handle(TrackRawOutcome::from_outcome(&message, &self.config), _ctx)
}
}
}
2 changes: 2 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ pub fn extract_config_args(matches: &ArgMatches) -> OverridableConfig {
id: matches.value_of("id").map(str::to_owned),
public_key: matches.value_of("public_key").map(str::to_owned),
secret_key: matches.value_of("secret_key").map(str::to_owned),
outcome_source: matches.value_of("source_id").map(str::to_owned),
}
}

Expand All @@ -104,6 +105,7 @@ pub fn extract_config_env_vars() -> OverridableConfig {
id: env::var("RELAY_ID").ok(),
public_key: env::var("RELAY_PUBLIC_KEY").ok(),
secret_key: env::var("RELAY_SECRET_KEY").ok(),
outcome_source: None, //already extracted in params
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/cliapp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ pub fn make_app() -> App<'static, 'static> {
.takes_value(true)
.long("redis-url")
.help("Redis server URL."),
)
.arg(
Arg::with_name("source_id")
.value_name("SOURCE_ID")
.takes_value(true)
.long("source-id")
.env("RELAY_SOURCE_ID")
.help("Names the current relay in the outcome source."),
),
)
.subcommand(
Expand Down
33 changes: 32 additions & 1 deletion tests/integration/test_outcome.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def test_outcomes_non_processing_max_batch_time(relay, mini_sentry):
for i in range(events_to_send):
event_id = _send_event(relay)
event_ids.add(event_id)
time.sleep(0.002) # sleep more than the batch time
time.sleep(0.01) # sleep more than the batch time

# we should get one batch per event sent
batches = []
Expand All @@ -186,6 +186,37 @@ def test_outcomes_non_processing_max_batch_time(relay, mini_sentry):
assert outcomes[0].get("event_id") in event_ids # a known event id


def test_outcome_source(relay, mini_sentry):
"""
Test that the source is picked from configuration and passed in outcomes
"""
config = {
"outcomes": {
"emit_outcomes": True,
"max_outcome_batch_size": 1,
"max_outcome_interval_millsec": 1,
"source": "my-layer",
}
}

relay = relay(mini_sentry, config)
relay.wait_relay_healthcheck()
# hack mini_sentry configures project 42 (remove the configuration so that we get an error for project 42)
mini_sentry.project_configs[42] = None

event_id = _send_event(relay)

outcomes_batch = mini_sentry.captured_outcomes.get(timeout=0.2)
assert mini_sentry.captured_outcomes.qsize() == 0 # we had only one batch

outcomes = outcomes_batch.get("outcomes")
assert len(outcomes) == 1

outcome = outcomes[0]

assert outcome.get("source") == "my-layer"


def test_outcomes_non_processing_batching(relay, mini_sentry):
"""
Test that outcomes are batched according to max size.
Expand Down