From c79fd60aa4bdc6674dfd9cb0a8ec8181c62b77c5 Mon Sep 17 00:00:00 2001 From: David Blain Date: Thu, 28 Aug 2025 16:29:56 +0200 Subject: [PATCH 01/10] refactor: Added SQLInsertRowsOperator --- .../providers/common/sql/operators/sql.py | 142 +++++++++++++++++- 1 file changed, 141 insertions(+), 1 deletion(-) diff --git a/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py b/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py index 0b884d97f0c26..37dfba57fdaff 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py +++ b/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py @@ -23,7 +23,10 @@ from functools import cached_property from typing import TYPE_CHECKING, Any, ClassVar, NoReturn, SupportsAbs -from airflow.exceptions import AirflowException, AirflowFailException +import jinja2 + +from airflow import XComArg +from airflow.exceptions import AirflowException, AirflowFailException, AirflowSkipException from airflow.models import SkipMixin from airflow.providers.common.sql.hooks.handlers import fetch_all_handler, return_single_query_results from airflow.providers.common.sql.hooks.sql import DbApiHook @@ -1252,6 +1255,143 @@ def execute(self, context: Context): self.skip_all_except(context["ti"], follow_branch) +class SQLInsertRowsOperator(BaseSQLOperator): + """ + Insert rows (e.g. a collection of tuples) into a database table directly from an XCom or Python data + structure. + + :param table: the name of the table in which the rows will be inserted (templated). + :param conn_id: the connection ID used to connect to the database + :param schema: (optional) the name of schema in which the table is defined + :param database: name of database (e.g. schema) which overwrite the defined one in connection + :param columns: (optional) specify a list of columns being used for the insert when passing a list of + dictionaries. + :param ignore_columns: (optional) specify a list of columns being ignored for the insert. If no columns + where specified, the columns will be resolved dynamically from the metadata. + :param rows: the rows to insert into the table. Rows can be a list of tuples or a list of dictionaries. + When a list of dictionaries is provided, the column names are inferred from the dictionary keys and + will be matched with the column names, ignored columns will be filtered out. + :rows_processor: (optional) a function that will be applied to the rows before inserting them into the table. + :param preoperator: sql statement or list of statements to be executed prior to loading the data. (templated) + :param postoperator: sql statement or list of statements to be executed after loading the data. (templated) + :param insert_args: (optional) dictionary of additional arguments passed to the underlying hook's + `insert_rows` method. This allows you to configure options such as `replace`, `executemany`, + `fast_executemany`, and `autocommit`. + + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:SQLInsertRowsOperator` + """ + + template_fields: Sequence[str] = ( + "table_name", + "conn_id", + "schema", + "database", + "_columns", + "ignored_columns", + "preoperator", + "postoperator", + "insert_args", + ) + template_ext: Sequence[str] = (".sql",) + template_fields_renderers = {"preoperator": "sql"} + + def __init__( + self, + *, + table_name: str, + conn_id: str, + schema: str | None = None, + database: str | None = None, + columns: Iterable[str] | None = None, + ignored_columns: Iterable[str] | None = None, + rows: list[Any] | XComArg = None, + rows_processor: Callable[[Any, Context], Any] = lambda rows, **context: rows, + preoperator: str | list[str] | None = None, + postoperator: str | list[str] | None = None, + hook_params: dict | None = None, + insert_args: dict | None = None, + **kwargs, + ): + super().__init__( + conn_id=conn_id, + database=database, + hook_params=hook_params, + **kwargs, + ) + self.table_name = table_name + self.schema = schema + self._columns: list | None = list(columns) if columns else None + self.ignored_columns = set(ignored_columns or {}) + self.rows = rows or [] + self._rows_processor = rows_processor + self.preoperator = preoperator + self.postoperator = postoperator + self.insert_args = insert_args or {} + self.do_xcom_push = False + + def render_template_fields( + self, + context: Context, + jinja_env: jinja2.Environment | None = None, + ) -> None: + super().render_template_fields(context=context, jinja_env=jinja_env) + + if isinstance(self.rows, XComArg): + self.rows = self.rows.resolve(context=context) + + @property + def table_name_with_schema(self) -> str: + if self.schema is not None: + return f"{self.schema}.{self.table_name}" + return self.table_name + + @cached_property + def columns(self): + if self._columns is None: + self._columns = self.get_db_hook().dialect.get_column_names( + self.table_name_with_schema + ) + return self._columns + + @property + def column_names(self) -> list[str]: + if self.ignored_columns: + return [ + column for column in self.columns if column not in self.ignored_columns + ] + return self.columns + + def _process_rows(self, context: Context): + return self._rows_processor(context, self.rows) # type: ignore + + def execute(self, context: Context) -> Any: + if self.rows is None: + raise AirflowSkipException( + f"Skipping task {self.task_id} because rows is None." + ) + + self.log.debug("Table: %s", self.table_name_with_schema) + self.log.debug("Column names: %s", self.column_names) + if self.preoperator: + self.log.debug("Running preoperator") + self.log.debug(self.preoperator) + self.get_db_hook().run(self.preoperator) + rows = self._process_rows(context=context) + self.get_db_hook().insert_rows( + table=self.table_name_with_schema, + rows=rows, + target_fields=self.column_names, + **self.insert_args, + ) + if self.postoperator: + self.log.debug("Running postoperator") + self.log.debug(self.postoperator) + self.get_db_hook().run(self.postoperator) + + def _initialize_partition_clause(clause: str | None) -> str | None: """Ensure the partition_clause contains only valid patterns.""" if clause is None: From 7318e71cfd48b0c01dd841b6d89754cf7f40bd0d Mon Sep 17 00:00:00 2001 From: David Blain Date: Thu, 28 Aug 2025 17:06:26 +0200 Subject: [PATCH 02/10] docs: Added documentation for the SQLInsertRowsOperator --- providers/common/sql/docs/operators.rst | 33 ++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/providers/common/sql/docs/operators.rst b/providers/common/sql/docs/operators.rst index fa62b6fc11f32..0d055447110f5 100644 --- a/providers/common/sql/docs/operators.rst +++ b/providers/common/sql/docs/operators.rst @@ -174,7 +174,38 @@ The below example demonstrates how to instantiate the SQLThresholdCheckOperator If the value returned by the query, is within the thresholds, the task passes. Otherwise, it fails. -.. _howto/operator:GenericTransfer: +.. _howto/operator:SQLInsertRowsOperator: + +Insert rows into Table +~~~~~~~~~~~~~~~~~~~~~~ + +Use the :class:`~airflow.providers.common.sql.operators.sql.SQLInsertRowsOperator` to insert rows into a database table +directly from Python data structures or an XCom. Parameters of the operator are: + +- ``table_name`` - name of the table in which the rows will be inserted (templated). +- ``conn_id`` - the Airflow connection ID used to connect to the database. +- ``schema`` (optional) - the schema in which the table is defined. +- ``database`` (optional) - name of the database which overrides the one defined in the connection. +- ``columns`` (optional) - list of columns to use for the insert when passing a list of dictionaries. +- ``ignored_columns`` (optional) - list of columns to ignore for the insert, if no columns are specified, + columns will be dynamically resolved from the metadata. +- ``rows`` - rows to insert, a list of tuples. +- ``rows_processor`` (optional) - a function applied to the rows before inserting them. +- ``preoperator`` (optional) - SQL statement or list of statements to execute before inserting data (templated). +- ``postoperator`` (optional) - SQL statement or list of statements to execute after inserting data (templated). +- ``hook_params`` (optional) - dictionary of additional parameters passed to the underlying hook. +- ``insert_args`` (optional) - dictionary of additional arguments passed to the hook's ``insert_rows`` method, + can include ``replace``, ``executemany``, ``fast_executemany``, ``autocommit``, and others supported by the hook. + +The example below shows how to instantiate the SQLInsertRowsOperator task. + +.. exampleinclude:: /../tests/system/common/sql/example_sql_insert_rows.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_sql_insert_rows] + :end-before: [END howto_operator_sql_insert_rows] + +.. _howto/operator:SQLInsertRowsOperator: Generic Transfer ~~~~~~~~~~~~~~~~ From be48ee7ceafa5b647dd78c23936d695a974d2e83 Mon Sep 17 00:00:00 2001 From: David Blain Date: Thu, 28 Aug 2025 17:06:40 +0200 Subject: [PATCH 03/10] docs: Added example file for the SQLInsertRowsOperator --- .../common/sql/example_sql_insert_rows.py | 90 +++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 providers/common/sql/tests/system/common/sql/example_sql_insert_rows.py diff --git a/providers/common/sql/tests/system/common/sql/example_sql_insert_rows.py b/providers/common/sql/tests/system/common/sql/example_sql_insert_rows.py new file mode 100644 index 0000000000000..ba24082602e6a --- /dev/null +++ b/providers/common/sql/tests/system/common/sql/example_sql_insert_rows.py @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from airflow import DAG +from airflow.providers.common.sql.operators.sql import SQLInsertRowsOperator +from airflow.utils.timezone import datetime + +AIRFLOW_DB_METADATA_TABLE = "ab_user" +connection_args = { + "conn_id": "airflow_db", + "conn_type": "Postgres", + "host": "postgres", + "schema": "postgres", + "login": "postgres", + "password": "postgres", + "port": 5432, +} + +with DAG( + "example_sql_insert_rows", + description="Example DAG for SQLInsertRowsOperator.", + default_args=connection_args, + start_date=datetime(2021, 1, 1), + schedule=None, + catchup=False, +) as dag: + """ + ### Example SQL insert rows DAG + + Runs the SQLInsertRowsOperator against the Airflow metadata DB. + """ + + # [START howto_operator_sql_insert_rows] + insert_rows = SQLInsertRowsOperator( + task_id="insert_rows", + table_name="actors", + columns=[ + "name", + "firstname", + "age", + ], + rows=[ + ("Stallone", "Sylvester", 78), + ("Statham", "Jason", 57), + ("Li", "Jet", 61), + ("Lundgren", "Dolph", 66), + ("Norris", "Chuck", 84), + ], + preoperator=[ + """ + CREATE TABLE IF NOT EXISTS actors ( + index BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + name TEXT NOT NULL, + firstname TEXT NOT NULL, + age BIGINT NOT NULL + ); + """, + "TRUNCATE TABLE actors;", + ], + postoperator="DROP TABLE IF EXISTS actors;", + insert_args={ + "commit_every": 1000, + "autocommit": False, + "executemany": True, + "fast_executemany": True, + }, + ) + # [END howto_operator_sql_insert_rows] + + +from tests_common.test_utils.system_tests import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) From 8ecd568bb57853b842cfa4bb7c8a842ae276ad04 Mon Sep 17 00:00:00 2001 From: David Blain Date: Fri, 29 Aug 2025 23:14:23 +0200 Subject: [PATCH 04/10] refactor: Fixed doc GenericTransfer: --- providers/common/sql/docs/operators.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/common/sql/docs/operators.rst b/providers/common/sql/docs/operators.rst index 0d055447110f5..837a003739748 100644 --- a/providers/common/sql/docs/operators.rst +++ b/providers/common/sql/docs/operators.rst @@ -205,7 +205,7 @@ The example below shows how to instantiate the SQLInsertRowsOperator task. :start-after: [START howto_operator_sql_insert_rows] :end-before: [END howto_operator_sql_insert_rows] -.. _howto/operator:SQLInsertRowsOperator: +.. _howto/operator:GenericTransfer: Generic Transfer ~~~~~~~~~~~~~~~~ From 00475cb95280400bfe8637eada7d78fd2281cfc0 Mon Sep 17 00:00:00 2001 From: David Blain Date: Fri, 29 Aug 2025 23:15:50 +0200 Subject: [PATCH 05/10] refactor: Fixed static checks SQLInsertRowsOperator --- .../airflow/providers/common/sql/operators/sql.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py b/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py index 37dfba57fdaff..429ac1eaab0db 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py +++ b/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py @@ -1351,27 +1351,21 @@ def table_name_with_schema(self) -> str: @cached_property def columns(self): if self._columns is None: - self._columns = self.get_db_hook().dialect.get_column_names( - self.table_name_with_schema - ) + self._columns = self.get_db_hook().dialect.get_column_names(self.table_name_with_schema) return self._columns @property def column_names(self) -> list[str]: if self.ignored_columns: - return [ - column for column in self.columns if column not in self.ignored_columns - ] + return [column for column in self.columns if column not in self.ignored_columns] return self.columns def _process_rows(self, context: Context): return self._rows_processor(context, self.rows) # type: ignore def execute(self, context: Context) -> Any: - if self.rows is None: - raise AirflowSkipException( - f"Skipping task {self.task_id} because rows is None." - ) + if not self.rows: + raise AirflowSkipException(f"Skipping task {self.task_id} because no rows.") self.log.debug("Table: %s", self.table_name_with_schema) self.log.debug("Column names: %s", self.column_names) From 1877a18c54c13f0638c52f6fc47ec06948d2c776 Mon Sep 17 00:00:00 2001 From: David Blain Date: Fri, 29 Aug 2025 23:20:09 +0200 Subject: [PATCH 06/10] refactor: Made conn_id optional in SQLInsertRowsOperator --- .../sql/src/airflow/providers/common/sql/operators/sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py b/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py index 429ac1eaab0db..df8719e482460 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py +++ b/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py @@ -1302,7 +1302,7 @@ def __init__( self, *, table_name: str, - conn_id: str, + conn_id: str | None = None, schema: str | None = None, database: str | None = None, columns: Iterable[str] | None = None, From d8d90f4b7993f609f99be299ffc962da331d2583 Mon Sep 17 00:00:00 2001 From: David Blain Date: Sat, 30 Aug 2025 08:17:48 +0200 Subject: [PATCH 07/10] refactor: Made rows optional in SQLInsertRowsOperator to fix PEP 484 issue --- .../sql/src/airflow/providers/common/sql/operators/sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py b/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py index df8719e482460..9b0ce633d5442 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py +++ b/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py @@ -1307,7 +1307,7 @@ def __init__( database: str | None = None, columns: Iterable[str] | None = None, ignored_columns: Iterable[str] | None = None, - rows: list[Any] | XComArg = None, + rows: list[Any] | XComArg | None = None, rows_processor: Callable[[Any, Context], Any] = lambda rows, **context: rows, preoperator: str | list[str] | None = None, postoperator: str | list[str] | None = None, From 793789a07cd8f41a40470f93b17306fb2dce0eea Mon Sep 17 00:00:00 2001 From: David Blain Date: Sat, 30 Aug 2025 08:19:46 +0200 Subject: [PATCH 08/10] refactor: Removed duplicate blank line in SQLInsertRowsOperator docstring --- .../common/sql/src/airflow/providers/common/sql/operators/sql.py | 1 - 1 file changed, 1 deletion(-) diff --git a/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py b/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py index 9b0ce633d5442..410bd7119c75d 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py +++ b/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py @@ -1278,7 +1278,6 @@ class SQLInsertRowsOperator(BaseSQLOperator): `insert_rows` method. This allows you to configure options such as `replace`, `executemany`, `fast_executemany`, and `autocommit`. - .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:SQLInsertRowsOperator` From 6c5a7b4bb68f106c36bf97d9100845d9a8b85a05 Mon Sep 17 00:00:00 2001 From: David Blain Date: Sat, 30 Aug 2025 08:20:18 +0200 Subject: [PATCH 09/10] refactor: Moved jinja to type checking block --- .../sql/src/airflow/providers/common/sql/operators/sql.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py b/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py index 410bd7119c75d..112bdce76d46d 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py +++ b/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py @@ -23,8 +23,6 @@ from functools import cached_property from typing import TYPE_CHECKING, Any, ClassVar, NoReturn, SupportsAbs -import jinja2 - from airflow import XComArg from airflow.exceptions import AirflowException, AirflowFailException, AirflowSkipException from airflow.models import SkipMixin @@ -34,6 +32,8 @@ from airflow.utils.helpers import merge_dicts if TYPE_CHECKING: + import jinja2 + from airflow.providers.openlineage.extractors import OperatorLineage from airflow.utils.context import Context From fd4920f205f882c6d8337da9daad12e52e504244 Mon Sep 17 00:00:00 2001 From: David Blain Date: Sat, 30 Aug 2025 17:22:30 +0200 Subject: [PATCH 10/10] refactor: Put summary on one line --- .../sql/src/airflow/providers/common/sql/operators/sql.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py b/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py index 112bdce76d46d..6caa2000059e9 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py +++ b/providers/common/sql/src/airflow/providers/common/sql/operators/sql.py @@ -1257,8 +1257,7 @@ def execute(self, context: Context): class SQLInsertRowsOperator(BaseSQLOperator): """ - Insert rows (e.g. a collection of tuples) into a database table directly from an XCom or Python data - structure. + Insert rows (e.g. a collection of tuples) into a database table directly from an XCom or Python data structure. :param table: the name of the table in which the rows will be inserted (templated). :param conn_id: the connection ID used to connect to the database