Skip to content

Commit e7ea910

Browse files
authored
ref(ACI): Move get_comparison_aggregation_value out of subscription processor (#91893)
Redo of #91783 but filtering out anomaly detection alerts (we haven't migrated them yet) and only accessing `detector` if we found it. See the latest 2 commits for these changes. Fixes https://sentry.sentry.io/issues/6616374058, https://sentry.sentry.io/issues/6616374079, and https://sentry.sentry.io/issues/6616374076
1 parent 70c854b commit e7ea910

File tree

6 files changed

+210
-233
lines changed

6 files changed

+210
-233
lines changed

src/sentry/incidents/subscription_processor.py

Lines changed: 48 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from django.db import router, transaction
1212
from django.utils import timezone
1313
from sentry_redis_tools.retrying_cluster import RetryingRedisCluster
14-
from snuba_sdk import Column, Condition, Limit, Op
1514

1615
from sentry import features
1716
from sentry.constants import ObjectStatus
@@ -42,7 +41,7 @@
4241
from sentry.incidents.tasks import handle_trigger_action
4342
from sentry.incidents.utils.metric_issue_poc import create_or_update_metric_issue
4443
from sentry.incidents.utils.process_update_helpers import (
45-
get_aggregation_value_helper,
44+
get_comparison_aggregation_value,
4645
get_crash_rate_alert_metrics_aggregation_value_helper,
4746
)
4847
from sentry.incidents.utils.types import (
@@ -51,20 +50,14 @@
5150
QuerySubscriptionUpdate,
5251
)
5352
from sentry.models.project import Project
54-
from sentry.search.eap.utils import add_start_end_conditions
5553
from sentry.seer.anomaly_detection.get_anomaly_data import get_anomaly_data_from_seer
5654
from sentry.seer.anomaly_detection.utils import anomaly_has_confidence, has_anomaly
5755
from sentry.snuba.dataset import Dataset
58-
from sentry.snuba.entity_subscription import (
59-
ENTITY_TIME_COLUMNS,
60-
get_entity_key_from_query_builder,
61-
get_entity_subscription_from_snuba_query,
62-
)
63-
from sentry.snuba.models import QuerySubscription, SnubaQuery
56+
from sentry.snuba.models import QuerySubscription
6457
from sentry.snuba.subscriptions import delete_snuba_subscription
65-
from sentry.utils import metrics, redis, snuba_rpc
58+
from sentry.utils import metrics, redis
6659
from sentry.utils.dates import to_datetime
67-
from sentry.workflow_engine.models import DataPacket
60+
from sentry.workflow_engine.models import DataPacket, Detector
6861
from sentry.workflow_engine.processors.data_packet import process_data_packets
6962

7063
logger = logging.getLogger(__name__)
@@ -216,106 +209,6 @@ def calculate_resolve_threshold(self, trigger: AlertRuleTrigger) -> float:
216209
threshold: float = trigger.alert_threshold + resolve_add
217210
return threshold
218211

219-
def get_comparison_aggregation_value(
220-
self, subscription_update: QuerySubscriptionUpdate
221-
) -> float | None:
222-
# NOTE (mifu67): we create this helper because we also use it in the new detector processing flow
223-
aggregation_value = get_aggregation_value_helper(subscription_update)
224-
if self.alert_rule.comparison_delta is None:
225-
return aggregation_value
226-
227-
# For comparison alerts run a query over the comparison period and use it to calculate the
228-
# % change.
229-
delta = timedelta(seconds=self.alert_rule.comparison_delta)
230-
end = subscription_update["timestamp"] - delta
231-
snuba_query = self.subscription.snuba_query
232-
start = end - timedelta(seconds=snuba_query.time_window)
233-
234-
entity_subscription = get_entity_subscription_from_snuba_query(
235-
snuba_query,
236-
self.subscription.project.organization_id,
237-
)
238-
dataset = Dataset(snuba_query.dataset)
239-
query_type = SnubaQuery.Type(snuba_query.type)
240-
project_ids = [self.subscription.project_id]
241-
242-
comparison_aggregate: None | float = None
243-
if query_type == SnubaQuery.Type.PERFORMANCE and dataset == Dataset.EventsAnalyticsPlatform:
244-
try:
245-
rpc_time_series_request = entity_subscription.build_rpc_request(
246-
query=snuba_query.query,
247-
project_ids=project_ids,
248-
environment=snuba_query.environment,
249-
params={
250-
"organization_id": self.subscription.project.organization.id,
251-
"project_id": project_ids,
252-
},
253-
referrer="subscription_processor.comparison_query",
254-
)
255-
256-
rpc_time_series_request = add_start_end_conditions(
257-
rpc_time_series_request, start, end
258-
)
259-
260-
rpc_response = snuba_rpc.timeseries_rpc([rpc_time_series_request])[0]
261-
if len(rpc_response.result_timeseries):
262-
comparison_aggregate = rpc_response.result_timeseries[0].data_points[0].data
263-
264-
except Exception:
265-
logger.exception(
266-
"Failed to run RPC comparison query",
267-
extra={
268-
"alert_rule_id": self.alert_rule.id,
269-
"subscription_id": subscription_update.get("subscription_id"),
270-
"organization_id": self.alert_rule.organization_id,
271-
},
272-
)
273-
return None
274-
275-
else:
276-
try:
277-
# TODO: determine whether we need to include the subscription query_extra here
278-
query_builder = entity_subscription.build_query_builder(
279-
query=snuba_query.query,
280-
project_ids=project_ids,
281-
environment=snuba_query.environment,
282-
params={
283-
"organization_id": self.subscription.project.organization.id,
284-
"project_id": project_ids,
285-
"start": start,
286-
"end": end,
287-
},
288-
)
289-
time_col = ENTITY_TIME_COLUMNS[get_entity_key_from_query_builder(query_builder)]
290-
query_builder.add_conditions(
291-
[
292-
Condition(Column(time_col), Op.GTE, start),
293-
Condition(Column(time_col), Op.LT, end),
294-
]
295-
)
296-
query_builder.limit = Limit(1)
297-
results = query_builder.run_query(
298-
referrer="subscription_processor.comparison_query"
299-
)
300-
comparison_aggregate = list(results["data"][0].values())[0]
301-
302-
except Exception:
303-
logger.exception(
304-
"Failed to run comparison query",
305-
extra={
306-
"alert_rule_id": self.alert_rule.id,
307-
"subscription_id": subscription_update.get("subscription_id"),
308-
"organization_id": self.alert_rule.organization_id,
309-
},
310-
)
311-
return None
312-
313-
if not comparison_aggregate:
314-
metrics.incr("incidents.alert_rules.skipping_update_comparison_value_invalid")
315-
return None
316-
317-
return (aggregation_value / comparison_aggregate) * 100
318-
319212
def get_crash_rate_alert_metrics_aggregation_value(
320213
self, subscription_update: QuerySubscriptionUpdate
321214
) -> float | None:
@@ -342,13 +235,22 @@ def get_crash_rate_alert_metrics_aggregation_value(
342235
self.reset_trigger_counts()
343236
return aggregation_value
344237

345-
def get_aggregation_value(self, subscription_update: QuerySubscriptionUpdate) -> float | None:
238+
def get_aggregation_value(
239+
self, subscription_update: QuerySubscriptionUpdate, comparison_delta: int | None = None
240+
) -> float | None:
346241
if self.subscription.snuba_query.dataset == Dataset.Metrics.value:
347242
aggregation_value = self.get_crash_rate_alert_metrics_aggregation_value(
348243
subscription_update
349244
)
350245
else:
351-
aggregation_value = self.get_comparison_aggregation_value(subscription_update)
246+
aggregation_value = get_comparison_aggregation_value(
247+
subscription_update=subscription_update,
248+
snuba_query=self.subscription.snuba_query,
249+
organization_id=self.subscription.project.organization.id,
250+
project_ids=[self.subscription.project_id],
251+
comparison_delta=comparison_delta,
252+
alert_rule_id=self.alert_rule.id,
253+
)
352254

353255
return aggregation_value
354256

@@ -366,14 +268,15 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
366268
if self.subscription.project.status != ObjectStatus.ACTIVE:
367269
metrics.incr("incidents.alert_rules.ignore_deleted_project")
368270
return
369-
if dataset == "events" and not features.has(
370-
"organizations:incidents", self.subscription.project.organization
371-
):
271+
272+
organization = self.subscription.project.organization
273+
274+
if dataset == "events" and not features.has("organizations:incidents", organization):
372275
# They have downgraded since these subscriptions have been created. So we just ignore updates for now.
373276
metrics.incr("incidents.alert_rules.ignore_update_missing_incidents")
374277
return
375278
elif dataset == "transactions" and not features.has(
376-
"organizations:performance-view", self.subscription.project.organization
279+
"organizations:performance-view", organization
377280
):
378281
# They have downgraded since these subscriptions have been created. So we just ignore updates for now.
379282
metrics.incr("incidents.alert_rules.ignore_update_missing_incidents_performance")
@@ -390,8 +293,6 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
390293
metrics.incr("incidents.alert_rules.skipping_already_processed_update")
391294
return
392295

393-
aggregation_value = self.get_aggregation_value(subscription_update)
394-
395296
self.last_update = subscription_update["timestamp"]
396297

397298
if (
@@ -408,11 +309,33 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
408309
},
409310
)
410311

312+
has_metric_alert_processing = features.has(
313+
"organizations:workflow-engine-metric-alert-processing", organization
314+
)
315+
comparison_delta = None
316+
317+
if (
318+
has_metric_alert_processing
319+
and not self.alert_rule.detection_type == AlertRuleDetectionType.DYNAMIC
320+
):
321+
try:
322+
detector = Detector.objects.get(
323+
data_sources__source_id=str(self.subscription.id),
324+
data_sources__type=DATA_SOURCE_SNUBA_QUERY_SUBSCRIPTION,
325+
)
326+
comparison_delta = detector.config.get("comparison_delta")
327+
except Detector.DoesNotExist:
328+
logger.exception(
329+
"Detector not found", extra={"subscription_id": self.subscription.id}
330+
)
331+
332+
else:
333+
comparison_delta = self.alert_rule.comparison_delta
334+
335+
aggregation_value = self.get_aggregation_value(subscription_update, comparison_delta)
336+
411337
if aggregation_value is not None:
412-
if features.has(
413-
"organizations:workflow-engine-metric-alert-processing",
414-
self.subscription.project.organization,
415-
):
338+
if has_metric_alert_processing:
416339
packet = MetricDetectorUpdate(
417340
entity=subscription_update.get("entity", ""),
418341
subscription_id=subscription_update["subscription_id"],
@@ -438,10 +361,8 @@ def process_update(self, subscription_update: QuerySubscriptionUpdate) -> None:
438361
)
439362

440363
has_anomaly_detection = features.has(
441-
"organizations:anomaly-detection-alerts", self.subscription.project.organization
442-
) and features.has(
443-
"organizations:anomaly-detection-rollout", self.subscription.project.organization
444-
)
364+
"organizations:anomaly-detection-alerts", organization
365+
) and features.has("organizations:anomaly-detection-rollout", organization)
445366

446367
alert_triggered_tags = {
447368
"detection_type": self.alert_rule.detection_type,

src/sentry/incidents/tasks.py

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,10 @@
1212
IncidentStatus,
1313
IncidentStatusMethod,
1414
)
15-
from sentry.incidents.utils.constants import (
16-
INCIDENTS_SNUBA_SUBSCRIPTION_TYPE,
17-
SUBSCRIPTION_METRICS_LOGGER,
18-
)
15+
from sentry.incidents.utils.constants import INCIDENTS_SNUBA_SUBSCRIPTION_TYPE
1916
from sentry.incidents.utils.types import QuerySubscriptionUpdate
2017
from sentry.models.project import Project
2118
from sentry.silo.base import SiloMode
22-
from sentry.snuba.dataset import Dataset
2319
from sentry.snuba.models import QuerySubscription
2420
from sentry.snuba.query_subscriptions.consumer import register_subscriber
2521
from sentry.tasks.base import instrumented_task
@@ -31,36 +27,6 @@
3127
logger = logging.getLogger(__name__)
3228

3329

34-
@register_subscriber(SUBSCRIPTION_METRICS_LOGGER)
35-
def handle_subscription_metrics_logger(
36-
subscription_update: QuerySubscriptionUpdate, subscription: QuerySubscription
37-
) -> None:
38-
"""
39-
Logs results from a `QuerySubscription`.
40-
"""
41-
from sentry.incidents.subscription_processor import SubscriptionProcessor
42-
43-
try:
44-
if subscription.snuba_query.dataset == Dataset.Metrics.value:
45-
processor = SubscriptionProcessor(subscription)
46-
# XXX: Temporary hack so that we can extract these values without raising an exception
47-
processor.reset_trigger_counts = lambda *arg, **kwargs: None # type: ignore[method-assign]
48-
aggregation_value = processor.get_aggregation_value(subscription_update)
49-
50-
logger.info(
51-
"handle_subscription_metrics_logger.message",
52-
extra={
53-
"subscription_id": subscription.id,
54-
"dataset": subscription.snuba_query.dataset,
55-
"snuba_subscription_id": subscription.subscription_id,
56-
"result": subscription_update,
57-
"aggregation_value": aggregation_value,
58-
},
59-
)
60-
except Exception:
61-
logger.exception("Failed to log subscription results")
62-
63-
6430
@register_subscriber(INCIDENTS_SNUBA_SUBSCRIPTION_TYPE)
6531
def handle_snuba_query_update(
6632
subscription_update: QuerySubscriptionUpdate, subscription: QuerySubscription
Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
11
INCIDENTS_SNUBA_SUBSCRIPTION_TYPE = "incidents"
22
INCIDENT_SNAPSHOT_BATCH_SIZE = 50
3-
SUBSCRIPTION_METRICS_LOGGER = "subscription_metrics_logger"

0 commit comments

Comments
 (0)