Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 0 additions & 33 deletions cms/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,8 @@

import os

from openedx.core.lib.celery.routers import AlternateEnvironmentRouter

# Set the default Django settings module for the 'celery' program
# and then instantiate the Celery singleton.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'cms.envs.production')
from openedx.core.lib.celery import APP # pylint: disable=wrong-import-position,unused-import

# Import after autodiscovery has had a chance to connect to the import_module signal
# so celery doesn't miss any apps getting installed.
from django.conf import settings # pylint: disable=wrong-import-position,wrong-import-order


class Router(AlternateEnvironmentRouter):
"""
An implementation of AlternateEnvironmentRouter, for routing tasks to non-cms queues.
"""

@property
def alternate_env_tasks(self):
"""
Defines alternate environment tasks, as a dict of form { task_name: alternate_queue }
"""
# The tasks below will be routed to the default lms queue.
return {
'completion_aggregator.tasks.update_aggregators': 'lms',
'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache': 'lms',
'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2': 'lms',
}

@property
def explicit_queues(self):
"""
Defines specific queues for tasks to run in (typically outside of the cms environment),
as a dict of form { task_name: queue_name }.
"""
return {
'lms.djangoapps.grades.tasks.compute_all_grades_for_course': settings.POLICY_CHANGE_GRADES_ROUTING_KEY,
}
9 changes: 6 additions & 3 deletions cms/djangoapps/contentstore/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ def clone_instance(instance, field_values):
return instance


@task()
@task
@set_code_owner_attribute
def rerun_course(source_course_key_string, destination_course_key_string, user_id, fields=None):
"""
Reruns a course in a new celery task.
Expand Down Expand Up @@ -168,7 +169,8 @@ def _parse_time(time_isoformat):
).replace(tzinfo=UTC)


@task(routing_key=settings.UPDATE_SEARCH_INDEX_JOB_QUEUE)
@task
@set_code_owner_attribute
def update_search_index(course_id, triggered_time_isoformat):
""" Updates course search index. """
try:
Expand All @@ -192,7 +194,8 @@ def update_search_index(course_id, triggered_time_isoformat):
LOGGER.debug(u'Search indexing successful for complete course %s', course_id)


@task()
@task
@set_code_owner_attribute
def update_library_index(library_id, triggered_time_isoformat):
""" Updates course search index. """
try:
Expand Down
19 changes: 18 additions & 1 deletion cms/envs/production.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def get_env_setting(setting):
DEFAULT_PRIORITY_QUEUE: {}
}

CELERY_ROUTES = "{}celery.Router".format(QUEUE_VARIANT)
CELERY_ROUTES = "openedx.core.lib.celery.routers.route_task"

# STATIC_URL_BASE specifies the base url to use for static files
STATIC_URL_BASE = ENV_TOKENS.get('STATIC_URL_BASE', None)
Expand Down Expand Up @@ -573,3 +573,20 @@ def get_env_setting(setting):
LOGO_URL_PNG = ENV_TOKENS.get('LOGO_URL_PNG', LOGO_URL_PNG)
LOGO_TRADEMARK_URL = ENV_TOKENS.get('LOGO_TRADEMARK_URL', LOGO_TRADEMARK_URL)
FAVICON_URL = ENV_TOKENS.get('FAVICON_URL', FAVICON_URL)

######################## CELERY ROTUING ########################

# Defines alternate environment tasks, as a dict of form { task_name: alternate_queue }
ALTERNATE_ENV_TASKS = {
'completion_aggregator.tasks.update_aggregators': 'lms',
'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache': 'lms',
'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2': 'lms',
}

# Defines the task -> alternate worker queue to be used when routing.
EXPLICIT_QUEUES = {
'lms.djangoapps.grades.tasks.compute_all_grades_for_course': {
'queue': POLICY_CHANGE_GRADES_ROUTING_KEY},
'cms.djangoapps.contentstore.tasks.update_search_index': {
'queue': UPDATE_SEARCH_INDEX_JOB_QUEUE},
}
4 changes: 1 addition & 3 deletions common/djangoapps/entitlements/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
from common.djangoapps.entitlements.models import CourseEntitlement

LOGGER = get_task_logger(__name__)
# Under cms the following setting is not defined, leading to errors during tests.
ROUTING_KEY = getattr(settings, 'ENTITLEMENTS_EXPIRATION_ROUTING_KEY', None)

# Maximum number of retries before giving up on awarding credentials.
# For reference, 11 retries with exponential backoff yields a maximum waiting
# time of 2047 seconds (about 30 minutes). Setting this to None could yield
Expand All @@ -22,7 +21,6 @@
@task(
bind=True,
ignore_result=True,
routing_key=ROUTING_KEY,
name='entitlements.expire_old_entitlements',
)
def expire_old_entitlements(self, start, end, logid='...'):
Expand Down
15 changes: 0 additions & 15 deletions lms/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,10 @@
Taken from: https://celery.readthedocs.org/en/latest/django/first-steps-with-django.html
"""


import os

from openedx.core.lib.celery.routers import AlternateEnvironmentRouter

# Set the default Django settings module for the 'celery' program
# and then instantiate the Celery singleton.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lms.envs.production')
from openedx.core.lib.celery import APP # pylint: disable=wrong-import-position,unused-import


class Router(AlternateEnvironmentRouter):
"""
An implementation of AlternateEnvironmentRouter, for routing tasks to non-cms queues.
"""

@property
def alternate_env_tasks(self):
"""
Defines alternate environment tasks, as a dict of form { task_name: alternate_queue }
"""
return {}
3 changes: 0 additions & 3 deletions lms/djangoapps/bulk_email/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,6 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)

total_recipients = combined_set.count()

routing_key = settings.BULK_EMAIL_ROUTING_KEY

# Weird things happen if we allow empty querysets as input to emailing subtasks
# The task appears to hang at "0 out of 0 completed" and never finishes.
if total_recipients == 0:
Expand All @@ -217,7 +215,6 @@ def _create_send_email_subtask(to_list, initial_subtask_status):
initial_subtask_status.to_dict(),
),
task_id=subtask_id,
routing_key=routing_key,
)
return new_subtask

Expand Down
4 changes: 2 additions & 2 deletions lms/djangoapps/discussion/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@


DEFAULT_LANGUAGE = 'en'
ROUTING_KEY = getattr(settings, 'ACE_ROUTING_KEY', None)


@task(base=LoggedTask)
Expand All @@ -60,7 +59,8 @@ class ResponseNotification(BaseMessageType):
pass


@task(base=LoggedTask, routing_key=ROUTING_KEY)
@task(base=LoggedTask)
@set_code_owner_attribute
def send_ace_message(context):
context['course_id'] = CourseKey.from_string(context['course_id'])

Expand Down
13 changes: 8 additions & 5 deletions lms/djangoapps/email_marketing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

log = logging.getLogger(__name__)
SAILTHRU_LIST_CACHE_KEY = "email.marketing.cache"
ACE_ROUTING_KEY = getattr(settings, 'ACE_ROUTING_KEY', None)


@task(bind=True, routing_key=ACE_ROUTING_KEY)
@task(bind=True)
@set_code_owner_attribute
def get_email_cookies_via_sailthru(self, user_email, post_parms):
"""
Adds/updates Sailthru cookie information for a new user.
Expand Down Expand Up @@ -61,7 +61,8 @@ def get_email_cookies_via_sailthru(self, user_email, post_parms):
return None


@task(bind=True, default_retry_delay=3600, max_retries=24, routing_key=ACE_ROUTING_KEY)
@task(bind=True, default_retry_delay=3600, max_retries=24)
@set_code_owner_attribute
def update_user(self, sailthru_vars, email, site=None, new_user=False, activation=False):
"""
Adds/updates Sailthru profile information for a user.
Expand Down Expand Up @@ -143,7 +144,8 @@ def is_default_site(site):
return not site or site.get('id') == settings.SITE_ID


@task(bind=True, default_retry_delay=3600, max_retries=24, routing_key=ACE_ROUTING_KEY)
@task(bind=True, default_retry_delay=3600, max_retries=24)
@set_code_owner_attribute
def update_user_email(self, new_email, old_email):
"""
Adds/updates Sailthru when a user email address is changed
Expand Down Expand Up @@ -303,7 +305,8 @@ def _retryable_sailthru_error(error):
return code == 9 or code == 43


@task(bind=True, routing_key=ACE_ROUTING_KEY)
@task(bind=True)
@set_code_owner_attribute
def update_course_enrollment(self, email, course_key, mode, site=None):
"""Adds/updates Sailthru when a user adds to cart/purchases/upgrades a course
Args:
Expand Down
3 changes: 2 additions & 1 deletion lms/djangoapps/gating/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
log = logging.getLogger(__name__)


@task()
@task
@set_code_owner_attribute
def task_evaluate_subsection_completion_milestones(course_id, block_id, user_id):
"""
Updates users' milestones related to completion of a subsection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def enqueue_all_shuffled_tasks(self, options):
"""
Enqueue all tasks, in shuffled order.
"""
task_options = {'routing_key': options['routing_key']} if options.get('routing_key') else {}
task_options = {'queue': options['routing_key']} if options.get('routing_key') else {}
for seq_id, kwargs in enumerate(self._shuffled_task_kwargs(options)):
kwargs['seq_id'] = seq_id
result = tasks.compute_grades_for_course_v2.apply_async(kwargs=kwargs, **task_options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,19 +92,19 @@ def test_tasks_fired(self, estimate_first_attempted, mock_task):
# Order doesn't matter, but can't use a set because dicts aren't hashable
expected = [
({
'routing_key': 'key',
'queue': 'key',
'kwargs': _kwargs(self.course_keys[0], 0)
},),
({
'routing_key': 'key',
'queue': 'key',
'kwargs': _kwargs(self.course_keys[0], 2)
},),
({
'routing_key': 'key',
'queue': 'key',
'kwargs': _kwargs(self.course_keys[3], 0)
},),
({
'routing_key': 'key',
'queue': 'key',
'kwargs': _kwargs(self.course_keys[3], 2)
},),
]
Expand Down
9 changes: 4 additions & 5 deletions lms/djangoapps/grades/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
SUBSECTION_GRADE_TIMEOUT_SECONDS = 300


@task(base=LoggedPersistOnFailureTask, routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY)
@task(base=LoggedPersistOnFailureTask)
@set_code_owner_attribute
def compute_all_grades_for_course(**kwargs):
"""
Compute grades for all students in the specified course.
Expand All @@ -69,7 +70,7 @@ def compute_all_grades_for_course(**kwargs):
'batch_size': batch_size,
})
compute_grades_for_course_v2.apply_async(
kwargs=kwargs, routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY
kwargs=kwargs, queue=settings.POLICY_CHANGE_GRADES_ROUTING_KEY
)


Expand Down Expand Up @@ -131,7 +132,6 @@ def compute_grades_for_course(course_key, offset, batch_size, **kwargs): # pyli
time_limit=SUBSECTION_GRADE_TIMEOUT_SECONDS,
max_retries=2,
default_retry_delay=RETRY_DELAY_SECONDS,
routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY
)
def recalculate_course_and_subsection_grades_for_user(self, **kwargs): # pylint: disable=unused-argument
"""
Expand Down Expand Up @@ -171,8 +171,7 @@ def recalculate_course_and_subsection_grades_for_user(self, **kwargs): # pylint
base=LoggedPersistOnFailureTask,
time_limit=SUBSECTION_GRADE_TIMEOUT_SECONDS,
max_retries=2,
default_retry_delay=RETRY_DELAY_SECONDS,
routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY
default_retry_delay=RETRY_DELAY_SECONDS
)
def recalculate_subsection_grade_v3(self, **kwargs):
"""
Expand Down
10 changes: 6 additions & 4 deletions lms/djangoapps/instructor_task/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ def send_bulk_course_email(entry_id, _xmodule_instance_args):
@task(
name='lms.djangoapps.instructor_task.tasks.calculate_problem_responses_csv.v2',
base=BaseInstructorTask,
routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY,
)
def calculate_problem_responses_csv(entry_id, xmodule_instance_args):
"""
Expand All @@ -175,7 +174,8 @@ def calculate_problem_responses_csv(entry_id, xmodule_instance_args):
return run_main_task(entry_id, task_fn, action_name)


@task(base=BaseInstructorTask, routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY)
@task(base=BaseInstructorTask)
@set_code_owner_attribute
def calculate_grades_csv(entry_id, xmodule_instance_args):
"""
Grade a course and push the results to an S3 bucket for download.
Expand All @@ -191,7 +191,8 @@ def calculate_grades_csv(entry_id, xmodule_instance_args):
return run_main_task(entry_id, task_fn, action_name)


@task(base=BaseInstructorTask, routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY)
@task(base=BaseInstructorTask)
@set_code_owner_attribute
def calculate_problem_grade_report(entry_id, xmodule_instance_args):
"""
Generate a CSV for a course containing all students' problem
Expand Down Expand Up @@ -256,7 +257,8 @@ def calculate_may_enroll_csv(entry_id, xmodule_instance_args):
return run_main_task(entry_id, task_fn, action_name)


@task(base=BaseInstructorTask, routing_key=settings.GRADES_DOWNLOAD_ROUTING_KEY)
@task(base=BaseInstructorTask)
@set_code_owner_attribute
def generate_certificates(entry_id, xmodule_instance_args):
"""
Grade students and generate certificates.
Expand Down
6 changes: 2 additions & 4 deletions lms/djangoapps/verify_student/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
from common.djangoapps.edxmako.shortcuts import render_to_string
from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers

ACE_ROUTING_KEY = getattr(settings, 'ACE_ROUTING_KEY', None)
SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY = getattr(settings, 'SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY', None)
log = logging.getLogger(__name__)


Expand Down Expand Up @@ -73,7 +71,8 @@ def after_return(self, status, retval, task_id, args, kwargs, einfo):
)


@task(routing_key=ACE_ROUTING_KEY)
@task
@set_code_owner_attribute
def send_verification_status_email(context):
"""
Spins a task to send verification status email to the learner
Expand All @@ -99,7 +98,6 @@ def send_verification_status_email(context):
bind=True,
default_retry_delay=settings.SOFTWARE_SECURE_REQUEST_RETRY_DELAY,
max_retries=settings.SOFTWARE_SECURE_RETRY_MAX_ATTEMPTS,
routing_key=SOFTWARE_SECURE_VERIFICATION_ROUTING_KEY,
)
def send_request_to_ss_for_user(self, user_verification_id, copy_id_photo_from):
"""
Expand Down
Loading