Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5f382d018612b61b547d05402c6988bc5b9959caa7e494dbb8775a3a5c425e31
1c79db933fe961f2d23605e1dcf73125923ea39bbd719c800e8adc1aa5bedb52
4,316 changes: 2,158 additions & 2,158 deletions airflow-core/docs/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 5 additions & 5 deletions airflow-core/src/airflow/api_fastapi/core_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
from airflow.api_fastapi.auth.tokens import get_signing_key
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.settings import AIRFLOW_PATH

log = logging.getLogger(__name__)

PY313 = sys.version_info >= (3, 13)
_PY313 = sys.version_info >= (3, 13)
_AIRFLOW_PATH = Path(__file__).parents[3]


def init_views(app: FastAPI) -> None:
Expand All @@ -50,7 +50,7 @@ def init_views(app: FastAPI) -> None:

dev_mode = os.environ.get("DEV_MODE", str(False)) == "true"

directory = Path(AIRFLOW_PATH) / ("airflow/ui/dev" if dev_mode else "airflow/ui/dist")
directory = _AIRFLOW_PATH / ("airflow/ui/dev" if dev_mode else "airflow/ui/dist")

# During python tests or when the backend is run without having the frontend build
# those directories might not exist. App should not fail initializing in those scenarios.
Expand All @@ -61,7 +61,7 @@ def init_views(app: FastAPI) -> None:
if dev_mode:
app.mount(
"/static/i18n/locales",
StaticFiles(directory=Path(AIRFLOW_PATH) / "airflow/ui/public/i18n/locales"),
StaticFiles(directory=_AIRFLOW_PATH / "airflow/ui/public/i18n/locales"),
name="dev_i18n_static",
)

Expand Down Expand Up @@ -124,7 +124,7 @@ def init_flask_plugins(app: FastAPI) -> None:
try:
from airflow.providers.fab.www.app import create_app
except ImportError:
if PY313:
if _PY313:
log.info(
"Some Airflow 2 plugins have been detected in your environment. Currently FAB provider "
"does not support Python 3.13, so you cannot use Airflow 2 plugins with Airflow 3 until "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
@providers_configuration_loaded
def create_backfill(args) -> None:
"""Create backfill job or dry run for a DAG or list of DAGs using regex."""
logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT)
logging.basicConfig(level=logging.INFO, format=settings.SIMPLE_LOG_FORMAT)
signal.signal(signal.SIGTERM, sigint_handler)
console = AirflowConsole()

Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/cli/commands/task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ def task_render(args, dag: DAG | None = None) -> None:
@providers_configuration_loaded
def task_clear(args) -> None:
"""Clear all task instances or only those matched by regex for a DAG(s)."""
logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT)
logging.basicConfig(level=logging.INFO, format=settings.SIMPLE_LOG_FORMAT)
if args.dag_id and not args.bundle_name and not args.dag_regex and not args.task_regex:
dags = [get_dag_by_file_location(args.dag_id)]
else:
Expand Down
8 changes: 6 additions & 2 deletions airflow-core/src/airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,21 @@ def _serialize_dag_capturing_errors(

We can't place them directly in import_errors, as this may be retried, and work the next time
"""
from airflow import settings
from airflow.models.dagcode import DagCode
from airflow.models.serialized_dag import SerializedDagModel

# Updating serialized DAG can not be faster than a minimum interval to reduce database write rate.
MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint(
"core", "min_serialized_dag_update_interval", fallback=30
)

try:
# We can't use bulk_write_to_db as we want to capture each error individually
dag_was_updated = SerializedDagModel.write_dag(
dag,
bundle_name=bundle_name,
bundle_version=bundle_version,
min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
min_update_interval=MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
session=session,
)
if not dag_was_updated:
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1612,7 +1612,7 @@ def _do_scheduling(self, session: Session) -> int:
"""
# Put a check in place to make sure we don't commit unexpectedly
with prohibit_commit(session) as guard:
if settings.USE_JOB_SCHEDULE:
if conf.getboolean("scheduler", "use_job_schedule", fallback=True):
self._create_dagruns_for_dags(guard, session)

self._start_queued_dagruns(session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
from alembic import context, op
from sqlalchemy import text

from airflow import settings
from airflow.migrations.db_types import TIMESTAMP

# revision identifiers, used by Alembic.
Expand Down Expand Up @@ -71,6 +70,8 @@ def upgrade():


def _move_offending_dagruns():
from airflow.utils.db import AIRFLOW_MOVED_TABLE_PREFIX

select_null_logical_date_query = "select * from dag_run where logical_date is null"

conn = op.get_bind()
Expand All @@ -83,7 +84,7 @@ def _move_offending_dagruns():

# Copy offending data to a new table. This can be done directly in Postgres
# and SQLite with create-from-select; MySQL needs a special case.
offending_table_name = f"{settings.AIRFLOW_MOVED_TABLE_PREFIX}__3_0_0__offending_dag_run"
offending_table_name = f"{AIRFLOW_MOVED_TABLE_PREFIX}__3_0_0__offending_dag_run"
if conn.dialect.name == "mysql":
op.execute(f"create table {offending_table_name} like dag_run")
op.execute(f"insert into {offending_table_name} {select_null_logical_date_query}")
Expand Down
10 changes: 7 additions & 3 deletions airflow-core/src/airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from sqlalchemy_utils import UUIDType

from airflow._shared.timezones import timezone
from airflow.configuration import conf
from airflow.models.asset import (
AssetAliasModel,
AssetModel,
Expand All @@ -46,7 +47,7 @@
from airflow.serialization.dag_dependency import DagDependency
from airflow.serialization.definitions.assets import SerializedAssetUniqueKey as UKey
from airflow.serialization.serialized_objects import DagSerialization
from airflow.settings import COMPRESS_SERIALIZED_DAGS, json
from airflow.settings import json
from airflow.utils.hashlib_wrapper import md5
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.sqlalchemy import UtcDateTime, get_dialect_name, mapped_column
Expand All @@ -62,6 +63,9 @@

log = logging.getLogger(__name__)

# If set to True, serialized DAGs is compressed before writing to DB,
_COMPRESS_SERIALIZED_DAGS = conf.getboolean("core", "compress_serialized_dags", fallback=False)


class _DagDependenciesResolver:
"""Resolver that resolves dag dependencies to include asset id and assets link to asset aliases."""
Expand Down Expand Up @@ -329,7 +333,7 @@ def __init__(self, dag: LazyDeserializedDAG) -> None:
# partially ordered json data
dag_data_json = json.dumps(dag_data, sort_keys=True).encode("utf-8")

if COMPRESS_SERIALIZED_DAGS:
if _COMPRESS_SERIALIZED_DAGS:
self._data = None
self._data_compressed = zlib.compress(dag_data_json)
else:
Expand Down Expand Up @@ -620,7 +624,7 @@ def get_dag_dependencies(cls, session: Session = NEW_SESSION) -> dict[str, list[
"""
load_json: Callable
data_col_to_select: ColumnElement[Any] | InstrumentedAttribute[bytes | None]
if COMPRESS_SERIALIZED_DAGS is False:
if _COMPRESS_SERIALIZED_DAGS is False:
dialect = get_dialect_name(session)
if dialect in ["sqlite", "mysql"]:
data_col_to_select = func.json_extract(cls._data, "$.dag.dag_dependencies")
Expand Down
109 changes: 45 additions & 64 deletions airflow-core/src/airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@
from airflow.utils.orm_event_handlers import setup_event_handlers
from airflow.utils.sqlalchemy import is_sqlalchemy_v1

USE_PSYCOPG3: bool
_USE_PSYCOPG3: bool
try:
from importlib.util import find_spec

is_psycopg3 = find_spec("psycopg") is not None

USE_PSYCOPG3 = is_psycopg3 and not is_sqlalchemy_v1()
_USE_PSYCOPG3 = is_psycopg3 and not is_sqlalchemy_v1()
except (ImportError, ModuleNotFoundError):
USE_PSYCOPG3 = False
_USE_PSYCOPG3 = False

if TYPE_CHECKING:
from sqlalchemy.engine import Engine
Expand Down Expand Up @@ -107,23 +107,13 @@
]
)

LOGGING_LEVEL = logging.INFO

LOG_FORMAT = conf.get("logging", "log_format")
SIMPLE_LOG_FORMAT = conf.get("logging", "simple_log_format")

SQL_ALCHEMY_CONN: str | None = None
SQL_ALCHEMY_CONN_ASYNC: str | None = None
PLUGINS_FOLDER: str | None = None
DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER"))

AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"}
"""
Mapping of sync scheme to async scheme.

:meta private:
"""

engine: Engine | None = None
Session: scoped_session | None = None
# NonScopedSession creates global sessions and is not safe to use in multi-threaded environment without
Expand Down Expand Up @@ -246,6 +236,9 @@ def load_policy_plugins(pm: pluggy.PluginManager):


def _get_async_conn_uri_from_sync(sync_uri):
AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"}
"""Mapping of sync scheme to async scheme."""

scheme, rest = sync_uri.split(":", maxsplit=1)
scheme = scheme.split("+", maxsplit=1)[0]
aiolib = AIO_LIBS_MAPPING.get(scheme)
Expand Down Expand Up @@ -327,9 +320,6 @@ def get_bind(
pass


AIRFLOW_PATH = os.path.dirname(os.path.dirname(__file__))


def _is_sqlite_db_path_relative(sqla_conn_str: str) -> bool:
"""Determine whether the database connection URI specifies a relative path."""
# Check for non-empty connection string:
Expand Down Expand Up @@ -455,22 +445,21 @@ def clean_in_fork():
register_at_fork(after_in_child=clean_in_fork)


DEFAULT_ENGINE_ARGS: dict[str, dict[str, Any]] = {
"postgresql": (
{
"executemany_values_page_size" if is_sqlalchemy_v1() else "insertmanyvalues_page_size": 10000,
}
| (
{}
if USE_PSYCOPG3
else {"executemany_mode": "values_plus_batch", "executemany_batch_page_size": 2000}
)
)
}


def prepare_engine_args(disable_connection_pool=False, pool_class=None):
"""Prepare SQLAlchemy engine args."""
DEFAULT_ENGINE_ARGS: dict[str, dict[str, Any]] = {
"postgresql": (
{
"executemany_values_page_size" if is_sqlalchemy_v1() else "insertmanyvalues_page_size": 10000,
}
| (
{}
if _USE_PSYCOPG3
else {"executemany_mode": "values_plus_batch", "executemany_batch_page_size": 2000}
)
)
}

default_args = {}
for dialect, default in DEFAULT_ENGINE_ARGS.items():
if SQL_ALCHEMY_CONN.startswith(dialect):
Expand Down Expand Up @@ -654,16 +643,39 @@ def prepare_syspath_for_config_and_plugins():

def __getattr__(name: str):
"""Handle deprecated module attributes."""
if name == "MASK_SECRETS_IN_LOGS":
import warnings
import warnings

from airflow.exceptions import RemovedInAirflow4Warning

if name == "MASK_SECRETS_IN_LOGS":
warnings.warn(
"settings.MASK_SECRETS_IN_LOGS has been removed. This shim returns default value of False. "
"Use SecretsMasker.enable_log_masking(), disable_log_masking(), or is_log_masking_enabled() instead.",
DeprecationWarning,
RemovedInAirflow4Warning,
stacklevel=2,
)
return False
if name == "WEB_COLORS":
warnings.warn(
"settings.WEB_COLORS has been removed. This shim returns default value. "
"Please upgrade your provider or integration.",
RemovedInAirflow4Warning,
stacklevel=2,
)
return {"LIGHTBLUE": "#4d9de0", "LIGHTORANGE": "#FF9933"}
if name == "EXECUTE_TASKS_NEW_PYTHON_INTERPRETER":
warnings.warn(
"settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER has been removed. This shim returns default value. "
"Please upgrade your provider or integration.",
RemovedInAirflow4Warning,
stacklevel=2,
)
return not hasattr(os, "fork") or conf.getboolean(
"core",
"execute_tasks_new_python_interpreter",
fallback=False,
)

raise AttributeError(f"module '{__name__}' has no attribute '{name}'")


Expand Down Expand Up @@ -736,26 +748,6 @@ def initialize():
atexit.register(dispose_orm)


# Const stuff
WEB_COLORS = {"LIGHTBLUE": "#4d9de0", "LIGHTORANGE": "#FF9933"}

# Updating serialized DAG can not be faster than a minimum interval to reduce database
# write rate.
MIN_SERIALIZED_DAG_UPDATE_INTERVAL = conf.getint("core", "min_serialized_dag_update_interval", fallback=30)

# If set to True, serialized DAGs is compressed before writing to DB,
COMPRESS_SERIALIZED_DAGS = conf.getboolean("core", "compress_serialized_dags", fallback=False)

CAN_FORK = hasattr(os, "fork")

EXECUTE_TASKS_NEW_PYTHON_INTERPRETER = not CAN_FORK or conf.getboolean(
"core",
"execute_tasks_new_python_interpreter",
fallback=False,
)

USE_JOB_SCHEDULE = conf.getboolean("scheduler", "use_job_schedule", fallback=True)

# By default Airflow plugins are lazily-loaded (only loaded when required). Set it to False,
# if you want to load plugins whenever 'airflow' is invoked via cli or loaded from module.
LAZY_LOAD_PLUGINS: bool = conf.getboolean("core", "lazy_load_plugins", fallback=True)
Expand All @@ -765,15 +757,4 @@ def initialize():
# loaded from module.
LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers", fallback=True)

# Executors can set this to true to configure logging correctly for
# containerized executors.
IS_EXECUTOR_CONTAINER = bool(os.environ.get("AIRFLOW_IS_EXECUTOR_CONTAINER", ""))
IS_K8S_EXECUTOR_POD = bool(os.environ.get("AIRFLOW_IS_K8S_EXECUTOR_POD", ""))
"""Will be True if running in kubernetes executor pod."""

HIDE_SENSITIVE_VAR_CONN_FIELDS = conf.getboolean("core", "hide_sensitive_var_conn_fields")

# Prefix used to identify tables holding data moved during migration.
AIRFLOW_MOVED_TABLE_PREFIX = "_airflow_moved"

DAEMON_UMASK: str = conf.get("core", "daemon_umask", fallback="0o077")
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/utils/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ def setup_logging(filename):
formatter = logging.Formatter(settings.SIMPLE_LOG_FORMAT)
handler.setFormatter(formatter)
root.addHandler(handler)
root.setLevel(settings.LOGGING_LEVEL)
root.setLevel(logging.INFO)

return handler.stream

Expand Down
Loading
Loading