Skip to content

Commit

Permalink
Make dag arg optional for CosmosTaskGroup (#119)
Browse files Browse the repository at this point in the history
Closes: #118

Typically when authoring with TaskGroups in Airflow, users will not
explicitly pass a DAG object but rather set the TaskGroup within a DAG
context manager. Making the `dag` arg optional in the CosmosTaskGroup
will allow the same authoring practice. In Airflow, if a `dag` arg is
not passed to a TaskGroup, behind the scenes
`airflow.models.DagContext.get_current_dag()` is called to retrieve the
DAG object. Since CosmosTaskGroup pushes the `dag` arg to its super
class, and early enough in construction, the same functional behavior is
triggered.

Co-authored-by: Chris Hronek <31361051+pohek321@users.noreply.github.com>
  • Loading branch information
josh-fell and chrishronek authored Feb 6, 2023
1 parent 8cbaec2 commit 0818719
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 13 deletions.
7 changes: 3 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

.. |fury| image:: https://badge.fury.io/py/astronomer-cosmos.svg
:target: https://badge.fury.io/py/astronomer-cosmos

.. |ossrank| image:: https://img.shields.io/endpoint?url=https://ossrank.com/shield/2121
:target: https://ossrank.com/shield/2121

.. |downloads| image:: https://img.shields.io/pypi/dm/astronomer-cosmos.svg
:target: https://img.shields.io/pypi/dm/astronomer-cosmos

Expand Down Expand Up @@ -67,7 +67,7 @@ Simiarly, you can render an Airflow TaskGroups using the ``DbtTaskGroup`` class.
dag_id="extract_dag",
start_date=datetime(2022, 11, 27),
schedule="@daily",
) as dag:
):
e1 = EmptyOperator(task_id="ingestion_workflow")
Expand All @@ -78,7 +78,6 @@ Simiarly, you can render an Airflow TaskGroups using the ``DbtTaskGroup`` class.
dbt_args={
"schema": "public",
},
dag=dag,
)
e2 = EmptyOperator(task_id="some_extraction")
Expand Down
2 changes: 1 addition & 1 deletion cosmos/core/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class CosmosTaskGroup(TaskGroup):
def __init__(
self,
cosmos_group: Group,
dag: DAG,
dag: Optional[DAG] = None,
*args: Any,
**kwargs: Any,
) -> None:
Expand Down
5 changes: 2 additions & 3 deletions docs/dbt/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ Create a DAG and import the :class:`cosmos.providers.dbt.DbtTaskGroup` class. Th
dbt_args={
"schema": "public",
},
dag=dag,
)
e2 = EmptyOperator(task_id="some_extraction")
Expand All @@ -62,5 +61,5 @@ The ``DbtTaskGroup`` operator will automatically generate a TaskGroup with the t

.. figure:: https://github.com/astronomer/astronomer-cosmos/raw/main/docs/_static/dbt_dag.png
:width: 800
dbt's default jaffle_shop project rendered as a TaskGroup in Airflow

dbt's default jaffle_shop project rendered as a TaskGroup in Airflow
1 change: 0 additions & 1 deletion docs/dbt/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ The :class:`cosmos.providers.dbt.DbtTaskGroup` class can be used to render a tas
dbt_args={
"schema": "public",
},
dag=dag,
)
e2 = EmptyOperator(task_id="some_extraction")
Expand Down
4 changes: 1 addition & 3 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ Create a DAG and import the ``DbtTaskGroup`` operator. The ``DbtTaskGroup`` oper
dbt_args={
"schema": "public",
},
dag=dag,
)
e2 = EmptyOperator(task_id="some_extraction")
Expand All @@ -77,7 +76,7 @@ The ``DbtTaskGroup`` operator will automatically generate a TaskGroup with the t

.. figure:: https://github.com/astronomer/astronomer-cosmos/raw/main/docs/_static/dbt_dag.png
:width: 800

dbt's default jaffle_shop project rendered as a TaskGroup in Airflow


Expand All @@ -97,4 +96,3 @@ Cosmos operates on a few guiding principles:
- **Flexible**: Cosmos is not opinionated in that it does not enforce a specific rendering method for third-party systems; users can decide whether they'd like to render their workflow as a DAG, TaskGroup, or individual task.
- **Extensible**: Cosmos is designed to be extensible. Users can add their own parsers and operators to support their own workflows.
- **Modular**: Cosmos is designed to be modular. Users can install only the dependencies they need for their workflows.

1 change: 0 additions & 1 deletion examples/dags/jaffle_shop.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
"dbt_executable_path": "/usr/local/airflow/dbt_venv/bin/dbt",
},
test_behavior="after_all",
dag=dag,
)

post_dbt_workflow = EmptyOperator(task_id="post_dbt_workflow")
Expand Down

0 comments on commit 0818719

Please sign in to comment.