Skip to content

Commit

Permalink
Merge branch 'main' into add-athena-session-token
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana authored Nov 9, 2023
2 parents 3c85872 + 9001c98 commit 06aa561
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 4 deletions.
27 changes: 27 additions & 0 deletions cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,30 @@
"LoadMode",
"TestBehavior",
]

"""
Required provider info for using Airflow config for configuration
"""


def get_provider_info():
return {
"package-name": "astronomer-cosmos", # Required
"name": "Astronomer Cosmos", # Required
"description": "Astronomer Cosmos is a library for rendering dbt workflows in Airflow. Contains dags, task groups, and operators.", # Required
"versions": [__version__], # Required
"config": {
"cosmos": {
"description": None,
"options": {
"propagate_logs": {
"description": "Enable log propagation from Cosmos custom logger\n",
"version_added": "1.3.0a1",
"type": "boolean",
"example": None,
"default": "True",
},
},
},
},
}
5 changes: 5 additions & 0 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations

import copy
import inspect
from typing import Any, Callable

Expand Down Expand Up @@ -118,6 +119,10 @@ def __init__(
# If we are using the old interface, we should migrate it to the new interface
# This is safe to do now since we have validated which config interface we're using
if project_config.dbt_project_path:
# We copy the configuration so the change does not affect other DAGs or TaskGroups
# that may reuse the same original configuration
render_config = copy.deepcopy(render_config)
execution_config = copy.deepcopy(execution_config)
render_config.project_path = project_config.dbt_project_path
execution_config.project_path = project_config.dbt_project_path

Expand Down
5 changes: 5 additions & 0 deletions cosmos/log.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations
import logging

from airflow.configuration import conf
from airflow.utils.log.colored_log import CustomTTYColoredFormatter


Expand All @@ -23,9 +24,13 @@ def get_logger(name: str | None = None) -> logging.Logger:
By using this logger, we introduce a (yellow) astronomer-cosmos string into the project's log messages:
[2023-08-09T14:20:55.532+0100] {subprocess.py:94} INFO - (astronomer-cosmos) - 13:20:55 Completed successfully
"""
propagateLogs: bool = True
if conf.has_option("cosmos", "propagate_logs"):
propagateLogs = conf.getboolean("cosmos", "propagate_logs")
logger = logging.getLogger(name)
formatter: logging.Formatter = CustomTTYColoredFormatter(fmt=LOG_FORMAT) # type: ignore
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.propagate = propagateLogs
return logger
26 changes: 22 additions & 4 deletions dev/dags/basic_cosmos_task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@
An example DAG that uses Cosmos to render a dbt project as a TaskGroup.
"""
import os

from datetime import datetime
from pathlib import Path

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator

from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, RenderConfig, ExecutionConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
Expand All @@ -23,6 +24,8 @@
),
)

shared_execution_config = ExecutionConfig()


@dag(
schedule_interval="@daily",
Expand All @@ -35,19 +38,34 @@ def basic_cosmos_task_group() -> None:
"""
pre_dbt = EmptyOperator(task_id="pre_dbt")

jaffle_shop = DbtTaskGroup(
group_id="test_123",
customers = DbtTaskGroup(
group_id="customers",
project_config=ProjectConfig(
(DBT_ROOT_PATH / "jaffle_shop").as_posix(),
),
render_config=RenderConfig(select=["path:seeds/raw_customers.csv"]),
execution_config=shared_execution_config,
operator_args={"install_deps": True},
profile_config=profile_config,
default_args={"retries": 2},
)

orders = DbtTaskGroup(
group_id="orders",
project_config=ProjectConfig(
(DBT_ROOT_PATH / "jaffle_shop").as_posix(),
),
render_config=RenderConfig(select=["path:seeds/raw_orders.csv"]),
execution_config=shared_execution_config,
operator_args={"install_deps": True},
profile_config=profile_config,
default_args={"retries": 2},
)

post_dbt = EmptyOperator(task_id="post_dbt")

pre_dbt >> jaffle_shop >> post_dbt
pre_dbt >> customers >> post_dbt
pre_dbt >> orders >> post_dbt


basic_cosmos_task_group()
1 change: 1 addition & 0 deletions docs/configuration/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ Cosmos offers a number of configuration options to customize its behavior. For m
Selecting & Excluding <selecting-excluding>
Operator Args <operator-args>
Compiled SQL <compiled-sql>
Logging <logging>
19 changes: 19 additions & 0 deletions docs/configuration/logging.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
.. _logging:

Logging
====================

Cosmos uses a custom logger implementation so that all log messages are clearly tagged with ``(astronomer-cosmos)``. By default this logger has propagation enabled.

In some environments (for example when running Celery workers) this can cause duplicated log messages to appear in the logs. In this case log propagation can be disabled via airflow configuration using the boolean option ``propagate_logs`` under a ``cosmos`` section.

.. code-block:: cfg
[cosmos]
propagate_logs = False
or

.. code-block:: python
AIRFLOW__COSMOS__PROPAGATE_LOGS = "False"
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ kubernetes = [
]


[project.entry-points.cosmos]
provider_info = "cosmos:get_provider_info"

[project.urls]
Homepage = "https://github.com/astronomer/astronomer-cosmos"
Documentation = "https://astronomer.github.io/astronomer-cosmos"
Expand Down
18 changes: 18 additions & 0 deletions tests/test_log.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging

from cosmos import get_provider_info
from cosmos.log import get_logger
from airflow.configuration import conf


def test_get_logger():
Expand All @@ -12,3 +14,19 @@ def test_get_logger():
assert custom_logger.propagate is True
assert custom_logger.handlers[0].formatter.__class__.__name__ == "CustomTTYColoredFormatter"
assert custom_string in custom_logger.handlers[0].formatter._fmt


def test_propagate_logs_conf():
if not conf.has_section("cosmos"):
conf.add_section("cosmos")
conf.set("cosmos", "propagate_logs", "False")
custom_logger = get_logger("cosmos-log")
assert custom_logger.propagate is False


def test_get_provider_info():
provider_info = get_provider_info()
assert "cosmos" in provider_info.get("config").keys()
assert "options" in provider_info.get("config").get("cosmos").keys()
assert "propagate_logs" in provider_info.get("config").get("cosmos").get("options").keys()
assert provider_info["config"]["cosmos"]["options"]["propagate_logs"]["type"] == "boolean"

0 comments on commit 06aa561

Please sign in to comment.