Skip to content

Conversation

@oboki
Copy link
Contributor

@oboki oboki commented Jan 31, 2025

This PR adds a STRAIGHT_JOIN hint to prevent an unintended full scan in get_sorted_triggers.

With MySQL backend, if there are many records in the trigger, it causes the task_instance to be scanned first as the driving table, leading to slow queries.

To test this, I stopped the triggerer process to keep it unassigned first, and generated some dummy data, as shown below:

from airflow.models.trigger import Trigger
from airflow.utils.session import NEW_SESSION, provide_session

@provide_session
def add_trigger(session=NEW_SESSION):
    for _ in range(10):
        session.add(Trigger(classpath="", kwargs={"key": "value"}))
    session.commit()

if __name__ == "__main__":
    add_trigger()

With the following 10 records where the triggerer is not assigned:

MySQL [airflow]> SELECT * FROM `trigger`;
+----+-----------+-----------------------------------------------+----------------------------+--------------+
| id | classpath | kwargs                                        | created_date               | triggerer_id |
+----+-----------+-----------------------------------------------+----------------------------+--------------+
| 41 |           | {"__var": {"key": "value"}, "__type": "dict"} | 2025-01-31 03:00:02.982334 |         NULL |
| 42 |           | {"__var": {"key": "value"}, "__type": "dict"} | 2025-01-31 03:00:02.982502 |         NULL |
| 43 |           | {"__var": {"key": "value"}, "__type": "dict"} | 2025-01-31 03:00:02.982536 |         NULL |
| 44 |           | {"__var": {"key": "value"}, "__type": "dict"} | 2025-01-31 03:00:02.982560 |         NULL |
| 45 |           | {"__var": {"key": "value"}, "__type": "dict"} | 2025-01-31 03:00:02.982583 |         NULL |
| 46 |           | {"__var": {"key": "value"}, "__type": "dict"} | 2025-01-31 03:00:02.982675 |         NULL |
| 47 |           | {"__var": {"key": "value"}, "__type": "dict"} | 2025-01-31 03:00:02.982696 |         NULL |
| 48 |           | {"__var": {"key": "value"}, "__type": "dict"} | 2025-01-31 03:00:02.982713 |         NULL |
| 49 |           | {"__var": {"key": "value"}, "__type": "dict"} | 2025-01-31 03:00:02.982731 |         NULL |
| 50 |           | {"__var": {"key": "value"}, "__type": "dict"} | 2025-01-31 03:00:02.982749 |         NULL |
+----+-----------+-----------------------------------------------+----------------------------+--------------+
10 rows in set (0.000 sec)

The execution plan of the AS-IS query shows that it does a full scan of task_instance first, causing a slowdown of the query due to the unnecessary scan.

MySQL [airflow]> 
MySQL [airflow]> EXPLAIN
    -> SELECT `trigger`.id 
    -> FROM `trigger` INNER JOIN task_instance ON `trigger`.id = task_instance.trigger_id 
    -> WHERE `trigger`.triggerer_id IS NULL OR (`trigger`.triggerer_id NOT IN (SELECT job.id 
    -> FROM job 
    -> WHERE job.end_date IS NULL AND job.latest_heartbeat > '2025-01-31 00:00:00' AND job.job_type = 'TriggererJob')) ORDER BY coalesce(task_instance.priority_weight, 0) DESC, `trigger`.created_date 
    ->  LIMIT 998 FOR UPDATE SKIP LOCKED;
+----+-------------+---------------+------------+--------+------------------------+----------------+---------+----------------------------------+------+----------+----------------------------------------------+
| id | select_type | table         | partitions | type   | possible_keys          | key            | key_len | ref                              | rows | filtered | Extra                                        |
+----+-------------+---------------+------------+--------+------------------------+----------------+---------+----------------------------------+------+----------+----------------------------------------------+
|  1 | PRIMARY     | task_instance | NULL       | ALL    | ti_trigger_id          | NULL           | NULL    | NULL                             | 1354 |   100.00 | Using where; Using temporary; Using filesort |
|  1 | PRIMARY     | trigger       | NULL       | eq_ref | PRIMARY                | PRIMARY        | 4       | airflow.task_instance.trigger_id |    1 |   100.00 | Using where                                  |
|  2 | SUBQUERY    | job           | NULL       | range  | PRIMARY,job_type_heart | job_type_heart | 131     | NULL                             |    3 |    10.00 | Using index condition; Using where           |
+----+-------------+---------------+------------+--------+------------------------+----------------+---------+----------------------------------+------+----------+----------------------------------------------+
3 rows in set, 1 warning (0.001 sec)

When adding .prefix_with("STRAIGHT_JOIN", dialect="mysql"), the compiled SQL execution plan improved as shown below:

MySQL [airflow]> EXPLAIN
    -> SELECT STRAIGHT_JOIN `trigger`.id 
    -> FROM `trigger` INNER JOIN task_instance ON `trigger`.id = task_instance.trigger_id 
    -> WHERE `trigger`.triggerer_id IS NULL OR (`trigger`.triggerer_id NOT IN (SELECT job.id 
    -> FROM job 
    -> WHERE job.end_date IS NULL AND job.latest_heartbeat > '2025-01-31 00:00:00' AND job.job_type = 'TriggererJob')) ORDER BY coalesce(task_instance.priority_weight, 0) DESC, `trigger`.created_date 
    ->  LIMIT 998 FOR UPDATE SKIP LOCKED;
+----+--------------------+---------------+------------+-----------------+------------------------+---------+---------+------+------+----------+----------------------------------------------+
| id | select_type        | table         | partitions | type            | possible_keys          | key     | key_len | ref  | rows | filtered | Extra                                        |
+----+--------------------+---------------+------------+-----------------+------------------------+---------+---------+------+------+----------+----------------------------------------------+
|  1 | PRIMARY            | trigger       | NULL       | ALL             | PRIMARY                | NULL    | NULL    | NULL |   10 |   100.00 | Using where; Using temporary; Using filesort |
|  1 | PRIMARY            | task_instance | NULL       | ALL             | ti_trigger_id          | NULL    | NULL    | NULL | 1354 |   100.00 | Using where; Using join buffer (hash join)   |
|  2 | DEPENDENT SUBQUERY | job           | NULL       | unique_subquery | PRIMARY,job_type_heart | PRIMARY | 4       | func |    1 |     5.00 | Using where; Full scan on NULL key           |
+----+--------------------+---------------+------------+-----------------+------------------------+---------+---------+------+------+----------+----------------------------------------------+
3 rows in set, 1 warning (0.001 sec)

In the case of PostgreSQL, regardless of the number of records in the trigger, the execution plan remains the same and efficient as shown below:

 Limit  (cost=113.21..121.68 rows=678 width=28)
   ->  LockRows  (cost=113.21..121.68 rows=678 width=28)
         ->  Sort  (cost=113.21..114.90 rows=678 width=28)
               Sort Key: (COALESCE(task_instance.priority_weight, 0)) DESC, trigger.created_date
               ->  Hash Join  (cost=19.21..81.32 rows=678 width=28)
                     Hash Cond: (task_instance.trigger_id = trigger.id)
                     ->  Seq Scan on task_instance  (cost=0.00..58.55 rows=1355 width=14)
                     ->  Hash  (cost=18.40..18.40 rows=65 width=18)
                           ->  Seq Scan on trigger  (cost=6.78..18.40 rows=65 width=18)
                                 Filter: ((triggerer_id IS NULL) OR (NOT (hashed SubPlan 1)))
                                 SubPlan 1
                                   ->  Seq Scan on job  (cost=0.00..6.78 rows=1 width=4)
                                         Filter: ((end_date IS NULL) AND (latest_heartbeat > '2025-01-31 00:00:00+00'::timestamp with time zone) AND ((job_type)
::text = 'TriggererJob'::text))

@oboki
Copy link
Contributor Author

oboki commented Mar 14, 2025

Could someone please take a look at this? @ashb @XD-DENG @dstandish @hussein-awala

@potiuk
Copy link
Member

potiuk commented Mar 16, 2025

It does seem it's good idea:

STRAIGHT_JOIN is similar to JOIN, except that the left table is always read before the right table. This can be used for those (few) cases for which the join optimizer processes the tables in a suboptimal order.

https://dev.mysql.com/doc/refman/8.4/en/join.html

@ashb @XD-DENG @dstandish @hussein-awala -> anything against it?

@potiuk potiuk force-pushed the refactor-optimize-sql-in-get_sorted_triggers branch from ef8df34 to 7378b81 Compare March 16, 2025 16:01
@oboki
Copy link
Contributor Author

oboki commented Apr 1, 2025

@potiuk

I would like to include this change in the upcoming 2.10.6 version. Is that possible? If yes, what would be the process?

@eladkal
Copy link
Contributor

eladkal commented Apr 1, 2025

There are conflicts to resolve

@oboki oboki force-pushed the refactor-optimize-sql-in-get_sorted_triggers branch from 7378b81 to 8f66666 Compare April 1, 2025 14:30
@oboki
Copy link
Contributor Author

oboki commented Apr 1, 2025

@eladkal

I missed the conflicts. I've resolved them now.

@oboki oboki force-pushed the refactor-optimize-sql-in-get_sorted_triggers branch from 8f66666 to 66eee24 Compare April 13, 2025 23:44
@oboki
Copy link
Contributor Author

oboki commented Apr 17, 2025

Hello, this PR was approved but hasn't been merged for a while. Could someone please help with merging it? Thanks in advance!

@ashb @XD-DENG @dstandish @hussein-awala @potiuk @eladkal

@eladkal eladkal added this to the Airflow 3.0.1 milestone Apr 18, 2025
@oboki oboki requested a review from potiuk April 19, 2025 03:09
@ashb ashb merged commit 9f0ac9a into apache:main Apr 22, 2025
49 checks passed
kaxil pushed a commit that referenced this pull request Apr 23, 2025
…_triggers (#46303)

This PR adds a STRAIGHT_JOIN hint to prevent an unintended full scan of TaskInstance table in get_sorted_triggers.

With MySQL backend, if there are many records in the trigger, it causes the task_instance to be scanned first as the driving table, leading to slow queries.

(cherry picked from commit 9f0ac9a)
prabhusneha pushed a commit to astronomer/airflow that referenced this pull request Apr 25, 2025
…_triggers (apache#46303)

This PR adds a STRAIGHT_JOIN hint to prevent an unintended full scan of TaskInstance table in get_sorted_triggers.

With MySQL backend, if there are many records in the trigger, it causes the task_instance to be scanned first as the driving table, leading to slow queries.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants