Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition between Triggerer and Scheduler #23824

Closed
2 tasks done
tanelk opened this issue May 20, 2022 · 7 comments · Fixed by #23846
Closed
2 tasks done

Race condition between Triggerer and Scheduler #23824

tanelk opened this issue May 20, 2022 · 7 comments · Fixed by #23846
Labels
area:core kind:bug This is a clearly a bug

Comments

@tanelk
Copy link
Contributor

tanelk commented May 20, 2022

Apache Airflow version

2.2.5

What happened

Deferable tasks, that trigger instantly after getting defered, might get its state set to FAILED by the scheduler.

The triggerer can fire the trigger and scheduler can re-queue the task instance before it has a chance to process the executor event for when the ti got defered.

What you think should happen instead

This code block should not run in this instance:

if ti.try_number == buffer_key.try_number and ti.state == State.QUEUED:
Stats.incr('scheduler.tasks.killed_externally')
msg = (
"Executor reports task instance %s finished (%s) although the "
"task says its %s. (Info: %s) Was the task killed externally?"
)
self.log.error(msg, ti, state, ti.state, info)
# Get task from the Serialized DAG
try:
dag = self.dagbag.get_dag(ti.dag_id)
task = dag.get_task(ti.task_id)
except Exception:
self.log.exception("Marking task instance %s as %s", ti, state)
ti.set_state(state)
continue
ti.task = task
if task.on_retry_callback or task.on_failure_callback:
request = TaskCallbackRequest(
full_filepath=ti.dag_model.fileloc,
simple_task_instance=SimpleTaskInstance.from_ti(ti),
msg=msg % (ti, state, ti.state, info),
)
self.executor.send_callback(request)
else:
ti.handle_failure(error=msg % (ti, state, ti.state, info), session=session)

How to reproduce

Most importantly have a trigger, that instantly fires. I'm not sure if the executor type is important - I'm running CeleryExecutor. Also having two schedulers might be important.

Operating System

Arch Linux

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@tanelk tanelk added area:core kind:bug This is a clearly a bug labels May 20, 2022
@tanelk
Copy link
Contributor Author

tanelk commented May 20, 2022

One race condition has been fixed in #21316, but this is unrelated.

@tanelk
Copy link
Contributor Author

tanelk commented May 20, 2022

Perhaps @ashb or @andrewgodwin have any ideas how do "synchronise" schedulers and triggereres to avoid this situation.

@wolfier
Copy link
Contributor

wolfier commented Jun 2, 2022

I was able to replicate this issue with the following DAGs on my local machine with LocalExceutor.

from airflow import DAG
from airflow.sensors.time_delta import TimeDeltaSensorAsync
from datetime import datetime, timedelta


for x in range(10):
    _dag_id = f"sense_dag_{x}"
    task_list = []
    with DAG(
        dag_id=_dag_id,
        schedule_interval="*/1 * * * *",
        start_date=datetime(2022, 1, 1),
        max_active_runs=1,
        # catchup=False,
    ) as dag:

        for y in range(10):
            task_list.append(
                TimeDeltaSensorAsync(
                    task_id=f"sense_{y}",
                    delta=timedelta(seconds=0),
                    retries=3,
                    retry_delay=timedelta(seconds=5)
                )
            )

    globals()[_dag_id] = dag

This issue is made more clear when there are multiple retries because the task log that indicate a task cannot run is recorded in the next try as the try number is increased. Another clear indicator is the output from the scheduler_job.py where the task instance state is checked to be completed (not queued).

[2022-05-27 16:55:43,906] {scheduler_job.py:669} ERROR - Executor reports task instance <TaskInstance: telescope_processor.live_nation.historic_sense_live_nation_gcs_objects scheduled__2022-05-03T00:00:00+00:00 [queued]> finished (success) although the task says its queued. (Info: None) Was the task killed externally?

I believe the purpose of the check is to ensure that the task is not executed again before the scheduler acknowledges the event sent by the executor, which is only sent when a task execution completes whether it succeeded or failed. The check is to prevent tasks getting stuck in the queued state. This behaviour is outlined in AIRFLOW-1641 and fixed in #2715.

With the introduction of Triggerers, there is a scenario where task aren't necessarily stuck in queued but placed in queued because it was moved to the scheduled state post completion of the trigger instance.

This is roughly how the task instance state flows for deferred tasks.

  1. Celery execution of the Task instance completes with celery state success (source).
  2. Trigger instance assigned with triggerer (source).
  3. Trigger Instance completes and task instance state is set to scheduled (source).
  4. Task instance key removed from the running set and executor event added to the event buffer with task state success (source) because the celery execution returned with celery state success.
  5. Scheduler processes executor event of the deferred task instance (now in the scheduled task state from step 3) with the success task state (from step 4).
  6. Task instance follow task life cycle as normal from scheduled -> queued
  7. Task is executed in worker

However, given how long it takes for the scheduler to get to processing executor events in the scheduler loop, step 5 and 6 could be flipped. This means that the task instance is in the queued state when the scheduler processes the executor event of the deferred task instance. Since the state is queued, the condition is met; therefore the task is marked as failed / up for retry.

@wolfier
Copy link
Contributor

wolfier commented Jun 2, 2022

I wonder if it's possible to have an in between state to indicate that the task has completed its execution but the scheduler has not processes the corresponding executor event in the event buffer.

@tanelk
Copy link
Contributor Author

tanelk commented Jun 3, 2022

@wolfier I think your analysis is spot on.
I have an open PR (#23846), that I believe fixes this issue, but I have not tested it outside of unit tests. Could you try applying that batch and running your test DAG again?

@RNHTTR
Copy link
Contributor

RNHTTR commented Dec 20, 2022

I'm seeing a possible regression (or at least a similar manifestation of this issue) in 2.4.3 using ExternalTaskSensorAsync

@Taragolis
Copy link
Contributor

@RNHTTR better to open a new issue with additional details and how to reproduce it.

However it also could be a problem with ExternalTaskSensorAsync which not a part of Airflow or community providers and in this case better open an issue in astronomer/astronomer-providers

As far as I know many of deferrable Operators use asgiref.sync.sync_to_async for communicate with synchronous part of Airflow (most of the code of Airflow are synchronous) such as Connections, Variable, Configurations, read/write into Airflow metadata database and this approach could have some side effects. But it just my assumption.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants