Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 0 additions & 33 deletions cms/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,8 @@

import os

from openedx.core.lib.celery.routers import AlternateEnvironmentRouter

# Set the default Django settings module for the 'celery' program
# and then instantiate the Celery singleton.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'cms.envs.production')
from openedx.core.lib.celery import APP # pylint: disable=wrong-import-position,unused-import

# Import after autodiscovery has had a chance to connect to the import_module signal
# so celery doesn't miss any apps getting installed.
from django.conf import settings # pylint: disable=wrong-import-position,wrong-import-order


class Router(AlternateEnvironmentRouter):
"""
An implementation of AlternateEnvironmentRouter, for routing tasks to non-cms queues.
"""

@property
def alternate_env_tasks(self):
"""
Defines alternate environment tasks, as a dict of form { task_name: alternate_queue }
"""
# The tasks below will be routed to the default lms queue.
return {
'completion_aggregator.tasks.update_aggregators': 'lms',
'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache': 'lms',
'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2': 'lms',
}

@property
def explicit_queues(self):
"""
Defines specific queues for tasks to run in (typically outside of the cms environment),
as a dict of form { task_name: queue_name }.
"""
return {
'lms.djangoapps.grades.tasks.compute_all_grades_for_course': settings.POLICY_CHANGE_GRADES_ROUTING_KEY,
}
6 changes: 3 additions & 3 deletions cms/djangoapps/contentstore/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def clone_instance(instance, field_values):
return instance


@task()
@task
@set_code_owner_attribute
def rerun_course(source_course_key_string, destination_course_key_string, user_id, fields=None):
"""
Expand Down Expand Up @@ -170,7 +170,7 @@ def _parse_time(time_isoformat):
).replace(tzinfo=UTC)


@task(routing_key=settings.UPDATE_SEARCH_INDEX_JOB_QUEUE)
@task
@set_code_owner_attribute
def update_search_index(course_id, triggered_time_isoformat):
""" Updates course search index. """
Expand All @@ -195,7 +195,7 @@ def update_search_index(course_id, triggered_time_isoformat):
LOGGER.debug(u'Search indexing successful for complete course %s', course_id)


@task()
@task
@set_code_owner_attribute
def update_library_index(library_id, triggered_time_isoformat):
""" Updates course search index. """
Expand Down
19 changes: 18 additions & 1 deletion cms/envs/production.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def get_env_setting(setting):
DEFAULT_PRIORITY_QUEUE: {}
}

CELERY_ROUTES = "{}celery.Router".format(QUEUE_VARIANT)
CELERY_ROUTES = "openedx.core.lib.celery.routers.route_task"

# STATIC_URL_BASE specifies the base url to use for static files
STATIC_URL_BASE = ENV_TOKENS.get('STATIC_URL_BASE', None)
Expand Down Expand Up @@ -574,3 +574,20 @@ def get_env_setting(setting):
LOGO_URL_PNG = ENV_TOKENS.get('LOGO_URL_PNG', LOGO_URL_PNG)
LOGO_TRADEMARK_URL = ENV_TOKENS.get('LOGO_TRADEMARK_URL', LOGO_TRADEMARK_URL)
FAVICON_URL = ENV_TOKENS.get('FAVICON_URL', FAVICON_URL)

######################## CELERY ROTUING ########################

# Defines alternate environment tasks, as a dict of form { task_name: alternate_queue }
ALTERNATE_ENV_TASKS = {
'completion_aggregator.tasks.update_aggregators': 'lms',
'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache': 'lms',
'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2': 'lms',
}

# Defines the task -> alternate worker queue to be used when routing.
EXPLICIT_QUEUES = {
'lms.djangoapps.grades.tasks.compute_all_grades_for_course': {
'queue': POLICY_CHANGE_GRADES_ROUTING_KEY},
'cms.djangoapps.contentstore.tasks.update_search_index': {
'queue': UPDATE_SEARCH_INDEX_JOB_QUEUE},
}
4 changes: 1 addition & 3 deletions common/djangoapps/entitlements/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
from common.djangoapps.entitlements.models import CourseEntitlement

LOGGER = get_task_logger(__name__)
# Under cms the following setting is not defined, leading to errors during tests.
ROUTING_KEY = getattr(settings, 'ENTITLEMENTS_EXPIRATION_ROUTING_KEY', None)

# Maximum number of retries before giving up on awarding credentials.
# For reference, 11 retries with exponential backoff yields a maximum waiting
# time of 2047 seconds (about 30 minutes). Setting this to None could yield
Expand All @@ -23,7 +22,6 @@
@task(
bind=True,
ignore_result=True,
routing_key=ROUTING_KEY,
name='entitlements.expire_old_entitlements',
)
@set_code_owner_attribute
Expand Down
15 changes: 0 additions & 15 deletions lms/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,10 @@
Taken from: https://celery.readthedocs.org/en/latest/django/first-steps-with-django.html
"""


import os

from openedx.core.lib.celery.routers import AlternateEnvironmentRouter

# Set the default Django settings module for the 'celery' program
# and then instantiate the Celery singleton.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lms.envs.production')
from openedx.core.lib.celery import APP # pylint: disable=wrong-import-position,unused-import


class Router(AlternateEnvironmentRouter):
"""
An implementation of AlternateEnvironmentRouter, for routing tasks to non-cms queues.
"""

@property
def alternate_env_tasks(self):
"""
Defines alternate environment tasks, as a dict of form { task_name: alternate_queue }
"""
return {}
3 changes: 0 additions & 3 deletions lms/djangoapps/bulk_email/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,6 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)

total_recipients = combined_set.count()

routing_key = settings.BULK_EMAIL_ROUTING_KEY

# Weird things happen if we allow empty querysets as input to emailing subtasks
# The task appears to hang at "0 out of 0 completed" and never finishes.
if total_recipients == 0:
Expand All @@ -218,7 +216,6 @@ def _create_send_email_subtask(to_list, initial_subtask_status):
initial_subtask_status.to_dict(),
),
task_id=subtask_id,
routing_key=routing_key,
)
return new_subtask

Expand Down
3 changes: 1 addition & 2 deletions lms/djangoapps/discussion/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@


DEFAULT_LANGUAGE = 'en'
ROUTING_KEY = getattr(settings, 'ACE_ROUTING_KEY', None)


@task(base=LoggedTask)
Expand All @@ -62,7 +61,7 @@ class ResponseNotification(BaseMessageType):
pass


@task(base=LoggedTask, routing_key=ROUTING_KEY)
@task(base=LoggedTask)
@set_code_owner_attribute
def send_ace_message(context):
context['course_id'] = CourseKey.from_string(context['course_id'])
Expand Down
9 changes: 4 additions & 5 deletions lms/djangoapps/email_marketing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

log = logging.getLogger(__name__)
SAILTHRU_LIST_CACHE_KEY = "email.marketing.cache"
ACE_ROUTING_KEY = getattr(settings, 'ACE_ROUTING_KEY', None)


@task(bind=True, routing_key=ACE_ROUTING_KEY)
@task(bind=True)
@set_code_owner_attribute
def get_email_cookies_via_sailthru(self, user_email, post_parms):
"""
Expand Down Expand Up @@ -63,7 +62,7 @@ def get_email_cookies_via_sailthru(self, user_email, post_parms):
return None


@task(bind=True, default_retry_delay=3600, max_retries=24, routing_key=ACE_ROUTING_KEY)
@task(bind=True, default_retry_delay=3600, max_retries=24)
@set_code_owner_attribute
def update_user(self, sailthru_vars, email, site=None, new_user=False, activation=False):
"""
Expand Down Expand Up @@ -146,7 +145,7 @@ def is_default_site(site):
return not site or site.get('id') == settings.SITE_ID


@task(bind=True, default_retry_delay=3600, max_retries=24, routing_key=ACE_ROUTING_KEY)
@task(bind=True, default_retry_delay=3600, max_retries=24)
@set_code_owner_attribute
def update_user_email(self, new_email, old_email):
"""
Expand Down Expand Up @@ -307,7 +306,7 @@ def _retryable_sailthru_error(error):
return code == 9 or code == 43


@task(bind=True, routing_key=ACE_ROUTING_KEY)
@task(bind=True)
@set_code_owner_attribute
def update_course_enrollment(self, email, course_key, mode, site=None):
"""Adds/updates Sailthru when a user adds to cart/purchases/upgrades a course
Expand Down
2 changes: 1 addition & 1 deletion lms/djangoapps/gating/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
log = logging.getLogger(__name__)


@task()
@task
@set_code_owner_attribute
def task_evaluate_subsection_completion_milestones(course_id, block_id, user_id):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def enqueue_all_shuffled_tasks(self, options):
"""
Enqueue all tasks, in shuffled order.
"""
task_options = {'routing_key': options['routing_key']} if options.get('routing_key') else {}
task_options = {'queue': options['routing_key']} if options.get('routing_key') else {}
for seq_id, kwargs in enumerate(self._shuffled_task_kwargs(options)):
kwargs['seq_id'] = seq_id
result = tasks.compute_grades_for_course_v2.apply_async(kwargs=kwargs, **task_options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,19 @@ def test_tasks_fired(self, estimate_first_attempted, mock_task):
# Order doesn't matter, but can't use a set because dicts aren't hashable
expected = [
({
'routing_key': 'key',
'queue': 'key',
'kwargs': _kwargs(self.course_keys[0], 0)
},),
({
'routing_key': 'key',
'queue': 'key',
'kwargs': _kwargs(self.course_keys[0], 2)
},),
({
'routing_key': 'key',
'queue': 'key',
'kwargs': _kwargs(self.course_keys[3], 0)
},),
({
'routing_key': 'key',
'queue': 'key',
'kwargs': _kwargs(self.course_keys[3], 2)
},),
]
Expand Down
8 changes: 3 additions & 5 deletions lms/djangoapps/grades/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
SUBSECTION_GRADE_TIMEOUT_SECONDS = 300


@task(base=LoggedPersistOnFailureTask, routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY)
@task(base=LoggedPersistOnFailureTask)
@set_code_owner_attribute
def compute_all_grades_for_course(**kwargs):
"""
Expand All @@ -74,7 +74,7 @@ def compute_all_grades_for_course(**kwargs):
'batch_size': batch_size,
})
compute_grades_for_course_v2.apply_async(
kwargs=kwargs, routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY
kwargs=kwargs, queue=settings.POLICY_CHANGE_GRADES_ROUTING_KEY
)


Expand Down Expand Up @@ -138,7 +138,6 @@ def compute_grades_for_course(course_key, offset, batch_size, **kwargs): # pyli
time_limit=SUBSECTION_GRADE_TIMEOUT_SECONDS,
max_retries=2,
default_retry_delay=RETRY_DELAY_SECONDS,
routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY
)
@set_code_owner_attribute
def recalculate_course_and_subsection_grades_for_user(self, **kwargs): # pylint: disable=unused-argument
Expand Down Expand Up @@ -179,8 +178,7 @@ def recalculate_course_and_subsection_grades_for_user(self, **kwargs): # pylint
base=LoggedPersistOnFailureTask,
time_limit=SUBSECTION_GRADE_TIMEOUT_SECONDS,
max_retries=2,
default_retry_delay=RETRY_DELAY_SECONDS,
routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY
default_retry_delay=RETRY_DELAY_SECONDS
)
@set_code_owner_attribute
def recalculate_subsection_grade_v3(self, **kwargs):
Expand Down
7 changes: 3 additions & 4 deletions lms/djangoapps/instructor_task/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ def send_bulk_course_email(entry_id, _xmodule_instance_args):
@task(
name='lms.djangoapps.instructor_task.tasks.calculate_problem_responses_csv.v2',
base=BaseInstructorTask,
routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY,
)
@set_code_owner_attribute
def calculate_problem_responses_csv(entry_id, xmodule_instance_args):
Expand All @@ -182,7 +181,7 @@ def calculate_problem_responses_csv(entry_id, xmodule_instance_args):
return run_main_task(entry_id, task_fn, action_name)


@task(base=BaseInstructorTask, routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY)
@task(base=BaseInstructorTask)
@set_code_owner_attribute
def calculate_grades_csv(entry_id, xmodule_instance_args):
"""
Expand All @@ -199,7 +198,7 @@ def calculate_grades_csv(entry_id, xmodule_instance_args):
return run_main_task(entry_id, task_fn, action_name)


@task(base=BaseInstructorTask, routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY)
@task(base=BaseInstructorTask)
@set_code_owner_attribute
def calculate_problem_grade_report(entry_id, xmodule_instance_args):
"""
Expand Down Expand Up @@ -269,7 +268,7 @@ def calculate_may_enroll_csv(entry_id, xmodule_instance_args):
return run_main_task(entry_id, task_fn, action_name)


@task(base=BaseInstructorTask, routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY)
@task(base=BaseInstructorTask)
@set_code_owner_attribute
def generate_certificates(entry_id, xmodule_instance_args):
"""
Expand Down
5 changes: 1 addition & 4 deletions lms/djangoapps/verify_student/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
from common.djangoapps.edxmako.shortcuts import render_to_string
from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers

ACE_ROUTING_KEY = getattr(settings, 'ACE_ROUTING_KEY', None)
SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY = getattr(settings, 'SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY', None)
log = logging.getLogger(__name__)


Expand Down Expand Up @@ -74,7 +72,7 @@ def after_return(self, status, retval, task_id, args, kwargs, einfo):
)


@task(routing_key=ACE_ROUTING_KEY)
@task
@set_code_owner_attribute
def send_verification_status_email(context):
"""
Expand All @@ -101,7 +99,6 @@ def send_verification_status_email(context):
bind=True,
default_retry_delay=settings.SOFTWARE_SECURE_REQUEST_RETRY_DELAY,
max_retries=settings.SOFTWARE_SECURE_RETRY_MAX_ATTEMPTS,
routing_key=SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY,
)
@set_code_owner_attribute
def send_request_to_ss_for_user(self, user_verification_id, copy_id_photo_from):
Expand Down
Loading