Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental BQ support to run dbt models with ExecutionMode.AIRFLOW_ASYNC #1230

Merged
merged 51 commits into from
Oct 3, 2024

Conversation

pankajastro
Copy link
Contributor

@pankajastro pankajastro commented Sep 29, 2024

This work has been inspired by the talk "Airflow at Monzo: Evolving our data platform as the bank scales" by @jonathanrainer @ed-sparkes given at Airflow Summit 2023: https://airflowsummit.org/sessions/2023/airflow-at-monzo-evolving-our-data-platform-as-the-bank-scales/.

Enable BQ users to run dbt models (full_refresh) asynchronously. This releases the Airflow worker node from waiting while the transformation (I/O) happens in the dataware house (some of which can take hours, according to customers), increasing the overall Airflow task throughput (more information: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html). As part of this change, we introduce the capability of not using the dbt command to run actual SQL transformations. This also avoids creating subprocesses in the worker node (ExecutionMode.LOCAL with InvocationMode. SUBPROCESS and ExecutionMode.VIRTUALENV) or the overhead of creating a Kubernetes Pod to execute the actual dbt command (ExecutionMode.KUBERNETES). This can avoid issues related to memory and CPU usage.

This PR takes advantage of an already implemented async operator in the Airflow repo by extending it in the Cosmos async operator. It also utilizes the pre-compiled SQL generated as part of the PR #1224. It downloads the generated SQL from a remote location (S3/GCS), which allows us to decouple from dbt during task execution.

Details

  • Expose get_profile_type on ProfileConfig: This aids in database selection
  • Add async_op_args: A high-level parameter to forward arguments to the upstream operator (Airflow operator). (This may change in this PR itself) The async operator params are process as kwargs in the operator_args parameter
  • Implement DbtRunAirflowAsyncOperator: This initializes the Airflow Operator, retrieves the SQL query at task runtime from a remote location, modifies the query as needed, and triggers the upstream execute method.

Limitations

  • This feature only works when using Airflow 2.8 and above
  • The async execution only works for BigQuery
  • The async execution only supports running dbt models (other dbt resources, such as seeds, sources, snapshots, tests, are run using the ExecutionMode.LOCAL)
  • This will work only if the user provides sets full_refresh=True in operator_args (which means tables will be dropped before being populated, as implemented in dbt-core)
  • Users need to use ProfileMapping in ProfileConfig, since Cosmos relies on having the connection (credentials) to be able to run the transformation in BQ without dbt-core
  • Users must provide the BQ location in operator_args (this is a limitation from the BigQueryInsertJobOperator that is being used to implement the native Airflow asynchronous support)

Testing

We have added a new dbt project to the repository to facilitate asynchronous task execution. The goal is to accelerate development without disrupting or requiring fixes for the existing tests. Also, we have added DAG for end-to-end testing https://github.com/astronomer/astronomer-cosmos/blob/bd6657a29b111510fc34b2baf0bcc0d65ec0e5b9/dev/dags/simple_dag_async.py

Configuration

Users need to configure the below param to execute deferrable tasks in the Cosmos

Example DAG: https://github.com/astronomer/astronomer-cosmos/blob/bd6657a29b111510fc34b2baf0bcc0d65ec0e5b9/dev/dags/simple_dag_async.py

Installation

You can leverage async operator support by installing an additional dependency

astronomer-cosmos[dbt-bigquery, google]

Documentation

The PR also document the limitations and uses of Airflow async execution in the Cosmos.

Related Issue(s)

Related to: #1120
Closes: #1134

Breaking Change?

No

Notes

This is an experimental feature, and as such, it may undergo breaking changes. We encourage users to share their experiences and feedback to improve it further.

We'd love support and feedback so we can define the next steps.

Checklist

  • I have made corresponding changes to the documentation (if required)
  • I have added tests that prove my fix is effective or that my feature works

Credits

This was a result of teamwork and effort:

Co-authored-by: Pankaj Koti pankajkoti699@gmail.com
Co-authored-by: Tatiana Al-Chueyr tatiana.alchueyr@gmail.com

Future Work

@pankajastro pankajastro force-pushed the execute-async-task branch 2 times, most recently from 35e58b6 to 407d311 Compare September 30, 2024 09:32
Remove print stmt

Fix query
Fix query

Remove oss execute method code
tatiana pushed a commit that referenced this pull request Sep 30, 2024
This PR is the groundwork for the implementation of
`ExecutionMode.AIRFLOW_ASYNC`
(#1120), which -
once all other epic tasks are completed - will enable asynchronous
execution of dbt resources using Apache Airflow’s deferrable operators.
As part of this work, this PR introduces a new option to the enum
`ExecutionMode` : `AIRFLOW_ASYNC`. When this execution mode is used,
Cosmos now creates a setup task that will pre-compile the dbt project
SQL and make it available to the remaining dbt tasks. This PR, however,
does not yet leverage Airflow's deferrable operators. If users use
`ExecutionMode.AIRFLOW_ASYNC` they will actually be running
`ExecutionMode.LOCAL` operators with this change. The PR (#1230) has a
first experimental version of using deferrable operators for task
execution.

## Setup task as the ground work for a new Execution Mode:
`ExecutionMode.AIRFLOW_ASYNC`:
- Adds a new operator, `DbtCompileAirflowAsyncOperator`, as a root
task(analogous to a setup task) in the DAG, running the dbt compile
command and uploading the compiled SQL files to a remote storage
location for subsequent tasks that fetch these compiled SQL files from
the remote storage and run them asynchronously using Airflow's
deferrable operators.

## Airflow Configurations:
- `remote_target_path`: Introduces a configurable path to store
dbt-generated files remotely, supporting any storage scheme that works
with Airflow’s Object Store (e.g., S3, GCS, Azure Blob).
- `remote_target_path_conn_id`: Allows specifying a custom connection ID
for the remote target path, defaulting to the scheme’s associated
Airflow connection if not set.

## Example DAG for CI Testing:
Introduces an example DAG (`simple_dag_async.py`) demonstrating how to
use the new execution mode(The execution like mentioned earlier would
still run like Execution Mode LOCAL operators at the moment with this PR
alone)
This DAG is integrated into the CI pipeline to run integration tests and
aims at verifying the functionality of the `ExecutionMode.AIRFLOW_ASYNC`
as and when implementation gets added starting with the experimental
implementation in #1230 .

## Unit & Integration Tests:
- Adds comprehensive unit and integration tests to ensure correct
behavior.
- Tests include validation for successful uploads, error handling for
misconfigured remote paths, and scenarios where `remote_target_path` are
not set.

## Documentation:
- Adds detailed documentation explaining how to configure and set the
`ExecutionMode.AIRFLOW_ASYNC`.

## Scope & Limitations of the feature being introduced:
1. This feature is meant to be released as Experimental and is also
marked so in the documentation.
2. Currently, it has been scoped for only dbt models to be executed
asynchronously (being worked upon in PR #1230), while other resource
types would be run synchronously.
3. `BigQuery` will be the only supported target database for this
execution mode ((being worked upon in PR #1230).

Thus, this PR enhances Cosmos by providing the ground work for more
efficient execution of long-running dbt resources

## Additional Notes:
- This feature is planned to be introduced in Cosmos v1.7.0.

related: #1134
Base automatically changed from poc-dbt-compile-task to main September 30, 2024 21:57
Copy link

netlify bot commented Sep 30, 2024

Deploy Preview for sunny-pastelito-5ecb04 ready!

Name Link
🔨 Latest commit 78bc069
🔍 Latest deploy log https://app.netlify.com/sites/sunny-pastelito-5ecb04/deploys/66fb24b60ca64b00085c5976
😎 Deploy Preview https://deploy-preview-1230--sunny-pastelito-5ecb04.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

Copy link

netlify bot commented Sep 30, 2024

Deploy Preview for sunny-pastelito-5ecb04 canceled.

Name Link
🔨 Latest commit a0cb147
🔍 Latest deploy log https://app.netlify.com/sites/sunny-pastelito-5ecb04/deploys/66fe73da6f73b00008c98193

.github/workflows/test.yml Outdated Show resolved Hide resolved
@tatiana
Copy link
Collaborator

tatiana commented Oct 2, 2024

On the issue:

There are two issues we have to solve before merging this PR

  1. CI is getting stuck when trying to run
    tests/test_example_dags.py::test_example_dag[simple_dag_async]

We identified that Cosmos was using its own dag.test that was buggy when used with Airflow deferrable operators. We changed to use Airflow's one if users are using AF >= 2.5 and it seems to have resolved the issue:
16a87ea (CI jobs were no longer stuck)

Copy link
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incredible work, @pankajastro @pankajkoti ! 🎉

To run dbt transformations without dbt while leveraging Airflow asynchronous processing is far from trivial - and this PR is a significant step in the right direction.

@dosubot dosubot bot added the lgtm This PR has been approved by a maintainer label Oct 2, 2024
Copy link

codecov bot commented Oct 2, 2024

Codecov Report

Attention: Patch coverage is 89.74359% with 12 lines in your changes missing coverage. Please review.

Project coverage is 95.73%. Comparing base (225d6e9) to head (a0cb147).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
cosmos/config.py 36.36% 7 Missing ⚠️
cosmos/operators/airflow_async.py 94.04% 5 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1230      +/-   ##
==========================================
- Coverage   95.93%   95.73%   -0.20%     
==========================================
  Files          67       67              
  Lines        3885     3965      +80     
==========================================
+ Hits         3727     3796      +69     
- Misses        158      169      +11     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@jlaneve
Copy link
Collaborator

jlaneve commented Oct 2, 2024

I haven't read through the code but this feature looks so awesome - it's something we've been talking about since day 1 so it's incredible to see it come to life!

@pankajastro
Copy link
Contributor Author

Thank you so much, @pankajkoti and @tatiana, for taking this to completion. I truly appreciate your support and hard work 🙏

@tatiana tatiana merged commit 111d430 into main Oct 3, 2024
66 of 68 checks passed
@tatiana tatiana deleted the execute-async-task branch October 3, 2024 16:13
tatiana pushed a commit that referenced this pull request Oct 3, 2024
Following up on the documentation added in PRs #1224 and #1230, this PR
refactors the documentation for Async Execution mode, particularly the
limitations section.

It also addresses a couple of un-rendered items in the scheduling.rst
file, caused by missing blank lines after the code-block directive.
tatiana added a commit that referenced this pull request Oct 4, 2024
New Features

* Introduction of experimental support to run dbt BQ models using Airflow deferrable operators by @pankajkoti @pankajastro @tatiana in #1224 #1230.
  This is a first step in this journey and we would really appreciate feedback from the community.

  For more information, check the documentation: https://astronomer.github.io/astronomer-cosmos/getting_started/execution-modes.html#airflow-async-experimental

  This work has been inspired by the talk "Airflow at Monzo: Evolving our data platform as the bank scales" by
  @jonathanrainer @ed-sparkes given at Airflow Summit 2023: https://airflowsummit.org/sessions/2023/airflow-at-monzo-evolving-our-data-platform-as-the-bank-scales/.

* Support using ``DatasetAlias`` and fix orphaning unreferenced dataset by @tatiana in #1217 #1240

  Documentation: https://astronomer.github.io/astronomer-cosmos/configuration/scheduling.html#data-aware-scheduling

* Add GCP_CLOUD_RUN_JOB execution mode by @ags-de #1153

  Learn more about it: https://astronomer.github.io/astronomer-cosmos/getting_started/gcp-cloud-run-job.html

Enhancements

* Create single virtualenv when ``DbtVirtualenvBaseOperator`` has ``virtualenv_dir=None`` and ``is_virtualenv_dir_temporary=True`` by @kesompochy in #1200
* Consistently handle build and imports in ``cosmos/__init__.py`` by @tatiana in #1215
* Add enum constants to init for direct import by @fabiomx in #1184

Bug fixes

* URL encode dataset names to support multibyte characters by @t0momi219 in #1198
* Fix invalid argument (``full_refresh``) passed to DbtTestAwsEksOperator (and others) by @johnhoran in #1175
* Fix ``printer_width`` arg type in ``DbtProfileConfigVars`` by @jessicaschueler in #1191
* Fix task owner fallback by @jmaicher in #1195

Docs

* Add scarf to readme and docs for website analytics by @cmarteepants in #1221
* Add ``virtualenv_dir`` param to ``ExecutionConfig`` docs by @pankajkoti in #1173
* Give credits to @LennartKloppenburg in CHANGELOG.rst by @tatiana #1174
* Refactor docs for async mode execution by @pankajkoti in #1241

Others

* Remove PR branch added for testing a change in CI in #1224 by @pankajkoti in #1233
* Fix CI wrt broken coverage upload artifact @pankajkoti in #1210
* Fix CI issues - Upgrade actions/upload-artifact & actions/download-artifact to v4 and set min version for packaging by @pankajkoti in #1208
* Resolve CI failures for Apache Airflow 2.7 jobs by @pankajkoti in #1182
* CI: Update GCP manifest file path based on new secret update by @pankajkoti in #1237
* Pre-commit hook updates in #1176 #1186, #1186, #1201, #1219, #1231
tatiana added a commit that referenced this pull request Oct 4, 2024
New Features

* Introduction of experimental support to run dbt BQ models using Airflow deferrable operators by @pankajkoti @pankajastro @tatiana in #1224 #1230.
  This is a first step in this journey and we would really appreciate feedback from the community.

  For more information, check the documentation: https://astronomer.github.io/astronomer-cosmos/getting_started/execution-modes.html#airflow-async-experimental

  This work has been inspired by the talk "Airflow at Monzo: Evolving our data platform as the bank scales" by
  @jonathanrainer @ed-sparkes given at Airflow Summit 2023: https://airflowsummit.org/sessions/2023/airflow-at-monzo-evolving-our-data-platform-as-the-bank-scales/.

* Support using ``DatasetAlias`` and fix orphaning unreferenced dataset by @tatiana in #1217 #1240

  Documentation: https://astronomer.github.io/astronomer-cosmos/configuration/scheduling.html#data-aware-scheduling

* Add GCP_CLOUD_RUN_JOB execution mode by @ags-de #1153

  Learn more about it: https://astronomer.github.io/astronomer-cosmos/getting_started/gcp-cloud-run-job.html

Enhancements

* Create single virtualenv when ``DbtVirtualenvBaseOperator`` has ``virtualenv_dir=None`` and ``is_virtualenv_dir_temporary=True`` by @kesompochy in #1200
* Consistently handle build and imports in ``cosmos/__init__.py`` by @tatiana in #1215
* Add enum constants to init for direct import by @fabiomx in #1184

Bug fixes

* URL encode dataset names to support multibyte characters by @t0momi219 in #1198
* Fix invalid argument (``full_refresh``) passed to DbtTestAwsEksOperator (and others) by @johnhoran in #1175
* Fix ``printer_width`` arg type in ``DbtProfileConfigVars`` by @jessicaschueler in #1191
* Fix task owner fallback by @jmaicher in #1195

Docs

* Add scarf to readme and docs for website analytics by @cmarteepants in #1221
* Add ``virtualenv_dir`` param to ``ExecutionConfig`` docs by @pankajkoti in #1173
* Give credits to @LennartKloppenburg in CHANGELOG.rst by @tatiana #1174
* Refactor docs for async mode execution by @pankajkoti in #1241

Others

* Remove PR branch added for testing a change in CI in #1224 by @pankajkoti in #1233
* Fix CI wrt broken coverage upload artifact @pankajkoti in #1210
* Fix CI issues - Upgrade actions/upload-artifact & actions/download-artifact to v4 and set min version for packaging by @pankajkoti in #1208
* Resolve CI failures for Apache Airflow 2.7 jobs by @pankajkoti in #1182
* CI: Update GCP manifest file path based on new secret update by @pankajkoti in #1237
* Pre-commit hook updates in #1176 #1186, #1186, #1201, #1219, #1231
tatiana added a commit that referenced this pull request Oct 4, 2024
**New Features**

* Support using ``DatasetAlias`` and fix orphaning unreferenced dataset
by @tatiana in #1217 #1240

Documentation:
https://astronomer.github.io/astronomer-cosmos/configuration/scheduling.html#data-aware-scheduling

* Add GCP_CLOUD_RUN_JOB execution mode by @ags-de #1153

Learn more about it:
https://astronomer.github.io/astronomer-cosmos/getting_started/gcp-cloud-run-job.html

* Introduction of experimental support to run dbt BQ models using
Airflow deferrable operators by @pankajkoti @pankajastro @tatiana in
#1224 #1230.

This is the first step in the journey of running dbt resources with
native Airflow, and we would appreciate feedback from the community.

For more information, check the documentation:
https://astronomer.github.io/astronomer-cosmos/getting_started/execution-modes.html#airflow-async-experimental

This work has been inspired by the talk "Airflow at Monzo: Evolving our
data platform as the bank scales" by
@jonathanrainer @ed-sparkes given at Airflow Summit 2023:
https://airflowsummit.org/sessions/2023/airflow-at-monzo-evolving-our-data-platform-as-the-bank-scales/.


**Enhancements**

* Create single virtualenv when ``DbtVirtualenvBaseOperator`` has
``virtualenv_dir=None`` and ``is_virtualenv_dir_temporary=True`` by
@kesompochy in #1200
* Consistently handle build and imports in ``cosmos/__init__.py`` by
@tatiana in #1215
* Add enum constants to init for direct import by @fabiomx in #1184

**Bug fixes**

* URL encode dataset names to support multibyte characters by @t0momi219
in #1198
* Fix invalid argument (``full_refresh``) passed to
DbtTestAwsEksOperator (and others) by @johnhoran in #1175
* Fix ``printer_width`` arg type in ``DbtProfileConfigVars`` by
@jessicaschueler in #1191
* Fix task owner fallback by @jmaicher in #1195

**Docs**

* Add scarf to readme and docs for website analytics by @cmarteepants in
#1221
* Add ``virtualenv_dir`` param to ``ExecutionConfig`` docs by
@pankajkoti in #1173
* Give credits to @LennartKloppenburg in CHANGELOG.rst by @tatiana #1174
* Refactor docs for async mode execution by @pankajkoti in #1241

Others

* Remove PR branch added for testing a change in CI in #1224 by
@pankajkoti in #1233
* Fix CI wrt broken coverage upload artifact @pankajkoti in #1210
* Fix CI issues - Upgrade actions/upload-artifact &
actions/download-artifact to v4 and set min version for packaging by
@pankajkoti in #1208
* Resolve CI failures for Apache Airflow 2.7 jobs by @pankajkoti in
#1182
* CI: Update GCP manifest file path based on new secret update by
@pankajkoti in #1237
* Pre-commit hook updates in #1176 #1186, #1186, #1201, #1219, #1231

---------

Co-authored-by: Pankaj Koti <pankajkoti699@gmail.com>
pankajastro added a commit that referenced this pull request Jan 27, 2025
)

Pluggable Async Operator Interface 
------------------------------------

This PR enhances the initial async operator support in Cosmos,
as introduced in [PR
#1230](#1230). The
changes decouple the DbtRunAirflowAsyncOperator
from BigQueryInsertJobOperator, making it more flexible and allowing
support for async operators with other data sources in the future

Introducing the DbtRunAirflowAsyncFactoryOperator class,
which dynamically selects the parent class containing 
the async operator implementation based on dbt profile.

I’ve added a template for implementing the Databricks async operator.

~ATM moved async operator-related code at path
`cosmos/operators/_async/`, but open for suggestion~
After discussing this with the team, I have moved async operator-related
code at path `cosmos/operators/_asynchronous/`

## Design principle 

Introduced the `DbtRunAirflowAsyncFactoryOperator` class that uses a
Factory Method design pattern combined with dynamic inheritance at
runtime.
- **Factory Method:** The `create_async_operator()` method generates a
specific async operator class based on the profile_config provided. This
allows the operator to adapt to different types of async operator at
runtime.
- **Dynamic Inheritance:** The class dynamically changes its base class
(__bases__) to use the async operator class created in the factory
method. This ensures the correct async class is used during execution.
- **Execution:** The execute() method calls the `super().execute()` to
trigger the execution logic, but it dynamically uses the appropriate
operator class for async behavior.

### Class hierarchy
```
                                    BigQueryInsertJobOperator
                                               |
DbtRunLocalOperator  DbtRunAirflowAsyncBigqueryOperator DbtRunAirflowAsyncDatabricksOperator (inject these parent class at runtime)
              \              /
             DbtRunFactoryAirflowAsyncOperator
                         |
            DbtRunAirflowAsyncOperator
``` 

## How to add a new async db operator
- Implement the operator at the path `cosmos/operators/_asynchronous/`
- The operator module should be in the format of:
`cosmos.operators._asynchronous.{profile_type}.{dbt_class}{_snake_case_to_camelcase(execution_mode)}{profile_type.capitalize()}Operator`
- For details example, I have added a dummy implementation for
`Databricks`

## Example DAG
```python
import os
from datetime import datetime
from pathlib import Path

from cosmos import DbtDag, ExecutionConfig, ExecutionMode, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.profiles import GoogleCloudServiceAccountDictProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=GoogleCloudServiceAccountDictProfileMapping(
        conn_id="gcp_gs_conn", profile_args={"dataset": "release_17", "project": "astronomer-dag-authoring"}
    ),
)


# [START airflow_async_execution_mode_example]
simple_dag_async = DbtDag(
    # dbt/cosmos-specific parameters
    project_config=ProjectConfig(
        DBT_ROOT_PATH / "original_jaffle_shop",
    ),
    profile_config=profile_config,
    execution_config=ExecutionConfig(
        execution_mode=ExecutionMode.AIRFLOW_ASYNC,
    ),
    render_config=RenderConfig(
        select=["path:models"],
        # test_behavior=TestBehavior.NONE
    ),
    # normal dag parameters
    schedule_interval=None,
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="simple_dag_async",
    tags=["simple"],
    operator_args={"full_refresh": True, "location": "northamerica-northeast1"},
)
# [END airflow_async_execution_mode_example]
``` 

<img width="1687" alt="Screenshot 2025-01-23 at 3 30 48 PM"
src="https://github.com/user-attachments/assets/bd7a13cc-7a52-4d55-947e-23a618b47e68"
/>

**Graph View**
<img width="1688" alt="Screenshot 2025-01-27 at 3 41 14 PM"
src="https://github.com/user-attachments/assets/d74a1851-39d9-4575-b9cb-11c286094643"
/>


closes: #1238
closes: #1239
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:config Related to configuration, like YAML files, environment variables, or executer configuration area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc area:profile Related to ProfileConfig, like Athena, BigQuery, Clickhouse, Spark, Trino, etc dbt:run Primarily related to dbt run command or functionality lgtm This PR has been approved by a maintainer profile:bigquery Related to BigQuery ProfileConfig size:XL This PR changes 500-999 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

PoC on ExecutionMode.LOCAL_AIRFLOW_ASYNC
6 participants