Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deferred pagination mode to GenericTransfer #44809

Draft
wants to merge 25 commits into
base: main
Choose a base branch
from

Conversation

dabla
Copy link
Contributor

@dabla dabla commented Dec 10, 2024

As explained in my Airflow medium blogpost, I've refactored the GenericTransfer to support deferred paginated reads.

When dealing with large datasets, not the whole dataset needs to be read into memory first before persisting it afterwards, as this could otherwise lead to out of memory errors on the worker executing the code.

I also took the opportunity to introduce an SQLExecuteQueryTrigger in the common sql provider, allowing the GenericTransfer to handle the paginated reads in deferred mode, so that the paginated reads can be decoupled from the writes, which shouldn't continuously block the worker as it can offload the reads to the triggerer while persisting the previous page in the meantime.

Once the dialects PR is done, we could improve the way how the GenericTransfer handles the paginated SQL queries across different databases. At this moment the paginated SQL query can be customized through the paginated_sql_statement_format parameter. The read size can be specified through the chunck_size parameter, maybe another (better) name could be preferred here but that I let you guy's decide how it's best named. If no chunk_size is specified, then the original implementation is used and everything is read and persisted in one go.

Last but not least, I've moved the test code to test deferrable operators out of the microsoft azure provider and put it into the common test utils, so it can be re-used across multiple modules.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

…d reads (in deferred mode) and introduce a SQLExecuteQueryTrigger
@dabla
Copy link
Contributor Author

dabla commented Dec 11, 2024

Following dependency check is failing in breeze:

pytest.param(
            ("providers/src/airflow/providers/standard/operators/bash.py",),
            {
                "selected-providers-list-as-string": "common.compat standard",
                "all-python-versions": "['3.9']",
                "all-python-versions-list-as-string": "3.9",
                "python-versions": "['3.9']",
                "python-versions-list-as-string": "3.9",
                "ci-image-build": "true",
                "prod-image-build": "false",
                "needs-helm-tests": "false",
                "run-tests": "true",
                "run-amazon-tests": "false",
                "docs-build": "true",
                "run-kubernetes-tests": "false",
                "skip-pre-commits": "identity,lint-helm-chart,mypy-airflow,mypy-dev,mypy-docs,mypy-providers,mypy-task-sdk,"
                "ts-compile-format-lint-ui,ts-compile-format-lint-www",
                "upgrade-to-newer-dependencies": "false",
                "core-test-types-list-as-string": "Always Core Serialization",
                "providers-test-types-list-as-string": "Providers[common.compat] Providers[standard]",
                "needs-mypy": "true",
                "mypy-checks": "['mypy-providers']",
            },
            id="Providers standard tests and Serialization tests to run when airflow bash.py changed",
        ),

@eladkal @potiuk This error is logical, as I needed to add the common sql provider dependency as the GenericTransfer needs this dependency due to the newly introduced SQLExecuteQueryTrigger used to allow the deferred paging mechanism.

But after some reflection, it still feels unlogical to me that the GenericTransfer operator is part of the standard provider package, unless it allows more than just transferring data from database to database? If not, it would be more logical it resides in the common sql provider or I'm missing something?

I've been going through the code, and checked implementations of the get_records and insert_rows method, which where all implemented by a Hook extending the DbApiHook, but I suspect the DbApiHook was introduced after the GenericTransfer already existed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants