diff --git a/airflow-core/src/airflow/executors/base_executor.py b/airflow-core/src/airflow/executors/base_executor.py index 10e3f65f65f4b..066c3269e277b 100644 --- a/airflow-core/src/airflow/executors/base_executor.py +++ b/airflow-core/src/airflow/executors/base_executor.py @@ -154,6 +154,19 @@ class BaseExecutor(LoggingMixin): name: None | ExecutorName = None callback_sink: BaseCallbackSink | None = None + @staticmethod + def get_db_manager() -> str | None: + """ + Return the DB manager class path for this executor, if any. + + Override this method in executor subclasses that require their own + database tables to be managed during db operations like reset/migrate. + + Returns: + str | None: Full module path to the DB manager class, or None if not needed. + """ + return None + @cached_property def jwt_generator(self) -> JWTGenerator: from airflow.api_fastapi.auth.tokens import ( diff --git a/airflow-core/src/airflow/utils/db_manager.py b/airflow-core/src/airflow/utils/db_manager.py index 93959b5bf68bf..6c8c4f9d98973 100644 --- a/airflow-core/src/airflow/utils/db_manager.py +++ b/airflow-core/src/airflow/utils/db_manager.py @@ -167,6 +167,7 @@ class RunDBManager(LoggingMixin): def __init__(self): from airflow.api_fastapi.app import create_auth_manager + from airflow.executors.executor_loader import ExecutorLoader super().__init__() self._managers: list[BaseDBManager] = [] @@ -179,6 +180,20 @@ def __init__(self): auth_manager_db_manager = create_auth_manager().get_db_manager() if auth_manager_db_manager and auth_manager_db_manager not in managers: managers.append(auth_manager_db_manager) + # Add DB managers specified by configured executors (if any) + try: + executor_names = ExecutorLoader.get_executor_names(validate_teams=False) + for executor_name in executor_names: + try: + executor_cls, _ = ExecutorLoader.import_executor_cls(executor_name) + if hasattr(executor_cls, "get_db_manager"): + executor_db_manager = executor_cls.get_db_manager() + if executor_db_manager and executor_db_manager not in managers: + managers.append(executor_db_manager) + except Exception: + self.log.debug("Could not load DB manager from executor %s", executor_name) + except Exception: + self.log.debug("Could not load executor DB managers") for module in managers: manager = import_string(module) self._managers.append(manager) diff --git a/providers/edge3/src/airflow/providers/edge3/alembic.ini b/providers/edge3/src/airflow/providers/edge3/alembic.ini new file mode 100644 index 0000000000000..32601bd33a3f2 --- /dev/null +++ b/providers/edge3/src/airflow/providers/edge3/alembic.ini @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[alembic] +script_location = %(here)s/migrations +sqlalchemy.url = + +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/providers/edge3/src/airflow/providers/edge3/cli/db_command.py b/providers/edge3/src/airflow/providers/edge3/cli/db_command.py new file mode 100644 index 0000000000000..0f3012094bdf4 --- /dev/null +++ b/providers/edge3/src/airflow/providers/edge3/cli/db_command.py @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from airflow import settings +from airflow.providers.edge3.models.db import EdgeDBManager +from airflow.utils.providers_configuration_loader import providers_configuration_loaded + + +@providers_configuration_loaded +def resetdb(args): + """Reset the Edge metadata database.""" + print(f"DB: {settings.engine.url!r}") + if not (args.yes or input("This will drop Edge tables. Proceed? (y/n)").upper() == "Y"): + raise SystemExit("Cancelled") + EdgeDBManager(settings.Session()).resetdb(skip_init=args.skip_init) + + +@providers_configuration_loaded +def migratedb(args): + """Migrate the Edge metadata database.""" + print(f"DB: {settings.engine.url!r}") + EdgeDBManager(settings.Session()).initdb() + print("Edge database migration complete.") diff --git a/providers/edge3/src/airflow/providers/edge3/cli/definition.py b/providers/edge3/src/airflow/providers/edge3/cli/definition.py index 5cd611cc3a438..c0fa9e57a011c 100644 --- a/providers/edge3/src/airflow/providers/edge3/cli/definition.py +++ b/providers/edge3/src/airflow/providers/edge3/cli/definition.py @@ -18,7 +18,15 @@ from typing import TYPE_CHECKING -from airflow.cli.cli_config import ARG_PID, ARG_VERBOSE, ActionCommand, Arg, GroupCommand, lazy_load_command +from airflow.cli.cli_config import ( + ARG_DB_SKIP_INIT, + ARG_PID, + ARG_VERBOSE, + ActionCommand, + Arg, + GroupCommand, + lazy_load_command, +) from airflow.configuration import conf if TYPE_CHECKING: @@ -231,6 +239,21 @@ ), ] +EDGE_DB_COMMANDS: tuple[ActionCommand, ...] = ( + ActionCommand( + name="migrate", + help="Migrates the Edge metadata database to the latest version", + func=lazy_load_command("airflow.providers.edge3.cli.db_command.migratedb"), + args=(ARG_VERBOSE,), + ), + ActionCommand( + name="reset", + help="Reset the Edge metadata database", + func=lazy_load_command("airflow.providers.edge3.cli.db_command.resetdb"), + args=(ARG_YES, ARG_DB_SKIP_INIT, ARG_VERBOSE), + ), +) + def get_edge_cli_commands() -> list[GroupCommand]: return [ @@ -243,6 +266,11 @@ def get_edge_cli_commands() -> list[GroupCommand]: ), subcommands=EDGE_COMMANDS, ), + GroupCommand( + name="edge-db", + help="Manage Edge database", + subcommands=EDGE_DB_COMMANDS, + ), ] diff --git a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py index 60392062469bc..44230842b27e1 100644 --- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py +++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py @@ -61,6 +61,11 @@ def __init__(self, parallelism: int = PARALLELISM): super().__init__(parallelism=parallelism) self.last_reported_state: dict[TaskInstanceKey, TaskInstanceState] = {} + @staticmethod + def get_db_manager() -> str | None: + """Return the DB manager class path for Edge executor.""" + return "airflow.providers.edge3.models.db.EdgeDBManager" + def _check_db_schema(self, engine: Engine) -> None: """ Check if already existing table matches the newest table schema. diff --git a/providers/edge3/src/airflow/providers/edge3/migrations/__init__.py b/providers/edge3/src/airflow/providers/edge3/migrations/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/edge3/src/airflow/providers/edge3/migrations/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/edge3/src/airflow/providers/edge3/migrations/env.py b/providers/edge3/src/airflow/providers/edge3/migrations/env.py new file mode 100644 index 0000000000000..697c51cd7a552 --- /dev/null +++ b/providers/edge3/src/airflow/providers/edge3/migrations/env.py @@ -0,0 +1,117 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import contextlib +from logging import getLogger +from logging.config import fileConfig + +from alembic import context + +from airflow import settings +from airflow.providers.edge3.models.db import EdgeDBManager + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +version_table = EdgeDBManager.version_table_name + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if not getLogger().handlers and config.config_file_name: + fileConfig(config.config_file_name, disable_existing_loggers=False) + +# add your model's MetaData object here +# for 'autogenerate' support +target_metadata = EdgeDBManager.metadata + + +def include_object(_, name, type_, *args): + if type_ == "table" and name not in target_metadata.tables: + return False + return True + + +def run_migrations_offline(): + """ + Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + """ + context.configure( + url=settings.SQL_ALCHEMY_CONN, + target_metadata=target_metadata, + literal_binds=True, + compare_type=True, + compare_server_default=True, + render_as_batch=True, + version_table=version_table, + include_object=include_object, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online(): + """ + Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + """ + + def process_revision_directives(context, revision, directives): + if getattr(config.cmd_opts, "autogenerate", False): + script = directives[0] + if script.upgrade_ops and script.upgrade_ops.is_empty(): + directives[:] = [] + print("No change detected in ORM schema, skipping revision.") + + with contextlib.ExitStack() as stack: + connection = config.attributes.get("connection", None) + + if not connection: + connection = stack.push(settings.engine.connect()) + + context.configure( + connection=connection, + transaction_per_migration=True, + target_metadata=target_metadata, + compare_type=True, + compare_server_default=True, + include_object=include_object, + render_as_batch=True, + process_revision_directives=process_revision_directives, + version_table=version_table, + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/providers/edge3/src/airflow/providers/edge3/migrations/versions/0001_initial_edge_tables.py b/providers/edge3/src/airflow/providers/edge3/migrations/versions/0001_initial_edge_tables.py new file mode 100644 index 0000000000000..437acfa6d7cad --- /dev/null +++ b/providers/edge3/src/airflow/providers/edge3/migrations/versions/0001_initial_edge_tables.py @@ -0,0 +1,110 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Initial Edge3 provider tables. + +Revision ID: 0001_initial_edge_tables +Revises: +Create Date: 2024-01-01 00:00:00.000000 +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import mysql + +# revision identifiers, used by Alembic. +revision = "0001_initial_edge_tables" +down_revision = None +branch_labels = None +depends_on = None +airflow_version = "3.0.0" + + +def upgrade(): + """Create Edge3 tables.""" + # Create edge_worker table + op.create_table( + "edge_worker", + sa.Column("worker_name", sa.String(64), nullable=False), + sa.Column("state", sa.String(20), nullable=True), + sa.Column("maintenance_comment", sa.String(1024), nullable=True), + sa.Column("queues", sa.String(256), nullable=True), + sa.Column("first_online", sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column("last_update", sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column("jobs_active", sa.Integer(), nullable=True, default=0), + sa.Column("jobs_taken", sa.Integer(), nullable=True, default=0), + sa.Column("jobs_success", sa.Integer(), nullable=True, default=0), + sa.Column("jobs_failed", sa.Integer(), nullable=True, default=0), + sa.Column("sysinfo", sa.String(256), nullable=True), + sa.PrimaryKeyConstraint("worker_name", name=op.f("edge_worker_pkey")), + ) + + # Create edge_job table + op.create_table( + "edge_job", + sa.Column("dag_id", sa.String(250), nullable=False), + sa.Column("task_id", sa.String(250), nullable=False), + sa.Column("run_id", sa.String(250), nullable=False), + sa.Column("map_index", sa.Integer(), nullable=False, server_default=sa.text("-1")), + sa.Column("try_number", sa.Integer(), nullable=False, default=0), + sa.Column("state", sa.String(20), nullable=True), + sa.Column("queue", sa.String(256), nullable=True), + sa.Column("concurrency_slots", sa.Integer(), nullable=True), + sa.Column("command", sa.String(2048), nullable=True), + sa.Column("queued_dttm", sa.TIMESTAMP(timezone=True), nullable=True), + sa.Column("edge_worker", sa.String(64), nullable=True), + sa.Column("last_update", sa.TIMESTAMP(timezone=True), nullable=True), + sa.PrimaryKeyConstraint( + "dag_id", "task_id", "run_id", "map_index", "try_number", name=op.f("edge_job_pkey") + ), + ) + op.create_index("rj_order", "edge_job", ["state", "queued_dttm", "queue"], unique=False) + + # Create edge_logs table + op.create_table( + "edge_logs", + sa.Column("dag_id", sa.String(250), nullable=False), + sa.Column("task_id", sa.String(250), nullable=False), + sa.Column("run_id", sa.String(250), nullable=False), + sa.Column("map_index", sa.Integer(), nullable=False, server_default=sa.text("-1")), + sa.Column("try_number", sa.Integer(), nullable=False, default=0), + sa.Column("log_chunk_time", sa.TIMESTAMP(timezone=True), nullable=False), + sa.Column( + "log_chunk_data", + sa.Text().with_variant(mysql.MEDIUMTEXT(), "mysql"), + nullable=False, + ), + sa.PrimaryKeyConstraint( + "dag_id", + "task_id", + "run_id", + "map_index", + "try_number", + "log_chunk_time", + name=op.f("edge_logs_pkey"), + ), + ) + + +def downgrade(): + """Drop Edge3 tables.""" + op.drop_table("edge_logs") + op.drop_index("rj_order", table_name="edge_job") + op.drop_table("edge_job") + op.drop_table("edge_worker") diff --git a/providers/edge3/src/airflow/providers/edge3/migrations/versions/__init__.py b/providers/edge3/src/airflow/providers/edge3/migrations/versions/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/providers/edge3/src/airflow/providers/edge3/migrations/versions/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/providers/edge3/src/airflow/providers/edge3/models/base.py b/providers/edge3/src/airflow/providers/edge3/models/base.py new file mode 100644 index 0000000000000..d09a90953932b --- /dev/null +++ b/providers/edge3/src/airflow/providers/edge3/models/base.py @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from sqlalchemy import MetaData +from sqlalchemy.orm import registry + +from airflow.models.base import _get_schema, naming_convention +from airflow.utils.sqlalchemy import is_sqlalchemy_v1 + +edge_metadata = MetaData(schema=_get_schema(), naming_convention=naming_convention) +edge_mapper_registry = registry(metadata=edge_metadata) + +if TYPE_CHECKING: + EdgeBase = Any +else: + EdgeBase = edge_mapper_registry.generate_base() + if not is_sqlalchemy_v1(): + EdgeBase.__allow_unmapped__ = True diff --git a/providers/edge3/src/airflow/providers/edge3/models/db.py b/providers/edge3/src/airflow/providers/edge3/models/db.py new file mode 100644 index 0000000000000..484e29ae8a3df --- /dev/null +++ b/providers/edge3/src/airflow/providers/edge3/models/db.py @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from pathlib import Path + +from airflow.providers.edge3.models.base import edge_metadata +from airflow.utils.db_manager import BaseDBManager + +PACKAGE_DIR = Path(__file__).parents[1] + +_REVISION_HEADS_MAP: dict[str, str] = { + "1.0.0": "0001_initial_edge_tables", +} + + +class EdgeDBManager(BaseDBManager): + """Manages Edge3 provider database.""" + + metadata = edge_metadata + version_table_name = "alembic_version_edge" + migration_dir = (PACKAGE_DIR / "migrations").as_posix() + alembic_file = (PACKAGE_DIR / "alembic.ini").as_posix() + supports_table_dropping = True + revision_heads_map = _REVISION_HEADS_MAP diff --git a/providers/edge3/src/airflow/providers/edge3/models/edge_job.py b/providers/edge3/src/airflow/providers/edge3/models/edge_job.py index ccf21de848fcd..6ea22b27a94f1 100644 --- a/providers/edge3/src/airflow/providers/edge3/models/edge_job.py +++ b/providers/edge3/src/airflow/providers/edge3/models/edge_job.py @@ -26,15 +26,16 @@ ) from sqlalchemy.orm import Mapped -from airflow.models.base import Base, StringID +from airflow.models.base import StringID from airflow.models.taskinstancekey import TaskInstanceKey from airflow.providers.common.compat.sdk import timezone from airflow.providers.common.compat.sqlalchemy.orm import mapped_column +from airflow.providers.edge3.models.base import EdgeBase from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.sqlalchemy import UtcDateTime -class EdgeJobModel(Base, LoggingMixin): +class EdgeJobModel(EdgeBase, LoggingMixin): """ A job which is queued, waiting or running on a Edge Worker. diff --git a/providers/edge3/src/airflow/providers/edge3/models/edge_logs.py b/providers/edge3/src/airflow/providers/edge3/models/edge_logs.py index f4366756841b9..dd7f516668df0 100644 --- a/providers/edge3/src/airflow/providers/edge3/models/edge_logs.py +++ b/providers/edge3/src/airflow/providers/edge3/models/edge_logs.py @@ -26,13 +26,14 @@ from sqlalchemy.dialects.mysql import MEDIUMTEXT from sqlalchemy.orm import Mapped -from airflow.models.base import Base, StringID +from airflow.models.base import StringID from airflow.providers.common.compat.sqlalchemy.orm import mapped_column +from airflow.providers.edge3.models.base import EdgeBase from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.sqlalchemy import UtcDateTime -class EdgeLogsModel(Base, LoggingMixin): +class EdgeLogsModel(EdgeBase, LoggingMixin): """ Temporary collected logs from a Edge Worker while job runs on remote site. diff --git a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py index 9816166a12d88..94f2197c11b05 100644 --- a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py +++ b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py @@ -26,9 +26,9 @@ from sqlalchemy import Integer, String, delete, select from sqlalchemy.orm import Mapped -from airflow.models.base import Base from airflow.providers.common.compat.sdk import AirflowException, Stats, timezone from airflow.providers.common.compat.sqlalchemy.orm import mapped_column +from airflow.providers.edge3.models.base import EdgeBase from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.providers_configuration_loader import providers_configuration_loaded from airflow.utils.session import NEW_SESSION, provide_session @@ -83,7 +83,7 @@ class EdgeWorkerState(str, Enum): """Worker was shut down in maintenance mode. It will be in maintenance mode when restarted.""" -class EdgeWorkerModel(Base, LoggingMixin): +class EdgeWorkerModel(EdgeBase, LoggingMixin): """A Edge Worker instance which reports the state and health.""" __tablename__ = "edge_worker"