1
1
import logging
2
2
from datetime import datetime
3
3
from time import time
4
+ from typing import Any , Callable , Dict , List , Optional
4
5
5
6
import sentry_sdk
6
7
from django .conf import settings
12
13
from sentry .constants import DEFAULT_STORE_NORMALIZER_ARGS
13
14
from sentry .datascrubbing import scrub_data
14
15
from sentry .eventstore import processing
16
+ from sentry .eventstore .processing .base import Event
15
17
from sentry .killswitches import killswitch_matches_context
16
18
from sentry .models import Activity , Organization , Project , ProjectOption
17
19
from sentry .stacktraces .processing import process_stacktraces , should_process_for_stacktraces
@@ -33,9 +35,13 @@ class RetryProcessing(Exception):
33
35
pass
34
36
35
37
36
- @metrics .wraps ("should_process" )
37
- def should_process (data ) :
38
+ @metrics .wraps ("should_process" ) # type: ignore
39
+ def should_process (data : CanonicalKeyDict ) -> bool :
38
40
"""Quick check if processing is needed at all."""
41
+ return _should_process_inner (data )
42
+
43
+
44
+ def _should_process_inner (data : CanonicalKeyDict ) -> bool :
39
45
from sentry .plugins .base import plugins
40
46
41
47
if data .get ("type" ) == "transaction" :
@@ -59,13 +65,13 @@ def should_process(data):
59
65
60
66
61
67
def submit_process (
62
- project ,
63
- from_reprocessing ,
64
- cache_key ,
65
- event_id ,
66
- start_time ,
67
- data_has_changed = None ,
68
- ):
68
+ project : Optional [ Project ] ,
69
+ from_reprocessing : bool ,
70
+ cache_key : str ,
71
+ event_id : Optional [ str ] ,
72
+ start_time : Optional [ int ] ,
73
+ data_has_changed : bool = False ,
74
+ ) -> None :
69
75
task = process_event_from_reprocessing if from_reprocessing else process_event
70
76
task .delay (
71
77
cache_key = cache_key ,
@@ -75,7 +81,14 @@ def submit_process(
75
81
)
76
82
77
83
78
- def submit_save_event (project_id , from_reprocessing , cache_key , event_id , start_time , data ):
84
+ def submit_save_event (
85
+ project_id : int ,
86
+ from_reprocessing : bool ,
87
+ cache_key : Optional [str ],
88
+ event_id : Optional [str ],
89
+ start_time : Optional [int ],
90
+ data : Optional [Event ],
91
+ ) -> None :
79
92
if cache_key :
80
93
data = None
81
94
@@ -90,7 +103,14 @@ def submit_save_event(project_id, from_reprocessing, cache_key, event_id, start_
90
103
)
91
104
92
105
93
- def _do_preprocess_event (cache_key , data , start_time , event_id , process_task , project ):
106
+ def _do_preprocess_event (
107
+ cache_key : str ,
108
+ data : Optional [Event ],
109
+ start_time : Optional [int ],
110
+ event_id : Optional [str ],
111
+ process_task : Callable [[Optional [str ], Optional [int ], Optional [str ], bool ], None ],
112
+ project : Optional [Project ],
113
+ ) -> None :
94
114
from sentry .lang .native .processing import should_process_with_symbolicator
95
115
from sentry .tasks .symbolication import should_demote_symbolication , submit_symbolicate
96
116
@@ -147,15 +167,20 @@ def _do_preprocess_event(cache_key, data, start_time, event_id, process_task, pr
147
167
submit_save_event (project_id , from_reprocessing , cache_key , event_id , start_time , original_data )
148
168
149
169
150
- @instrumented_task (
170
+ @instrumented_task ( # type: ignore
151
171
name = "sentry.tasks.store.preprocess_event" ,
152
172
queue = "events.preprocess_event" ,
153
173
time_limit = 65 ,
154
174
soft_time_limit = 60 ,
155
175
)
156
176
def preprocess_event (
157
- cache_key = None , data = None , start_time = None , event_id = None , project = None , ** kwargs
158
- ):
177
+ cache_key : str ,
178
+ data : Optional [Event ] = None ,
179
+ start_time : Optional [int ] = None ,
180
+ event_id : Optional [str ] = None ,
181
+ project : Optional [Project ] = None ,
182
+ ** kwargs : Any ,
183
+ ) -> None :
159
184
return _do_preprocess_event (
160
185
cache_key = cache_key ,
161
186
data = data ,
@@ -166,15 +191,20 @@ def preprocess_event(
166
191
)
167
192
168
193
169
- @instrumented_task (
194
+ @instrumented_task ( # type: ignore
170
195
name = "sentry.tasks.store.preprocess_event_from_reprocessing" ,
171
196
queue = "events.reprocessing.preprocess_event" ,
172
197
time_limit = 65 ,
173
198
soft_time_limit = 60 ,
174
199
)
175
200
def preprocess_event_from_reprocessing (
176
- cache_key = None , data = None , start_time = None , event_id = None , project = None , ** kwargs
177
- ):
201
+ cache_key : str ,
202
+ data : Optional [Event ] = None ,
203
+ start_time : Optional [int ] = None ,
204
+ event_id : Optional [str ] = None ,
205
+ project : Optional [Project ] = None ,
206
+ ** kwargs : Any ,
207
+ ) -> None :
178
208
return _do_preprocess_event (
179
209
cache_key = cache_key ,
180
210
data = data ,
@@ -185,13 +215,13 @@ def preprocess_event_from_reprocessing(
185
215
)
186
216
187
217
188
- @instrumented_task (
218
+ @instrumented_task ( # type: ignore
189
219
name = "sentry.tasks.store.retry_process_event" ,
190
220
queue = "sleep" ,
191
221
time_limit = (60 * 5 ) + 5 ,
192
222
soft_time_limit = 60 * 5 ,
193
223
)
194
- def retry_process_event (process_task_name , task_kwargs , ** kwargs ) :
224
+ def retry_process_event (process_task_name : str , task_kwargs : Dict [ str , Any ], ** kwargs : Any ) -> None :
195
225
"""
196
226
The only purpose of this task is be enqueued with some ETA set. This is
197
227
essentially an implementation of ETAs on top of Celery's existing ETAs, but
@@ -210,14 +240,14 @@ def retry_process_event(process_task_name, task_kwargs, **kwargs):
210
240
211
241
212
242
def do_process_event (
213
- cache_key ,
214
- start_time ,
215
- event_id ,
216
- process_task ,
217
- data = None ,
218
- data_has_changed = None ,
219
- from_symbolicate = False ,
220
- ):
243
+ cache_key : str ,
244
+ start_time : Optional [ int ] ,
245
+ event_id : Optional [ str ] ,
246
+ process_task : Callable [[ Optional [ str ], Optional [ int ], Optional [ str ], bool ], None ] ,
247
+ data : Optional [ Event ] = None ,
248
+ data_has_changed : bool = False ,
249
+ from_symbolicate : bool = False ,
250
+ ) -> None :
221
251
from sentry .plugins .base import plugins
222
252
223
253
if data is None :
@@ -237,7 +267,7 @@ def do_process_event(
237
267
238
268
event_id = data ["event_id" ]
239
269
240
- def _continue_to_save_event ():
270
+ def _continue_to_save_event () -> None :
241
271
from_reprocessing = process_task is process_event_from_reprocessing
242
272
submit_save_event (project_id , from_reprocessing , cache_key , event_id , start_time , data )
243
273
@@ -259,7 +289,7 @@ def _continue_to_save_event():
259
289
"organization" , Organization .objects .get_from_cache (id = project .organization_id )
260
290
)
261
291
262
- has_changed = bool ( data_has_changed )
292
+ has_changed = data_has_changed
263
293
264
294
with sentry_sdk .start_span (op = "tasks.store.process_event.get_reprocessing_revision" ):
265
295
# Fetch the reprocessing revision
@@ -376,13 +406,19 @@ def _continue_to_save_event():
376
406
return _continue_to_save_event ()
377
407
378
408
379
- @instrumented_task (
409
+ @instrumented_task ( # type: ignore
380
410
name = "sentry.tasks.store.process_event" ,
381
411
queue = "events.process_event" ,
382
412
time_limit = 65 ,
383
413
soft_time_limit = 60 ,
384
414
)
385
- def process_event (cache_key , start_time = None , event_id = None , data_has_changed = None , ** kwargs ):
415
+ def process_event (
416
+ cache_key : str ,
417
+ start_time : Optional [int ] = None ,
418
+ event_id : Optional [str ] = None ,
419
+ data_has_changed : bool = False ,
420
+ ** kwargs : Any ,
421
+ ) -> None :
386
422
"""
387
423
Handles event processing (for those events that need it)
388
424
@@ -402,15 +438,19 @@ def process_event(cache_key, start_time=None, event_id=None, data_has_changed=No
402
438
)
403
439
404
440
405
- @instrumented_task (
441
+ @instrumented_task ( # type: ignore
406
442
name = "sentry.tasks.store.process_event_from_reprocessing" ,
407
443
queue = "events.reprocessing.process_event" ,
408
444
time_limit = 65 ,
409
445
soft_time_limit = 60 ,
410
446
)
411
447
def process_event_from_reprocessing (
412
- cache_key , start_time = None , event_id = None , data_has_changed = None , ** kwargs
413
- ):
448
+ cache_key : str ,
449
+ start_time : Optional [int ] = None ,
450
+ event_id : Optional [str ] = None ,
451
+ data_has_changed : bool = False ,
452
+ ** kwargs : Any ,
453
+ ) -> None :
414
454
return do_process_event (
415
455
cache_key = cache_key ,
416
456
start_time = start_time ,
@@ -420,7 +460,9 @@ def process_event_from_reprocessing(
420
460
)
421
461
422
462
423
- def delete_raw_event (project_id , event_id , allow_hint_clear = False ):
463
+ def delete_raw_event (
464
+ project_id : int , event_id : Optional [str ], allow_hint_clear : bool = False
465
+ ) -> None :
424
466
set_current_event_project (project_id )
425
467
426
468
if event_id is None :
@@ -448,8 +490,14 @@ def delete_raw_event(project_id, event_id, allow_hint_clear=False):
448
490
449
491
450
492
def create_failed_event (
451
- cache_key , data , project_id , issues , event_id , start_time = None , reprocessing_rev = None
452
- ):
493
+ cache_key : str ,
494
+ data : Optional [Event ],
495
+ project_id : int ,
496
+ issues : List [Dict [str , str ]],
497
+ event_id : Optional [str ],
498
+ start_time : Optional [int ] = None ,
499
+ reprocessing_rev : Any = None ,
500
+ ) -> bool :
453
501
"""If processing failed we put the original data from the cache into a
454
502
raw event. Returns `True` if a failed event was inserted
455
503
"""
@@ -538,8 +586,13 @@ def create_failed_event(
538
586
539
587
540
588
def _do_save_event (
541
- cache_key = None , data = None , start_time = None , event_id = None , project_id = None , ** kwargs
542
- ):
589
+ cache_key : Optional [str ] = None ,
590
+ data : Optional [Event ] = None ,
591
+ start_time : Optional [int ] = None ,
592
+ event_id : Optional [str ] = None ,
593
+ project_id : Optional [int ] = None ,
594
+ ** kwargs : Any ,
595
+ ) -> None :
543
596
"""
544
597
Saves an event to the database.
545
598
"""
@@ -643,7 +696,9 @@ def _do_save_event(
643
696
time_synthetic_monitoring_event (data , project_id , start_time )
644
697
645
698
646
- def time_synthetic_monitoring_event (data , project_id , start_time ):
699
+ def time_synthetic_monitoring_event (
700
+ data : Event , project_id : int , start_time : Optional [int ]
701
+ ) -> bool :
647
702
"""
648
703
For special events produced by the recurring synthetic monitoring
649
704
functions, emit timing metrics for:
@@ -691,13 +746,18 @@ def time_synthetic_monitoring_event(data, project_id, start_time):
691
746
return True
692
747
693
748
694
- @instrumented_task (
749
+ @instrumented_task ( # type: ignore
695
750
name = "sentry.tasks.store.save_event" ,
696
751
queue = "events.save_event" ,
697
752
time_limit = 65 ,
698
753
soft_time_limit = 60 ,
699
754
)
700
755
def save_event (
701
- cache_key = None , data = None , start_time = None , event_id = None , project_id = None , ** kwargs
702
- ):
756
+ cache_key : Optional [str ] = None ,
757
+ data : Optional [Event ] = None ,
758
+ start_time : Optional [int ] = None ,
759
+ event_id : Optional [str ] = None ,
760
+ project_id : Optional [int ] = None ,
761
+ ** kwargs : Any ,
762
+ ) -> None :
703
763
_do_save_event (cache_key , data , start_time , event_id , project_id , ** kwargs )
0 commit comments