task-sql-decorator: Introducing the @task.sql decorator#60851
task-sql-decorator: Introducing the @task.sql decorator#60851kaxil merged 15 commits intoapache:mainfrom
task-sql-decorator: Introducing the @task.sql decorator#60851Conversation
kaxil
left a comment
There was a problem hiding this comment.
Looks good to me but worth checking how OL works with it.
|
@kaxil, what is the best way to test this? Do we have a framework in-place for this already? |
There is |
providers/common/sql/src/airflow/providers/common/sql/decorators/sql.py
Outdated
Show resolved
Hide resolved
kacpermuda
left a comment
There was a problem hiding this comment.
Looks good, thanks !
From the OL perspective, the Ol methods from SqlExecuteQueryOperator are executed, since PythonOperator does not have them. With the added type check in BaseSqlOperator's OL method, the *_on_start method does nothing, since we do not know SQL text before execution, and the *_on_complete behaves as the query was executed with the usual SqlExecuteQueryOperator which is good.
mobuchowski
left a comment
There was a problem hiding this comment.
I was going to look at OpenLineage impl, but @kacpermuda already did this - so I just have a few drive-by comments :)
providers/common/sql/tests/system/common/sql/example_sql_execute_query.py
Show resolved
Hide resolved
…0851) For DAG authors familiar with writing Python functions to do "something", the `@task` decorator is one of the most popular tools authoring logic in Airflow. There are several other decorators, such as `@task.bash`, `@task.kubernetes`, etc. that extend this functionality. However, there is no `@task.sql` decorator. This PR introduces the `@task.sql` decorator. This decorator is a wrapper around the `SQLExecuteQueryOperator`. However, the value returned from the Python function is the SQL query that is executed. Here's an example usage: ```python from airflow.sdk import DAG, task from datetime import datetime with DAG( dag_id="sql_deco", start_date=datetime(2025, 1, 1), schedule="@once" ) as dag: @task.sql( conn_id="task-sql-decorator" # Transient connection defined in the UI ) def task_1(): return "SELECT 1;" task_1() ```
…0851) For DAG authors familiar with writing Python functions to do "something", the `@task` decorator is one of the most popular tools authoring logic in Airflow. There are several other decorators, such as `@task.bash`, `@task.kubernetes`, etc. that extend this functionality. However, there is no `@task.sql` decorator. This PR introduces the `@task.sql` decorator. This decorator is a wrapper around the `SQLExecuteQueryOperator`. However, the value returned from the Python function is the SQL query that is executed. Here's an example usage: ```python from airflow.sdk import DAG, task from datetime import datetime with DAG( dag_id="sql_deco", start_date=datetime(2025, 1, 1), schedule="@once" ) as dag: @task.sql( conn_id="task-sql-decorator" # Transient connection defined in the UI ) def task_1(): return "SELECT 1;" task_1() ```
Description
For DAG authors familiar with writing Python functions to do "something", the
@taskdecorator is one of the most popular tools authoring logic in Airflow. There are several other decorators, such as@task.bash,@task.kubernetes, etc. that extend this functionality. However, there is no@task.sqldecorator.This PR introduces the
@task.sqldecorator. This decorator is a wrapper around theSQLExecuteQueryOperator. However, the value returned from the Python function is the SQL query that is executed.Here's an example usage:
Testing
To run the unit tests that were authored for this decorator, the command below can be used:
The DAG above was also used to validate the functionality of the
@task.sqldecorator.Other Notes
There will be additional documentation and examples that comes out of this initial pull request, with the goal of providing parity between this new decorator and the
SQLExecuteQueryOperator. For now, this PR just provides the bare-bones.