Skip to content

Conversation

@dabla
Copy link
Contributor

@dabla dabla commented Sep 24, 2025

Allow a list of SQL statements to be passed to the GenericTransfer just like the SQLExecuteQueryOperator to avoid the need to use partial/expand which is way more expensive than looping over the SQL-statements within the operator.

The use-case is inspired from this medium article where there it was still using partial/expand.

An example:

load_retrieved_files_to_db = GenericTransfer(
    task_id="load_retrieved_files_to_db",
    source_conn_id="duckdb_duckdb_memory",
    destination_conn_id=MSSQL_CONN_ID,
    destination_table="BRONZE.log_files",
    sql=retrieved_files.output.map(
       lambda file: f"""
       SELECT *
       FROM read_csv(
           '{file}',
           delim = ';',
           header = false,
           columns = {{
               'event_id': 'INTEGER',
               'event_time': 'TIMESTAMP',
               'event': 'VARCHAR',
               'result': 'VARCHAR',
               'info': 'VARCHAR',
               'user_id': 'VARCHAR',
               'origin': 'VARCHAR',
               'destination': 'VARCHAR',
               'session_id': 'INTEGER'
           }}
       );
    """
    ),
    insert_args={
        "target_fields": [
            "event_id",
            "event_time",
            "event",
            "result",
            "info",
            "user_id",
            "origin",
            "destination",
            "session_id",
        ],
        "commit_every": 5000,
        "executemany": True,
        "fast_executemany": True,
    },
)

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

…t like the SQLExecuteQueryOperator to avoid the need to use partial/expand which is way more expensive than looping over the SQL-statements within the operator
@dabla dabla requested a review from potiuk October 22, 2025 05:48
…s/generic_transfer.py

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
@dabla dabla requested a review from uranusjr October 28, 2025 10:27
@eladkal eladkal merged commit 7bec5e9 into apache:main Oct 29, 2025
81 checks passed
Lzzz666 pushed a commit to Lzzz666/airflow that referenced this pull request Oct 30, 2025
* refactor: Allow a list of SQL to be passed to the GenericTransfer just like the SQLExecuteQueryOperator to avoid the need to use partial/expand which is way more expensive than looping over the SQL-statements within the operator

* refactor: Reformatted execute method in GenericTransfer

* Update providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>

* refactor: Reverted list to str for sql parameter in test GenericTransfer

* refactor: Move check of sql parameter in constructor of GenericTransfer

* refactor: Added unit test for GenericTransfer when multiple SQL statements are defined

* Revert "refactor: Move check of sql parameter in constructor of GenericTransfer"

This reverts commit 881f447.

---------

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants