Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform parallel report merging within the Report lock #762

Merged
merged 1 commit into from
Oct 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 67 additions & 47 deletions tasks/upload_finisher.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import functools
import json
import logging
Expand All @@ -6,6 +7,7 @@
from enum import Enum

from redis.exceptions import LockError
from redis.lock import Lock
from shared.celery_config import (
compute_comparison_task_name,
notify_task_name,
Expand Down Expand Up @@ -42,7 +44,7 @@
from tasks.base import BaseCodecovTask
from tasks.parallel_verification import parallel_verification_task
from tasks.upload_clean_labels_index import task_name as clean_labels_index_task_name
from tasks.upload_processor import UploadProcessorTask
from tasks.upload_processor import UPLOAD_PROCESSING_LOCK_NAME, UploadProcessorTask

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -148,62 +150,70 @@ def run_impl(
],
}

report_service = ReportService(commit_yaml)
report = self.merge_incremental_reports(
commit_yaml,
repository,
commit,
report_service,
processing_results,
parallel_processing,
report_lock = (
acquire_report_lock(repoid, commitid, self.hard_time_limit_task)
if parallel_processing is ParallelProcessing.PARALLEL
else contextlib.nullcontext()
)

log.info(
"Saving combined report",
extra=dict(
repoid=repoid,
commit=commitid,
processing_results=processing_results,
parent_task=self.request.parent_id,
),
)

if parallel_processing is ParallelProcessing.PARALLEL:
pr = processing_results["processings_so_far"][0]["arguments"].get("pr")
processor_task = UploadProcessorTask()
processor_task.save_report_results(
db_session,
report_service,
with report_lock:
report_service = ReportService(commit_yaml)
report = self.merge_incremental_reports(
commit_yaml,
repository,
commit,
report,
pr,
report_code,
report_service,
processing_results,
parallel_processing,
)

else:
parallel_paths = report_service.save_parallel_report_to_archive(
commit, report, report_code
)
# now that we've built the report and stored it to GCS, we have what we need to
# compare the results with the current upload pipeline. We end execution of the
# finisher task here so that we don't cause any additional side-effects

# The verification task that will compare the results of the serial flow and
# the parallel flow, and log the result to determine if parallel flow is
# working properly.
parallel_verification_task.apply_async(
kwargs=dict(
log.info(
"Saving combined report",
extra=dict(
repoid=repoid,
commitid=commitid,
commit_yaml=commit_yaml,
report_code=report_code,
parallel_paths=parallel_paths,
commit=commitid,
processing_results=processing_results,
parent_task=self.request.parent_id,
),
)

return
if parallel_processing is ParallelProcessing.PARALLEL:
pr = processing_results["processings_so_far"][0]["arguments"].get(
"pr"
)
processor_task = UploadProcessorTask()
processor_task.save_report_results(
db_session,
report_service,
repository,
commit,
report,
pr,
report_code,
)

else:
parallel_paths = report_service.save_parallel_report_to_archive(
commit, report, report_code
)
# now that we've built the report and stored it to GCS, we have what we need to
# compare the results with the current upload pipeline. We end execution of the
# finisher task here so that we don't cause any additional side-effects

# The verification task that will compare the results of the serial flow and
# the parallel flow, and log the result to determine if parallel flow is
# working properly.
parallel_verification_task.apply_async(
kwargs=dict(
repoid=repoid,
commitid=commitid,
commit_yaml=commit_yaml,
report_code=report_code,
parallel_paths=parallel_paths,
processing_results=processing_results,
),
)

return

lock_name = f"upload_finisher_lock_{repoid}_{commitid}"
redis_connection = get_redis_connection()
Expand Down Expand Up @@ -654,6 +664,16 @@ def merge_report(cumulative_report: Report, obj):
upload_finisher_task = celery_app.tasks[RegisteredUploadTask.name]


def acquire_report_lock(repoid: int, commitid: str, hard_time_limit: int) -> Lock:
lock_name = UPLOAD_PROCESSING_LOCK_NAME(repoid, commitid)
redis_connection = get_redis_connection()
return redis_connection.lock(
lock_name,
timeout=max(60 * 5, hard_time_limit),
blocking_timeout=5,
)


# TODO: maybe move this to `shared` if it turns out to be a better place for this
def change_sessionid(report: Report, old_id: int, new_id: int):
"""
Expand Down
Loading