Skip to content

ref: Add types to store and symbolication #29014

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 33 commits into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
5f68c45
ref: Move all symbolicator-related code in store.py to a separate file
relaxolotl Sep 29, 2021
a2212cd
Fix tests
loewenheim Oct 1, 2021
7fd4abc
add new symbolication-specific tasks to celery imports
relaxolotl Oct 4, 2021
5242505
ref: Add types to store and symbolication
loewenheim Oct 1, 2021
8867315
s/Event/Any
loewenheim Oct 1, 2021
984f1de
restore project and kwargs arguments
loewenheim Oct 5, 2021
4d8ef4f
Remove some Any types
loewenheim Oct 5, 2021
1c5503a
Remove unused type ignore
loewenheim Oct 6, 2021
6575788
Correctly(?) import Event
loewenheim Oct 6, 2021
dfb5a43
Merge branch 'master' into ref/split-store
loewenheim Oct 6, 2021
435de3a
style(lint): Auto commit lint changes
getsantry[bot] Oct 6, 2021
bc785e7
Merge branch 'ref/split-store' into ref/store-symbolication-types
loewenheim Oct 6, 2021
0bf9d91
Merge branch 'ref/split-store' into ref/store-symbolication-types
loewenheim Oct 6, 2021
14a708b
style(lint): Auto commit lint changes
getsantry[bot] Oct 6, 2021
e753266
Remove unused import
loewenheim Oct 6, 2021
b0f7f82
Fix import
loewenheim Oct 6, 2021
5183bf8
Merge branch 'ref/split-store' into ref/store-symbolication-types
loewenheim Oct 6, 2021
0190ab3
Fix submit_symbolicate
loewenheim Oct 6, 2021
6f1fd90
Correctly type kwargs
loewenheim Oct 6, 2021
a7936e2
Fix None issue
loewenheim Oct 6, 2021
04340b2
Make some cache_keys non-optional
loewenheim Oct 7, 2021
ee0a866
Merge branch 'master' into ref/split-store
loewenheim Oct 13, 2021
ac786e5
Merge branch 'ref/split-store' into ref/store-symbolication-types
loewenheim Oct 13, 2021
7d20062
Merge branch 'master' into ref/split-store
loewenheim Oct 13, 2021
5a0b223
Merge branch 'ref/split-store' into ref/store-symbolication-types
loewenheim Oct 13, 2021
427f948
style(lint): Auto commit lint changes
getsantry[bot] Oct 13, 2021
329c4c3
remove internal marker from do_process_event
relaxolotl Oct 15, 2021
f186a75
split tests
relaxolotl Oct 15, 2021
7b50506
Merge remote-tracking branch 'origin/master' into ref/split-store
relaxolotl Oct 15, 2021
f1a5c8d
Merge branch 'ref/split-store' into ref/store-symbolication-types
loewenheim Oct 15, 2021
18299f5
fix test
loewenheim Oct 15, 2021
55ccfeb
Merge branch 'ref/split-store' into ref/store-symbolication-types
loewenheim Oct 15, 2021
6072277
Merge remote-tracking branch 'origin/master' into ref/store-symbolica…
relaxolotl Oct 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ files = src/sentry/api/bases/external_actor.py,
src/sentry/spans/**/*.py,
src/sentry/tasks/app_store_connect.py,
src/sentry/tasks/low_priority_symbolication.py,
src/sentry/tasks/store.py,
src/sentry/tasks/symbolication.py,
src/sentry/tasks/update_user_reports.py,
src/sentry/unmerge.py,
src/sentry/utils/appleconnect/,
Expand Down
146 changes: 103 additions & 43 deletions src/sentry/tasks/store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from datetime import datetime
from time import time
from typing import Any, Callable, Dict, List, Optional

import sentry_sdk
from django.conf import settings
Expand All @@ -12,6 +13,7 @@
from sentry.constants import DEFAULT_STORE_NORMALIZER_ARGS
from sentry.datascrubbing import scrub_data
from sentry.eventstore import processing
from sentry.eventstore.processing.base import Event
from sentry.killswitches import killswitch_matches_context
from sentry.models import Activity, Organization, Project, ProjectOption
from sentry.stacktraces.processing import process_stacktraces, should_process_for_stacktraces
Expand All @@ -33,9 +35,13 @@ class RetryProcessing(Exception):
pass


@metrics.wraps("should_process")
def should_process(data):
@metrics.wraps("should_process") # type: ignore
def should_process(data: CanonicalKeyDict) -> bool:
"""Quick check if processing is needed at all."""
return _should_process_inner(data)


def _should_process_inner(data: CanonicalKeyDict) -> bool:
from sentry.plugins.base import plugins

if data.get("type") == "transaction":
Expand All @@ -59,13 +65,13 @@ def should_process(data):


def submit_process(
project,
from_reprocessing,
cache_key,
event_id,
start_time,
data_has_changed=None,
):
project: Optional[Project],
from_reprocessing: bool,
cache_key: str,
event_id: Optional[str],
start_time: Optional[int],
data_has_changed: bool = False,
) -> None:
task = process_event_from_reprocessing if from_reprocessing else process_event
task.delay(
cache_key=cache_key,
Expand All @@ -75,7 +81,14 @@ def submit_process(
)


def submit_save_event(project_id, from_reprocessing, cache_key, event_id, start_time, data):
def submit_save_event(
project_id: int,
from_reprocessing: bool,
cache_key: Optional[str],
event_id: Optional[str],
start_time: Optional[int],
data: Optional[Event],
) -> None:
if cache_key:
data = None

Expand All @@ -90,7 +103,14 @@ def submit_save_event(project_id, from_reprocessing, cache_key, event_id, start_
)


def _do_preprocess_event(cache_key, data, start_time, event_id, process_task, project):
def _do_preprocess_event(
cache_key: str,
data: Optional[Event],
start_time: Optional[int],
event_id: Optional[str],
process_task: Callable[[Optional[str], Optional[int], Optional[str], bool], None],
project: Optional[Project],
) -> None:
from sentry.lang.native.processing import should_process_with_symbolicator
from sentry.tasks.symbolication import should_demote_symbolication, submit_symbolicate

Expand Down Expand Up @@ -147,15 +167,20 @@ def _do_preprocess_event(cache_key, data, start_time, event_id, process_task, pr
submit_save_event(project_id, from_reprocessing, cache_key, event_id, start_time, original_data)


@instrumented_task(
@instrumented_task( # type: ignore
name="sentry.tasks.store.preprocess_event",
queue="events.preprocess_event",
time_limit=65,
soft_time_limit=60,
)
def preprocess_event(
cache_key=None, data=None, start_time=None, event_id=None, project=None, **kwargs
):
cache_key: str,
data: Optional[Event] = None,
start_time: Optional[int] = None,
event_id: Optional[str] = None,
project: Optional[Project] = None,
**kwargs: Any,
) -> None:
return _do_preprocess_event(
cache_key=cache_key,
data=data,
Expand All @@ -166,15 +191,20 @@ def preprocess_event(
)


@instrumented_task(
@instrumented_task( # type: ignore
name="sentry.tasks.store.preprocess_event_from_reprocessing",
queue="events.reprocessing.preprocess_event",
time_limit=65,
soft_time_limit=60,
)
def preprocess_event_from_reprocessing(
cache_key=None, data=None, start_time=None, event_id=None, project=None, **kwargs
):
cache_key: str,
data: Optional[Event] = None,
start_time: Optional[int] = None,
event_id: Optional[str] = None,
project: Optional[Project] = None,
**kwargs: Any,
) -> None:
return _do_preprocess_event(
cache_key=cache_key,
data=data,
Expand All @@ -185,13 +215,13 @@ def preprocess_event_from_reprocessing(
)


@instrumented_task(
@instrumented_task( # type: ignore
name="sentry.tasks.store.retry_process_event",
queue="sleep",
time_limit=(60 * 5) + 5,
soft_time_limit=60 * 5,
)
def retry_process_event(process_task_name, task_kwargs, **kwargs):
def retry_process_event(process_task_name: str, task_kwargs: Dict[str, Any], **kwargs: Any) -> None:
"""
The only purpose of this task is be enqueued with some ETA set. This is
essentially an implementation of ETAs on top of Celery's existing ETAs, but
Expand All @@ -210,14 +240,14 @@ def retry_process_event(process_task_name, task_kwargs, **kwargs):


def do_process_event(
cache_key,
start_time,
event_id,
process_task,
data=None,
data_has_changed=None,
from_symbolicate=False,
):
cache_key: str,
start_time: Optional[int],
event_id: Optional[str],
process_task: Callable[[Optional[str], Optional[int], Optional[str], bool], None],
data: Optional[Event] = None,
data_has_changed: bool = False,
from_symbolicate: bool = False,
) -> None:
from sentry.plugins.base import plugins

if data is None:
Expand All @@ -237,7 +267,7 @@ def do_process_event(

event_id = data["event_id"]

def _continue_to_save_event():
def _continue_to_save_event() -> None:
from_reprocessing = process_task is process_event_from_reprocessing
submit_save_event(project_id, from_reprocessing, cache_key, event_id, start_time, data)

Expand All @@ -259,7 +289,7 @@ def _continue_to_save_event():
"organization", Organization.objects.get_from_cache(id=project.organization_id)
)

has_changed = bool(data_has_changed)
has_changed = data_has_changed

with sentry_sdk.start_span(op="tasks.store.process_event.get_reprocessing_revision"):
# Fetch the reprocessing revision
Expand Down Expand Up @@ -376,13 +406,19 @@ def _continue_to_save_event():
return _continue_to_save_event()


@instrumented_task(
@instrumented_task( # type: ignore
name="sentry.tasks.store.process_event",
queue="events.process_event",
time_limit=65,
soft_time_limit=60,
)
def process_event(cache_key, start_time=None, event_id=None, data_has_changed=None, **kwargs):
def process_event(
cache_key: str,
start_time: Optional[int] = None,
event_id: Optional[str] = None,
data_has_changed: bool = False,
**kwargs: Any,
) -> None:
"""
Handles event processing (for those events that need it)

Expand All @@ -402,15 +438,19 @@ def process_event(cache_key, start_time=None, event_id=None, data_has_changed=No
)


@instrumented_task(
@instrumented_task( # type: ignore
name="sentry.tasks.store.process_event_from_reprocessing",
queue="events.reprocessing.process_event",
time_limit=65,
soft_time_limit=60,
)
def process_event_from_reprocessing(
cache_key, start_time=None, event_id=None, data_has_changed=None, **kwargs
):
cache_key: str,
start_time: Optional[int] = None,
event_id: Optional[str] = None,
data_has_changed: bool = False,
**kwargs: Any,
) -> None:
return do_process_event(
cache_key=cache_key,
start_time=start_time,
Expand All @@ -420,7 +460,9 @@ def process_event_from_reprocessing(
)


def delete_raw_event(project_id, event_id, allow_hint_clear=False):
def delete_raw_event(
project_id: int, event_id: Optional[str], allow_hint_clear: bool = False
) -> None:
set_current_event_project(project_id)

if event_id is None:
Expand Down Expand Up @@ -448,8 +490,14 @@ def delete_raw_event(project_id, event_id, allow_hint_clear=False):


def create_failed_event(
cache_key, data, project_id, issues, event_id, start_time=None, reprocessing_rev=None
):
cache_key: str,
data: Optional[Event],
project_id: int,
issues: List[Dict[str, str]],
event_id: Optional[str],
start_time: Optional[int] = None,
reprocessing_rev: Any = None,
) -> bool:
"""If processing failed we put the original data from the cache into a
raw event. Returns `True` if a failed event was inserted
"""
Expand Down Expand Up @@ -538,8 +586,13 @@ def create_failed_event(


def _do_save_event(
cache_key=None, data=None, start_time=None, event_id=None, project_id=None, **kwargs
):
cache_key: Optional[str] = None,
data: Optional[Event] = None,
start_time: Optional[int] = None,
event_id: Optional[str] = None,
project_id: Optional[int] = None,
**kwargs: Any,
) -> None:
"""
Saves an event to the database.
"""
Expand Down Expand Up @@ -643,7 +696,9 @@ def _do_save_event(
time_synthetic_monitoring_event(data, project_id, start_time)


def time_synthetic_monitoring_event(data, project_id, start_time):
def time_synthetic_monitoring_event(
data: Event, project_id: int, start_time: Optional[int]
) -> bool:
"""
For special events produced by the recurring synthetic monitoring
functions, emit timing metrics for:
Expand Down Expand Up @@ -691,13 +746,18 @@ def time_synthetic_monitoring_event(data, project_id, start_time):
return True


@instrumented_task(
@instrumented_task( # type: ignore
name="sentry.tasks.store.save_event",
queue="events.save_event",
time_limit=65,
soft_time_limit=60,
)
def save_event(
cache_key=None, data=None, start_time=None, event_id=None, project_id=None, **kwargs
):
cache_key: Optional[str] = None,
data: Optional[Event] = None,
start_time: Optional[int] = None,
event_id: Optional[str] = None,
project_id: Optional[int] = None,
**kwargs: Any,
) -> None:
_do_save_event(cache_key, data, start_time, event_id, project_id, **kwargs)
Loading