Skip to content

Commit

Permalink
Review Feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jscheffl committed Dec 1, 2024
1 parent 17d76f1 commit 0dc22ee
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions providers/src/airflow/providers/edge/worker_api/routes/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from ast import literal_eval
from typing import Annotated

from sqlalchemy import select
from sqlalchemy import select, update

from airflow.providers.edge.models.edge_job import EdgeJobModel
from airflow.providers.edge.worker_api.auth import jwt_token_authorization_rest
Expand Down Expand Up @@ -113,15 +113,15 @@ def state(
session: SessionDep,
) -> None:
"""Update the state of a job running on the edge worker."""
query = select(EdgeJobModel).where(
EdgeJobModel.dag_id == dag_id,
EdgeJobModel.task_id == task_id,
EdgeJobModel.run_id == run_id,
EdgeJobModel.map_index == map_index,
EdgeJobModel.try_number == try_number,
query = (
update(EdgeJobModel)
.where(
EdgeJobModel.dag_id == dag_id,
EdgeJobModel.task_id == task_id,
EdgeJobModel.run_id == run_id,
EdgeJobModel.map_index == map_index,
EdgeJobModel.try_number == try_number,
)
.values(state=state, last_update=timezone.utcnow())
)
job: EdgeJobModel = session.scalar(query)
if job:
job.state = state
job.last_update = timezone.utcnow()
session.commit()
session.execute(query)

0 comments on commit 0dc22ee

Please sign in to comment.