Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions airflow-core/src/airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,19 @@ class BaseExecutor(LoggingMixin):
name: None | ExecutorName = None
callback_sink: BaseCallbackSink | None = None

@staticmethod
def get_db_manager() -> str | None:
"""
Return the DB manager class path for this executor, if any.

Override this method in executor subclasses that require their own
database tables to be managed during db operations like reset/migrate.

Returns:
str | None: Full module path to the DB manager class, or None if not needed.
"""
return None

@cached_property
def jwt_generator(self) -> JWTGenerator:
from airflow.api_fastapi.auth.tokens import (
Expand Down
15 changes: 15 additions & 0 deletions airflow-core/src/airflow/utils/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ class RunDBManager(LoggingMixin):

def __init__(self):
from airflow.api_fastapi.app import create_auth_manager
from airflow.executors.executor_loader import ExecutorLoader

super().__init__()
self._managers: list[BaseDBManager] = []
Expand All @@ -179,6 +180,20 @@ def __init__(self):
auth_manager_db_manager = create_auth_manager().get_db_manager()
if auth_manager_db_manager and auth_manager_db_manager not in managers:
managers.append(auth_manager_db_manager)
# Add DB managers specified by configured executors (if any)
try:
executor_names = ExecutorLoader.get_executor_names(validate_teams=False)
for executor_name in executor_names:
try:
executor_cls, _ = ExecutorLoader.import_executor_cls(executor_name)
if hasattr(executor_cls, "get_db_manager"):
executor_db_manager = executor_cls.get_db_manager()
if executor_db_manager and executor_db_manager not in managers:
managers.append(executor_db_manager)
except Exception:
self.log.debug("Could not load DB manager from executor %s", executor_name)
except Exception:
self.log.debug("Could not load executor DB managers")
for module in managers:
manager = import_string(module)
self._managers.append(manager)
Expand Down
54 changes: 54 additions & 0 deletions providers/edge3/src/airflow/providers/edge3/alembic.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[alembic]
script_location = %(here)s/migrations
sqlalchemy.url =

[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = WARN
handlers = console
qualname =

[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = INFO
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
38 changes: 38 additions & 0 deletions providers/edge3/src/airflow/providers/edge3/cli/db_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from airflow import settings
from airflow.providers.edge3.models.db import EdgeDBManager
from airflow.utils.providers_configuration_loader import providers_configuration_loaded


@providers_configuration_loaded
def resetdb(args):
"""Reset the Edge metadata database."""
print(f"DB: {settings.engine.url!r}")
if not (args.yes or input("This will drop Edge tables. Proceed? (y/n)").upper() == "Y"):
raise SystemExit("Cancelled")
EdgeDBManager(settings.Session()).resetdb(skip_init=args.skip_init)


@providers_configuration_loaded
def migratedb(args):
"""Migrate the Edge metadata database."""
print(f"DB: {settings.engine.url!r}")
EdgeDBManager(settings.Session()).initdb()
print("Edge database migration complete.")
30 changes: 29 additions & 1 deletion providers/edge3/src/airflow/providers/edge3/cli/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@

from typing import TYPE_CHECKING

from airflow.cli.cli_config import ARG_PID, ARG_VERBOSE, ActionCommand, Arg, GroupCommand, lazy_load_command
from airflow.cli.cli_config import (
ARG_DB_SKIP_INIT,
ARG_PID,
ARG_VERBOSE,
ActionCommand,
Arg,
GroupCommand,
lazy_load_command,
)
from airflow.configuration import conf

if TYPE_CHECKING:
Expand Down Expand Up @@ -231,6 +239,21 @@
),
]

EDGE_DB_COMMANDS: tuple[ActionCommand, ...] = (
ActionCommand(
name="migrate",
help="Migrates the Edge metadata database to the latest version",
func=lazy_load_command("airflow.providers.edge3.cli.db_command.migratedb"),
args=(ARG_VERBOSE,),
),
ActionCommand(
name="reset",
help="Reset the Edge metadata database",
func=lazy_load_command("airflow.providers.edge3.cli.db_command.resetdb"),
args=(ARG_YES, ARG_DB_SKIP_INIT, ARG_VERBOSE),
),
)


def get_edge_cli_commands() -> list[GroupCommand]:
return [
Expand All @@ -243,6 +266,11 @@ def get_edge_cli_commands() -> list[GroupCommand]:
),
subcommands=EDGE_COMMANDS,
),
GroupCommand(
name="edge-db",
help="Manage Edge database",
subcommands=EDGE_DB_COMMANDS,
),
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ def __init__(self, parallelism: int = PARALLELISM):
super().__init__(parallelism=parallelism)
self.last_reported_state: dict[TaskInstanceKey, TaskInstanceState] = {}

@staticmethod
def get_db_manager() -> str | None:
"""Return the DB manager class path for Edge executor."""
return "airflow.providers.edge3.models.db.EdgeDBManager"

def _check_db_schema(self, engine: Engine) -> None:
"""
Check if already existing table matches the newest table schema.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
117 changes: 117 additions & 0 deletions providers/edge3/src/airflow/providers/edge3/migrations/env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import contextlib
from logging import getLogger
from logging.config import fileConfig

from alembic import context

from airflow import settings
from airflow.providers.edge3.models.db import EdgeDBManager

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

version_table = EdgeDBManager.version_table_name

# Interpret the config file for Python logging.
# This line sets up loggers basically.
if not getLogger().handlers and config.config_file_name:
fileConfig(config.config_file_name, disable_existing_loggers=False)

# add your model's MetaData object here
# for 'autogenerate' support
target_metadata = EdgeDBManager.metadata


def include_object(_, name, type_, *args):
if type_ == "table" and name not in target_metadata.tables:
return False
return True


def run_migrations_offline():
"""
Run migrations in 'offline' mode.

This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.

Calls to context.execute() here emit the given string to the
script output.
"""
context.configure(
url=settings.SQL_ALCHEMY_CONN,
target_metadata=target_metadata,
literal_binds=True,
compare_type=True,
compare_server_default=True,
render_as_batch=True,
version_table=version_table,
include_object=include_object,
)

with context.begin_transaction():
context.run_migrations()


def run_migrations_online():
"""
Run migrations in 'online' mode.

In this scenario we need to create an Engine
and associate a connection with the context.
"""

def process_revision_directives(context, revision, directives):
if getattr(config.cmd_opts, "autogenerate", False):
script = directives[0]
if script.upgrade_ops and script.upgrade_ops.is_empty():
directives[:] = []
print("No change detected in ORM schema, skipping revision.")

with contextlib.ExitStack() as stack:
connection = config.attributes.get("connection", None)

if not connection:
connection = stack.push(settings.engine.connect())

context.configure(
connection=connection,
transaction_per_migration=True,
target_metadata=target_metadata,
compare_type=True,
compare_server_default=True,
include_object=include_object,
render_as_batch=True,
process_revision_directives=process_revision_directives,
version_table=version_table,
)

with context.begin_transaction():
context.run_migrations()


if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
Loading
Loading