Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger die with DB deadlock between scheduler #27000

Closed
2 tasks done
NickYadance opened this issue Oct 12, 2022 · 7 comments
Closed
2 tasks done

Trigger die with DB deadlock between scheduler #27000

NickYadance opened this issue Oct 12, 2022 · 7 comments

Comments

@NickYadance
Copy link
Contributor

NickYadance commented Oct 12, 2022

Apache Airflow version

Other Airflow 2 version (2.3.2)

What happened

There is discussion #22553 about this but without detailed trace. There is also a similar issue #23639. Trigger will occasionly die due to DB transaction deadlock. In my case the trigger dies 5-6 times per day.

Mysql engine status

------------------------
LATEST DETECTED DEADLOCK
------------------------
2022-10-11 08:33:52 139737395513088
*** (1) TRANSACTION:
TRANSACTION 5858164555, ACTIVE 0 sec fetching rows
mysql tables in use 1, locked 1
LOCK WAIT 143 lock struct(s), heap size 24696, 2 row lock(s)
MySQL thread id 3080443983, OS thread handle 139736854193920, query id 89316169955 10.244.3.94 airflow Searching rows for update
UPDATE task_instance SET state='scheduled', trigger_id=NULL, next_method='__fail__', next_kwargs='{\"__var\": {\"error\": \"Trigger/execution timeout\"}, \"__type\": \"dict\"}' WHERE task_instance.state = 'deferred' AND task_instance.trigger_timeout < '2022-10-11 08:33:52.708635'

*** (1) HOLDS THE LOCK(S):
RECORD LOCKS space id 522 page no 750798 n bits 200 index ti_state of table `airflow`.`task_instance` trx id 5858164555 lock_mode X locks rec but not gap
Record lock, heap no 43 PHYSICAL RECORD: n_fields 5; compact format; info bits 0
 0: len 8; hex 6465666572726564; asc deferred;;
 1: len 30; hex 64642d72616e6b65722d64645f66656176345f735f725f6f6d6f655f6634; asc dagid; (total 42 bytes);
 2: len 19; hex 66675f6d61726b65725f73656e736f725f504c; asc taskid;;
 3: len 30; hex 7363686564756c65645f5f323032322d31302d31305432333a30303a3030; asc scheduled__2022-10-10T23:00:00; (total 36 bytes);
 4: len 4; hex 7fffffff; asc     ;;


*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 522 page no 676208 n bits 104 index PRIMARY of table `airflow`.`task_instance` trx id 5858164555 lock_mode X locks rec but not gap waiting
Record lock, heap no 35 PHYSICAL RECORD: n_fields 29; compact format; info bits 0
 0: len 30; hex 64642d72616e6b65722d64645f66656176345f735f725f6f6d6f655f6634; asc dagid; (total 42 bytes);
 1: len 19; hex 66675f6d61726b65725f73656e736f725f504c; asc taskid;;
 2: len 30; hex 7363686564756c65645f5f323032322d31302d31305432333a30303a3030; asc scheduled__2022-10-10T23:00:00; (total 36 bytes);
 3: len 4; hex 7fffffff; asc     ;;
 4: len 6; hex 00015d2c7f56; asc   ], V;;
 5: len 7; hex 010000024b1824; asc     K $;;
 6: len 7; hex 6344b21f0a3ee2; asc cD   > ;;
 7: SQL NULL;
 8: SQL NULL;
 9: len 9; hex 7363686564756c6564; asc scheduled;;
 10: len 4; hex 80000000; asc     ;;
 11: len 30; hex 616972666c6f772d63656c6572792d776f726b65722d342e616972666c6f; asc airflow-celery-worker-4.airflo; (total 71 bytes);
 12: len 4; hex 726f6f74; asc root;;
 13: len 4; hex 80a4fa25; asc    %;;
 14: len 12; hex 64656661756c745f706f6f6c; asc default_pool;;
 15: len 13; hex 63656c6572792d776f726b6572; asc celery-worker;;
 16: len 4; hex 80000005; asc     ;;
 17: len 22; hex 446570656e64656e63794d61726b657253656e736f72; asc Sensor;;
 18: len 7; hex 6345293d071e5a; asc cE)=  Z;;
 19: len 4; hex 8001e7ff; asc     ;;
 20: len 4; hex 80000002; asc     ;;
 21: len 5; hex 80047d942e; asc   } .;;
 22: len 4; hex 80000001; asc     ;;
 23: len 4; hex 808bc44c; asc    L;;
 24: len 30; hex 35633163383065342d383438632d343564312d383265652d356233633766; asc 5c1c80e4-848c-45d1-82ee-5b3c7f; (total 36 bytes);
 25: SQL NULL;
 26: len 5; hex 99ae180020; asc      ;;
 27: len 18; hex 657865637574655f6f6e5f74726967676572; asc execute_on_trigger;;
 28: len 30; hex 00020060001200050017000600001d000c5b005f5f7661725f5f74797065; asc    `             [ __var__type; (total 97 bytes);


*** (2) TRANSACTION:
TRANSACTION 5858164566, ACTIVE 0 sec updating or deleting
mysql tables in use 1, locked 1
LOCK WAIT 3 lock struct(s), heap size 1128, 2 row lock(s), undo log entries 1
MySQL thread id 3080393170, OS thread handle 139729109509888, query id 89316170062 10.244.3.92 airflow updating
UPDATE task_instance SET state='scheduled', trigger_id=NULL, next_kwargs='{\"__var\": {\"event\": {\"__var\": 1665477231.072732, \"__type\": \"datetime\"}}, \"__type\": \"dict\"}' WHERE task_instance.task_id = 'taskid' AND task_instance.dag_id = 'dagidwp_long-dag1' AND task_instance.run_id = 'scheduled__2022-10-10T23:00:00+00:00' AND task_instance.map_index = -1

*** (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 522 page no 676208 n bits 104 index PRIMARY of table `airflow`.`task_instance` trx id 5858164566 lock_mode X locks rec but not gap
Record lock, heap no 35 PHYSICAL RECORD: n_fields 29; compact format; info bits 0
 0: len 30; hex 64642d72616e6b65722d64645f66656176345f735f725f6f6d6f655f6634; asc dagid; (total 42 bytes);
 1: len 19; hex 66675f6d61726b65725f73656e736f725f504c; asc taskid;;
 2: len 30; hex 7363686564756c65645f5f323032322d31302d31305432333a30303a3030; asc scheduled__2022-10-10T23:00:00; (total 36 bytes);
 3: len 4; hex 7fffffff; asc     ;;
 4: len 6; hex 00015d2c7f56; asc   ], V;;
 5: len 7; hex 010000024b1824; asc     K $;;
 6: len 7; hex 6344b21f0a3ee2; asc cD   > ;;
 7: SQL NULL;
 8: SQL NULL;
 9: len 9; hex 7363686564756c6564; asc scheduled;;
 10: len 4; hex 80000000; asc     ;;
 11: len 30; hex 616972666c6f772d63656c6572792d776f726b65722d342e616972666c6f; asc airflow-celery-worker-4.airflo; (total 71 bytes);
 12: len 4; hex 726f6f74; asc root;;
 13: len 4; hex 80a4fa25; asc    %;;
 14: len 12; hex 64656661756c745f706f6f6c; asc default_pool;;
 15: len 13; hex 63656c6572792d776f726b6572; asc celery-worker;;
 16: len 4; hex 80000005; asc     ;;
 17: len 22; hex 446570656e64656e63794d61726b657253656e736f72; asc Sensor;;
 18: len 7; hex 6345293d071e5a; asc cE)=  Z;;
 19: len 4; hex 8001e7ff; asc     ;;
 20: len 4; hex 80000002; asc     ;;
 21: len 5; hex 80047d942e; asc   } .;;
 22: len 4; hex 80000001; asc     ;;
 23: len 4; hex 808bc44c; asc    L;;
 24: len 30; hex 35633163383065342d383438632d343564312d383265652d356233633766; asc 5c1c80e4-848c-45d1-82ee-5b3c7f; (total 36 bytes);
 25: SQL NULL;
 26: len 5; hex 99ae180020; asc      ;;
 27: len 18; hex 657865637574655f6f6e5f74726967676572; asc execute_on_trigger;;
 28: len 30; hex 00020060001200050017000600001d000c5b005f5f7661725f5f74797065; asc    `             [ __var__type; (total 97 bytes);


*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 522 page no 750798 n bits 200 index ti_state of table `airflow`.`task_instance` trx id 5858164566 lock_mode X locks rec but not gap waiting
Record lock, heap no 43 PHYSICAL RECORD: n_fields 5; compact format; info bits 0
 0: len 8; hex 6465666572726564; asc deferred;;
 1: len 30; hex 64642d72616e6b65722d64645f66656176345f735f725f6f6d6f655f6634; asc dagid; (total 42 bytes);
 2: len 19; hex 66675f6d61726b65725f73656e736f725f504c; asc taskid;;
 3: len 30; hex 7363686564756c65645f5f323032322d31302d31305432333a30303a3030; asc scheduled__2022-10-10T23:00:00; (total 36 bytes);
 4: len 4; hex 7fffffff; asc     ;;

*** WE ROLL BACK TRANSACTION (2)

Trigger exit log

 File "/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 319, in _query
    db.query(q)
  File "/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/connections.py", line 254, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
[SQL: UPDATE task_instance SET state=%s, trigger_id=%s, next_kwargs=%s WHERE task_instance.task_id = %s AND task_instance.dag_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
[parameters: (<TaskInstanceState.SCHEDULED: 'scheduled'>, None, '{"__var": {"event": {"__var": 1665477231.072732, "__type": "datetime"}}, "__type": "dict"}', 'taskid', 'dagid', 'scheduled__2022-10-10T23:00:00+00:00', -1)]
(Background on this error at: http://sqlalche.me/e/14/e3q8)
[2022-10-11 08:33:52,729] {triggerer_job.py:111} INFO - Waiting for triggers to clean up
[2022-10-11 08:33:53,569] {triggerer_job.py:117} INFO - Exited trigger loop
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1706, in _execute_context
    cursor, statement, parameters, context
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 716, in do_execute
    cursor.execute(statement, parameters)
  File "/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 206, in execute
    res = self._query(query)
  File "/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 319, in _query
    db.query(q)
  File "/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/connections.py", line 254, in query
    _mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (1213, 'Deadlock found when trying to get lock; try restarting transaction')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/share/miniconda2/envs/airflow/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/__main__.py", line 38, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 51, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/cli/commands/triggerer_command.py", line 68, in triggerer
    job.run()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 244, in run
    self._execute()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/triggerer_job.py", line 106, in _execute
    self._run_trigger_loop()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/triggerer_job.py", line 131, in _run_trigger_loop
    self.handle_events()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/triggerer_job.py", line 160, in handle_events
    Trigger.submit_event(trigger_id=trigger_id, event=event)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 71, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/miniconda2/envs/airflow/lib/python3.7/contextlib.py", line 119, in __exit__
    next(self.gen)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/session.py", line 33, in create_session
    session.commit()
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1423, in commit
    self._transaction.commit(_to_root=self.future)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 829, in commit
    self._prepare_impl()
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 808, in _prepare_impl
    self.session.flush()
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 3255, in flush
    self._flush(objects)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 3395, in _flush
    transaction.rollback(_capture_exception=True)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 72, in __exit__
    with_traceback=exc_tb,
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 3355, in _flush
    flush_context.execute()
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 453, in execute
    rec.execute(self)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
    uow,
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 239, in save_obj
    update,
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 999, in _emit_update_statements
    statement, multiparams, execution_options=execution_options
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1520, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 314, in _execute_on_connection
    self, multiparams, params, execution_options
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1399, in _execute_clauseelement
    cache_hit=cache_hit,
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1749, in _execute_context
    e, statement, parameters, cursor, context
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1930, in _handle_dbapi_exception
    sqlalchemy_exception, with_traceback=exc_info[2], from_=e
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1706, in _execute_context
    cursor, statement, parameters, context
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 716, in do_execute
    cursor.execute(statement, parameters)
  File "/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 206, in execute
    res = self._query(query)
  File "/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 319, in _query
    db.query(q)
  File "/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/connections.py", line 254, in query
    _mysql.connection.query(self, query)
sqlalchemy.exc.OperationalError: (MySQLdb._exceptions.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
[SQL: UPDATE task_instance SET state=%s, trigger_id=%s, next_kwargs=%s WHERE task_instance.task_id = %s AND task_instance.dag_id = %s AND task_instance.run_id = %s AND task_instance.map_index = %s]
[parameters: (<TaskInstanceState.SCHEDULED: 'scheduled'>, None, '{"__var": {"event": {"__var": 1665477231.072732, "__type": "datetime"}}, "__type": "dict"}', 'taskid', 'dagid', 'scheduled__2022-10-10T23:00:00+00:00', -1)]
(Background on this error at: http://sqlalche.me/e/14/e3q8)

This query holds row lock in primary index (dag_id,task_id,run_id,map_index), waiting for secondary index lock.

@classmethod
@provide_session
def submit_event(cls, trigger_id, event, session=None):
"""
Takes an event from an instance of itself, and triggers all dependent
tasks to resume.
"""
for task_instance in session.query(TaskInstance).filter(
TaskInstance.trigger_id == trigger_id, TaskInstance.state == State.DEFERRED
):
# Add the event's payload into the kwargs for the task
next_kwargs = task_instance.next_kwargs or {}
next_kwargs["event"] = event.payload
task_instance.next_kwargs = next_kwargs
# Remove ourselves as its trigger
task_instance.trigger_id = None
# Finally, mark it as scheduled so it gets re-queued
task_instance.state = State.SCHEDULED

This query holds row lock in secondary index as engine status telled (state), waiting for primary index lock, causing the deadlock.

def check_trigger_timeouts(self, session: Session = NEW_SESSION) -> None:
"""
Looks at all tasks that are in the "deferred" state and whose trigger
or execution timeout has passed, so they can be marked as failed.
"""
num_timed_out_tasks = (
session.query(TaskInstance)
.filter(
TaskInstance.state == TaskInstanceState.DEFERRED,
TaskInstance.trigger_timeout < timezone.utcnow(),
)
.update(
# We have to schedule these to fail themselves so it doesn't
# happen inside the scheduler.
{
"state": TaskInstanceState.SCHEDULED,
"next_method": "__fail__",
"next_kwargs": {"error": "Trigger/execution timeout"},
"trigger_id": None,
}
)
)
if num_timed_out_tasks:
self.log.info("Timed out %i deferred tasks without fired triggers", num_timed_out_tasks)

#22553 and #23639 offer different solutions towards this.

  1. add with_row_lock to queries so selected rows will be pre-locked, without lock contention.
  2. add retry.

As for retry, there is already retry in previous methods.

def clean_unused(cls, session=None):
"""
Deletes all triggers that have no tasks/DAGs dependent on them
(triggers have a one-to-many relationship to both)
"""
# Update all task instances with trigger IDs that are not DEFERRED to remove them
for attempt in run_with_db_retries():
with attempt:
session.query(TaskInstance).filter(
TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None)
).update({TaskInstance.trigger_id: None})
# Get all triggers that have no task instances depending on them...
ids = [
trigger_id
for (trigger_id,) in (
session.query(cls.id)
.join(TaskInstance, cls.id == TaskInstance.trigger_id, isouter=True)
.group_by(cls.id)
.having(func.count(TaskInstance.trigger_id) == 0)
)
]
# ...and delete them (we can't do this in one query due to MySQL)
session.query(Trigger).filter(Trigger.id.in_(ids)).delete(synchronize_session=False)

What you think should happen instead

No response

How to reproduce

No response

Operating System

ubuntu

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@NickYadance NickYadance added area:core kind:bug This is a clearly a bug labels Oct 12, 2022
@uranusjr
Copy link
Member

What is your Airflow version?

@NickYadance
Copy link
Contributor Author

What is your Airflow version?

2.3.2

@NickYadance
Copy link
Contributor Author

I'll close the issue cuz i didn't find an easy way to reproduce. The lock contention between two queries are quite hard to construct. The workaround about this is to reduce how often scheduler checks for timeout triggers to avoid potential lock contention.

def check_trigger_timeouts(self, session: Session = NEW_SESSION) -> None:
"""
Looks at all tasks that are in the "deferred" state and whose trigger
or execution timeout has passed, so they can be marked as failed.
"""
num_timed_out_tasks = (
session.query(TaskInstance)
.filter(
TaskInstance.state == TaskInstanceState.DEFERRED,
TaskInstance.trigger_timeout < timezone.utcnow(),
)
.update(
# We have to schedule these to fail themselves so it doesn't
# happen inside the scheduler.
{
"state": TaskInstanceState.SCHEDULED,
"next_method": "__fail__",
"next_kwargs": {"error": "Trigger/execution timeout"},
"trigger_id": None,
}
)
)
if num_timed_out_tasks:
self.log.info("Timed out %i deferred tasks without fired triggers", num_timed_out_tasks)

There is configuration trigger_timeout_check_interval default to 15. I raise it to a reasonable higher value and the deadlock issue is greatly reduced.

@potiuk potiuk reopened this Oct 23, 2022
@potiuk
Copy link
Member

potiuk commented Oct 23, 2022

I will re-open this one. It has enough information to try to avoid the deadlock in the first place - the problem is that Triggerer acquires the same locks as scheduler but in a different sequence, the right solution should be to change either Triggerer (most likely) or scheduler (rather unlikely) to apply the same sequence for locks.

Most likely Triggered shoudl attempt to loclk DagRun first and only then update task instance or even avoid locking DagRun in the first place. I believe we fixed a very similar deadlock situation recently.

@potiuk
Copy link
Member

potiuk commented Oct 23, 2022

I will take a look at this shortly (or maybe @ashb or @andrewgodwin might take a look at it before).

@github-actions
Copy link

This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.

Copy link

This issue has been closed because it has not received response from the issue author.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Nov 25, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants