Skip to content

Comments

Add rows processor to GenericTransfer#61143

Merged
potiuk merged 37 commits intoapache:mainfrom
dabla:feature/add-rows-processor-generic-transfer
Feb 10, 2026
Merged

Add rows processor to GenericTransfer#61143
potiuk merged 37 commits intoapache:mainfrom
dabla:feature/add-rows-processor-generic-transfer

Conversation

@dabla
Copy link
Contributor

@dabla dabla commented Jan 27, 2026


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

This PR extends the GenericTransfer operator with a rows_processor parameter, similar to SQLInsertRowsOperator.
The rows_processor allows users to post-process rows returned by the source hook before they are inserted into the destination hook.

This enables lightweight transformations (filtering, normalization, enrichment, etc.) without introducing an intermediate task, while keeping database-specific type handling encapsulated in the hook itself.

def rows_processor(rows, **context):
    # Example: normalize string values before insertion
    return [
        tuple(value.strip() if isinstance(value, str) else value for value in row)
        for row in rows
    ]

GenericTransfer(
    task_id="transfer_data",
    source_conn_id="source_db",
    destination_conn_id="target_db",
    sql="SELECT id, name FROM source_table",
    destination_table="target_table",
    rows_processor=rows_processor,
)

In addition, this PR makes SQLExecuteQueryTrigger non-blocking when running on Airflow 3.2 or higher.

Previously, the trigger had to remain blocking due to a bug in the CommsDecoder that could raise a “Response read out of order” error when handling asynchronous trigger events. With this issue fixed in Airflow 3.2, the trigger can now safely run in a non-blocking mode, improving triggerer scalability and resource usage.


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@dabla dabla requested a review from ephraimbuddy January 28, 2026 19:17
Copy link
Contributor

@Nataneljpwd Nataneljpwd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, I have left a few comments and questions, mainly nitpicks

Copy link
Contributor

@Nataneljpwd Nataneljpwd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great!
Only 1 question regarding the python interface file

@dabla
Copy link
Contributor Author

dabla commented Feb 1, 2026

Looks great! Only 1 question regarding the python interface file

Thx for to review @Nataneljpwd, really appreciated, well done! I think I've fixed all issues, awaiting CI/CD now.

@dabla dabla requested a review from Nataneljpwd February 1, 2026 16:33
Copy link
Contributor

@Nataneljpwd Nataneljpwd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, 1 minor nitpick for type hinting, other than that, looks great

Copy link
Contributor

@Nataneljpwd Nataneljpwd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! just need to make the CI pass

@dabla dabla force-pushed the feature/add-rows-processor-generic-transfer branch from 9718eac to 9d3b2ef Compare February 2, 2026 13:57
@dabla dabla requested a review from Nataneljpwd February 2, 2026 18:21
@dabla
Copy link
Contributor Author

dabla commented Feb 3, 2026

@Nataneljpwd thx for the review and helping fixing the mypy issues!

Copy link
Contributor

@Nataneljpwd Nataneljpwd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything looks amazing, nice work!

@dabla dabla requested a review from shahar1 February 6, 2026 17:39
@potiuk potiuk merged commit d70216c into apache:main Feb 10, 2026
90 checks passed
Alok-kumar-priyadarshi pushed a commit to Alok-kumar-priyadarshi/airflow that referenced this pull request Feb 11, 2026
* refactor: Fixed docstring for rows_processor parameter in SQLInsertRowsOperator

* refactor: Added rows_processor parameter in GenericTransfer

* refactor: Make SQLExecuteQueryTrigger non-blocking from Airflow 3.2+

* refactor: Make inserting of rows in sync and deferred mode more DRY

* refactor: Updated GenericTransfer type

* refactor: Ignore mypy for _rows_processor

* refactor: Removed _process_rows and made rows_processor optional in constructor of GenericTransfer

* refactor: Removed _process_rows and made rows_processor optional in constructor of SQLInsertRowsOperator

* refactor: Made _get_records in trigger protected

* refactor: Updated typing of rows_processor in GenericTransfer interface

* refactor: Fixed typing of rows parameter in _insert_rows method

* refactor: Refactored unit test for SQLInsertRowsOperator

* refactor: Fixted typing of _insert_rows method

* refactor: Updated docstring for rows_processor parameter

* refactor: Changed typing of context parameter from Context to Any in rows_processor parameter

* refactor: Changed typing of rows_processor

* refactor: Refactored _insert_rows method in SQLInsertRowsOperator and removed override of render_template_fields

* refactor: Removed unused import of jinja2

* docs: Added comment to explain why rows_processor is typed like this

* Update providers/common/sql/src/airflow/providers/common/sql/operators/sql.py

Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>

* refactor: Removed get_records

* refactor: Evaluate XComArgs first before evaluating rows

* refactor: Fixed check on rows in SQLInsertRowsOperator

---------

Co-authored-by: David Blain <david.blain@b-holding.be>
Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>
Ratasa143 pushed a commit to Ratasa143/airflow that referenced this pull request Feb 15, 2026
* refactor: Fixed docstring for rows_processor parameter in SQLInsertRowsOperator

* refactor: Added rows_processor parameter in GenericTransfer

* refactor: Make SQLExecuteQueryTrigger non-blocking from Airflow 3.2+

* refactor: Make inserting of rows in sync and deferred mode more DRY

* refactor: Updated GenericTransfer type

* refactor: Ignore mypy for _rows_processor

* refactor: Removed _process_rows and made rows_processor optional in constructor of GenericTransfer

* refactor: Removed _process_rows and made rows_processor optional in constructor of SQLInsertRowsOperator

* refactor: Made _get_records in trigger protected

* refactor: Updated typing of rows_processor in GenericTransfer interface

* refactor: Fixed typing of rows parameter in _insert_rows method

* refactor: Refactored unit test for SQLInsertRowsOperator

* refactor: Fixted typing of _insert_rows method

* refactor: Updated docstring for rows_processor parameter

* refactor: Changed typing of context parameter from Context to Any in rows_processor parameter

* refactor: Changed typing of rows_processor

* refactor: Refactored _insert_rows method in SQLInsertRowsOperator and removed override of render_template_fields

* refactor: Removed unused import of jinja2

* docs: Added comment to explain why rows_processor is typed like this

* Update providers/common/sql/src/airflow/providers/common/sql/operators/sql.py

Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>

* refactor: Removed get_records

* refactor: Evaluate XComArgs first before evaluating rows

* refactor: Fixed check on rows in SQLInsertRowsOperator

---------

Co-authored-by: David Blain <david.blain@b-holding.be>
Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>
choo121600 pushed a commit to choo121600/airflow that referenced this pull request Feb 22, 2026
* refactor: Fixed docstring for rows_processor parameter in SQLInsertRowsOperator

* refactor: Added rows_processor parameter in GenericTransfer

* refactor: Make SQLExecuteQueryTrigger non-blocking from Airflow 3.2+

* refactor: Make inserting of rows in sync and deferred mode more DRY

* refactor: Updated GenericTransfer type

* refactor: Ignore mypy for _rows_processor

* refactor: Removed _process_rows and made rows_processor optional in constructor of GenericTransfer

* refactor: Removed _process_rows and made rows_processor optional in constructor of SQLInsertRowsOperator

* refactor: Made _get_records in trigger protected

* refactor: Updated typing of rows_processor in GenericTransfer interface

* refactor: Fixed typing of rows parameter in _insert_rows method

* refactor: Refactored unit test for SQLInsertRowsOperator

* refactor: Fixted typing of _insert_rows method

* refactor: Updated docstring for rows_processor parameter

* refactor: Changed typing of context parameter from Context to Any in rows_processor parameter

* refactor: Changed typing of rows_processor

* refactor: Refactored _insert_rows method in SQLInsertRowsOperator and removed override of render_template_fields

* refactor: Removed unused import of jinja2

* docs: Added comment to explain why rows_processor is typed like this

* Update providers/common/sql/src/airflow/providers/common/sql/operators/sql.py

Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>

* refactor: Removed get_records

* refactor: Evaluate XComArgs first before evaluating rows

* refactor: Fixed check on rows in SQLInsertRowsOperator

---------

Co-authored-by: David Blain <david.blain@b-holding.be>
Co-authored-by: Shahar Epstein <60007259+shahar1@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants