Skip to content

EXECUTE_TASKS_NEW_PYTHON_INTERPRETER setting no play nice with elasticsearch #47420

@dstandish

Description

@dstandish

Body

Elasticsearch (when configured to output json to stdout) requires, naturally, that the logs are sent as json to stdout.

Currently when running with EXECUTE_TASKS_NEW_PYTHON_INTERPRETER set to true, we use check_output, and we do nothing with the output.

See celery_executor_utils.py method _execute_in_subprocess.

I first came up with a POC hack to "fix" this for specifically this use case:

def _run_and_stream(cmd, env):
    import subprocess

    process = subprocess.Popen(
        cmd,
        stdout=sys.__stdout__,
        stderr=sys.__stderr__,
        text=True,
        env=env,
        close_fds=True,
    )

    while True:
        if process.poll() is not None:
            break
        time.sleep(.5)

    return process.poll()


def _execute_in_subprocess(command_to_exec: CommandType, celery_task_id: str | None = None) -> None:
    env = os.environ.copy()
    if celery_task_id:
        env["external_executor_id"] = celery_task_id
    try:
        _run_and_stream(command_to_exec, env=env)
    except subprocess.CalledProcessError as e:
        log.exception("[%s] execute_command encountered a CalledProcessError", celery_task_id)
        log.error(e.output)
        msg = f"Celery command failed on host: {get_hostname()} with celery_task_id {celery_task_id}"
        raise AirflowException(msg)

I later found out about subprocess.run which seems definitely better and would not require the _run_and_stream function, though I have not tested this:

def _execute_in_subprocess(command_to_exec: CommandType, celery_task_id: str | None = None) -> None:
    env = os.environ.copy()
    if celery_task_id:
        env["external_executor_id"] = celery_task_id
    try:
        subprocess.run(command_to_exec, stderr=sys.__stderr__, stdout=sys.__stdout__, close_fds=True, env=env)
    except subprocess.CalledProcessError as e:
        log.exception("[%s] execute_command encountered a CalledProcessError", celery_task_id)
        log.error(e.output)
        msg = f"Celery command failed on host: {get_hostname()} with celery_task_id {celery_task_id}"
        raise AirflowException(msg)

But testing would need to be done to ensure it doesn't have unintended consequences or break other configurations.

In that snippet I use __stdout__ instead of stdout because the latter is replaced by a logging proxy in the celery context, which introduces a prefix ForkPoolWorker-1 or sometihng, which would make it no longer ndjson, which I suspect might prevent the line from being parsed properly.

The important thing would be to make sure this is ok for log handlers other than ES with json stdout -- this might not, and a different approach re handling the forwarding of stdout might be required, to support all use cases cleanly.

Anyway, this is a path to pursue, if it is valuable enough to make this combination work.

Last note, there's already some similar code in standard task runner if you look at CAN_FORK -- it's used when the OS cannot fork.

Committer

  • I acknowledge that I am a maintainer/committer of the Apache Airflow project.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions