Skip to content

Commit

Permalink
Check for pending uploads outside of lock (#628)
Browse files Browse the repository at this point in the history
  • Loading branch information
Swatinem authored Aug 26, 2024
1 parent f9b65c7 commit 93c03f9
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 23 deletions.
2 changes: 2 additions & 0 deletions tasks/tests/unit/test_upload_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,7 @@ def test_run_impl_currently_processing(self, dbsession, mocker, mock_redis):
mocked_run_impl_within_lock = mocker.patch.object(
UploadTask, "run_impl_within_lock", return_value=True
)
mock_redis.keys[f"uploads/{commit.repoid}/{commit.commitid}"] = ["something"]
task = UploadTask()
task.request.retries = 0
with pytest.raises(Retry):
Expand All @@ -1182,6 +1183,7 @@ def test_run_impl_currently_processing_second_retry(
mocked_run_impl_within_lock = mocker.patch.object(
UploadTask, "run_impl_within_lock", return_value={"some": "value"}
)
mock_redis.keys[f"uploads/{commit.repoid}/{commit.commitid}"] = ["something"]
task = UploadTask()
task.request.retries = 1
result = task.run_impl(dbsession, commit.repoid, commit.commitid)
Expand Down
55 changes: 32 additions & 23 deletions tasks/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,14 @@ def run_impl(
report_type=ReportType(report_type),
report_code=report_code,
)
lock_name = upload_context.lock_name("upload")

if not upload_context.has_pending_jobs():
log.info("No pending jobs. Upload task is done.")
return {
"was_setup": False,
"was_updated": False,
"tasks_were_scheduled": False,
}

if upload_context.is_currently_processing() and self.request.retries == 0:
log.info(
Expand All @@ -300,18 +307,40 @@ def run_impl(
repoid=repoid,
commit=commitid,
report_type=report_type,
has_pending_jobs=upload_context.has_pending_jobs(),
),
)
upload_context.prepare_kwargs_for_retry(kwargs)
self.retry(countdown=60, kwargs=kwargs)

if retry_countdown := _should_debounce_processing(upload_context):
log.info(
"Retrying due to very recent uploads.",
extra=dict(
repoid=upload_context.repoid,
commit=upload_context.commitid,
report_type=upload_context.report_type.value,
countdown=retry_countdown,
),
)
upload_context.prepare_kwargs_for_retry(kwargs)
self.retry(countdown=retry_countdown, kwargs=kwargs)

lock_name = upload_context.lock_name("upload")
try:
with upload_context.redis_connection.lock(
lock_name,
timeout=max(300, self.hard_time_limit_task),
blocking_timeout=5,
):
# Check whether a different `Upload` task has "stolen" our uploads
if not upload_context.has_pending_jobs():
log.info("No pending jobs. Upload task is done.")
return {
"was_setup": False,
"was_updated": False,
"tasks_were_scheduled": False,
}

return self.run_impl_within_lock(
db_session,
upload_context,
Expand Down Expand Up @@ -379,27 +408,6 @@ def run_impl_within_lock(
report_code=upload_context.report_code,
),
)
if not upload_context.has_pending_jobs():
log.info("No pending jobs. Upload task is done.")
return {
"was_setup": False,
"was_updated": False,
"tasks_were_scheduled": False,
}

if retry_countdown := _should_debounce_processing(upload_context):
log.info(
"Retrying due to very recent uploads.",
extra=dict(
repoid=upload_context.repoid,
commit=upload_context.commitid,
report_type=upload_context.report_type.value,
countdown=retry_countdown,
),
)
upload_context.prepare_kwargs_for_retry(kwargs)
self.retry(countdown=retry_countdown, kwargs=kwargs)

repoid = upload_context.repoid
commitid = upload_context.commitid
report_type = upload_context.report_type
Expand Down Expand Up @@ -529,6 +537,7 @@ def run_impl_within_lock(

normalized_arguments["upload_pk"] = upload.id_
argument_list.append(normalized_arguments)

if argument_list:
db_session.commit()
self.schedule_task(
Expand Down

0 comments on commit 93c03f9

Please sign in to comment.