Skip to content

Commit

Permalink
Fix job attribute update to account for mismatching columns between r…
Browse files Browse the repository at this point in the history
…ows to be updated
  • Loading branch information
ryuwd committed Dec 19, 2024
1 parent 97b3c58 commit fb55adc
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 8 deletions.
21 changes: 15 additions & 6 deletions diracx-db/src/diracx/db/sql/job/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any

from sqlalchemy import bindparam, delete, func, insert, select, update
from sqlalchemy import bindparam, delete, func, insert, select, update, case
from sqlalchemy.exc import IntegrityError, NoResultFound

if TYPE_CHECKING:
Expand Down Expand Up @@ -219,13 +219,22 @@ async def setJobAttributesBulk(self, jobData):
jobData[job_id].update(
{"LastUpdateTime": datetime.now(tz=timezone.utc)}
)
columns = set(key for attrs in jobData.values() for key in attrs.keys())
case_expressions = {
column: case(
*[
(Jobs.__table__.c.JobID == job_id, attrs[column])
for job_id, attrs in jobData.items() if column in attrs
],
else_=getattr(Jobs.__table__.c, column) # Retain original value
)
for column in columns
}

await self.conn.execute(
Jobs.__table__.update().where(
Jobs.__table__.c.JobID == bindparam("b_JobID")
),
[{"b_JobID": job_id, **attrs} for job_id, attrs in jobData.items()],
stmt = Jobs.__table__.update().values(**case_expressions).where(
Jobs.__table__.c.JobID.in_(jobData.keys())
)
await self.conn.execute(stmt)

async def getJobJDL(self, job_id: int, original: bool = False) -> str:
from DIRAC.WorkloadManagementSystem.DB.JobDBUtils import extractJDL
Expand Down
4 changes: 2 additions & 2 deletions diracx-db/src/diracx/db/sql/utils/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,11 @@ def parse_jdl(job_id, job_jdl):
"failed": failed,
"success": {
job_id: {
"InputData": job_jdls[job_id],
"InputData": job_jdls.get(job_id, None),
**attribute_changes[job_id],
**set_status_result.model_dump(),
}
for job_id, set_status_result in set_job_status_result.success.items()
for job_id, set_status_result in set_job_status_result.success.items() if job_id not in failed
},
}

Expand Down

0 comments on commit fb55adc

Please sign in to comment.