Skip to content

Commit

Permalink
Merge branch 'main' of github.com:codecov/worker into 2686-add-checkp…
Browse files Browse the repository at this point in the history
…oint-with-repoid
  • Loading branch information
adrian-codecov committed Oct 21, 2024
2 parents 2b92ba6 + 2ae6c6d commit 5514b97
Show file tree
Hide file tree
Showing 11 changed files with 651 additions and 713 deletions.
2 changes: 1 addition & 1 deletion database/tests/factories/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class Meta:
).hexdigest()
)
ci_passed = True
pullid = 1
pullid = None
timestamp = datetime(2019, 2, 1, 17, 59, 47)
author = factory.SubFactory(OwnerFactory)
repository = factory.SubFactory(RepositoryFactory)
Expand Down
21 changes: 6 additions & 15 deletions helpers/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,39 +68,30 @@ def __init__(
sentry_sdk.set_tag("owner_id", owner_id)
sentry_sdk.set_tag("repo_id", repo_id)
sentry_sdk.set_tag("commit_sha", commit_sha)
transaction = sentry_sdk.get_current_scope().transaction
if transaction is not None:
transaction.set_tag("owner_id", owner_id)
transaction.set_tag("repo_id", repo_id)
transaction.set_tag("commit_sha", commit_sha)

def populate(self):
if self.populated:
return

repo = None
commit = None
dbsession = get_db_session()

if self.repo_id:
if not self.owner_id:
repo = (
dbsession.query(Repository)
self.owner_id = (
dbsession.query(Repository.ownerid)
.filter(Repository.repoid == self.repo_id)
.first()
.first()[0]
)
self.owner_id = repo.ownerid

if self.commit_sha and not self.commit_id:
commit = (
dbsession.query(Commit)
self.commit_id = (
dbsession.query(Commit.id_)
.filter(
Commit.repoid == self.repo_id,
Commit.commitid == self.commit_sha,
)
.first()
.first()[0]
)
self.commit_id = commit.id_

self.populated = True

Expand Down
2 changes: 1 addition & 1 deletion requirements.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
https://github.com/codecov/opentelem-python/archive/refs/tags/v0.0.4a1.tar.gz#egg=codecovopentelem
https://github.com/codecov/shared/archive/f40874907d3ddbe6993c8016c3f09306cc14a9f8.tar.gz#egg=shared
https://github.com/codecov/shared/archive/795495e3d290c2f946792ee1a35f45fba5913c1b.tar.gz#egg=shared
https://github.com/codecov/test-results-parser/archive/ef39a0888acd62d02a316a852a15d755c74e78c6.tar.gz#egg=test-results-parser
https://github.com/codecov/timestring/archive/d37ceacc5954dff3b5bd2f887936a98a668dda42.tar.gz#egg=timestring
asgiref>=3.7.2
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ sentry-sdk[celery]==2.13.0
# via
# -r requirements.in
# shared
shared @ https://github.com/codecov/shared/archive/f40874907d3ddbe6993c8016c3f09306cc14a9f8.tar.gz
shared @ https://github.com/codecov/shared/archive/795495e3d290c2f946792ee1a35f45fba5913c1b.tar.gz
# via -r requirements.in
six==1.16.0
# via
Expand Down
73 changes: 41 additions & 32 deletions services/bundle_analysis/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,13 @@ def _get_parent_commit(
.first()
)

@sentry_sdk.trace
def _previous_bundle_analysis_report(
self,
bundle_loader: BundleAnalysisReportLoader,
commit: Commit,
head_bundle_report: BundleAnalysisReport,
) -> Optional[BundleAnalysisReport]:
head_bundle_report: BundleAnalysisReport | None,
) -> BundleAnalysisReport | None:
"""
Helper function to fetch the parent commit's BAR for the purpose of matching previous bundle's
Assets to the current one being parsed.
Expand Down Expand Up @@ -180,9 +181,44 @@ def _previous_bundle_analysis_report(

return bundle_loader.load(parent_commit_report.external_id)

@sentry_sdk.trace
def _attempt_init_from_previous_report(
self,
commit: Commit,
bundle_loader: BundleAnalysisReportLoader,
) -> BundleAnalysisReport:
"""Attempts to carry over parent bundle analysis report if current commit doesn't have a report.
Fallback to creating a fresh bundle analysis report if there is no previous report to carry over.
"""
# load a new copy of the previous bundle report into temp file
bundle_report = self._previous_bundle_analysis_report(
bundle_loader, commit, head_bundle_report=None
)
if bundle_report:
# query which bundle names has caching turned on
bundles_to_be_cached = CacheConfig.objects.filter(
is_caching=True,
repo_id=commit.repoid,
).values_list("bundle_name", flat=True)

# For each bundle:
# if caching is on then update bundle.is_cached property to true
# if caching is off then delete that bundle from the report
update_fields = {}
for bundle in bundle_report.bundle_reports():
if bundle.name in bundles_to_be_cached:
update_fields[bundle.name] = True
else:
bundle_report.delete_bundle_by_name(bundle.name)
if update_fields:
bundle_report.update_is_cached(update_fields)
return bundle_report
# fallback to create a fresh bundle analysis report if there is no previous report to carry over
return BundleAnalysisReport()

@sentry_sdk.trace
def process_upload(
self, commit: Commit, upload: Upload, compare_sha: Optional[str] = None
self, commit: Commit, upload: Upload, compare_sha: str | None = None
) -> ProcessingResult:
"""
Download and parse the data associated with the given upload and
Expand All @@ -195,37 +231,10 @@ def process_upload(

# fetch existing bundle report from storage
bundle_report = bundle_loader.load(commit_report.external_id)

# attempt to carry over parent bundle analysis report if commit doesn't have a report
if bundle_report is None:
# load a new copy of the previous bundle report into temp file
bundle_report = self._previous_bundle_analysis_report(
bundle_loader, commit, head_bundle_report=bundle_report
bundle_report = self._attempt_init_from_previous_report(
commit, bundle_loader
)
if bundle_report:
# query which bundle names has caching turned on
qs = CacheConfig.objects.filter(
is_caching=True,
repo_id=commit.repoid,
)
bundles_to_be_cached = [item.bundle_name for item in qs]

# For each bundle,
# if caching is on then update bundle.is_cached property to true
# if caching is off then delete that bundle from the report
update_fields = {}
for bundle in bundle_report.bundle_reports():
if bundle.name in bundles_to_be_cached:
update_fields[bundle.name] = True
else:
bundle_report.delete_bundle_by_name(bundle.name)
if update_fields:
bundle_report.update_is_cached(update_fields)

# fallback to create a fresh bundle analysis report if there is no previous
# report to carry over from
if bundle_report is None:
bundle_report = BundleAnalysisReport()

# download raw upload data to local tempfile
_, local_path = tempfile.mkstemp()
Expand Down
84 changes: 32 additions & 52 deletions tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
log_set_task_id,
log_set_task_name,
)
from helpers.metrics import metrics
from helpers.telemetry import MetricContext, TimeseriesTimer
from helpers.timeseries import timeseries_enabled

Expand All @@ -51,9 +50,7 @@ def on_timeout(self, soft: bool, timeout: int):
res = super().on_timeout(soft, timeout)
if not soft:
REQUEST_HARD_TIMEOUT_COUNTER.labels(task=self.name).inc()
metrics.incr(f"{self.metrics_prefix}.hardtimeout")
REQUEST_TIMEOUT_COUNTER.labels(task=self.name).inc()
metrics.incr(f"{self.metrics_prefix}.timeout")
return res


Expand Down Expand Up @@ -245,20 +242,13 @@ def _emit_queue_metrics(self):
enqueued_time = datetime.fromisoformat(created_timestamp)
now = datetime.now()
delta = now - enqueued_time
metrics.timing(f"{self.metrics_prefix}.time_in_queue", delta)

queue_name = self.request.get("delivery_info", {}).get("routing_key", None)
time_in_queue_timer = TASK_TIME_IN_QUEUE.labels(
task=self.name, queue=queue_name
) # TODO is None a valid label value
time_in_queue_timer.observe(delta.total_seconds())

if queue_name:
metrics.timing(f"worker.queues.{queue_name}.time_in_queue", delta)
metrics.timing(
f"{self.metrics_prefix}.{queue_name}.time_in_queue", delta
)

def run(self, *args, **kwargs):
task = get_current_task()

Expand All @@ -279,39 +269,32 @@ def run(self, *args, **kwargs):
owner_id=kwargs.get("ownerid"),
)

with TimeseriesTimer(
metric_context, f"{self.metrics_prefix}.full_runtime", sync=True
):
with self.task_full_runtime.time(): # Timer isn't tested
with metrics.timer(f"{self.metrics_prefix}.full"):
db_session = get_db_session()
try:
with TimeseriesTimer(
metric_context,
f"{self.metrics_prefix}.core_runtime",
sync=True,
):
with self.task_core_runtime.time(): # Timer isn't tested
with metrics.timer(f"{self.metrics_prefix}.run"):
return self.run_impl(db_session, *args, **kwargs)
except (DataError, IntegrityError):
log.exception(
"Errors related to the constraints of database happened",
extra=dict(task_args=args, task_kwargs=kwargs),
)
db_session.rollback()
self._rollback_django()
self.retry()
except SQLAlchemyError as ex:
self._analyse_error(ex, args, kwargs)
db_session.rollback()
self._rollback_django()
self.retry()
finally:
log_set_task_name(None)
log_set_task_id(None)
self.wrap_up_dbsession(db_session)
self._commit_django()
with self.task_full_runtime.time(): # Timer isn't tested
db_session = get_db_session()
try:
with TimeseriesTimer(
metric_context, f"{self.metrics_prefix}.core_runtime", sync=True
):
with self.task_core_runtime.time(): # Timer isn't tested
return self.run_impl(db_session, *args, **kwargs)
except (DataError, IntegrityError):
log.exception(
"Errors related to the constraints of database happened",
extra=dict(task_args=args, task_kwargs=kwargs),
)
db_session.rollback()
self._rollback_django()
self.retry()
except SQLAlchemyError as ex:
self._analyse_error(ex, args, kwargs)
db_session.rollback()
self._rollback_django()
self.retry()
finally:
log_set_task_name(None)
log_set_task_id(None)
self.wrap_up_dbsession(db_session)
self._commit_django()

def wrap_up_dbsession(self, db_session):
"""
Expand Down Expand Up @@ -352,10 +335,9 @@ def wrap_up_dbsession(self, db_session):
)
get_db_session.remove()

def on_retry(self, *args, **kwargs):
res = super().on_retry(*args, **kwargs)
def on_retry(self, exc, task_id, args, kwargs, einfo):
res = super().on_retry(exc, task_id, args, kwargs, einfo)
self.task_retry_counter.inc()
metrics.incr(f"{self.metrics_prefix}.retries")
metric_context = MetricContext(
commit_sha=kwargs.get("commitid"),
repo_id=kwargs.get("repoid"),
Expand All @@ -364,10 +346,9 @@ def on_retry(self, *args, **kwargs):
metric_context.log_simple_metric(f"{self.metrics_prefix}.retry", 1.0)
return res

def on_success(self, *args, **kwargs):
res = super().on_success(*args, **kwargs)
def on_success(self, retval, task_id, args, kwargs):
res = super().on_success(retval, task_id, args, kwargs)
self.task_success_counter.inc()
metrics.incr(f"{self.metrics_prefix}.successes")
metric_context = MetricContext(
commit_sha=kwargs.get("commitid"),
repo_id=kwargs.get("repoid"),
Expand All @@ -376,13 +357,12 @@ def on_success(self, *args, **kwargs):
metric_context.log_simple_metric(f"{self.metrics_prefix}.success", 1.0)
return res

def on_failure(self, *args, **kwargs):
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""
Includes SoftTimeoutLimitException, for example
"""
res = super().on_failure(*args, **kwargs)
res = super().on_failure(exc, task_id, args, kwargs, einfo)
self.task_failure_counter.inc()
metrics.incr(f"{self.metrics_prefix}.failures")
metric_context = MetricContext(
commit_sha=kwargs.get("commitid"),
repo_id=kwargs.get("repoid"),
Expand Down
Loading

0 comments on commit 5514b97

Please sign in to comment.