Fix task-level audit logs missing SUCCESS/RUNNING events in Airflow 3.1.x #60680
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
closes: #58381
Summary
The audit logging doesn't seem to capture events for task lifecycle state transitions including running and success. As mentioned in the document, it should capture system-generated events including "task lifecycle state transitions (queued, running, success, failed)". As mentioned in the issue, task-level success and running events were logged in Airflow 2.10.x.
In Airflow 3, task state is set to
runningthrough the execution API endpoint/{task_instance_id}/run(ti_run), and task state update is handled the endpoint/{task_instance_id}/state(ti_update_state). The logic to insert log entry seems missing when updating task instance states.To log the state transition events, we need to write logs into DB through when the two functions is processing the state update. However, if we would also want to cover the "queue" event, we need to add similar logic in the function below:
airflow/airflow-core/src/airflow/jobs/scheduler_job_runner.py
Lines 412 to 426 in 06fbabb
Current (Basic) Approach
Since both
ti_runandti_update_statealready execute a SELECT query at the start to fetch task metadata, The audit log entry is constructed throughTaskInstanceKeyby copying the data from the query result with an updated state. As this insert operation is done through the samesession, if I understand correctly, this operation is atomic and can ensure consistency between the actual task state and audit log.Some Issues
logical_datefield is empty. Inti_run, this could probably be collected from the DAG Run query, but inti_update_stateextra query is required. Also,ownerfield is empty if it is not explicitly passed.extrafield need to be explicitly constructed but onlyhostnameis available.TaskInstanceobject available which containslogical_date. This is more ideal as the log is more closer to what are in Airflow 2's audit log, but similarly still need to construct theextrafield. To make the logging behavior similar to the one we have in scheduler require extra query to construct those information.The logging behavior is kind of different now based on where the insert is implemented. Thinking about a way to make this implementation more unified and ensure consistency between actual task state and log entry.
Current Implementation
Airflow 2 Audit Log
Was generative AI tooling used to co-author this PR?
Generated-by: [Antigravity, Claude Opus 4.5] following the guidelines
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.