Skip to content

Commit

Permalink
refactor: move parts of test result processor and finisher into funct…
Browse files Browse the repository at this point in the history
…ions

Signed-off-by: joseph-sentry <joseph.sawaya@sentry.io>
  • Loading branch information
joseph-sentry committed Jan 15, 2024
1 parent facc22d commit 481bed4
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 87 deletions.
19 changes: 18 additions & 1 deletion services/test_results.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging
from hashlib import md5
from typing import Mapping, Sequence

from shared.torngit.exceptions import TorngitClientError
from test_results_parser import Outcome
from test_results_parser import Outcome, Testrun

from database.enums import ReportType
from database.models import Commit, CommitReport, RepositoryFlag, Upload
Expand Down Expand Up @@ -205,3 +206,19 @@ def insert_breaks(self, table_value):
)

return table_value


def testrun_to_dict(t: Testrun):
return {
"outcome": int(t.outcome),
"name": t.name,
"testsuite": t.testsuite,
"duration_seconds": t.duration,
"failure_message": t.failure_message,
}


def generate_env(flag_names, job_code):
return md5(
(" ".join(sorted(flag_names)) + " " + (job_code or "")).encode("utf-8")
).hexdigest()
161 changes: 99 additions & 62 deletions tasks/test_results_finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,93 +89,63 @@ async def process_async_within_lock(

notify = True

if all(
(
testrun_list["successful"] is False
for result in previous_result
for testrun_list in result
)
):
if self.check_if_no_success(previous_result):
# every processor errored, nothing to notify on
return {"notify_attempted": False, "notify_succeeded": False}

test_dict = dict()
testrun_list = []

existing_tests = db_session.query(Test).filter(Test.repoid == repoid)
test_dict = self.get_test_dict(db_session, repoid)

existing_test_instances = (
db_session.query(TestInstance)
.join(Upload)
.join(CommitReport)
.join(Commit)
.filter(Commit.id_ == commit.id_)
.all()
existing_test_instance_by_test = self.get_existing_test_instance_by_test(
db_session, commit
)

for test in existing_tests:
test_dict[hash((test.testsuite, test.name))] = test

existing_test_instance_by_test = {
testrun.test.id: testrun for testrun in existing_test_instances
}

for result in previous_result:
# finish_individual_result
for testrun_dict_list in result:
if testrun_dict_list["successful"]:
for testrun in testrun_dict_list["testrun_list"]:
test_hash = hash((testrun["testsuite"], testrun["name"]))
if test_hash not in test_dict:
test = Test(
repoid=repoid,
name=testrun["name"],
testsuite=testrun["testsuite"],
env=testrun_dict_list["env"],
)
db_session.add(test)
test_dict[test_hash] = test
else:
test = test_dict[test_hash]
testsuite = testrun["testsuite"]
name = testrun["name"]
env = testrun_dict_list["env"]
run_number = testrun_dict_list["run_number"]
upload_id = testrun_dict_list["upload_id"]
duration_seconds = testrun["duration_seconds"]
outcome = testrun["outcome"]
failure_message = testrun["failure_message"]

test = self.get_or_create_test(
db_session, test_dict, testsuite, name, repoid, env
)

if test.id in existing_test_instance_by_test:
existing_run_number = existing_test_instance_by_test[
test.id
].upload.build_code
try:
if int(testrun_dict_list["run_number"]) > int(
existing_run_number
):
existing_test_instance_by_test[
test.id
].upload_id = testrun_dict_list["upload_id"]
existing_test_instance_by_test[
test.id
].duration_seconds = testrun["duration_seconds"]
existing_test_instance_by_test[
test.id
].outcome = testrun["outcome"]
existing_test_instance_by_test[
test.id
].failure_message = testrun["failure_message"]

except ValueError:
pass
self.try_overwrite_old_test_instance(
existing_test_instance_by_test,
test.id,
run_number,
upload_id,
duration_seconds,
outcome,
failure_message,
)
else:
# create_new_test_instance
ti = TestInstance(
test_id=test.id,
test=test,
upload_id=testrun_dict_list["upload_id"],
duration_seconds=testrun["duration_seconds"],
outcome=testrun["outcome"],
failure_message=testrun["failure_message"],
upload_id=upload_id,
duration_seconds=duration_seconds,
outcome=outcome,
failure_message=failure_message,
)
db_session.add(ti)
testrun_list.append(ti)
db_session.flush()

testrun_list += existing_test_instance_by_test.values()

if all([instance.outcome != Outcome.Failure for instance in testrun_list]):
if self.check_if_no_failures(testrun_list):
return {"notify_attempted": False, "notify_succeeded": False}

success = None
Expand All @@ -194,6 +164,73 @@ async def process_async_within_lock(

return {"notify_attempted": notify, "notify_succeeded": success}

def check_if_no_success(self, previous_result):
return all(
(
testrun_list["successful"] is False
for result in previous_result
for testrun_list in result
)
)

def get_or_create_test(self, db_session, test_dict, testsuite, name, repoid, env):
test_hash = hash((testsuite, name))
if test_hash not in test_dict:
test = Test(
repoid=repoid,
name=name,
testsuite=testsuite,
env=env,
)
db_session.add(test)
test_dict.update({test_hash: test})
else:
test = test_dict.get(test_dict)

return test

def try_overwrite_old_test_instance(
self,
test_map,
test_id,
run_number,
upload_id,
duration_seconds,
outcome,
failure_message,
):
existing_test_instance = test_map[test_id]
existing_run_number = existing_test_instance.upload.build_code

try:
if int(run_number) > int(existing_run_number):
existing_test_instance.upload_id = upload_id
existing_test_instance.duration_seconds = duration_seconds
existing_test_instance.outcome = outcome
existing_test_instance.failure_message = failure_message

except ValueError:
pass

def check_if_no_failures(self, testrun_list):
return all([instance.outcome != Outcome.Failure for instance in testrun_list])

def get_existing_test_instance_by_test(self, db_session, commit):
existing_test_instances = (
db_session.query(TestInstance)
.join(Upload)
.join(CommitReport)
.join(Commit)
.filter(Commit.id_ == commit.id_)
.all()
)

return {testrun.test.id: testrun for testrun in existing_test_instances}

def get_test_dict(self, db_session, repoid):
existing_tests = db_session.query(Test).filter(Test.repoid == repoid)
return {hash((test.testsuite, test.name)) for test in existing_tests}


RegisteredTestResultsFinisherTask = celery_app.register_task(TestResultsFinisherTask())
test_results_finisher_task = celery_app.tasks[RegisteredTestResultsFinisherTask.name]
29 changes: 5 additions & 24 deletions tasks/test_results_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@
import json
import logging
import zlib
from hashlib import md5
from io import BytesIO
from json import loads
from typing import List

from shared.celery_config import test_results_processor_task_name
from shared.config import get_config
from shared.yaml import UserYaml
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from test_results_parser import (
ParserError,
Testrun,
Expand All @@ -22,8 +18,8 @@

from app import celery_app
from database.models import Repository, Upload
from database.models.reports import Test, TestInstance
from services.archive import ArchiveService
from services.test_results import generate_env, testrun_to_dict
from services.yaml import read_yaml_field
from tasks.base import BaseCodecovTask

Expand Down Expand Up @@ -104,20 +100,14 @@ def process_individual_upload(self, repoid, commitid, upload_obj: Upload):
"successful": False,
}

env = generate_env(upload_obj.flag_names, upload_obj.job_code)

return {
"successful": True,
"upload_id": upload_obj.id,
"run_number": upload_obj.build_code,
"env": md5(
(
" ".join(sorted(upload_obj.flag_names))
+ " "
+ (upload_obj.job_code or "")
).encode("utf-8")
).hexdigest(),
"testrun_list": [
self.testrun_to_dict(testrun) for testrun in parsed_testruns
],
"env": env,
"testrun_list": [testrun_to_dict(testrun) for testrun in parsed_testruns],
}

def process_individual_arg(self, upload: Upload, repository) -> List[Testrun]:
Expand Down Expand Up @@ -188,15 +178,6 @@ def match_report(self, file_bytes):
def remove_space_from_line(self, line):
return bytes("".join(line.decode("utf-8").split()), "utf-8")

def testrun_to_dict(self, t: Testrun):
return {
"outcome": int(t.outcome),
"name": t.name,
"testsuite": t.testsuite,
"duration_seconds": t.duration,
"failure_message": t.failure_message,
}

def should_delete_archive(self, commit_yaml):
if get_config("services", "minio", "expire_raw_after_n_days"):
return True
Expand Down

0 comments on commit 481bed4

Please sign in to comment.