Skip to content

Commit

Permalink
fix(server): Forward outcomes in processing mode (#653)
Browse files Browse the repository at this point in the history
The outcome endpoint checks for `emit_outcomes`, which needs to return
`true` in processing mode. This allows all components in Relay to check
a single flag.
  • Loading branch information
jan-auer authored Jul 8, 2020
1 parent 69b6bd5 commit 7570ba0
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- Apply clock drift correction for timestamps that are too far in the past or future. This fixes a bug where broken transaction timestamps would lead to negative durations. ([#634](https://github.com/getsentry/relay/pull/634))
- Respond with status code `200 OK` to rate limited minidump and UE4 requests. Third party clients otherwise retry those requests, leading to even more load. ([#646](https://github.com/getsentry/relay/pull/646), [#647](https://github.com/getsentry/relay/pull/647))
- Ingested unreal crash reports no longer have a `misc_primary_cpu_brand` key with GPU information set in the Unreal context. ([#650](https://github.com/getsentry/relay/pull/650))
- Fix ingestion of forwarded outcomes in processing Relays. Previously, `emit_outcomes` had to be set explicitly to enable this. ([#653](https://github.com/getsentry/relay/pull/653))

**Internal**:

Expand Down
7 changes: 5 additions & 2 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1094,9 +1094,12 @@ impl Config {
self.values.relay.tls_identity_password.as_deref()
}

/// Returns the emit_outcomes flag
/// Returns whether this Relay should emit outcomes.
///
/// This is `true` either if `outcomes.emit_outcomes` is explicitly enabled, or if this Relay is
/// in processing mode.
pub fn emit_outcomes(&self) -> bool {
self.values.outcomes.emit_outcomes
self.values.outcomes.emit_outcomes || self.values.processing.enabled
}

/// Returns the maximum number of outcomes that are batched before being sent
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/fixtures/mini_sentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ def public_keys():

@app.route("/api/0/relays/outcomes/", methods=["POST"])
def outcomes():
"""
Mock endpoint for outcomes. SENTRY DOES NOT IMPLEMENT THIS ENDPOINT! This is just used to
verify Relay's batching behavior.
"""
relay_id = flask_request.headers["x-sentry-relay-id"]
if relay_id not in authenticated_relays:
abort(403, "relay not registered")
Expand Down
88 changes: 43 additions & 45 deletions tests/integration/test_outcome.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
HOUR_MILLISEC = 1000 * 3600


def test_outcomes_processing(relay_with_processing, kafka_consumer, mini_sentry):
def test_outcomes_processing(
relay_with_processing, kafka_consumer, mini_sentry, outcomes_consumer
):
"""
Tests outcomes are sent to the kafka outcome topic
Expand All @@ -19,7 +21,8 @@ def test_outcomes_processing(relay_with_processing, kafka_consumer, mini_sentry)
"""
relay = relay_with_processing()
relay.wait_relay_healthcheck()
outcomes = kafka_consumer("outcomes")

outcomes_consumer = outcomes_consumer()
# hack mini_sentry configures project 42 (remove the configuration so that we get an error for project 42)
mini_sentry.project_configs[42] = None

Expand All @@ -33,40 +36,23 @@ def test_outcomes_processing(relay_with_processing, kafka_consumer, mini_sentry)
"extra": {"msg_text": message_text},
},
)

start = datetime.utcnow()
# polling first message can take a few good seconds
outcome = outcomes.poll(timeout=20)
outcome = outcomes_consumer.get_outcome()
end = datetime.utcnow()

assert outcome is not None
outcome = outcome.value()
outcome = json.loads(outcome)
# set defaults to allow for results that elide empty fields
default = {
"org_id": None,
"key_id": None,
"reason": None,
"event_id": None,
"remote_addr": None,
}
outcome = {**default, **outcome}
# deal with the timestamp separately ( we can't control it exactly)
assert outcome["project_id"] == 42
assert outcome["event_id"] == event_id
assert outcome.get("org_id") is None
assert outcome.get("key_id") is None
assert outcome["outcome"] == 3
assert outcome["reason"] == "project_id"
assert outcome["remote_addr"] == "127.0.0.1"

# deal with the timestamp separately (we can't control it exactly)
timestamp = outcome.get("timestamp")
del outcome["timestamp"]
assert timestamp is not None
event_emission = datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
assert start <= event_emission <= end
# reconstruct the expected message without timestamp
expected = {
"project_id": 42,
"event_id": event_id,
"org_id": None,
"key_id": None,
"outcome": 3,
"reason": "project_id",
"remote_addr": "127.0.0.1",
}
assert outcome == expected


def _send_event(relay):
Expand Down Expand Up @@ -290,8 +276,14 @@ def test_outcome_source(relay, mini_sentry):
assert outcome.get("source") == "my-layer"


@pytest.mark.parametrize("num_intermediate_relays", [1, 4])
def test_outcome_forwarding(relay, mini_sentry, num_intermediate_relays):
@pytest.mark.parametrize("num_intermediate_relays", [1, 3])
def test_outcome_forwarding(
mini_sentry,
relay,
relay_with_processing,
outcomes_consumer,
num_intermediate_relays,
):
"""
Tests that Relay forwards outcomes from a chain of relays
Expand All @@ -300,7 +292,20 @@ def test_outcome_forwarding(relay, mini_sentry, num_intermediate_relays):
are properly forwarded up to sentry.
"""

config = {
processing_config = {
"outcomes": {
"emit_outcomes": False, # The default, overridden by processing.enabled: true
"batch_size": 1,
"batch_interval": 1,
"source": "processing-layer",
}
}

# The innermost Relay needs to be in processing mode
upstream = relay_with_processing(processing_config)
upstream.wait_relay_healthcheck()

intermediate_config = {
"outcomes": {
"emit_outcomes": True,
"batch_size": 1,
Expand All @@ -309,14 +314,13 @@ def test_outcome_forwarding(relay, mini_sentry, num_intermediate_relays):
}
}

upstream = mini_sentry
# build a chain of identical relays
for i in range(num_intermediate_relays):
intermediate_relay = relay(upstream, config)
upstream = intermediate_relay
upstream = relay(upstream, intermediate_config)
upstream.wait_relay_healthcheck()

# mark the downstream relay so we can identify outcomes originating from it
config_downstream = deepcopy(config)
config_downstream = deepcopy(intermediate_config)
config_downstream["outcomes"]["source"] = "downstream-layer"

downstream_relay = relay(upstream, config_downstream)
Expand All @@ -327,13 +331,7 @@ def test_outcome_forwarding(relay, mini_sentry, num_intermediate_relays):

event_id = _send_event(downstream_relay)

outcomes_batch = mini_sentry.captured_outcomes.get(timeout=0.2)
assert mini_sentry.captured_outcomes.qsize() == 0 # we had only one batch

outcomes = outcomes_batch.get("outcomes")
assert len(outcomes) == 1

outcome = outcomes[0]

outcomes_consumer = outcomes_consumer()
outcome = outcomes_consumer.get_outcome()
assert outcome.get("source") == "downstream-layer"
assert outcome.get("event_id") == event_id

0 comments on commit 7570ba0

Please sign in to comment.