diff --git a/CHANGELOG.md b/CHANGELOG.md index edcb4c3b1e..77e2d37e57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ - Apply clock drift correction for timestamps that are too far in the past or future. This fixes a bug where broken transaction timestamps would lead to negative durations. ([#634](https://github.com/getsentry/relay/pull/634)) - Respond with status code `200 OK` to rate limited minidump and UE4 requests. Third party clients otherwise retry those requests, leading to even more load. ([#646](https://github.com/getsentry/relay/pull/646), [#647](https://github.com/getsentry/relay/pull/647)) - Ingested unreal crash reports no longer have a `misc_primary_cpu_brand` key with GPU information set in the Unreal context. ([#650](https://github.com/getsentry/relay/pull/650)) +- Fix ingestion of forwarded outcomes in processing Relays. Previously, `emit_outcomes` had to be set explicitly to enable this. ([#653](https://github.com/getsentry/relay/pull/653)) **Internal**: diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index eb5618c8cb..f52a0eacdb 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -1094,9 +1094,12 @@ impl Config { self.values.relay.tls_identity_password.as_deref() } - /// Returns the emit_outcomes flag + /// Returns whether this Relay should emit outcomes. + /// + /// This is `true` either if `outcomes.emit_outcomes` is explicitly enabled, or if this Relay is + /// in processing mode. pub fn emit_outcomes(&self) -> bool { - self.values.outcomes.emit_outcomes + self.values.outcomes.emit_outcomes || self.values.processing.enabled } /// Returns the maximum number of outcomes that are batched before being sent diff --git a/tests/integration/fixtures/mini_sentry.py b/tests/integration/fixtures/mini_sentry.py index 1b306113a7..e4b3100ae1 100644 --- a/tests/integration/fixtures/mini_sentry.py +++ b/tests/integration/fixtures/mini_sentry.py @@ -156,6 +156,10 @@ def public_keys(): @app.route("/api/0/relays/outcomes/", methods=["POST"]) def outcomes(): + """ + Mock endpoint for outcomes. SENTRY DOES NOT IMPLEMENT THIS ENDPOINT! This is just used to + verify Relay's batching behavior. + """ relay_id = flask_request.headers["x-sentry-relay-id"] if relay_id not in authenticated_relays: abort(403, "relay not registered") diff --git a/tests/integration/test_outcome.py b/tests/integration/test_outcome.py index 1f60e64eb4..ca600da4a9 100644 --- a/tests/integration/test_outcome.py +++ b/tests/integration/test_outcome.py @@ -10,7 +10,9 @@ HOUR_MILLISEC = 1000 * 3600 -def test_outcomes_processing(relay_with_processing, kafka_consumer, mini_sentry): +def test_outcomes_processing( + relay_with_processing, kafka_consumer, mini_sentry, outcomes_consumer +): """ Tests outcomes are sent to the kafka outcome topic @@ -19,7 +21,8 @@ def test_outcomes_processing(relay_with_processing, kafka_consumer, mini_sentry) """ relay = relay_with_processing() relay.wait_relay_healthcheck() - outcomes = kafka_consumer("outcomes") + + outcomes_consumer = outcomes_consumer() # hack mini_sentry configures project 42 (remove the configuration so that we get an error for project 42) mini_sentry.project_configs[42] = None @@ -33,40 +36,23 @@ def test_outcomes_processing(relay_with_processing, kafka_consumer, mini_sentry) "extra": {"msg_text": message_text}, }, ) + start = datetime.utcnow() - # polling first message can take a few good seconds - outcome = outcomes.poll(timeout=20) + outcome = outcomes_consumer.get_outcome() end = datetime.utcnow() - assert outcome is not None - outcome = outcome.value() - outcome = json.loads(outcome) - # set defaults to allow for results that elide empty fields - default = { - "org_id": None, - "key_id": None, - "reason": None, - "event_id": None, - "remote_addr": None, - } - outcome = {**default, **outcome} - # deal with the timestamp separately ( we can't control it exactly) + assert outcome["project_id"] == 42 + assert outcome["event_id"] == event_id + assert outcome.get("org_id") is None + assert outcome.get("key_id") is None + assert outcome["outcome"] == 3 + assert outcome["reason"] == "project_id" + assert outcome["remote_addr"] == "127.0.0.1" + + # deal with the timestamp separately (we can't control it exactly) timestamp = outcome.get("timestamp") - del outcome["timestamp"] - assert timestamp is not None event_emission = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ") assert start <= event_emission <= end - # reconstruct the expected message without timestamp - expected = { - "project_id": 42, - "event_id": event_id, - "org_id": None, - "key_id": None, - "outcome": 3, - "reason": "project_id", - "remote_addr": "127.0.0.1", - } - assert outcome == expected def _send_event(relay): @@ -290,8 +276,14 @@ def test_outcome_source(relay, mini_sentry): assert outcome.get("source") == "my-layer" -@pytest.mark.parametrize("num_intermediate_relays", [1, 4]) -def test_outcome_forwarding(relay, mini_sentry, num_intermediate_relays): +@pytest.mark.parametrize("num_intermediate_relays", [1, 3]) +def test_outcome_forwarding( + mini_sentry, + relay, + relay_with_processing, + outcomes_consumer, + num_intermediate_relays, +): """ Tests that Relay forwards outcomes from a chain of relays @@ -300,7 +292,20 @@ def test_outcome_forwarding(relay, mini_sentry, num_intermediate_relays): are properly forwarded up to sentry. """ - config = { + processing_config = { + "outcomes": { + "emit_outcomes": False, # The default, overridden by processing.enabled: true + "batch_size": 1, + "batch_interval": 1, + "source": "processing-layer", + } + } + + # The innermost Relay needs to be in processing mode + upstream = relay_with_processing(processing_config) + upstream.wait_relay_healthcheck() + + intermediate_config = { "outcomes": { "emit_outcomes": True, "batch_size": 1, @@ -309,14 +314,13 @@ def test_outcome_forwarding(relay, mini_sentry, num_intermediate_relays): } } - upstream = mini_sentry # build a chain of identical relays for i in range(num_intermediate_relays): - intermediate_relay = relay(upstream, config) - upstream = intermediate_relay + upstream = relay(upstream, intermediate_config) + upstream.wait_relay_healthcheck() # mark the downstream relay so we can identify outcomes originating from it - config_downstream = deepcopy(config) + config_downstream = deepcopy(intermediate_config) config_downstream["outcomes"]["source"] = "downstream-layer" downstream_relay = relay(upstream, config_downstream) @@ -327,13 +331,7 @@ def test_outcome_forwarding(relay, mini_sentry, num_intermediate_relays): event_id = _send_event(downstream_relay) - outcomes_batch = mini_sentry.captured_outcomes.get(timeout=0.2) - assert mini_sentry.captured_outcomes.qsize() == 0 # we had only one batch - - outcomes = outcomes_batch.get("outcomes") - assert len(outcomes) == 1 - - outcome = outcomes[0] - + outcomes_consumer = outcomes_consumer() + outcome = outcomes_consumer.get_outcome() assert outcome.get("source") == "downstream-layer" assert outcome.get("event_id") == event_id