Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions providers/edge3/docs/deployment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ instance. The commands are:
- ``airflow edge remote-edge-worker-update-maintenance-comment``: Updates the maintenance comment for a remote edge worker
- ``airflow edge remote-edge-worker-exit-maintenance``: Request a remote edge worker to exit maintenance mode
- ``airflow edge shutdown-remote-edge-worker``: Shuts down a remote edge worker gracefully
- ``airflow edge shutdown-all-workers``: Request graceful shutdown of all registered edge workers
- ``airflow edge remove-remote-edge-worker``: Remove a worker instance from the cluster
- ``airflow edge add-worker-queues``: Add queues to an edge worker
- ``airflow edge remove-worker-queues``: Remove queues from an edge worker
43 changes: 43 additions & 0 deletions providers/edge3/src/airflow/providers/edge3/cli/edge_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,37 @@ def remote_worker_request_shutdown(args) -> None:
logger.info("Requested shutdown of Edge Worker host %s by %s.", args.edge_hostname, getuser())


@cli_utils.action_cli(check_db=False)
@providers_configuration_loaded
def shutdown_all_workers(args) -> None:
"""Request graceful shutdown of all edge workers."""
_check_valid_db_connection()
if not (
args.yes
or input("This will shutdown all active edge workers, this cannot be undone! Proceed? (y/n)").upper()
== "Y"
):
raise SystemExit("Cancelled")

from airflow.providers.edge3.models.edge_worker import get_registered_edge_hosts, request_shutdown

all_hosts = list(get_registered_edge_hosts())
if not all_hosts:
logger.info("No edge workers found to shutdown.")
return

shutdown_count = 0
for host in all_hosts:
try:
request_shutdown(host.worker_name)
logger.info("Requested shutdown of Edge Worker host %s", host.worker_name)
shutdown_count += 1
except Exception as e:
logger.error("Failed to shutdown Edge Worker host %s: %s", host.worker_name, e)

logger.info("Requested shutdown of %d edge workers by %s.", shutdown_count, getuser())


@cli_utils.action_cli(check_db=False)
@providers_configuration_loaded
def add_worker_queues(args) -> None:
Expand Down Expand Up @@ -468,6 +499,12 @@ def remove_worker_queues(args) -> None:
ARG_STDERR = Arg(("--stderr",), help="Redirect stderr to this file if run in daemon mode")
ARG_STDOUT = Arg(("--stdout",), help="Redirect stdout to this file if run in daemon mode")
ARG_LOG_FILE = Arg(("-l", "--log-file"), help="Location of the log file if run in daemon mode")
ARG_YES = Arg(
("-y", "--yes"),
help="Skip confirmation prompt and proceed with shutdown",
action="store_true",
default=False,
)

EDGE_COMMANDS: list[ActionCommand] = [
ActionCommand(
Expand Down Expand Up @@ -581,4 +618,10 @@ def remove_worker_queues(args) -> None:
ARG_QUEUES_MANAGE,
),
),
ActionCommand(
name="shutdown-all-workers",
help=shutdown_all_workers.__doc__,
func=shutdown_all_workers,
args=(ARG_YES,),
),
]