diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index 20949283199074..859f0a07bcf159 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -26,7 +26,7 @@ from sentry.conf.types.role_dict import RoleDict from sentry.conf.types.sdk_config import ServerSdkConfig from sentry.utils import json # NOQA (used in getsentry config) -from sentry.utils.celery import crontab_with_minute_jitter +from sentry.utils.celery import crontab_with_minute_jitter, make_split_task_queues from sentry.utils.types import Type, type_from_value @@ -835,7 +835,16 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str: # Each route has a task name as key and a tuple containing a list of queues # and a default one as destination. The default one is used when the # rollout option is not active. -CELERY_SPLIT_QUEUE_TASK_ROUTES: Mapping[str, SplitQueueTaskRoute] = {} +CELERY_SPLIT_QUEUE_TASK_ROUTES: Mapping[str, SplitQueueTaskRoute] = { + "events.save_event_transaction": { + "default_queue": "events.save_event_transaction", + "queues_config": { + "total": 3, + "in_use": 3, + }, + } +} +SPLIT_TASK_QUEUES = make_split_task_queues(CELERY_SPLIT_QUEUE_TASK_ROUTES) # Mapping from queue name to split queues to be used by SplitQueueRouter. # This is meant to be used in those case where we have to specify the @@ -1266,12 +1275,12 @@ def SOCIAL_AUTH_DEFAULT_USERNAME() -> str: elif SILO_MODE == "REGION": CELERYBEAT_SCHEDULE_FILENAME = os.path.join(tempfile.gettempdir(), "sentry-celerybeat-region") CELERYBEAT_SCHEDULE = CELERYBEAT_SCHEDULE_REGION - CELERY_QUEUES = CELERY_QUEUES_REGION + CELERY_QUEUES = CELERY_QUEUES_REGION + SPLIT_TASK_QUEUES else: CELERYBEAT_SCHEDULE = {**CELERYBEAT_SCHEDULE_CONTROL, **CELERYBEAT_SCHEDULE_REGION} CELERYBEAT_SCHEDULE_FILENAME = os.path.join(tempfile.gettempdir(), "sentry-celerybeat") - CELERY_QUEUES = CELERY_QUEUES_REGION + CELERY_QUEUES_CONTROL + CELERY_QUEUES = CELERY_QUEUES_REGION + CELERY_QUEUES_CONTROL + SPLIT_TASK_QUEUES for queue in CELERY_QUEUES: queue.durable = False diff --git a/src/sentry/utils/celery.py b/src/sentry/utils/celery.py index 8573af313e64dd..f576a099bcb8a8 100644 --- a/src/sentry/utils/celery.py +++ b/src/sentry/utils/celery.py @@ -25,12 +25,12 @@ def _build_queues(base: str, quantity: int) -> Sequence[Queue]: return [Queue(name=name, routing_key=name) for name in build_queue_names(base, quantity)] -def make_split_task_queues(config: Mapping[str, SplitQueueTaskRoute]) -> Sequence[Queue]: +def make_split_task_queues(config: Mapping[str, SplitQueueTaskRoute]) -> list[Queue]: """ Generates the split queues definitions from the mapping between a task name and a config expressed as `SplitQueueTaskRoute`. """ - ret: MutableSequence[Queue] = [] + ret: list[Queue] = [] for conf in config.values(): if "queues_config" in conf: ret.extend(_build_queues(conf["default_queue"], conf["queues_config"]["total"]))