diff --git a/tasks/tests/unit/test_upload_task.py b/tasks/tests/unit/test_upload_task.py index 0c7a24055..13b9304d3 100644 --- a/tasks/tests/unit/test_upload_task.py +++ b/tasks/tests/unit/test_upload_task.py @@ -1163,6 +1163,7 @@ def test_run_impl_currently_processing(self, dbsession, mocker, mock_redis): mocked_run_impl_within_lock = mocker.patch.object( UploadTask, "run_impl_within_lock", return_value=True ) + mock_redis.keys[f"uploads/{commit.repoid}/{commit.commitid}"] = ["something"] task = UploadTask() task.request.retries = 0 with pytest.raises(Retry): @@ -1182,6 +1183,7 @@ def test_run_impl_currently_processing_second_retry( mocked_run_impl_within_lock = mocker.patch.object( UploadTask, "run_impl_within_lock", return_value={"some": "value"} ) + mock_redis.keys[f"uploads/{commit.repoid}/{commit.commitid}"] = ["something"] task = UploadTask() task.request.retries = 1 result = task.run_impl(dbsession, commit.repoid, commit.commitid) diff --git a/tasks/upload.py b/tasks/upload.py index ad17fbd17..2a861c787 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -291,7 +291,14 @@ def run_impl( report_type=ReportType(report_type), report_code=report_code, ) - lock_name = upload_context.lock_name("upload") + + if not upload_context.has_pending_jobs(): + log.info("No pending jobs. Upload task is done.") + return { + "was_setup": False, + "was_updated": False, + "tasks_were_scheduled": False, + } if upload_context.is_currently_processing() and self.request.retries == 0: log.info( @@ -300,18 +307,40 @@ def run_impl( repoid=repoid, commit=commitid, report_type=report_type, - has_pending_jobs=upload_context.has_pending_jobs(), ), ) upload_context.prepare_kwargs_for_retry(kwargs) self.retry(countdown=60, kwargs=kwargs) + if retry_countdown := _should_debounce_processing(upload_context): + log.info( + "Retrying due to very recent uploads.", + extra=dict( + repoid=upload_context.repoid, + commit=upload_context.commitid, + report_type=upload_context.report_type.value, + countdown=retry_countdown, + ), + ) + upload_context.prepare_kwargs_for_retry(kwargs) + self.retry(countdown=retry_countdown, kwargs=kwargs) + + lock_name = upload_context.lock_name("upload") try: with upload_context.redis_connection.lock( lock_name, timeout=max(300, self.hard_time_limit_task), blocking_timeout=5, ): + # Check whether a different `Upload` task has "stolen" our uploads + if not upload_context.has_pending_jobs(): + log.info("No pending jobs. Upload task is done.") + return { + "was_setup": False, + "was_updated": False, + "tasks_were_scheduled": False, + } + return self.run_impl_within_lock( db_session, upload_context, @@ -379,27 +408,6 @@ def run_impl_within_lock( report_code=upload_context.report_code, ), ) - if not upload_context.has_pending_jobs(): - log.info("No pending jobs. Upload task is done.") - return { - "was_setup": False, - "was_updated": False, - "tasks_were_scheduled": False, - } - - if retry_countdown := _should_debounce_processing(upload_context): - log.info( - "Retrying due to very recent uploads.", - extra=dict( - repoid=upload_context.repoid, - commit=upload_context.commitid, - report_type=upload_context.report_type.value, - countdown=retry_countdown, - ), - ) - upload_context.prepare_kwargs_for_retry(kwargs) - self.retry(countdown=retry_countdown, kwargs=kwargs) - repoid = upload_context.repoid commitid = upload_context.commitid report_type = upload_context.report_type @@ -529,6 +537,7 @@ def run_impl_within_lock( normalized_arguments["upload_pk"] = upload.id_ argument_list.append(normalized_arguments) + if argument_list: db_session.commit() self.schedule_task(