Skip to content

Commit

Permalink
Merge pull request #11525 from bjester/soud-enqueue-logging
Browse files Browse the repository at this point in the history
Misc fixes for LOD syncing and upgrade Morango
  • Loading branch information
bjester authored Nov 17, 2023
2 parents cb9d02d + 193a675 commit 0ce8716
Show file tree
Hide file tree
Showing 7 changed files with 446 additions and 56 deletions.
4 changes: 3 additions & 1 deletion kolibri/core/auth/kolibri_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ def handle_initial(self, context):
"""
:type context: morango.sync.context.LocalSessionContext
"""
if context.is_receiver:
from kolibri.core.device.utils import device_provisioned

if context.is_receiver and device_provisioned():
is_pull = context.is_pull
is_push = context.is_push
sync_filter = str(context.filter)
Expand Down
47 changes: 26 additions & 21 deletions kolibri/core/auth/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from kolibri.core.serializers import HexOnlyUUIDField
from kolibri.core.tasks.decorators import register_task
from kolibri.core.tasks.exceptions import JobNotFound
from kolibri.core.tasks.exceptions import JobRunning
from kolibri.core.tasks.job import JobStatus
from kolibri.core.tasks.job import Priority
from kolibri.core.tasks.job import State
Expand Down Expand Up @@ -424,6 +425,7 @@ def validate(self, data):
queue=soud_sync_queue,
priority=Priority.HIGH,
status_fn=status_fn,
long_running=True,
)
def soud_sync_processing():
# run processing
Expand All @@ -433,37 +435,39 @@ def soud_sync_processing():
if next_run is not None:
job = get_current_job()
job.retry_in(next_run)
else:
logger.info("Skipping enqueue of SoUD sync processing: no attempts remaining")


def enqueue_soud_sync_processing(force=False):
def enqueue_soud_sync_processing():
"""
Enqueue a task to process SoUD syncs, if necessary
"""
next_run = soud.get_time_to_next_attempt()
if next_run is None:
# No need to enqueue, as there is no next run
logger.info("Skipping enqueue of SoUD sync processing: no eligible syncs")
return

if force:
job_storage.cancel_if_exists(SOUD_SYNC_PROCESSING_JOB_ID)
else:
# Check if there is already an enqueued job
try:
converted_next_run = naive_utc_datetime(timezone.now() + next_run)
orm_job = job_storage.get_orm_job(SOUD_SYNC_PROCESSING_JOB_ID)
if (
orm_job.state == State.RUNNING
or orm_job.state == State.QUEUED
and orm_job.scheduled_time <= converted_next_run
):
# Already queued sooner or at the same time as the next run
return
# Otherwise, cancel the existing job, and re-enqueue
job_storage.cancel_if_exists(SOUD_SYNC_PROCESSING_JOB_ID)
except JobNotFound:
pass

soud_sync_processing.enqueue_in(next_run)
# Check if there is already an enqueued job
try:
converted_next_run = naive_utc_datetime(timezone.now() + next_run)
orm_job = job_storage.get_orm_job(SOUD_SYNC_PROCESSING_JOB_ID)
if (
orm_job.state not in (State.COMPLETED, State.FAILED, State.CANCELED)
and orm_job.scheduled_time <= converted_next_run
):
# Already queued sooner or at the same time as the next run
logger.info("Skipping enqueue of SoUD sync processing: scheduled sooner")
return
except JobNotFound:
pass

logger.info("Enqueuing SoUD sync processing in {}".format(next_run))
try:
soud_sync_processing.enqueue_in(next_run)
except JobRunning:
logger.info("Skipping enqueue of SoUD sync processing: already running")


@register_task(
Expand Down Expand Up @@ -578,6 +582,7 @@ def validate(self, data):
queue=soud_sync_queue,
permission_classes=[IsSuperAdmin() | NotProvisioned()],
status_fn=status_fn,
long_running=True,
)
def peeruserimport(command, **kwargs):
call_command(command, **kwargs)
Expand Down
66 changes: 35 additions & 31 deletions kolibri/core/auth/test/test_auth_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from kolibri.core.discovery.models import NetworkLocation
from kolibri.core.discovery.utils.network.errors import NetworkLocationNotFound
from kolibri.core.discovery.utils.network.errors import ResourceGoneError
from kolibri.core.tasks.exceptions import JobRunning
from kolibri.core.tasks.job import Job
from kolibri.core.tasks.job import State
from kolibri.utils.time_utils import naive_utc_datetime
Expand Down Expand Up @@ -692,31 +693,6 @@ def test_validate_and_create_sync_credentials_no_credentials(


class SoudTasksTestCase(TestCase):
@patch("kolibri.core.auth.tasks.soud_sync_processing")
@patch("kolibri.core.auth.tasks.soud")
def test_enqueue_soud_sync_processing__none__forced(self, mock_soud, mock_task):
mock_soud.get_time_to_next_attempt.return_value = None
enqueue_soud_sync_processing(force=True)
mock_task.enqueue_in.assert_not_called()

@patch("kolibri.core.auth.tasks.soud_sync_processing")
@patch("kolibri.core.auth.tasks.soud")
def test_enqueue_soud_sync_processing__now__forced(self, mock_soud, mock_task):
mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(seconds=0)
enqueue_soud_sync_processing(force=True)
mock_task.enqueue_in.assert_called_once_with(datetime.timedelta(seconds=0))

@patch("kolibri.core.auth.tasks.job_storage")
@patch("kolibri.core.auth.tasks.soud_sync_processing")
@patch("kolibri.core.auth.tasks.soud")
def test_enqueue_soud_sync_processing__future__forced(
self, mock_soud, mock_task, mock_job_storage
):
mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(seconds=15)
enqueue_soud_sync_processing(force=True)
mock_job_storage.cancel_if_exists.assert_called_once_with("50")
mock_task.enqueue_in.assert_called_once_with(datetime.timedelta(seconds=15))

@patch("kolibri.core.auth.tasks.job_storage")
@patch("kolibri.core.auth.tasks.soud_sync_processing")
@patch("kolibri.core.auth.tasks.soud")
Expand All @@ -728,7 +704,6 @@ def test_enqueue_soud_sync_processing__future__scheduled(
mock_job.state = State.QUEUED
mock_job.scheduled_time = naive_utc_datetime(timezone.now())
enqueue_soud_sync_processing()
mock_job_storage.cancel_if_exists.assert_not_called()
mock_task.enqueue_in.assert_not_called()

@patch("kolibri.core.auth.tasks.job_storage")
Expand All @@ -737,14 +712,11 @@ def test_enqueue_soud_sync_processing__future__scheduled(
def test_enqueue_soud_sync_processing__future__running(
self, mock_soud, mock_task, mock_job_storage
):
mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(
seconds=-10
)
mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(seconds=1)
mock_job = mock_job_storage.get_orm_job.return_value
mock_job.state = State.RUNNING
mock_job.scheduled_time = naive_utc_datetime(timezone.now())
enqueue_soud_sync_processing()
mock_job_storage.cancel_if_exists.assert_not_called()
mock_task.enqueue_in.assert_not_called()

@patch("kolibri.core.auth.tasks.job_storage")
Expand All @@ -760,7 +732,39 @@ def test_enqueue_soud_sync_processing__future__reschedule(
timezone.now() + datetime.timedelta(seconds=15)
)
enqueue_soud_sync_processing()
mock_job_storage.cancel_if_exists.assert_called_once_with("50")
mock_task.enqueue_in.assert_called_once_with(datetime.timedelta(seconds=10))

@patch("kolibri.core.auth.tasks.job_storage")
@patch("kolibri.core.auth.tasks.soud_sync_processing")
@patch("kolibri.core.auth.tasks.soud")
def test_enqueue_soud_sync_processing__completed__enqueue(
self, mock_soud, mock_task, mock_job_storage
):
mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(seconds=10)
mock_job = mock_job_storage.get_orm_job.return_value
mock_job.state = State.COMPLETED
# far in the past
mock_job.scheduled_time = naive_utc_datetime(
timezone.now() - datetime.timedelta(seconds=100)
)
enqueue_soud_sync_processing()
mock_task.enqueue_in.assert_called_once_with(datetime.timedelta(seconds=10))

@patch("kolibri.core.auth.tasks.job_storage")
@patch("kolibri.core.auth.tasks.soud_sync_processing")
@patch("kolibri.core.auth.tasks.soud")
def test_enqueue_soud_sync_processing__race__already_running(
self, mock_soud, mock_task, mock_job_storage
):
mock_soud.get_time_to_next_attempt.return_value = datetime.timedelta(seconds=10)
mock_job = mock_job_storage.get_orm_job.return_value
mock_job.state = State.COMPLETED
# far in the past
mock_job.scheduled_time = naive_utc_datetime(
timezone.now() - datetime.timedelta(seconds=100)
)
mock_task.enqueue_in.side_effect = JobRunning()
enqueue_soud_sync_processing()
mock_task.enqueue_in.assert_called_once_with(datetime.timedelta(seconds=10))

@patch("kolibri.core.auth.tasks.get_current_job")
Expand Down
2 changes: 2 additions & 0 deletions kolibri/core/auth/test/test_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
from django.test import TestCase
from morango.sync.context import LocalSessionContext

from .helpers import provision_device
from kolibri.core.auth.kolibri_plugin import AuthSyncHook
from kolibri.core.auth.kolibri_plugin import CleanUpTaskOperation


@mock.patch("kolibri.core.auth.kolibri_plugin.cleanupsync")
class CleanUpTaskOperationTestCase(TestCase):
def setUp(self):
provision_device()
self.context = mock.MagicMock(
spec=LocalSessionContext(),
filter=uuid.uuid4().hex,
Expand Down
17 changes: 15 additions & 2 deletions kolibri/core/device/soud.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def get_time_to_next_attempt():
)
if attempt_at is None:
return None
return datetime.timedelta(seconds=attempt_at - time.time())
return datetime.timedelta(seconds=max(attempt_at - time.time(), 0))


def attempt_execute_window():
Expand All @@ -332,6 +332,15 @@ def execute_syncs():
"""
Core SoUD sync processing logic that processes any syncs that
"""
# since there should only ever be one processing job running at a time, if we encounter any in
# the queue that are marked as syncing, we should reset their status to pending because it must
# mean that the previous job was terminated unexpectedly
SyncQueue.objects.filter(status=SyncQueueStatus.Syncing,).update(
status=SyncQueueStatus.Pending,
updated=time.time(),
keep_alive=0,
)

base_qs = (
get_eligible_syncs()
.order_by("attempt_at")
Expand Down Expand Up @@ -376,6 +385,10 @@ def execute_sync(context):
sync_queue.save()

try:
# context filters the network location to only those marked available
if not context.network_location:
raise NetworkLocation.DoesNotExist

call_command(
command,
user=context.user_id,
Expand All @@ -386,10 +399,10 @@ def execute_sync(context):
**resume_kwargs
)
except NetworkLocation.DoesNotExist:
# network location may have become unavailable
cleanup = True
logger.debug("{} Network location unavailable".format(context))
sync_queue.status = SyncQueueStatus.Pending
sync_queue.increment_and_backoff_next_attempt()
except Exception as e:
cleanup = True
if isinstance(e, MorangoResumeSyncError):
Expand Down
Loading

0 comments on commit 0ce8716

Please sign in to comment.