Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check for pending uploads outside of lock #628

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tasks/tests/unit/test_upload_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,7 @@ def test_run_impl_currently_processing(self, dbsession, mocker, mock_redis):
mocked_run_impl_within_lock = mocker.patch.object(
UploadTask, "run_impl_within_lock", return_value=True
)
mock_redis.keys[f"uploads/{commit.repoid}/{commit.commitid}"] = ["something"]
task = UploadTask()
task.request.retries = 0
with pytest.raises(Retry):
Expand All @@ -1182,6 +1183,7 @@ def test_run_impl_currently_processing_second_retry(
mocked_run_impl_within_lock = mocker.patch.object(
UploadTask, "run_impl_within_lock", return_value={"some": "value"}
)
mock_redis.keys[f"uploads/{commit.repoid}/{commit.commitid}"] = ["something"]
task = UploadTask()
task.request.retries = 1
result = task.run_impl(dbsession, commit.repoid, commit.commitid)
Expand Down
55 changes: 32 additions & 23 deletions tasks/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,14 @@
report_type=ReportType(report_type),
report_code=report_code,
)
lock_name = upload_context.lock_name("upload")

if not upload_context.has_pending_jobs():
log.info("No pending jobs. Upload task is done.")
return {
"was_setup": False,
"was_updated": False,
"tasks_were_scheduled": False,
}

if upload_context.is_currently_processing() and self.request.retries == 0:
log.info(
Expand All @@ -300,18 +307,40 @@
repoid=repoid,
commit=commitid,
report_type=report_type,
has_pending_jobs=upload_context.has_pending_jobs(),
),
)
upload_context.prepare_kwargs_for_retry(kwargs)
self.retry(countdown=60, kwargs=kwargs)

if retry_countdown := _should_debounce_processing(upload_context):
log.info(
"Retrying due to very recent uploads.",
extra=dict(
repoid=upload_context.repoid,
commit=upload_context.commitid,
report_type=upload_context.report_type.value,
countdown=retry_countdown,
),
)
upload_context.prepare_kwargs_for_retry(kwargs)
self.retry(countdown=retry_countdown, kwargs=kwargs)

lock_name = upload_context.lock_name("upload")
try:
with upload_context.redis_connection.lock(
lock_name,
timeout=max(300, self.hard_time_limit_task),
blocking_timeout=5,
):
# Check whether a different `Upload` task has "stolen" our uploads
if not upload_context.has_pending_jobs():
log.info("No pending jobs. Upload task is done.")
return {

Check warning on line 338 in tasks/upload.py

View check run for this annotation

Codecov Notifications / codecov/patch

tasks/upload.py#L337-L338

Added lines #L337 - L338 were not covered by tests
"was_setup": False,
"was_updated": False,
"tasks_were_scheduled": False,
}

return self.run_impl_within_lock(
db_session,
upload_context,
Expand Down Expand Up @@ -379,27 +408,6 @@
report_code=upload_context.report_code,
),
)
if not upload_context.has_pending_jobs():
log.info("No pending jobs. Upload task is done.")
return {
"was_setup": False,
"was_updated": False,
"tasks_were_scheduled": False,
}

if retry_countdown := _should_debounce_processing(upload_context):
log.info(
"Retrying due to very recent uploads.",
extra=dict(
repoid=upload_context.repoid,
commit=upload_context.commitid,
report_type=upload_context.report_type.value,
countdown=retry_countdown,
),
)
upload_context.prepare_kwargs_for_retry(kwargs)
self.retry(countdown=retry_countdown, kwargs=kwargs)

repoid = upload_context.repoid
commitid = upload_context.commitid
report_type = upload_context.report_type
Expand Down Expand Up @@ -529,6 +537,7 @@

normalized_arguments["upload_pk"] = upload.id_
argument_list.append(normalized_arguments)

if argument_list:
db_session.commit()
self.schedule_task(
Expand Down
Loading