Skip to content

Commit

Permalink
feat: prepare to split save_event_transaction queue into 3 (#78868)
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnagara authored Oct 9, 2024
1 parent 3a80ca6 commit abfe52e
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 6 deletions.
17 changes: 13 additions & 4 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from sentry.conf.types.role_dict import RoleDict
from sentry.conf.types.sdk_config import ServerSdkConfig
from sentry.utils import json # NOQA (used in getsentry config)
from sentry.utils.celery import crontab_with_minute_jitter
from sentry.utils.celery import crontab_with_minute_jitter, make_split_task_queues
from sentry.utils.types import Type, type_from_value


Expand Down Expand Up @@ -835,7 +835,16 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
# Each route has a task name as key and a tuple containing a list of queues
# and a default one as destination. The default one is used when the
# rollout option is not active.
CELERY_SPLIT_QUEUE_TASK_ROUTES: Mapping[str, SplitQueueTaskRoute] = {}
CELERY_SPLIT_QUEUE_TASK_ROUTES: Mapping[str, SplitQueueTaskRoute] = {
"events.save_event_transaction": {
"default_queue": "events.save_event_transaction",
"queues_config": {
"total": 3,
"in_use": 3,
},
}
}
SPLIT_TASK_QUEUES = make_split_task_queues(CELERY_SPLIT_QUEUE_TASK_ROUTES)

# Mapping from queue name to split queues to be used by SplitQueueRouter.
# This is meant to be used in those case where we have to specify the
Expand Down Expand Up @@ -1266,12 +1275,12 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str:
elif SILO_MODE == "REGION":
CELERYBEAT_SCHEDULE_FILENAME = os.path.join(tempfile.gettempdir(), "sentry-celerybeat-region")
CELERYBEAT_SCHEDULE = CELERYBEAT_SCHEDULE_REGION
CELERY_QUEUES = CELERY_QUEUES_REGION
CELERY_QUEUES = CELERY_QUEUES_REGION + SPLIT_TASK_QUEUES

else:
CELERYBEAT_SCHEDULE = {**CELERYBEAT_SCHEDULE_CONTROL, **CELERYBEAT_SCHEDULE_REGION}
CELERYBEAT_SCHEDULE_FILENAME = os.path.join(tempfile.gettempdir(), "sentry-celerybeat")
CELERY_QUEUES = CELERY_QUEUES_REGION + CELERY_QUEUES_CONTROL
CELERY_QUEUES = CELERY_QUEUES_REGION + CELERY_QUEUES_CONTROL + SPLIT_TASK_QUEUES

for queue in CELERY_QUEUES:
queue.durable = False
Expand Down
4 changes: 2 additions & 2 deletions src/sentry/utils/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ def _build_queues(base: str, quantity: int) -> Sequence[Queue]:
return [Queue(name=name, routing_key=name) for name in build_queue_names(base, quantity)]


def make_split_task_queues(config: Mapping[str, SplitQueueTaskRoute]) -> Sequence[Queue]:
def make_split_task_queues(config: Mapping[str, SplitQueueTaskRoute]) -> list[Queue]:
"""
Generates the split queues definitions from the mapping between
a task name and a config expressed as `SplitQueueTaskRoute`.
"""
ret: MutableSequence[Queue] = []
ret: list[Queue] = []
for conf in config.values():
if "queues_config" in conf:
ret.extend(_build_queues(conf["default_queue"], conf["queues_config"]["total"]))
Expand Down

0 comments on commit abfe52e

Please sign in to comment.