Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions providers/edge3/docs/deployment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ subcommand

airflow edge worker

You can also start this worker in the background by running
it as a daemonized process. Additionally, you can redirect stdout
and stderr to their respective files.

Make sure to set umask in [worker_umask] to set permissions
for newly created files by workers.

.. code-block:: bash

airflow edge worker -D --stdout edge-worker.o.log --stderr edge-worker.e.log


Your worker should start picking up tasks as soon as they get fired in
its direction. To stop a worker running on a machine you can use:

Expand Down
12 changes: 12 additions & 0 deletions providers/edge3/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,15 @@ config:
type: integer
example: ~
default: "524288"
worker_umask:
description: |
The default umask to use for edge worker when run in daemon mode

This controls the file-creation mode mask which determines the initial value of file permission bits
for newly created files.

This value is treated as an octal-integer.
version_added: ~
type: string
default: ~
example: ~
47 changes: 41 additions & 6 deletions providers/edge3/src/airflow/providers/edge3/cli/edge_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

from airflow import __version__ as airflow_version, settings
from airflow.cli.cli_config import ARG_PID, ARG_VERBOSE, ActionCommand, Arg
from airflow.cli.commands.daemon_utils import run_command_with_daemon_option
from airflow.cli.simple_table import AirflowConsole
from airflow.configuration import conf
from airflow.providers.edge3 import __version__ as edge_provider_version
Expand Down Expand Up @@ -166,7 +167,6 @@ class _EdgeWorkerCli:
"""Flag if job processing should be completed and no new jobs fetched for maintenance mode. """
maintenance_comments: str | None = None
"""Comments for maintenance mode."""

edge_instance: _EdgeWorkerCli | None = None
"""Singleton instance of the worker."""

Expand All @@ -178,6 +178,7 @@ def __init__(
concurrency: int,
job_poll_interval: int,
heartbeat_interval: int,
daemon: bool = False,
):
self.pid_file_path = pid_file_path
self.job_poll_interval = job_poll_interval
Expand All @@ -186,6 +187,7 @@ def __init__(
self.queues = queues
self.concurrency = concurrency
self.free_concurrency = concurrency
self.daemon = daemon

_EdgeWorkerCli.edge_instance = self

Expand Down Expand Up @@ -342,7 +344,8 @@ def start(self):
if e.response.status_code == HTTPStatus.NOT_FOUND:
raise SystemExit("Error: API endpoint is not ready, please set [edge] api_enabled=True.")
raise SystemExit(str(e))
_write_pid_to_pidfile(self.pid_file_path)
if not self.daemon:
_write_pid_to_pidfile(self.pid_file_path)
signal.signal(signal.SIGINT, _EdgeWorkerCli.signal_handler)
signal.signal(SIG_STATUS, _EdgeWorkerCli.signal_handler)
signal.signal(signal.SIGTERM, self.shutdown_handler)
Expand All @@ -368,7 +371,8 @@ def start(self):
except EdgeWorkerVersionException:
logger.info("Version mismatch of Edge worker and Core. Quitting worker anyway.")
finally:
remove_existing_pidfile(self.pid_file_path)
if not self.daemon:
remove_existing_pidfile(self.pid_file_path)

def loop(self):
"""Run a loop of scheduling and monitoring tasks."""
Expand Down Expand Up @@ -492,10 +496,8 @@ def interruptible_sleep(self):
return


@cli_utils.action_cli(check_db=False)
@providers_configuration_loaded
def worker(args):
"""Start Airflow Edge Worker."""
def _launch_worker(args):
print(settings.HEADER)
print(EDGE_WORKER_HEADER)

Expand All @@ -506,10 +508,27 @@ def worker(args):
concurrency=args.concurrency,
job_poll_interval=conf.getint("edge", "job_poll_interval"),
heartbeat_interval=conf.getint("edge", "heartbeat_interval"),
daemon=args.daemon,
)
edge_worker.start()


@cli_utils.action_cli(check_db=False)
@providers_configuration_loaded
def worker(args):
"""Start Airflow Edge Worker."""
umask = args.umask or conf.get("edge", "worker_umask", fallback=settings.DAEMON_UMASK)

run_command_with_daemon_option(
args=args,
process_name=EDGE_WORKER_PROCESS_NAME,
callback=lambda: _launch_worker(args),
should_setup_logging=True,
pid_file=_pid_file_path(args.pid),
umask=umask,
)


@cli_utils.action_cli(check_db=False)
@providers_configuration_loaded
def status(args):
Expand Down Expand Up @@ -787,6 +806,17 @@ def remote_worker_request_shutdown(args) -> None:
help="State of the edge worker",
)

ARG_DAEMON = Arg(
("-D", "--daemon"), help="Daemonize instead of running in the foreground", action="store_true"
)
ARG_UMASK = Arg(
("-u", "--umask"),
help="Set the umask of edge worker in daemon mode",
)
ARG_STDERR = Arg(("--stderr",), help="Redirect stderr to this file if run in daemon mode")
ARG_STDOUT = Arg(("--stdout",), help="Redirect stdout to this file if run in daemon mode")
ARG_LOG_FILE = Arg(("-l", "--log-file"), help="Location of the log file if run in daemon mode")

EDGE_COMMANDS: list[ActionCommand] = [
ActionCommand(
name=worker.__name__,
Expand All @@ -798,6 +828,11 @@ def remote_worker_request_shutdown(args) -> None:
ARG_EDGE_HOSTNAME,
ARG_PID,
ARG_VERBOSE,
ARG_DAEMON,
ARG_STDOUT,
ARG_STDERR,
ARG_LOG_FILE,
ARG_UMASK,
),
),
ActionCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ def get_provider_info():
"example": None,
"default": "524288",
},
"worker_umask": {
"description": "The default umask to use for edge worker when run in daemon mode\n\nThis controls the file-creation mode mask which determines the initial value of file permission bits\nfor newly created files.\n\nThis value is treated as an octal-integer.\n",
"version_added": None,
"type": "string",
"default": None,
"example": None,
},
},
}
},
Expand Down